Merge remote-tracking branch 'origin/master' into zero_copy_hdfs

This commit is contained in:
Zhichang Yu 2021-07-14 02:12:28 +00:00
commit fe5d17680e
160 changed files with 4133 additions and 281 deletions

5
.gitmodules vendored
View File

@ -193,7 +193,7 @@
url = https://github.com/danlark1/miniselect url = https://github.com/danlark1/miniselect
[submodule "contrib/rocksdb"] [submodule "contrib/rocksdb"]
path = contrib/rocksdb path = contrib/rocksdb
url = https://github.com/ClickHouse-Extras/rocksdb.git url = https://github.com/ClickHouse-Extras/rocksdb.git
[submodule "contrib/xz"] [submodule "contrib/xz"]
path = contrib/xz path = contrib/xz
url = https://github.com/xz-mirror/xz url = https://github.com/xz-mirror/xz
@ -231,3 +231,6 @@
[submodule "contrib/sqlite-amalgamation"] [submodule "contrib/sqlite-amalgamation"]
path = contrib/sqlite-amalgamation path = contrib/sqlite-amalgamation
url = https://github.com/azadkuh/sqlite-amalgamation url = https://github.com/azadkuh/sqlite-amalgamation
[submodule "contrib/s2geometry"]
path = contrib/s2geometry
url = https://github.com/ClickHouse-Extras/s2geometry.git

View File

@ -541,6 +541,7 @@ include (cmake/find/rocksdb.cmake)
include (cmake/find/libpqxx.cmake) include (cmake/find/libpqxx.cmake)
include (cmake/find/nuraft.cmake) include (cmake/find/nuraft.cmake)
include (cmake/find/yaml-cpp.cmake) include (cmake/find/yaml-cpp.cmake)
include (cmake/find/s2geometry.cmake)
if(NOT USE_INTERNAL_PARQUET_LIBRARY) if(NOT USE_INTERNAL_PARQUET_LIBRARY)
set (ENABLE_ORC OFF CACHE INTERNAL "") set (ENABLE_ORC OFF CACHE INTERNAL "")

View File

@ -0,0 +1,24 @@
option(ENABLE_S2_GEOMETRY "Enable S2 geometry library" ${ENABLE_LIBRARIES})
if (ENABLE_S2_GEOMETRY)
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/s2geometry")
message (WARNING "submodule contrib/s2geometry is missing. to fix try run: \n git submodule update --init --recursive")
set (ENABLE_S2_GEOMETRY 0)
set (USE_S2_GEOMETRY 0)
else()
if (OPENSSL_FOUND)
set (S2_GEOMETRY_LIBRARY s2)
set (S2_GEOMETRY_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/s2geometry/src/s2)
set (USE_S2_GEOMETRY 1)
else()
message (WARNING "S2 uses OpenSSL, but the latter is absent.")
endif()
endif()
if (NOT USE_S2_GEOMETRY)
message (${RECONFIGURE_MESSAGE_LEVEL} "Can't enable S2 geometry library")
endif()
endif()
message (STATUS "Using s2geometry=${USE_S2_GEOMETRY} : ${S2_GEOMETRY_INCLUDE_DIR}")

View File

@ -1,4 +1,4 @@
option(ENABLE_SQLITE "Enalbe sqlite" ${ENABLE_LIBRARIES}) option(ENABLE_SQLITE "Enable sqlite" ${ENABLE_LIBRARIES})
if (NOT ENABLE_SQLITE) if (NOT ENABLE_SQLITE)
return() return()

View File

@ -1,4 +1,4 @@
option(ENABLE_STATS "Enalbe StatsLib library" ${ENABLE_LIBRARIES}) option(ENABLE_STATS "Enable StatsLib library" ${ENABLE_LIBRARIES})
if (ENABLE_STATS) if (ENABLE_STATS)
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/stats") if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/stats")

View File

@ -1,3 +1,4 @@
# Third-party libraries may have substandard code.
# Put all targets defined here and in added subfolders under "contrib/" folder in GUI-based IDEs by default. # Put all targets defined here and in added subfolders under "contrib/" folder in GUI-based IDEs by default.
# Some of third-party projects may override CMAKE_FOLDER or FOLDER property of their targets, so they will # Some of third-party projects may override CMAKE_FOLDER or FOLDER property of their targets, so they will
@ -10,10 +11,8 @@ else ()
endif () endif ()
unset (_current_dir_name) unset (_current_dir_name)
# Third-party libraries may have substandard code. set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -w")
# Also remove a possible source of nondeterminism. set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -w")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -w -D__DATE__= -D__TIME__= -D__TIMESTAMP__=")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -w -D__DATE__= -D__TIME__= -D__TIMESTAMP__=")
if (WITH_COVERAGE) if (WITH_COVERAGE)
set (WITHOUT_COVERAGE_LIST ${WITHOUT_COVERAGE}) set (WITHOUT_COVERAGE_LIST ${WITHOUT_COVERAGE})
@ -333,3 +332,6 @@ if (USE_SQLITE)
add_subdirectory(sqlite-cmake) add_subdirectory(sqlite-cmake)
endif() endif()
if (USE_S2_GEOMETRY)
add_subdirectory(s2geometry-cmake)
endif()

1
contrib/s2geometry vendored Submodule

@ -0,0 +1 @@
Subproject commit 20ea540d81f4575a3fc0aea585aac611bcd03ede

View File

@ -0,0 +1,126 @@
set(S2_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/s2geometry/src")
set(S2_SRCS
"${S2_SOURCE_DIR}/s2/base/stringprintf.cc"
"${S2_SOURCE_DIR}/s2/base/strtoint.cc"
"${S2_SOURCE_DIR}/s2/encoded_s2cell_id_vector.cc"
"${S2_SOURCE_DIR}/s2/encoded_s2point_vector.cc"
"${S2_SOURCE_DIR}/s2/encoded_s2shape_index.cc"
"${S2_SOURCE_DIR}/s2/encoded_string_vector.cc"
"${S2_SOURCE_DIR}/s2/id_set_lexicon.cc"
"${S2_SOURCE_DIR}/s2/mutable_s2shape_index.cc"
"${S2_SOURCE_DIR}/s2/r2rect.cc"
"${S2_SOURCE_DIR}/s2/s1angle.cc"
"${S2_SOURCE_DIR}/s2/s1chord_angle.cc"
"${S2_SOURCE_DIR}/s2/s1interval.cc"
"${S2_SOURCE_DIR}/s2/s2boolean_operation.cc"
"${S2_SOURCE_DIR}/s2/s2builder.cc"
"${S2_SOURCE_DIR}/s2/s2builder_graph.cc"
"${S2_SOURCE_DIR}/s2/s2builderutil_closed_set_normalizer.cc"
"${S2_SOURCE_DIR}/s2/s2builderutil_find_polygon_degeneracies.cc"
"${S2_SOURCE_DIR}/s2/s2builderutil_lax_polygon_layer.cc"
"${S2_SOURCE_DIR}/s2/s2builderutil_s2point_vector_layer.cc"
"${S2_SOURCE_DIR}/s2/s2builderutil_s2polygon_layer.cc"
"${S2_SOURCE_DIR}/s2/s2builderutil_s2polyline_layer.cc"
"${S2_SOURCE_DIR}/s2/s2builderutil_s2polyline_vector_layer.cc"
"${S2_SOURCE_DIR}/s2/s2builderutil_snap_functions.cc"
"${S2_SOURCE_DIR}/s2/s2cap.cc"
"${S2_SOURCE_DIR}/s2/s2cell.cc"
"${S2_SOURCE_DIR}/s2/s2cell_id.cc"
"${S2_SOURCE_DIR}/s2/s2cell_index.cc"
"${S2_SOURCE_DIR}/s2/s2cell_union.cc"
"${S2_SOURCE_DIR}/s2/s2centroids.cc"
"${S2_SOURCE_DIR}/s2/s2closest_cell_query.cc"
"${S2_SOURCE_DIR}/s2/s2closest_edge_query.cc"
"${S2_SOURCE_DIR}/s2/s2closest_point_query.cc"
"${S2_SOURCE_DIR}/s2/s2contains_vertex_query.cc"
"${S2_SOURCE_DIR}/s2/s2convex_hull_query.cc"
"${S2_SOURCE_DIR}/s2/s2coords.cc"
"${S2_SOURCE_DIR}/s2/s2crossing_edge_query.cc"
"${S2_SOURCE_DIR}/s2/s2debug.cc"
"${S2_SOURCE_DIR}/s2/s2earth.cc"
"${S2_SOURCE_DIR}/s2/s2edge_clipping.cc"
"${S2_SOURCE_DIR}/s2/s2edge_crosser.cc"
"${S2_SOURCE_DIR}/s2/s2edge_crossings.cc"
"${S2_SOURCE_DIR}/s2/s2edge_distances.cc"
"${S2_SOURCE_DIR}/s2/s2edge_tessellator.cc"
"${S2_SOURCE_DIR}/s2/s2error.cc"
"${S2_SOURCE_DIR}/s2/s2furthest_edge_query.cc"
"${S2_SOURCE_DIR}/s2/s2latlng.cc"
"${S2_SOURCE_DIR}/s2/s2latlng_rect.cc"
"${S2_SOURCE_DIR}/s2/s2latlng_rect_bounder.cc"
"${S2_SOURCE_DIR}/s2/s2lax_loop_shape.cc"
"${S2_SOURCE_DIR}/s2/s2lax_polygon_shape.cc"
"${S2_SOURCE_DIR}/s2/s2lax_polyline_shape.cc"
"${S2_SOURCE_DIR}/s2/s2loop.cc"
"${S2_SOURCE_DIR}/s2/s2loop_measures.cc"
"${S2_SOURCE_DIR}/s2/s2measures.cc"
"${S2_SOURCE_DIR}/s2/s2metrics.cc"
"${S2_SOURCE_DIR}/s2/s2max_distance_targets.cc"
"${S2_SOURCE_DIR}/s2/s2min_distance_targets.cc"
"${S2_SOURCE_DIR}/s2/s2padded_cell.cc"
"${S2_SOURCE_DIR}/s2/s2point_compression.cc"
"${S2_SOURCE_DIR}/s2/s2point_region.cc"
"${S2_SOURCE_DIR}/s2/s2pointutil.cc"
"${S2_SOURCE_DIR}/s2/s2polygon.cc"
"${S2_SOURCE_DIR}/s2/s2polyline.cc"
"${S2_SOURCE_DIR}/s2/s2polyline_alignment.cc"
"${S2_SOURCE_DIR}/s2/s2polyline_measures.cc"
"${S2_SOURCE_DIR}/s2/s2polyline_simplifier.cc"
"${S2_SOURCE_DIR}/s2/s2predicates.cc"
"${S2_SOURCE_DIR}/s2/s2projections.cc"
"${S2_SOURCE_DIR}/s2/s2r2rect.cc"
"${S2_SOURCE_DIR}/s2/s2region.cc"
"${S2_SOURCE_DIR}/s2/s2region_term_indexer.cc"
"${S2_SOURCE_DIR}/s2/s2region_coverer.cc"
"${S2_SOURCE_DIR}/s2/s2region_intersection.cc"
"${S2_SOURCE_DIR}/s2/s2region_union.cc"
"${S2_SOURCE_DIR}/s2/s2shape_index.cc"
"${S2_SOURCE_DIR}/s2/s2shape_index_buffered_region.cc"
"${S2_SOURCE_DIR}/s2/s2shape_index_measures.cc"
"${S2_SOURCE_DIR}/s2/s2shape_measures.cc"
"${S2_SOURCE_DIR}/s2/s2shapeutil_build_polygon_boundaries.cc"
"${S2_SOURCE_DIR}/s2/s2shapeutil_coding.cc"
"${S2_SOURCE_DIR}/s2/s2shapeutil_contains_brute_force.cc"
"${S2_SOURCE_DIR}/s2/s2shapeutil_edge_iterator.cc"
"${S2_SOURCE_DIR}/s2/s2shapeutil_get_reference_point.cc"
"${S2_SOURCE_DIR}/s2/s2shapeutil_range_iterator.cc"
"${S2_SOURCE_DIR}/s2/s2shapeutil_visit_crossing_edge_pairs.cc"
"${S2_SOURCE_DIR}/s2/s2text_format.cc"
"${S2_SOURCE_DIR}/s2/s2wedge_relations.cc"
"${S2_SOURCE_DIR}/s2/strings/ostringstream.cc"
"${S2_SOURCE_DIR}/s2/strings/serialize.cc"
# ClickHouse doesn't use strings from abseil.
# So, there is no duplicate symbols.
"${S2_SOURCE_DIR}/s2/third_party/absl/base/dynamic_annotations.cc"
"${S2_SOURCE_DIR}/s2/third_party/absl/base/internal/raw_logging.cc"
"${S2_SOURCE_DIR}/s2/third_party/absl/base/internal/throw_delegate.cc"
"${S2_SOURCE_DIR}/s2/third_party/absl/numeric/int128.cc"
"${S2_SOURCE_DIR}/s2/third_party/absl/strings/ascii.cc"
"${S2_SOURCE_DIR}/s2/third_party/absl/strings/match.cc"
"${S2_SOURCE_DIR}/s2/third_party/absl/strings/numbers.cc"
"${S2_SOURCE_DIR}/s2/third_party/absl/strings/str_cat.cc"
"${S2_SOURCE_DIR}/s2/third_party/absl/strings/str_split.cc"
"${S2_SOURCE_DIR}/s2/third_party/absl/strings/string_view.cc"
"${S2_SOURCE_DIR}/s2/third_party/absl/strings/strip.cc"
"${S2_SOURCE_DIR}/s2/third_party/absl/strings/internal/memutil.cc"
"${S2_SOURCE_DIR}/s2/util/bits/bit-interleave.cc"
"${S2_SOURCE_DIR}/s2/util/bits/bits.cc"
"${S2_SOURCE_DIR}/s2/util/coding/coder.cc"
"${S2_SOURCE_DIR}/s2/util/coding/varint.cc"
"${S2_SOURCE_DIR}/s2/util/math/exactfloat/exactfloat.cc"
"${S2_SOURCE_DIR}/s2/util/math/mathutil.cc"
"${S2_SOURCE_DIR}/s2/util/units/length-units.cc"
)
add_library(s2 ${S2_SRCS})
if (OPENSSL_FOUND)
target_link_libraries(s2 PRIVATE ${OPENSSL_LIBRARIES})
endif()
target_include_directories(s2 SYSTEM BEFORE PUBLIC "${S2_SOURCE_DIR}/")
if(M_LIBRARY)
target_link_libraries(s2 PRIVATE ${M_LIBRARY})
endif()

View File

@ -380,6 +380,14 @@ function run_tests
01923_network_receive_time_metric_insert 01923_network_receive_time_metric_insert
01889_sqlite_read_write 01889_sqlite_read_write
# needs s2
01849_geoToS2
01851_s2_to_geo
01852_s2_get_neighbours
01853_s2_cells_intersect
01854_s2_cap_contains
01854_s2_cap_union
) )
time clickhouse-test --hung-check -j 8 --order=random --use-skip-list \ time clickhouse-test --hung-check -j 8 --order=random --use-skip-list \

View File

@ -79,6 +79,7 @@ SELECT library_name, license_type, license_path FROM system.licenses ORDER BY li
| re2 | BSD 3-clause | /contrib/re2/LICENSE | | re2 | BSD 3-clause | /contrib/re2/LICENSE |
| replxx | BSD 3-clause | /contrib/replxx/LICENSE.md | | replxx | BSD 3-clause | /contrib/replxx/LICENSE.md |
| rocksdb | BSD 3-clause | /contrib/rocksdb/LICENSE.leveldb | | rocksdb | BSD 3-clause | /contrib/rocksdb/LICENSE.leveldb |
| s2geometry | Apache | /contrib/s2geometry/LICENSE |
| sentry-native | MIT | /contrib/sentry-native/LICENSE | | sentry-native | MIT | /contrib/sentry-native/LICENSE |
| simdjson | Apache | /contrib/simdjson/LICENSE | | simdjson | Apache | /contrib/simdjson/LICENSE |
| snappy | Public Domain | /contrib/snappy/COPYING | | snappy | Public Domain | /contrib/snappy/COPYING |

View File

@ -1,6 +1,6 @@
--- ---
toc_priority: 12 toc_priority: 12
toc_title: MateriaziePostgreSQL toc_title: MaterializedPostgreSQL
--- ---
# MaterializedPostgreSQL {#materialize-postgresql} # MaterializedPostgreSQL {#materialize-postgresql}

View File

@ -1213,7 +1213,15 @@ Default value: `3`.
## output_format_json_quote_64bit_integers {#session_settings-output_format_json_quote_64bit_integers} ## output_format_json_quote_64bit_integers {#session_settings-output_format_json_quote_64bit_integers}
If the value is true, integers appear in quotes when using JSON\* Int64 and UInt64 formats (for compatibility with most JavaScript implementations); otherwise, integers are output without the quotes. Controls quoting of 64-bit or bigger [integers](../../sql-reference/data-types/int-uint.md) (like `UInt64` or `Int128`) when they are output in a [JSON](../../interfaces/formats.md#json) format.
Such integers are enclosed in quotes by default. This behavior is compatible with most JavaScript implementations.
Possible values:
- 0 — Integers are output without quotes.
- 1 — Integers are enclosed in quotes.
Default value: 1.
## output_format_json_quote_denormals {#settings-output_format_json_quote_denormals} ## output_format_json_quote_denormals {#settings-output_format_json_quote_denormals}

View File

@ -8,12 +8,11 @@ Columns:
- `table` ([String](../../sql-reference/data-types/string.md)) — Table name. - `table` ([String](../../sql-reference/data-types/string.md)) — Table name.
- `name` ([String](../../sql-reference/data-types/string.md)) — Index name. - `name` ([String](../../sql-reference/data-types/string.md)) — Index name.
- `type` ([String](../../sql-reference/data-types/string.md)) — Index type. - `type` ([String](../../sql-reference/data-types/string.md)) — Index type.
- `expr` ([String](../../sql-reference/data-types/string.md)) — Expression used to calculate the index. - `expr` ([String](../../sql-reference/data-types/string.md)) — Expression for the index calculation.
- `granularity` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of granules in the block. - `granularity` ([UInt64](../../sql-reference/data-types/int-uint.md)) — The number of granules in the block.
**Example** **Example**
```sql ```sql
SELECT * FROM system.data_skipping_indices LIMIT 2 FORMAT Vertical; SELECT * FROM system.data_skipping_indices LIMIT 2 FORMAT Vertical;
``` ```

View File

@ -34,7 +34,7 @@ Input table:
Query: Query:
``` sql ``` sql
SELECT medianDeterministic(val, 1) FROM t SELECT medianDeterministic(val, 1) FROM t;
``` ```
Result: Result:

View File

@ -12,9 +12,6 @@ toc_title: Map(key, value)
- `key` — The key part of the pair. [String](../../sql-reference/data-types/string.md) or [Integer](../../sql-reference/data-types/int-uint.md). - `key` — The key part of the pair. [String](../../sql-reference/data-types/string.md) or [Integer](../../sql-reference/data-types/int-uint.md).
- `value` — The value part of the pair. [String](../../sql-reference/data-types/string.md), [Integer](../../sql-reference/data-types/int-uint.md) or [Array](../../sql-reference/data-types/array.md). - `value` — The value part of the pair. [String](../../sql-reference/data-types/string.md), [Integer](../../sql-reference/data-types/int-uint.md) or [Array](../../sql-reference/data-types/array.md).
!!! warning "Warning"
Currently `Map` data type is an experimental feature. To work with it you must set `allow_experimental_map_type = 1`.
To get the value from an `a Map('key', 'value')` column, use `a['key']` syntax. This lookup works now with a linear complexity. To get the value from an `a Map('key', 'value')` column, use `a['key']` syntax. This lookup works now with a linear complexity.
**Examples** **Examples**

View File

@ -195,6 +195,41 @@ Result:
└────────────────────┘ └────────────────────┘
``` ```
## h3ToGeo {#h3togeo}
Returns `(lon, lat)` that corresponds to the provided H3 index.
**Syntax**
``` sql
h3ToGeo(h3Index)
```
**Arguments**
- `h3Index` — H3 Index. Type: [UInt64](../../../sql-reference/data-types/int-uint.md).
**Returned values**
- `lon` — Longitude. Type: [Float64](../../../sql-reference/data-types/float.md).
- `lat` — Latitude. Type: [Float64](../../../sql-reference/data-types/float.md).
**Example**
Query:
``` sql
SELECT h3ToGeo(644325524701193974) coordinates;
```
Result:
``` text
┌─coordinates───────────────────────────┐
│ (37.79506616830252,55.71290243145668) │
└───────────────────────────────────────┘
```
## h3kRing {#h3kring} ## h3kRing {#h3kring}
Lists all the [H3](#h3index) hexagons in the raduis of `k` from the given hexagon in random order. Lists all the [H3](#h3index) hexagons in the raduis of `k` from the given hexagon in random order.

View File

@ -306,3 +306,49 @@ Result:
└───────────────────────────────────────────────────────────────────────────────────────────────────────┘ └───────────────────────────────────────────────────────────────────────────────────────────────────────┘
``` ```
## toJSONString {#tojsonstring}
Serializes a value to its JSON representation. Various data types and nested structures are supported.
64-bit [integers](../../sql-reference/data-types/int-uint.md) or bigger (like `UInt64` or `Int128`) are enclosed in quotes by default. [output_format_json_quote_64bit_integers](../../operations/settings/settings.md#session_settings-output_format_json_quote_64bit_integers) controls this behavior.
Special values `NaN` and `inf` are replaced with `null`. Enable [output_format_json_quote_denormals](../../operations/settings/settings.md#settings-output_format_json_quote_denormals) setting to show them.
When serializing an [Enum](../../sql-reference/data-types/enum.md) value, the function outputs its name.
**Syntax**
``` sql
toJSONString(value)
```
**Arguments**
- `value` — Value to serialize. Value may be of any data type.
**Returned value**
- JSON representation of the value.
Type: [String](../../sql-reference/data-types/string.md).
**Example**
The first example shows serialization of a [Map](../../sql-reference/data-types/map.md).
The second example shows some special values wrapped into a [Tuple](../../sql-reference/data-types/tuple.md).
Query:
``` sql
SELECT toJSONString(map('key1', 1, 'key2', 2));
SELECT toJSONString(tuple(1.25, NULL, NaN, +inf, -inf, [])) SETTINGS output_format_json_quote_denormals = 1;
```
Result:
``` text
{"key1":1,"key2":2}
[1.25,null,"nan","inf","-inf",[]]
```
**See Also**
- [output_format_json_quote_64bit_integers](../../operations/settings/settings.md#session_settings-output_format_json_quote_64bit_integers)
- [output_format_json_quote_denormals](../../operations/settings/settings.md#settings-output_format_json_quote_denormals)

View File

@ -49,6 +49,7 @@ ENGINE = MaterializeMySQL('host:port', ['database' | database], 'user', 'passwor
| DATE, NEWDATE | [Date](../../sql-reference/data-types/date.md) | | DATE, NEWDATE | [Date](../../sql-reference/data-types/date.md) |
| DATETIME, TIMESTAMP | [DateTime](../../sql-reference/data-types/datetime.md) | | DATETIME, TIMESTAMP | [DateTime](../../sql-reference/data-types/datetime.md) |
| DATETIME2, TIMESTAMP2 | [DateTime64](../../sql-reference/data-types/datetime64.md) | | DATETIME2, TIMESTAMP2 | [DateTime64](../../sql-reference/data-types/datetime64.md) |
| ENUM | [Enum](../../sql-reference/data-types/enum.md) |
| STRING | [String](../../sql-reference/data-types/string.md) | | STRING | [String](../../sql-reference/data-types/string.md) |
| VARCHAR, VAR_STRING | [String](../../sql-reference/data-types/string.md) | | VARCHAR, VAR_STRING | [String](../../sql-reference/data-types/string.md) |
| BLOB | [String](../../sql-reference/data-types/string.md) | | BLOB | [String](../../sql-reference/data-types/string.md) |

View File

@ -1204,8 +1204,15 @@ load_balancing = round_robin
Работает для форматов JSONEachRow и TSKV. Работает для форматов JSONEachRow и TSKV.
## output_format_json_quote_64bit_integers {#session_settings-output_format_json_quote_64bit_integers} ## output_format_json_quote_64bit_integers {#session_settings-output_format_json_quote_64bit_integers}
Управляет кавычками при выводе 64-битных или более [целых чисел](../../sql-reference/data-types/int-uint.md) (например, `UInt64` или `Int128`) в формате [JSON](../../interfaces/formats.md#json).
По умолчанию такие числа заключаются в кавычки. Это поведение соответствует большинству реализаций JavaScript.
Если значение истинно, то при использовании JSON\* форматов UInt64 и Int64 числа выводятся в кавычках (из соображений совместимости с большинством реализаций JavaScript), иначе - без кавычек. Возможные значения:
- 0 — числа выводятся без кавычек.
- 1 — числа выводятся в кавычках.
Значение по умолчанию: 1.
## output_format_json_quote_denormals {#settings-output_format_json_quote_denormals} ## output_format_json_quote_denormals {#settings-output_format_json_quote_denormals}

View File

@ -0,0 +1,38 @@
# system.data_skipping_indices {#system-data-skipping-indices}
Содержит информацию о существующих индексах пропуска данных во всех таблицах.
Столбцы:
- `database` ([String](../../sql-reference/data-types/string.md)) — имя базы данных.
- `table` ([String](../../sql-reference/data-types/string.md)) — имя таблицы.
- `name` ([String](../../sql-reference/data-types/string.md)) — имя индекса.
- `type` ([String](../../sql-reference/data-types/string.md)) — тип индекса.
- `expr` ([String](../../sql-reference/data-types/string.md)) — выражение, используемое для вычисления индекса.
- `granularity` ([UInt64](../../sql-reference/data-types/int-uint.md)) — количество гранул в блоке данных.
**Пример**
```sql
SELECT * FROM system.data_skipping_indices LIMIT 2 FORMAT Vertical;
```
```text
Row 1:
──────
database: default
table: user_actions
name: clicks_idx
type: minmax
expr: clicks
granularity: 1
Row 2:
──────
database: default
table: users
name: contacts_null_idx
type: minmax
expr: assumeNotNull(contacts_null)
granularity: 1
```

View File

@ -4,7 +4,6 @@
Функции: Функции:
- `median` — синоним для [quantile](../../../sql-reference/aggregate-functions/reference/quantile.md#quantile). - `median` — синоним для [quantile](../../../sql-reference/aggregate-functions/reference/quantile.md#quantile).
- `medianDeterministic` — синоним для [quantileDeterministic](../../../sql-reference/aggregate-functions/reference/quantiledeterministic.md#quantiledeterministic). - `medianDeterministic` — синоним для [quantileDeterministic](../../../sql-reference/aggregate-functions/reference/quantiledeterministic.md#quantiledeterministic).
- `medianExact` — синоним для [quantileExact](../../../sql-reference/aggregate-functions/reference/quantileexact.md#quantileexact). - `medianExact` — синоним для [quantileExact](../../../sql-reference/aggregate-functions/reference/quantileexact.md#quantileexact).
@ -31,7 +30,7 @@
Запрос: Запрос:
``` sql ``` sql
SELECT medianDeterministic(val, 1) FROM t SELECT medianDeterministic(val, 1) FROM t;
``` ```
Результат: Результат:
@ -41,4 +40,3 @@ SELECT medianDeterministic(val, 1) FROM t
│ 1.5 │ │ 1.5 │
└─────────────────────────────┘ └─────────────────────────────┘
``` ```

View File

@ -12,9 +12,6 @@ toc_title: Map(key, value)
- `key` — ключ. [String](../../sql-reference/data-types/string.md) или [Integer](../../sql-reference/data-types/int-uint.md). - `key` — ключ. [String](../../sql-reference/data-types/string.md) или [Integer](../../sql-reference/data-types/int-uint.md).
- `value` — значение. [String](../../sql-reference/data-types/string.md), [Integer](../../sql-reference/data-types/int-uint.md) или [Array](../../sql-reference/data-types/array.md). - `value` — значение. [String](../../sql-reference/data-types/string.md), [Integer](../../sql-reference/data-types/int-uint.md) или [Array](../../sql-reference/data-types/array.md).
!!! warning "Предупреждение"
Сейчас использование типа данных `Map` является экспериментальной возможностью. Чтобы использовать этот тип данных, включите настройку `allow_experimental_map_type = 1`.
Чтобы получить значение из колонки `a Map('key', 'value')`, используйте синтаксис `a['key']`. В настоящее время такая подстановка работает по алгоритму с линейной сложностью. Чтобы получить значение из колонки `a Map('key', 'value')`, используйте синтаксис `a['key']`. В настоящее время такая подстановка работает по алгоритму с линейной сложностью.
**Примеры** **Примеры**

View File

@ -306,3 +306,51 @@ SELECT JSONExtractKeysAndValuesRaw('{"a": [-100, 200.0], "b":{"c": {"d": "hello"
│ [('d','"hello"'),('f','"world"')] │ │ [('d','"hello"'),('f','"world"')] │
└───────────────────────────────────────────────────────────────────────────────────────────────────────┘ └───────────────────────────────────────────────────────────────────────────────────────────────────────┘
``` ```
## toJSONString {#tojsonstring}
Сериализует значение в JSON представление. Поддерживаются различные типы данных и вложенные структуры.
По умолчанию 64-битные [целые числа](../../sql-reference/data-types/int-uint.md) и более (например, `UInt64` или `Int128`) заключаются в кавычки. Настройка [output_format_json_quote_64bit_integers](../../operations/settings/settings.md#session_settings-output_format_json_quote_64bit_integers) управляет этим поведением.
Специальные значения `NaN` и `inf` заменяются на `null`. Чтобы они отображались, включите настройку [output_format_json_quote_denormals](../../operations/settings/settings.md#settings-output_format_json_quote_denormals).
Когда сериализуется значение [Enum](../../sql-reference/data-types/enum.md), то функция выводит его имя.
**Синтаксис**
``` sql
toJSONString(value)
```
**Аргументы**
- `value` — значение, которое необходимо сериализовать. Может быть любого типа.
**Возвращаемое значение**
- JSON представление значения.
Тип: [String](../../sql-reference/data-types/string.md).
**Пример**
Первый пример показывает сериализацию [Map](../../sql-reference/data-types/map.md).
Во втором примере есть специальные значения, обернутые в [Tuple](../../sql-reference/data-types/tuple.md).
Запрос:
``` sql
SELECT toJSONString(map('key1', 1, 'key2', 2));
SELECT toJSONString(tuple(1.25, NULL, NaN, +inf, -inf, [])) SETTINGS output_format_json_quote_denormals = 1;
```
Результат:
``` text
{"key1":1,"key2":2}
[1.25,null,"nan","inf","-inf",[]]
```
**Смотрите также**
- [output_format_json_quote_64bit_integers](../../operations/settings/settings.md#session_settings-output_format_json_quote_64bit_integers)
- [output_format_json_quote_denormals](../../operations/settings/settings.md#settings-output_format_json_quote_denormals)

View File

@ -419,6 +419,11 @@ if (USE_AWS_S3)
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${AWS_S3_INCLUDE_DIR}) target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${AWS_S3_INCLUDE_DIR})
endif() endif()
if (USE_S2_GEOMETRY)
dbms_target_link_libraries (PUBLIC ${S2_GEOMETRY_LIBRARY})
dbms_target_include_directories (SYSTEM BEFORE PUBLIC ${S2_GEOMETRY_INCLUDE_DIR})
endif()
if (USE_BROTLI) if (USE_BROTLI)
target_link_libraries (clickhouse_common_io PRIVATE ${BROTLI_LIBRARY}) target_link_libraries (clickhouse_common_io PRIVATE ${BROTLI_LIBRARY})
target_include_directories (clickhouse_common_io SYSTEM BEFORE PRIVATE ${BROTLI_INCLUDE_DIR}) target_include_directories (clickhouse_common_io SYSTEM BEFORE PRIVATE ${BROTLI_INCLUDE_DIR})

View File

@ -559,7 +559,8 @@
M(589, DISTRIBUTED_BROKEN_BATCH_FILES) \ M(589, DISTRIBUTED_BROKEN_BATCH_FILES) \
M(590, CANNOT_SYSCONF) \ M(590, CANNOT_SYSCONF) \
M(591, SQLITE_ENGINE_ERROR) \ M(591, SQLITE_ENGINE_ERROR) \
M(592, ZERO_COPY_REPLICATION_ERROR) \ M(592, DATA_ENCRYPTION_ERROR) \
M(593, ZERO_COPY_REPLICATION_ERROR) \
\ \
M(998, POSTGRESQL_CONNECTION_FAILURE) \ M(998, POSTGRESQL_CONNECTION_FAILURE) \
M(999, KEEPER_EXCEPTION) \ M(999, KEEPER_EXCEPTION) \

View File

@ -26,13 +26,14 @@ namespace ErrorCodes
MySQLClient::MySQLClient(const String & host_, UInt16 port_, const String & user_, const String & password_) MySQLClient::MySQLClient(const String & host_, UInt16 port_, const String & user_, const String & password_)
: host(host_), port(port_), user(user_), password(std::move(password_)) : host(host_), port(port_), user(user_), password(std::move(password_))
{ {
client_capability_flags = CLIENT_PROTOCOL_41 | CLIENT_PLUGIN_AUTH | CLIENT_SECURE_CONNECTION; mysql_context.client_capabilities = CLIENT_PROTOCOL_41 | CLIENT_PLUGIN_AUTH | CLIENT_SECURE_CONNECTION;
} }
MySQLClient::MySQLClient(MySQLClient && other) MySQLClient::MySQLClient(MySQLClient && other)
: host(std::move(other.host)), port(other.port), user(std::move(other.user)), password(std::move(other.password)) : host(std::move(other.host)), port(other.port), user(std::move(other.user)), password(std::move(other.password))
, client_capability_flags(other.client_capability_flags) , mysql_context(other.mysql_context)
{ {
mysql_context.sequence_id = 0;
} }
void MySQLClient::connect() void MySQLClient::connect()
@ -56,7 +57,7 @@ void MySQLClient::connect()
in = std::make_shared<ReadBufferFromPocoSocket>(*socket); in = std::make_shared<ReadBufferFromPocoSocket>(*socket);
out = std::make_shared<WriteBufferFromPocoSocket>(*socket); out = std::make_shared<WriteBufferFromPocoSocket>(*socket);
packet_endpoint = std::make_shared<PacketEndpoint>(*in, *out, seq); packet_endpoint = mysql_context.makeEndpoint(*in, *out);
handshake(); handshake();
} }
@ -68,7 +69,7 @@ void MySQLClient::disconnect()
socket->close(); socket->close();
socket = nullptr; socket = nullptr;
connected = false; connected = false;
seq = 0; mysql_context.sequence_id = 0;
} }
/// https://dev.mysql.com/doc/internals/en/connection-phase-packets.html /// https://dev.mysql.com/doc/internals/en/connection-phase-packets.html
@ -87,10 +88,10 @@ void MySQLClient::handshake()
String auth_plugin_data = native41.getAuthPluginData(); String auth_plugin_data = native41.getAuthPluginData();
HandshakeResponse handshake_response( HandshakeResponse handshake_response(
client_capability_flags, MAX_PACKET_LENGTH, charset_utf8, user, "", auth_plugin_data, mysql_native_password); mysql_context.client_capabilities, MAX_PACKET_LENGTH, charset_utf8, user, "", auth_plugin_data, mysql_native_password);
packet_endpoint->sendPacket<HandshakeResponse>(handshake_response, true); packet_endpoint->sendPacket<HandshakeResponse>(handshake_response, true);
ResponsePacket packet_response(client_capability_flags, true); ResponsePacket packet_response(mysql_context.client_capabilities, true);
packet_endpoint->receivePacket(packet_response); packet_endpoint->receivePacket(packet_response);
packet_endpoint->resetSequenceId(); packet_endpoint->resetSequenceId();
@ -105,7 +106,7 @@ void MySQLClient::writeCommand(char command, String query)
WriteCommand write_command(command, query); WriteCommand write_command(command, query);
packet_endpoint->sendPacket<WriteCommand>(write_command, true); packet_endpoint->sendPacket<WriteCommand>(write_command, true);
ResponsePacket packet_response(client_capability_flags); ResponsePacket packet_response(mysql_context.client_capabilities);
packet_endpoint->receivePacket(packet_response); packet_endpoint->receivePacket(packet_response);
switch (packet_response.getType()) switch (packet_response.getType())
{ {
@ -124,7 +125,7 @@ void MySQLClient::registerSlaveOnMaster(UInt32 slave_id)
RegisterSlave register_slave(slave_id); RegisterSlave register_slave(slave_id);
packet_endpoint->sendPacket<RegisterSlave>(register_slave, true); packet_endpoint->sendPacket<RegisterSlave>(register_slave, true);
ResponsePacket packet_response(client_capability_flags); ResponsePacket packet_response(mysql_context.client_capabilities);
packet_endpoint->receivePacket(packet_response); packet_endpoint->receivePacket(packet_response);
packet_endpoint->resetSequenceId(); packet_endpoint->resetSequenceId();
if (packet_response.getType() == PACKET_ERR) if (packet_response.getType() == PACKET_ERR)

View File

@ -45,9 +45,7 @@ private:
String password; String password;
bool connected = false; bool connected = false;
UInt32 client_capability_flags = 0; MySQLWireContext mysql_context;
uint8_t seq = 0;
const UInt8 charset_utf8 = 33; const UInt8 charset_utf8 = 33;
const String mysql_native_password = "mysql_native_password"; const String mysql_native_password = "mysql_native_password";

View File

@ -68,4 +68,15 @@ String PacketEndpoint::packetToText(const String & payload)
} }
MySQLProtocol::PacketEndpointPtr MySQLWireContext::makeEndpoint(WriteBuffer & out)
{
return MySQLProtocol::PacketEndpoint::create(out, sequence_id);
}
MySQLProtocol::PacketEndpointPtr MySQLWireContext::makeEndpoint(ReadBuffer & in, WriteBuffer & out)
{
return MySQLProtocol::PacketEndpoint::create(in, out, sequence_id);
}
} }

View File

@ -5,6 +5,7 @@
#include "IMySQLReadPacket.h" #include "IMySQLReadPacket.h"
#include "IMySQLWritePacket.h" #include "IMySQLWritePacket.h"
#include "IO/MySQLPacketPayloadReadBuffer.h" #include "IO/MySQLPacketPayloadReadBuffer.h"
#include <common/shared_ptr_helper.h>
namespace DB namespace DB
{ {
@ -15,19 +16,13 @@ namespace MySQLProtocol
/* Writes and reads packets, keeping sequence-id. /* Writes and reads packets, keeping sequence-id.
* Throws ProtocolError, if packet with incorrect sequence-id was received. * Throws ProtocolError, if packet with incorrect sequence-id was received.
*/ */
class PacketEndpoint class PacketEndpoint : public shared_ptr_helper<PacketEndpoint>
{ {
public: public:
uint8_t & sequence_id; uint8_t & sequence_id;
ReadBuffer * in; ReadBuffer * in;
WriteBuffer * out; WriteBuffer * out;
/// For writing.
PacketEndpoint(WriteBuffer & out_, uint8_t & sequence_id_);
/// For reading and writing.
PacketEndpoint(ReadBuffer & in_, WriteBuffer & out_, uint8_t & sequence_id_);
MySQLPacketPayloadReadBuffer getPayload(); MySQLPacketPayloadReadBuffer getPayload();
void receivePacket(IMySQLReadPacket & packet); void receivePacket(IMySQLReadPacket & packet);
@ -48,8 +43,29 @@ public:
/// Converts packet to text. Is used for debug output. /// Converts packet to text. Is used for debug output.
static String packetToText(const String & payload); static String packetToText(const String & payload);
protected:
/// For writing.
PacketEndpoint(WriteBuffer & out_, uint8_t & sequence_id_);
/// For reading and writing.
PacketEndpoint(ReadBuffer & in_, WriteBuffer & out_, uint8_t & sequence_id_);
friend struct shared_ptr_helper<PacketEndpoint>;
};
using PacketEndpointPtr = std::shared_ptr<PacketEndpoint>;
}
struct MySQLWireContext
{
uint8_t sequence_id = 0;
uint32_t client_capabilities = 0;
size_t max_packet_size = 0;
MySQLProtocol::PacketEndpointPtr makeEndpoint(WriteBuffer & out);
MySQLProtocol::PacketEndpointPtr makeEndpoint(ReadBuffer & in, WriteBuffer & out);
}; };
} }
}

View File

@ -1,4 +1,7 @@
#include "Connection.h" #include "Connection.h"
#if USE_LIBPQXX
#include <common/logger_useful.h> #include <common/logger_useful.h>
namespace postgres namespace postgres
@ -72,3 +75,5 @@ void Connection::connect()
updateConnection(); updateConnection();
} }
} }
#endif

View File

@ -1,5 +1,11 @@
#pragma once #pragma once
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#if USE_LIBPQXX
#include <pqxx/pqxx> // Y_IGNORE #include <pqxx/pqxx> // Y_IGNORE
#include <Core/Types.h> #include <Core/Types.h>
#include <boost/noncopyable.hpp> #include <boost/noncopyable.hpp>
@ -45,3 +51,5 @@ private:
Poco::Logger * log; Poco::Logger * log;
}; };
} }
#endif

View File

@ -1,5 +1,11 @@
#pragma once #pragma once
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#if USE_LIBPQXX
#include <pqxx/pqxx> // Y_IGNORE #include <pqxx/pqxx> // Y_IGNORE
#include <Core/Types.h> #include <Core/Types.h>
#include <common/BorrowedObjectPool.h> #include <common/BorrowedObjectPool.h>
@ -35,3 +41,5 @@ private:
using ConnectionHolderPtr = std::unique_ptr<ConnectionHolder>; using ConnectionHolderPtr = std::unique_ptr<ConnectionHolder>;
} }
#endif

View File

@ -1,4 +1,7 @@
#include "PoolWithFailover.h" #include "PoolWithFailover.h"
#if USE_LIBPQXX
#include "Utils.h" #include "Utils.h"
#include <Common/parseRemoteDescription.h> #include <Common/parseRemoteDescription.h>
#include <Common/Exception.h> #include <Common/Exception.h>
@ -136,3 +139,5 @@ ConnectionHolderPtr PoolWithFailover::get()
throw DB::Exception(DB::ErrorCodes::POSTGRESQL_CONNECTION_FAILURE, "Unable to connect to any of the replicas"); throw DB::Exception(DB::ErrorCodes::POSTGRESQL_CONNECTION_FAILURE, "Unable to connect to any of the replicas");
} }
} }
#endif

View File

@ -1,5 +1,12 @@
#pragma once #pragma once
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#if USE_LIBPQXX
#include "ConnectionHolder.h" #include "ConnectionHolder.h"
#include <mutex> #include <mutex>
#include <Poco/Util/AbstractConfiguration.h> #include <Poco/Util/AbstractConfiguration.h>
@ -63,3 +70,5 @@ private:
using PoolWithFailoverPtr = std::shared_ptr<PoolWithFailover>; using PoolWithFailoverPtr = std::shared_ptr<PoolWithFailover>;
} }
#endif

View File

@ -1,4 +1,7 @@
#include "Utils.h" #include "Utils.h"
#if USE_LIBPQXX
#include <IO/Operators.h> #include <IO/Operators.h>
namespace postgres namespace postgres
@ -17,3 +20,5 @@ ConnectionInfo formatConnectionString(String dbname, String host, UInt16 port, S
} }
} }
#endif

View File

@ -1,5 +1,11 @@
#pragma once #pragma once
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#if USE_LIBPQXX
#include <pqxx/pqxx> // Y_IGNORE #include <pqxx/pqxx> // Y_IGNORE
#include <Core/Types.h> #include <Core/Types.h>
#include "Connection.h" #include "Connection.h"
@ -15,3 +21,5 @@ namespace postgres
{ {
ConnectionInfo formatConnectionString(String dbname, String host, UInt16 port, String user, String password); ConnectionInfo formatConnectionString(String dbname, String host, UInt16 port, String user, String password);
} }
#endif

View File

@ -31,6 +31,10 @@ SRCS(
MySQL/PacketsProtocolText.cpp MySQL/PacketsProtocolText.cpp
MySQL/PacketsReplication.cpp MySQL/PacketsReplication.cpp
NamesAndTypes.cpp NamesAndTypes.cpp
PostgreSQL/Connection.cpp
PostgreSQL/PoolWithFailover.cpp
PostgreSQL/Utils.cpp
PostgreSQL/insertPostgreSQLValue.cpp
PostgreSQLProtocol.cpp PostgreSQLProtocol.cpp
QueryProcessingStage.cpp QueryProcessingStage.cpp
Settings.cpp Settings.cpp

View File

@ -8,7 +8,7 @@
#include <Core/ExternalResultDescription.h> #include <Core/ExternalResultDescription.h>
#include <DataStreams/IBlockInputStream.h> #include <DataStreams/IBlockInputStream.h>
#include <sqlite3.h> #include <sqlite3.h> // Y_IGNORE
namespace DB namespace DB

View File

@ -9,7 +9,7 @@
#include <Databases/DatabasesCommon.h> #include <Databases/DatabasesCommon.h>
#include <Parsers/ASTCreateQuery.h> #include <Parsers/ASTCreateQuery.h>
#include <sqlite3.h> #include <sqlite3.h> // Y_IGNORE
namespace DB namespace DB

View File

@ -7,7 +7,7 @@
#if USE_SQLITE #if USE_SQLITE
#include <Storages/StorageSQLite.h> #include <Storages/StorageSQLite.h>
#include <sqlite3.h> #include <sqlite3.h> // Y_IGNORE
namespace DB namespace DB

View File

@ -206,9 +206,9 @@ void DiskDecorator::startup()
delegate->startup(); delegate->startup();
} }
void DiskDecorator::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context) void DiskDecorator::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & map)
{ {
delegate->applyNewSettings(config, context); delegate->applyNewSettings(config, context, config_prefix, map);
} }
} }

View File

@ -70,7 +70,7 @@ public:
SyncGuardPtr getDirectorySyncGuard(const String & path) const override; SyncGuardPtr getDirectorySyncGuard(const String & path) const override;
void shutdown() override; void shutdown() override;
void startup() override; void startup() override;
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context) override; void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & map) override;
protected: protected:
Executor & getExecutor() override; Executor & getExecutor() override;

201
src/Disks/DiskEncrypted.cpp Normal file
View File

@ -0,0 +1,201 @@
#include <Disks/DiskEncrypted.h>
#if USE_SSL
#include <Disks/DiskFactory.h>
#include <IO/FileEncryptionCommon.h>
#include <IO/ReadBufferFromEncryptedFile.h>
#include <IO/WriteBufferFromEncryptedFile.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_DISK_INDEX;
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
extern const int LOGICAL_ERROR;
}
using DiskEncryptedPtr = std::shared_ptr<DiskEncrypted>;
using namespace FileEncryption;
class DiskEncryptedReservation : public IReservation
{
public:
DiskEncryptedReservation(DiskEncryptedPtr disk_, std::unique_ptr<IReservation> reservation_)
: disk(std::move(disk_)), reservation(std::move(reservation_))
{
}
UInt64 getSize() const override { return reservation->getSize(); }
DiskPtr getDisk(size_t i) const override
{
if (i != 0)
throw Exception("Can't use i != 0 with single disk reservation", ErrorCodes::INCORRECT_DISK_INDEX);
return disk;
}
Disks getDisks() const override { return {disk}; }
void update(UInt64 new_size) override { reservation->update(new_size); }
private:
DiskEncryptedPtr disk;
std::unique_ptr<IReservation> reservation;
};
ReservationPtr DiskEncrypted::reserve(UInt64 bytes)
{
auto reservation = delegate->reserve(bytes);
if (!reservation)
return {};
return std::make_unique<DiskEncryptedReservation>(std::static_pointer_cast<DiskEncrypted>(shared_from_this()), std::move(reservation));
}
DiskEncrypted::DiskEncrypted(const String & name_, DiskPtr disk_, const String & key_, const String & path_)
: DiskDecorator(disk_)
, name(name_), key(key_), disk_path(path_)
, disk_absolute_path(delegate->getPath() + disk_path)
{
initialize();
}
void DiskEncrypted::initialize()
{
// use wrapped_disk as an EncryptedDisk store
if (disk_path.empty())
return;
if (disk_path.back() != '/')
throw Exception("Disk path must ends with '/', but '" + disk_path + "' doesn't.", ErrorCodes::LOGICAL_ERROR);
delegate->createDirectories(disk_path);
}
std::unique_ptr<ReadBufferFromFileBase> DiskEncrypted::readFile(
const String & path,
size_t buf_size,
size_t estimated_size,
size_t aio_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const
{
auto wrapped_path = wrappedPath(path);
auto buffer = delegate->readFile(wrapped_path, buf_size, estimated_size, aio_threshold, mmap_threshold, mmap_cache);
String iv;
size_t offset = 0;
if (exists(path) && getFileSize(path))
{
iv = readIV(kIVSize, *buffer);
offset = kIVSize;
}
else
iv = randomString(kIVSize);
return std::make_unique<ReadBufferFromEncryptedFile>(buf_size, std::move(buffer), iv, key, offset);
}
std::unique_ptr<WriteBufferFromFileBase> DiskEncrypted::writeFile(const String & path, size_t buf_size, WriteMode mode)
{
String iv;
size_t start_offset = 0;
auto wrapped_path = wrappedPath(path);
if (mode == WriteMode::Append && exists(path) && getFileSize(path))
{
auto read_buffer = delegate->readFile(wrapped_path, kIVSize);
iv = readIV(kIVSize, *read_buffer);
start_offset = getFileSize(path);
}
else
iv = randomString(kIVSize);
auto buffer = delegate->writeFile(wrapped_path, buf_size, mode);
return std::make_unique<WriteBufferFromEncryptedFile>(buf_size, std::move(buffer), iv, key, start_offset);
}
size_t DiskEncrypted::getFileSize(const String & path) const
{
auto wrapped_path = wrappedPath(path);
size_t size = delegate->getFileSize(wrapped_path);
return size > kIVSize ? (size - kIVSize) : 0;
}
void DiskEncrypted::truncateFile(const String & path, size_t size)
{
auto wrapped_path = wrappedPath(path);
delegate->truncateFile(wrapped_path, size ? (size + kIVSize) : 0);
}
SyncGuardPtr DiskEncrypted::getDirectorySyncGuard(const String & path) const
{
auto wrapped_path = wrappedPath(path);
return delegate->getDirectorySyncGuard(wrapped_path);
}
void DiskEncrypted::applyNewSettings(
const Poco::Util::AbstractConfiguration & config,
ContextPtr /*context*/,
const String & config_prefix,
const DisksMap & map)
{
String wrapped_disk_name = config.getString(config_prefix + ".disk", "");
if (wrapped_disk_name.empty())
throw Exception("The wrapped disk name can not be empty. An encrypted disk is a wrapper over another disk. "
"Disk " + name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
key = config.getString(config_prefix + ".key", "");
if (key.empty())
throw Exception("Encrypted disk key can not be empty. Disk " + name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
auto wrapped_disk = map.find(wrapped_disk_name);
if (wrapped_disk == map.end())
throw Exception("The wrapped disk must have been announced earlier. No disk with name " + wrapped_disk_name + ". Disk " + name,
ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
delegate = wrapped_disk->second;
disk_path = config.getString(config_prefix + ".path", "");
initialize();
}
void registerDiskEncrypted(DiskFactory & factory)
{
auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr /*context*/,
const DisksMap & map) -> DiskPtr {
String wrapped_disk_name = config.getString(config_prefix + ".disk", "");
if (wrapped_disk_name.empty())
throw Exception("The wrapped disk name can not be empty. An encrypted disk is a wrapper over another disk. "
"Disk " + name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
String key = config.getString(config_prefix + ".key", "");
if (key.empty())
throw Exception("Encrypted disk key can not be empty. Disk " + name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
if (key.size() != cipherKeyLength(defaultCipher()))
throw Exception("Expected key with size " + std::to_string(cipherKeyLength(defaultCipher())) + ", got key with size " + std::to_string(key.size()),
ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
auto wrapped_disk = map.find(wrapped_disk_name);
if (wrapped_disk == map.end())
throw Exception("The wrapped disk must have been announced earlier. No disk with name " + wrapped_disk_name + ". Disk " + name,
ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
String relative_path = config.getString(config_prefix + ".path", "");
return std::make_shared<DiskEncrypted>(name, wrapped_disk->second, key, relative_path);
};
factory.registerDiskType("encrypted", creator);
}
}
#endif

229
src/Disks/DiskEncrypted.h Normal file
View File

@ -0,0 +1,229 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include <Common/config.h>
#endif
#if USE_SSL
#include <Disks/IDisk.h>
#include <Disks/DiskDecorator.h>
namespace DB
{
class ReadBufferFromFileBase;
class WriteBufferFromFileBase;
class DiskEncrypted : public DiskDecorator
{
public:
DiskEncrypted(const String & name_, DiskPtr disk_, const String & key_, const String & path_);
const String & getName() const override { return name; }
const String & getPath() const override { return disk_absolute_path; }
ReservationPtr reserve(UInt64 bytes) override;
bool exists(const String & path) const override
{
auto wrapped_path = wrappedPath(path);
return delegate->exists(wrapped_path);
}
bool isFile(const String & path) const override
{
auto wrapped_path = wrappedPath(path);
return delegate->isFile(wrapped_path);
}
bool isDirectory(const String & path) const override
{
auto wrapped_path = wrappedPath(path);
return delegate->isDirectory(wrapped_path);
}
size_t getFileSize(const String & path) const override;
void createDirectory(const String & path) override
{
auto wrapped_path = wrappedPath(path);
delegate->createDirectory(wrapped_path);
}
void createDirectories(const String & path) override
{
auto wrapped_path = wrappedPath(path);
delegate->createDirectories(wrapped_path);
}
void clearDirectory(const String & path) override
{
auto wrapped_path = wrappedPath(path);
delegate->clearDirectory(wrapped_path);
}
void moveDirectory(const String & from_path, const String & to_path) override
{
auto wrapped_from_path = wrappedPath(from_path);
auto wrapped_to_path = wrappedPath(to_path);
delegate->moveDirectory(wrapped_from_path, wrapped_to_path);
}
DiskDirectoryIteratorPtr iterateDirectory(const String & path) override
{
auto wrapped_path = wrappedPath(path);
return delegate->iterateDirectory(wrapped_path);
}
void createFile(const String & path) override
{
auto wrapped_path = wrappedPath(path);
delegate->createFile(wrapped_path);
}
void moveFile(const String & from_path, const String & to_path) override
{
auto wrapped_from_path = wrappedPath(from_path);
auto wrapped_to_path = wrappedPath(to_path);
delegate->moveFile(wrapped_from_path, wrapped_to_path);
}
void replaceFile(const String & from_path, const String & to_path) override
{
auto wrapped_from_path = wrappedPath(from_path);
auto wrapped_to_path = wrappedPath(to_path);
delegate->replaceFile(wrapped_from_path, wrapped_to_path);
}
void listFiles(const String & path, std::vector<String> & file_names) override
{
auto wrapped_path = wrappedPath(path);
delegate->listFiles(wrapped_path, file_names);
}
void copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path) override
{
IDisk::copy(from_path, to_disk, to_path);
}
std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size,
size_t estimated_size,
size_t aio_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const override;
std::unique_ptr<WriteBufferFromFileBase> writeFile(
const String & path,
size_t buf_size,
WriteMode mode) override;
void removeFile(const String & path) override
{
auto wrapped_path = wrappedPath(path);
delegate->removeFile(wrapped_path);
}
void removeFileIfExists(const String & path) override
{
auto wrapped_path = wrappedPath(path);
delegate->removeFileIfExists(wrapped_path);
}
void removeDirectory(const String & path) override
{
auto wrapped_path = wrappedPath(path);
delegate->removeDirectory(wrapped_path);
}
void removeRecursive(const String & path) override
{
auto wrapped_path = wrappedPath(path);
delegate->removeRecursive(wrapped_path);
}
void removeSharedFile(const String & path, bool flag) override
{
auto wrapped_path = wrappedPath(path);
delegate->removeSharedFile(wrapped_path, flag);
}
void removeSharedRecursive(const String & path, bool flag) override
{
auto wrapped_path = wrappedPath(path);
delegate->removeSharedRecursive(wrapped_path, flag);
}
void removeSharedFileIfExists(const String & path, bool flag) override
{
auto wrapped_path = wrappedPath(path);
delegate->removeSharedFileIfExists(wrapped_path, flag);
}
void setLastModified(const String & path, const Poco::Timestamp & timestamp) override
{
auto wrapped_path = wrappedPath(path);
delegate->setLastModified(wrapped_path, timestamp);
}
Poco::Timestamp getLastModified(const String & path) override
{
auto wrapped_path = wrappedPath(path);
return delegate->getLastModified(wrapped_path);
}
void setReadOnly(const String & path) override
{
auto wrapped_path = wrappedPath(path);
delegate->setReadOnly(wrapped_path);
}
void createHardLink(const String & src_path, const String & dst_path) override
{
auto wrapped_src_path = wrappedPath(src_path);
auto wrapped_dst_path = wrappedPath(dst_path);
delegate->createHardLink(wrapped_src_path, wrapped_dst_path);
}
void truncateFile(const String & path, size_t size) override;
String getUniqueId(const String & path) const override
{
auto wrapped_path = wrappedPath(path);
return delegate->getUniqueId(wrapped_path);
}
void onFreeze(const String & path) override
{
auto wrapped_path = wrappedPath(path);
delegate->onFreeze(wrapped_path);
}
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & map) override;
DiskType::Type getType() const override { return DiskType::Type::Encrypted; }
SyncGuardPtr getDirectorySyncGuard(const String & path) const override;
private:
void initialize();
String wrappedPath(const String & path) const
{
// if path starts_with disk_path -> got already wrapped path
if (!disk_path.empty() && path.starts_with(disk_path))
return path;
return disk_path + path;
}
String name;
String key;
String disk_path;
String disk_absolute_path;
};
}
#endif

View File

@ -24,7 +24,8 @@ DiskPtr DiskFactory::create(
const String & name, const String & name,
const Poco::Util::AbstractConfiguration & config, const Poco::Util::AbstractConfiguration & config,
const String & config_prefix, const String & config_prefix,
ContextPtr context) const ContextPtr context,
const DisksMap & map) const
{ {
const auto disk_type = config.getString(config_prefix + ".type", "local"); const auto disk_type = config.getString(config_prefix + ".type", "local");
@ -33,7 +34,7 @@ DiskPtr DiskFactory::create(
throw Exception{"DiskFactory: the disk '" + name + "' has unknown disk type: " + disk_type, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG}; throw Exception{"DiskFactory: the disk '" + name + "' has unknown disk type: " + disk_type, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG};
const auto & disk_creator = found->second; const auto & disk_creator = found->second;
return disk_creator(name, config, config_prefix, context); return disk_creator(name, config, config_prefix, context, map);
} }
} }

View File

@ -8,12 +8,14 @@
#include <Poco/Util/AbstractConfiguration.h> #include <Poco/Util/AbstractConfiguration.h>
#include <functional> #include <functional>
#include <map>
#include <unordered_map> #include <unordered_map>
namespace DB namespace DB
{ {
using DisksMap = std::map<String, DiskPtr>;
/** /**
* Disk factory. Responsible for creating new disk objects. * Disk factory. Responsible for creating new disk objects.
*/ */
@ -24,7 +26,8 @@ public:
const String & name, const String & name,
const Poco::Util::AbstractConfiguration & config, const Poco::Util::AbstractConfiguration & config,
const String & config_prefix, const String & config_prefix,
ContextPtr context)>; ContextPtr context,
const DisksMap & map)>;
static DiskFactory & instance(); static DiskFactory & instance();
@ -34,7 +37,8 @@ public:
const String & name, const String & name,
const Poco::Util::AbstractConfiguration & config, const Poco::Util::AbstractConfiguration & config,
const String & config_prefix, const String & config_prefix,
ContextPtr context) const; ContextPtr context,
const DisksMap & map) const;
private: private:
using DiskTypeRegistry = std::unordered_map<String, Creator>; using DiskTypeRegistry = std::unordered_map<String, Creator>;

View File

@ -367,7 +367,8 @@ void registerDiskLocal(DiskFactory & factory)
auto creator = [](const String & name, auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & config, const Poco::Util::AbstractConfiguration & config,
const String & config_prefix, const String & config_prefix,
ContextPtr context) -> DiskPtr { ContextPtr context,
const DisksMap & /*map*/) -> DiskPtr {
String path = config.getString(config_prefix + ".path", ""); String path = config.getString(config_prefix + ".path", "");
if (name == "default") if (name == "default")
{ {

View File

@ -450,7 +450,8 @@ void registerDiskMemory(DiskFactory & factory)
auto creator = [](const String & name, auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & /*config*/, const Poco::Util::AbstractConfiguration & /*config*/,
const String & /*config_prefix*/, const String & /*config_prefix*/,
ContextPtr /*context*/) -> DiskPtr { return std::make_shared<DiskMemory>(name); }; ContextPtr /*context*/,
const DisksMap & /*map*/) -> DiskPtr { return std::make_shared<DiskMemory>(name); };
factory.registerDiskType("memory", creator); factory.registerDiskType("memory", creator);
} }

View File

@ -37,7 +37,7 @@ DiskSelector::DiskSelector(const Poco::Util::AbstractConfiguration & config, con
auto disk_config_prefix = config_prefix + "." + disk_name; auto disk_config_prefix = config_prefix + "." + disk_name;
disks.emplace(disk_name, factory.create(disk_name, config, disk_config_prefix, context)); disks.emplace(disk_name, factory.create(disk_name, config, disk_config_prefix, context, disks));
} }
if (!has_default_disk) if (!has_default_disk)
disks.emplace(default_disk_name, std::make_shared<DiskLocal>(default_disk_name, context->getPath(), 0)); disks.emplace(default_disk_name, std::make_shared<DiskLocal>(default_disk_name, context->getPath(), 0));
@ -62,16 +62,16 @@ DiskSelectorPtr DiskSelector::updateFromConfig(
if (!std::all_of(disk_name.begin(), disk_name.end(), isWordCharASCII)) if (!std::all_of(disk_name.begin(), disk_name.end(), isWordCharASCII))
throw Exception("Disk name can contain only alphanumeric and '_' (" + disk_name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); throw Exception("Disk name can contain only alphanumeric and '_' (" + disk_name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
auto disk_config_prefix = config_prefix + "." + disk_name;
if (result->getDisksMap().count(disk_name) == 0) if (result->getDisksMap().count(disk_name) == 0)
{ {
auto disk_config_prefix = config_prefix + "." + disk_name; result->addToDiskMap(disk_name, factory.create(disk_name, config, disk_config_prefix, context, result->getDisksMap()));
result->addToDiskMap(disk_name, factory.create(disk_name, config, disk_config_prefix, context));
} }
else else
{ {
auto disk = old_disks_minus_new_disks[disk_name]; auto disk = old_disks_minus_new_disks[disk_name];
disk->applyNewSettings(config, context); disk->applyNewSettings(config, context, disk_config_prefix, result->getDisksMap());
old_disks_minus_new_disks.erase(disk_name); old_disks_minus_new_disks.erase(disk_name);
} }

View File

@ -12,7 +12,6 @@ namespace DB
class DiskSelector; class DiskSelector;
using DiskSelectorPtr = std::shared_ptr<const DiskSelector>; using DiskSelectorPtr = std::shared_ptr<const DiskSelector>;
using DisksMap = std::map<String, DiskPtr>;
/// Parse .xml configuration and store information about disks /// Parse .xml configuration and store information about disks
/// Mostly used for introspection. /// Mostly used for introspection.

View File

@ -12,7 +12,8 @@ struct DiskType
Local, Local,
RAM, RAM,
S3, S3,
HDFS HDFS,
Encrypted
}; };
static String toString(Type disk_type) static String toString(Type disk_type)
{ {
@ -26,6 +27,8 @@ struct DiskType
return "s3"; return "s3";
case Type::HDFS: case Type::HDFS:
return "hdfs"; return "hdfs";
case Type::Encrypted:
return "encrypted";
} }
__builtin_unreachable(); __builtin_unreachable();
} }

View File

@ -178,7 +178,8 @@ void registerDiskHDFS(DiskFactory & factory)
auto creator = [](const String & name, auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & config, const Poco::Util::AbstractConfiguration & config,
const String & config_prefix, const String & config_prefix,
ContextPtr context_) -> DiskPtr ContextPtr context_,
const DisksMap & /*map*/) -> DiskPtr
{ {
fs::path disk = fs::path(context_->getPath()) / "disks" / name; fs::path disk = fs::path(context_->getPath()) / "disks" / name;
fs::create_directories(disk); fs::create_directories(disk);

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <Interpreters/Context_fwd.h> #include <Interpreters/Context_fwd.h>
#include <Interpreters/Context.h>
#include <Core/Defines.h> #include <Core/Defines.h>
#include <common/types.h> #include <common/types.h>
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
@ -237,7 +238,7 @@ public:
virtual SyncGuardPtr getDirectorySyncGuard(const String & path) const; virtual SyncGuardPtr getDirectorySyncGuard(const String & path) const;
/// Applies new settings for disk in runtime. /// Applies new settings for disk in runtime.
virtual void applyNewSettings(const Poco::Util::AbstractConfiguration &, ContextPtr) {} virtual void applyNewSettings(const Poco::Util::AbstractConfiguration &, ContextPtr, const String &, const DisksMap &) { }
protected: protected:
friend class DiskDecorator; friend class DiskDecorator;

View File

@ -921,7 +921,7 @@ void DiskS3::onFreeze(const String & path)
revision_file_buf.finalize(); revision_file_buf.finalize();
} }
void DiskS3::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context) void DiskS3::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String &, const DisksMap &)
{ {
auto new_settings = settings_getter(config, "storage_configuration.disks." + name, context); auto new_settings = settings_getter(config, "storage_configuration.disks." + name, context);

View File

@ -112,7 +112,7 @@ public:
/// Dumps current revision counter into file 'revision.txt' at given path. /// Dumps current revision counter into file 'revision.txt' at given path.
void onFreeze(const String & path) override; void onFreeze(const String & path) override;
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context) override; void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String &, const DisksMap &) override;
private: private:
void createFileOperationObject(const String & operation_name, UInt64 revision, const ObjectMetadata & metadata); void createFileOperationObject(const String & operation_name, UInt64 revision, const ObjectMetadata & metadata);

View File

@ -167,7 +167,8 @@ void registerDiskS3(DiskFactory & factory)
auto creator = [](const String & name, auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & config, const Poco::Util::AbstractConfiguration & config,
const String & config_prefix, const String & config_prefix,
ContextPtr context) -> DiskPtr { ContextPtr context,
const DisksMap & /*map*/) -> DiskPtr {
S3::URI uri(Poco::URI(config.getString(config_prefix + ".endpoint"))); S3::URI uri(Poco::URI(config.getString(config_prefix + ".endpoint")));
if (uri.key.back() != '/') if (uri.key.back() != '/')
throw Exception("S3 path must ends with '/', but '" + uri.key + "' doesn't.", ErrorCodes::BAD_ARGUMENTS); throw Exception("S3 path must ends with '/', but '" + uri.key + "' doesn't.", ErrorCodes::BAD_ARGUMENTS);

View File

@ -16,6 +16,10 @@ void registerDiskMemory(DiskFactory & factory);
void registerDiskS3(DiskFactory & factory); void registerDiskS3(DiskFactory & factory);
#endif #endif
#if USE_SSL
void registerDiskEncrypted(DiskFactory & factory);
#endif
#if USE_HDFS #if USE_HDFS
void registerDiskHDFS(DiskFactory & factory); void registerDiskHDFS(DiskFactory & factory);
#endif #endif
@ -32,6 +36,10 @@ void registerDisks()
registerDiskS3(factory); registerDiskS3(factory);
#endif #endif
#if USE_SSL
registerDiskEncrypted(factory);
#endif
#if USE_HDFS #if USE_HDFS
registerDiskHDFS(factory); registerDiskHDFS(factory);
#endif #endif

View File

@ -10,6 +10,7 @@ PEERDIR(
SRCS( SRCS(
DiskCacheWrapper.cpp DiskCacheWrapper.cpp
DiskDecorator.cpp DiskDecorator.cpp
DiskEncrypted.cpp
DiskFactory.cpp DiskFactory.cpp
DiskLocal.cpp DiskLocal.cpp
DiskMemory.cpp DiskMemory.cpp

View File

@ -33,6 +33,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int FORMAT_IS_NOT_SUITABLE_FOR_INPUT; extern const int FORMAT_IS_NOT_SUITABLE_FOR_INPUT;
extern const int FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT; extern const int FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT;
extern const int UNSUPPORTED_METHOD;
} }
const FormatFactory::Creators & FormatFactory::getCreators(const String & name) const const FormatFactory::Creators & FormatFactory::getCreators(const String & name) const
@ -207,6 +208,9 @@ BlockOutputStreamPtr FormatFactory::getOutputStreamParallelIfPossible(
WriteCallback callback, WriteCallback callback,
const std::optional<FormatSettings> & _format_settings) const const std::optional<FormatSettings> & _format_settings) const
{ {
if (context->getMySQLProtocolContext() && name != "MySQLWire")
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "MySQL protocol does not support custom output formats");
const auto & output_getter = getCreators(name).output_processor_creator; const auto & output_getter = getCreators(name).output_processor_creator;
const Settings & settings = context->getSettingsRef(); const Settings & settings = context->getSettingsRef();
@ -309,7 +313,10 @@ OutputFormatPtr FormatFactory::getOutputFormatParallelIfPossible(
{ {
const auto & output_getter = getCreators(name).output_processor_creator; const auto & output_getter = getCreators(name).output_processor_creator;
if (!output_getter) if (!output_getter)
throw Exception("Format " + name + " is not suitable for output (with processors)", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT); throw Exception(ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT, "Format {} is not suitable for output (with processors)", name);
if (context->getMySQLProtocolContext() && name != "MySQLWire")
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "MySQL protocol does not support custom output formats");
auto format_settings = _format_settings ? *_format_settings : getFormatSettings(context); auto format_settings = _format_settings ? *_format_settings : getFormatSettings(context);
@ -344,7 +351,7 @@ OutputFormatPtr FormatFactory::getOutputFormat(
{ {
const auto & output_getter = getCreators(name).output_processor_creator; const auto & output_getter = getCreators(name).output_processor_creator;
if (!output_getter) if (!output_getter)
throw Exception("Format " + name + " is not suitable for output (with processors)", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT); throw Exception(ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT, "Format {} is not suitable for output (with processors)", name);
if (context->hasQueryContext() && context->getSettingsRef().log_queries) if (context->hasQueryContext() && context->getSettingsRef().log_queries)
context->getQueryContext()->addQueryFactoriesInfo(Context::QueryLogFactories::Format, name); context->getQueryContext()->addQueryFactoriesInfo(Context::QueryLogFactories::Format, name);

View File

@ -124,3 +124,6 @@ endif()
# Signed integer overflow on user-provided data inside boost::geometry - ignore. # Signed integer overflow on user-provided data inside boost::geometry - ignore.
set_source_files_properties("pointInPolygon.cpp" PROPERTIES COMPILE_FLAGS -fno-sanitize=signed-integer-overflow) set_source_files_properties("pointInPolygon.cpp" PROPERTIES COMPILE_FLAGS -fno-sanitize=signed-integer-overflow)
# target_link_libraries(clickhouse_functions PRIVATE ${S2_LIBRARY})
target_include_directories(clickhouse_functions SYSTEM PUBLIC ${S2_GEOMETRY_INCLUDE_DIR})

View File

@ -6,4 +6,5 @@
#cmakedefine01 USE_SIMDJSON #cmakedefine01 USE_SIMDJSON
#cmakedefine01 USE_RAPIDJSON #cmakedefine01 USE_RAPIDJSON
#cmakedefine01 USE_H3 #cmakedefine01 USE_H3
#cmakedefine01 USE_S2_GEOMETRY
#cmakedefine01 USE_FASTOPS #cmakedefine01 USE_FASTOPS

View File

@ -46,20 +46,23 @@ public:
const auto * arg = arguments[0].get(); const auto * arg = arguments[0].get();
if (!WhichDataType(arg).isFloat64()) if (!WhichDataType(arg).isFloat64())
throw Exception( throw Exception(
"Illegal type " + arg->getName() + " of argument " + std::to_string(1) + " of function " + getName() + ". Must be Float64", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); "Illegal type {} of argument {} of function {}. Must be Float64",
arg->getName(), 1, getName());
arg = arguments[1].get(); arg = arguments[1].get();
if (!WhichDataType(arg).isFloat64()) if (!WhichDataType(arg).isFloat64())
throw Exception( throw Exception(
"Illegal type " + arg->getName() + " of argument " + std::to_string(2) + " of function " + getName() + ". Must be Float64", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); "Illegal type {} of argument {} of function {}. Must be Float64",
arg->getName(), 2, getName());
arg = arguments[2].get(); arg = arguments[2].get();
if (!WhichDataType(arg).isUInt8()) if (!WhichDataType(arg).isUInt8())
throw Exception( throw Exception(
"Illegal type " + arg->getName() + " of argument " + std::to_string(3) + " of function " + getName() + ". Must be UInt8", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); "Illegal type {} of argument {} of function {}. Must be UInt8",
arg->getName(), 3, getName());
return std::make_shared<DataTypeUInt64>(); return std::make_shared<DataTypeUInt64>();
} }

111
src/Functions/geoToS2.cpp Normal file
View File

@ -0,0 +1,111 @@
#if !defined(ARCADIA_BUILD)
# include "config_functions.h"
#endif
#if USE_S2_GEOMETRY
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include <Common/typeid_cast.h>
#include <Common/NaNUtils.h>
#include <common/range.h>
#include "s2_fwd.h"
class S2CellId;
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
namespace
{
/**
* Accepts points of the form (longitude, latitude)
* Returns s2 identifier
*/
class FunctionGeoToS2 : public IFunction
{
public:
static constexpr auto name = "geoToS2";
static FunctionPtr create(ContextPtr)
{
return std::make_shared<FunctionGeoToS2>();
}
std::string getName() const override
{
return name;
}
size_t getNumberOfArguments() const override { return 2; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
for (size_t i = 0; i < getNumberOfArguments(); ++i)
{
const auto * arg = arguments[i].get();
if (!WhichDataType(arg).isFloat64())
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of argument {} of function {}. Must be Float64",
arg->getName(), i, getName());
}
return std::make_shared<DataTypeUInt64>();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
const auto * col_lon = arguments[0].column.get();
const auto * col_lat = arguments[1].column.get();
auto dst = ColumnVector<UInt64>::create();
auto & dst_data = dst->getData();
dst_data.resize(input_rows_count);
for (const auto row : collections::range(0, input_rows_count))
{
const Float64 lon = col_lon->getFloat64(row);
const Float64 lat = col_lat->getFloat64(row);
if (isNaN(lon) || isNaN(lat))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Arguments must not be NaN");
if (!(isFinite(lon) && isFinite(lat)))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Arguments must not be infinite");
/// S2 acceptes point as (latitude, longitude)
S2LatLng lat_lng = S2LatLng::FromDegrees(lat, lon);
S2CellId id(lat_lng);
dst_data[row] = id.id();
}
return dst;
}
};
}
void registerFunctionGeoToS2(FunctionFactory & factory)
{
factory.registerFunction<FunctionGeoToS2>();
}
}
#endif

View File

@ -44,8 +44,9 @@ public:
const auto * arg = arguments[0].get(); const auto * arg = arguments[0].get();
if (!WhichDataType(arg).isUInt8()) if (!WhichDataType(arg).isUInt8())
throw Exception( throw Exception(
"Illegal type " + arg->getName() + " of argument " + std::to_string(1) + " of function " + getName() + ". Must be UInt8", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); "Illegal type {} of argument {} of function {}. Must be UInt8",
arg->getName(), 1, getName());
return std::make_shared<DataTypeFloat64>(); return std::make_shared<DataTypeFloat64>();
} }
@ -62,8 +63,10 @@ public:
{ {
const int resolution = col_hindex->getUInt(row); const int resolution = col_hindex->getUInt(row);
if (resolution > MAX_H3_RES) if (resolution > MAX_H3_RES)
throw Exception("The argument 'resolution' (" + toString(resolution) + ") of function " + getName() throw Exception(
+ " is out of bounds because the maximum resolution in H3 library is " + toString(MAX_H3_RES), ErrorCodes::ARGUMENT_OUT_OF_BOUND); ErrorCodes::ARGUMENT_OUT_OF_BOUND,
"The argument 'resolution' ({}) of function {} is out of bounds because the maximum resolution in H3 library is ",
resolution, getName(), MAX_H3_RES);
// Numerical constant is 180 degrees / pi / Earth radius, Earth radius is from h3 sources // Numerical constant is 180 degrees / pi / Earth radius, Earth radius is from h3 sources
Float64 res = 8.99320592271288084e-6 * getHexagonEdgeLengthAvgM(resolution); Float64 res = 8.99320592271288084e-6 * getHexagonEdgeLengthAvgM(resolution);

View File

@ -49,8 +49,9 @@ public:
const auto * arg = arguments[0].get(); const auto * arg = arguments[0].get();
if (!WhichDataType(arg).isUInt8()) if (!WhichDataType(arg).isUInt8())
throw Exception( throw Exception(
"Illegal type " + arg->getName() + " of argument " + std::to_string(1) + " of function " + getName() + ". Must be UInt8", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); "Illegal type {} of argument {} of function {}. Must be UInt8",
arg->getName(), 1, getName());
return std::make_shared<DataTypeFloat64>(); return std::make_shared<DataTypeFloat64>();
} }
@ -67,8 +68,10 @@ public:
{ {
const UInt64 resolution = col_hindex->getUInt(row); const UInt64 resolution = col_hindex->getUInt(row);
if (resolution > MAX_H3_RES) if (resolution > MAX_H3_RES)
throw Exception("The argument 'resolution' (" + toString(resolution) + ") of function " + getName() throw Exception(
+ " is out of bounds because the maximum resolution in H3 library is " + toString(MAX_H3_RES), ErrorCodes::ARGUMENT_OUT_OF_BOUND); ErrorCodes::ARGUMENT_OUT_OF_BOUND,
"The argument 'resolution' ({}) of function {} is out of bounds because the maximum resolution in H3 library is ",
resolution, getName(), MAX_H3_RES);
Float64 res = getHexagonEdgeLengthAvgM(resolution); Float64 res = getHexagonEdgeLengthAvgM(resolution);

View File

@ -41,8 +41,9 @@ public:
const auto * arg = arguments[0].get(); const auto * arg = arguments[0].get();
if (!WhichDataType(arg).isUInt64()) if (!WhichDataType(arg).isUInt64())
throw Exception( throw Exception(
"Illegal type " + arg->getName() + " of argument " + std::to_string(1) + " of function " + getName() + ". Must be UInt64", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); "Illegal type {} of argument {} of function {}. Must be UInt64",
arg->getName(), 1, getName());
return std::make_shared<DataTypeUInt8>(); return std::make_shared<DataTypeUInt8>();
} }

View File

@ -41,8 +41,9 @@ public:
const auto * arg = arguments[0].get(); const auto * arg = arguments[0].get();
if (!WhichDataType(arg).isUInt64()) if (!WhichDataType(arg).isUInt64())
throw Exception( throw Exception(
"Illegal type " + arg->getName() + " of argument " + std::to_string(1) + " of function " + getName() + ". Must be UInt64", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); "Illegal type {} of argument {} of function {}. Must be UInt64",
arg->getName(), 1, getName());
return std::make_shared<DataTypeUInt8>(); return std::make_shared<DataTypeUInt8>();
} }

View File

@ -44,8 +44,9 @@ public:
const auto * arg = arguments[0].get(); const auto * arg = arguments[0].get();
if (!WhichDataType(arg).isUInt8()) if (!WhichDataType(arg).isUInt8())
throw Exception( throw Exception(
"Illegal type " + arg->getName() + " of argument " + std::to_string(1) + " of function " + getName() + ". Must be UInt8", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); "Illegal type {} of argument {} of function {}. Must be UInt8",
arg->getName(), 1, getName());
return std::make_shared<DataTypeFloat64>(); return std::make_shared<DataTypeFloat64>();
} }
@ -62,8 +63,10 @@ public:
{ {
const UInt64 resolution = col_hindex->getUInt(row); const UInt64 resolution = col_hindex->getUInt(row);
if (resolution > MAX_H3_RES) if (resolution > MAX_H3_RES)
throw Exception("The argument 'resolution' (" + toString(resolution) + ") of function " + getName() throw Exception(
+ " is out of bounds because the maximum resolution in H3 library is " + toString(MAX_H3_RES), ErrorCodes::ARGUMENT_OUT_OF_BOUND); ErrorCodes::ARGUMENT_OUT_OF_BOUND,
"The argument 'resolution' ({}) of function {} is out of bounds because the maximum resolution in H3 library is ",
resolution, getName(), MAX_H3_RES);
Float64 res = getHexagonAreaAvgM2(resolution); Float64 res = getHexagonAreaAvgM2(resolution);

View File

@ -41,14 +41,16 @@ public:
const auto * arg = arguments[0].get(); const auto * arg = arguments[0].get();
if (!WhichDataType(arg).isUInt64()) if (!WhichDataType(arg).isUInt64())
throw Exception( throw Exception(
"Illegal type " + arg->getName() + " of argument " + std::to_string(1) + " of function " + getName() + ". Must be UInt64", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); "Illegal type {} of argument {} of function {}. Must be UInt64",
arg->getName(), 1, getName());
arg = arguments[1].get(); arg = arguments[1].get();
if (!WhichDataType(arg).isUInt64()) if (!WhichDataType(arg).isUInt64())
throw Exception( throw Exception(
"Illegal type " + arg->getName() + " of argument " + std::to_string(2) + " of function " + getName() + ". Must be UInt64", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); "Illegal type {} of argument {} of function {}. Must be UInt64",
arg->getName(), 2, getName());
return std::make_shared<DataTypeUInt8>(); return std::make_shared<DataTypeUInt8>();
} }

View File

@ -41,8 +41,9 @@ public:
const auto * arg = arguments[0].get(); const auto * arg = arguments[0].get();
if (!WhichDataType(arg).isUInt64()) if (!WhichDataType(arg).isUInt64())
throw Exception( throw Exception(
"Illegal type " + arg->getName() + " of argument " + std::to_string(1) + " of function " + getName() + ". Must be UInt64", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); "Illegal type {} of argument {} of function {}. Must be UInt64",
arg->getName(), 1, getName());
return std::make_shared<DataTypeUInt8>(); return std::make_shared<DataTypeUInt8>();
} }

View File

@ -50,14 +50,16 @@ public:
const auto * arg = arguments[0].get(); const auto * arg = arguments[0].get();
if (!WhichDataType(arg).isUInt64()) if (!WhichDataType(arg).isUInt64())
throw Exception( throw Exception(
"Illegal type " + arg->getName() + " of argument " + std::to_string(1) + " of function " + getName() + ". Must be UInt64", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); "Illegal type {} of argument {} of function {}. Must be UInt64",
arg->getName(), 1, getName());
arg = arguments[1].get(); arg = arguments[1].get();
if (!WhichDataType(arg).isUInt8()) if (!WhichDataType(arg).isUInt8())
throw Exception( throw Exception(
"Illegal type " + arg->getName() + " of argument " + std::to_string(2) + " of function " + getName() + ". Must be UInt8", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); "Illegal type {} of argument {} of function {}. Must be UInt8",
arg->getName(), 2, getName());
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>()); return std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>());
} }
@ -81,14 +83,17 @@ public:
const UInt8 child_resolution = col_resolution->getUInt(row); const UInt8 child_resolution = col_resolution->getUInt(row);
if (child_resolution > MAX_H3_RES) if (child_resolution > MAX_H3_RES)
throw Exception("The argument 'resolution' (" + toString(child_resolution) + ") of function " + getName() throw Exception(
+ " is out of bounds because the maximum resolution in H3 library is " + toString(MAX_H3_RES), ErrorCodes::ARGUMENT_OUT_OF_BOUND); ErrorCodes::ARGUMENT_OUT_OF_BOUND,
"The argument 'resolution' ({}) of function {} is out of bounds because the maximum resolution in H3 library is {}",
toString(child_resolution), getName(), toString(MAX_H3_RES));
const size_t vec_size = cellToChildrenSize(parent_hindex, child_resolution); const size_t vec_size = cellToChildrenSize(parent_hindex, child_resolution);
if (vec_size > MAX_ARRAY_SIZE) if (vec_size > MAX_ARRAY_SIZE)
throw Exception("The result of function" + getName() throw Exception(
+ " (array of " + toString(vec_size) + " elements) will be too large with resolution argument = " ErrorCodes::TOO_LARGE_ARRAY_SIZE,
+ toString(child_resolution), ErrorCodes::TOO_LARGE_ARRAY_SIZE); "The result of function {} (array of {} elements) will be too large with resolution argument = {}",
getName(), toString(vec_size), toString(child_resolution));
hindex_vec.resize(vec_size); hindex_vec.resize(vec_size);
cellToChildren(parent_hindex, child_resolution, hindex_vec.data()); cellToChildren(parent_hindex, child_resolution, hindex_vec.data());

View File

@ -44,14 +44,16 @@ public:
const auto * arg = arguments[0].get(); const auto * arg = arguments[0].get();
if (!WhichDataType(arg).isUInt64()) if (!WhichDataType(arg).isUInt64())
throw Exception( throw Exception(
"Illegal type " + arg->getName() + " of argument " + std::to_string(1) + " of function " + getName() + ". Must be UInt64", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); "Illegal type {} of argument {} of function {}. Must be UInt64",
arg->getName(), 1, getName());
arg = arguments[1].get(); arg = arguments[1].get();
if (!WhichDataType(arg).isUInt8()) if (!WhichDataType(arg).isUInt8())
throw Exception( throw Exception(
"Illegal type " + arg->getName() + " of argument " + std::to_string(2) + " of function " + getName() + ". Must be UInt8", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); "Illegal type {} of argument {} of function {}. Must be UInt8",
arg->getName(), 2, getName());
return std::make_shared<DataTypeUInt64>(); return std::make_shared<DataTypeUInt64>();
} }
@ -71,8 +73,10 @@ public:
const UInt8 resolution = col_resolution->getUInt(row); const UInt8 resolution = col_resolution->getUInt(row);
if (resolution > MAX_H3_RES) if (resolution > MAX_H3_RES)
throw Exception("The argument 'resolution' (" + toString(resolution) + ") of function " + getName() throw Exception(
+ " is out of bounds because the maximum resolution in H3 library is " + toString(MAX_H3_RES), ErrorCodes::ARGUMENT_OUT_OF_BOUND); ErrorCodes::ARGUMENT_OUT_OF_BOUND,
"The argument 'resolution' ({}) of function {} is out of bounds because the maximum resolution in H3 library is {}",
toString(resolution), getName(), toString(MAX_H3_RES));
UInt64 res = cellToParent(hindex, resolution); UInt64 res = cellToParent(hindex, resolution);

View File

@ -42,8 +42,9 @@ public:
const auto * arg = arguments[0].get(); const auto * arg = arguments[0].get();
if (!WhichDataType(arg).isUInt64()) if (!WhichDataType(arg).isUInt64())
throw Exception( throw Exception(
"Illegal type " + arg->getName() + " of argument " + std::to_string(1) + " of function " + getName() + ". Must be UInt64", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); "Illegal type {} of argument {} of function {}. Must be UInt64",
arg->getName(), 1, getName());
return std::make_shared<DataTypeString>(); return std::make_shared<DataTypeString>();
} }
@ -67,16 +68,14 @@ public:
const UInt64 hindex = col_hindex->getUInt(i); const UInt64 hindex = col_hindex->getUInt(i);
if (!isValidCell(hindex)) if (!isValidCell(hindex))
{ throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Invalid H3 index: {}", hindex);
throw Exception("Invalid H3 index: " + std::to_string(hindex), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
h3ToString(hindex, pos, H3_INDEX_STRING_LENGTH); h3ToString(hindex, pos, H3_INDEX_STRING_LENGTH);
// move to end of the index // move to end of the index
while (*pos != '\0') while (*pos != '\0')
{
pos++; pos++;
}
vec_offsets[i] = ++pos - begin; vec_offsets[i] = ++pos - begin;
} }
vec_res.resize(pos - begin); vec_res.resize(pos - begin);

View File

@ -47,14 +47,16 @@ public:
const auto * arg = arguments[0].get(); const auto * arg = arguments[0].get();
if (!WhichDataType(arg).isUInt64()) if (!WhichDataType(arg).isUInt64())
throw Exception( throw Exception(
"Illegal type " + arg->getName() + " of argument " + std::to_string(1) + " of function " + getName() + ". Must be UInt64", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); "Illegal type {} of argument {} of function {}. Must be UInt64",
arg->getName(), 1, getName());
arg = arguments[1].get(); arg = arguments[1].get();
if (!isInteger(arg)) if (!isInteger(arg))
throw Exception( throw Exception(
"Illegal type " + arg->getName() + " of argument " + std::to_string(2) + " of function " + getName() + ". Must be integer", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); "Illegal type {} of argument {} of function {}. Must be integer",
arg->getName(), 2, getName());
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>()); return std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>());
} }

96
src/Functions/h3toGeo.cpp Normal file
View File

@ -0,0 +1,96 @@
#if !defined(ARCADIA_BUILD)
# include "config_functions.h"
#endif
#if USE_H3
#include <array>
#include <math.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
#include <Common/typeid_cast.h>
#include <h3api.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
namespace
{
/// Implements the function h3ToGeo which takes a single argument (h3Index)
/// and returns the longitude and latitude that correspond to the provided h3 index
class FunctionH3ToGeo : public IFunction
{
public:
static constexpr auto name = "h3ToGeo";
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionH3ToGeo>(); }
std::string getName() const override { return name; }
size_t getNumberOfArguments() const override { return 1; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
const auto * arg = arguments[0].get();
if (!WhichDataType(arg).isUInt64())
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of argument {} of function {}. Must be UInt64",
arg->getName(), 1, getName());
return std::make_shared<DataTypeTuple>(
DataTypes{std::make_shared<DataTypeFloat64>(), std::make_shared<DataTypeFloat64>()},
Strings{"longitude", "latitude"});
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
const auto * col_index = arguments[0].column.get();
auto latitude = ColumnFloat64::create(input_rows_count);
auto longitude = ColumnFloat64::create(input_rows_count);
ColumnFloat64::Container & lon_data = longitude->getData();
ColumnFloat64::Container & lat_data = latitude->getData();
for (size_t row = 0; row < input_rows_count; ++row)
{
H3Index h3index = col_index->getUInt(row);
LatLng coord{};
cellToLatLng(h3index,&coord);
lon_data[row] = radsToDegs(coord.lng);
lat_data[row] = radsToDegs(coord.lat);
}
MutableColumns columns;
columns.emplace_back(std::move(longitude));
columns.emplace_back(std::move(latitude));
return ColumnTuple::create(std::move(columns));
}
};
}
void registerFunctionH3ToGeo(FunctionFactory & factory)
{
factory.registerFunction<FunctionH3ToGeo>();
}
}
#endif

View File

@ -28,6 +28,7 @@ void registerFunctionSvg(FunctionFactory & factory);
#if USE_H3 #if USE_H3
void registerFunctionGeoToH3(FunctionFactory &); void registerFunctionGeoToH3(FunctionFactory &);
void registerFunctionH3ToGeo(FunctionFactory &);
void registerFunctionH3EdgeAngle(FunctionFactory &); void registerFunctionH3EdgeAngle(FunctionFactory &);
void registerFunctionH3EdgeLengthM(FunctionFactory &); void registerFunctionH3EdgeLengthM(FunctionFactory &);
void registerFunctionH3GetResolution(FunctionFactory &); void registerFunctionH3GetResolution(FunctionFactory &);
@ -42,6 +43,19 @@ void registerFunctionH3ToString(FunctionFactory &);
void registerFunctionH3HexAreaM2(FunctionFactory &); void registerFunctionH3HexAreaM2(FunctionFactory &);
#endif #endif
#if USE_S2_GEOMETRY
void registerFunctionGeoToS2(FunctionFactory &);
void registerFunctionS2ToGeo(FunctionFactory &);
void registerFunctionS2GetNeighbors(FunctionFactory &);
void registerFunctionS2CellsIntersect(FunctionFactory &);
void registerFunctionS2CapContains(FunctionFactory &);
void registerFunctionS2CapUnion(FunctionFactory &);
void registerFunctionS2RectAdd(FunctionFactory &);
void registerFunctionS2RectContains(FunctionFactory &);
void registerFunctionS2RectUnion(FunctionFactory &);
void registerFunctionS2RectIntersection(FunctionFactory &);
#endif
void registerFunctionsGeo(FunctionFactory & factory) void registerFunctionsGeo(FunctionFactory & factory)
{ {
@ -66,6 +80,7 @@ void registerFunctionsGeo(FunctionFactory & factory)
#if USE_H3 #if USE_H3
registerFunctionGeoToH3(factory); registerFunctionGeoToH3(factory);
registerFunctionH3ToGeo(factory);
registerFunctionH3EdgeAngle(factory); registerFunctionH3EdgeAngle(factory);
registerFunctionH3EdgeLengthM(factory); registerFunctionH3EdgeLengthM(factory);
registerFunctionH3GetResolution(factory); registerFunctionH3GetResolution(factory);
@ -79,6 +94,19 @@ void registerFunctionsGeo(FunctionFactory & factory)
registerFunctionH3ToString(factory); registerFunctionH3ToString(factory);
registerFunctionH3HexAreaM2(factory); registerFunctionH3HexAreaM2(factory);
#endif #endif
#if USE_S2_GEOMETRY
registerFunctionGeoToS2(factory);
registerFunctionS2ToGeo(factory);
registerFunctionS2GetNeighbors(factory);
registerFunctionS2CellsIntersect(factory);
registerFunctionS2CapContains(factory);
registerFunctionS2CapUnion(factory);
registerFunctionS2RectAdd(factory);
registerFunctionS2RectContains(factory);
registerFunctionS2RectUnion(factory);
registerFunctionS2RectIntersection(factory);
#endif
} }
} }

View File

@ -0,0 +1,132 @@
#if !defined(ARCADIA_BUILD)
# include "config_functions.h"
#endif
#if USE_S2_GEOMETRY
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnTuple.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeTuple.h>
#include <Functions/FunctionFactory.h>
#include <Common/typeid_cast.h>
#include <Common/NaNUtils.h>
#include <common/range.h>
#include "s2_fwd.h"
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int BAD_ARGUMENTS;
}
namespace
{
/**
* The cap represents a portion of the sphere that has been cut off by a plane.
* It is defined by a point on a sphere and a radius in degrees.
* Imagine that we draw a line through the center of the sphere and our point.
* An infinite number of planes pass through this line, but any plane will intersect the cap in two points.
* Thus the angle is defined by one of this points and the entire line.
* So, the radius of Pi/2 defines a hemisphere and the radius of Pi defines a whole sphere.
*
* This function returns whether a cap contains a point.
*/
class FunctionS2CapContains : public IFunction
{
public:
static constexpr auto name = "s2CapContains";
static FunctionPtr create(ContextPtr)
{
return std::make_shared<FunctionS2CapContains>();
}
std::string getName() const override
{
return name;
}
size_t getNumberOfArguments() const override { return 3; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
for (size_t index = 0; index < getNumberOfArguments(); ++index)
{
const auto * arg = arguments[index].get();
/// Radius
if (index == 1)
{
if (!WhichDataType(arg).isFloat64())
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of argument {} of function {}. Must be Float64",
arg->getName(), 2, getName());
}
else if (!WhichDataType(arg).isUInt64())
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of argument {} of function {}. Must be UInt64",
arg->getName(), index + 1, getName());
}
return std::make_shared<DataTypeUInt8>();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
const auto * col_center = arguments[0].column.get();
const auto * col_degrees = arguments[1].column.get();
const auto * col_point = arguments[2].column.get();
auto dst = ColumnUInt8::create();
auto & dst_data = dst->getData();
dst_data.reserve(input_rows_count);
for (const auto row : collections::range(0, input_rows_count))
{
const auto center = S2CellId(col_center->getUInt(row));
const Float64 degrees = col_degrees->getFloat64(row);
const auto point = S2CellId(col_point->getUInt(row));
if (isNaN(degrees))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Radius of the cap must not be nan");
if (std::isinf(degrees))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Radius of the cap must not be infinite");
if (!center.is_valid())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Center is not valid");
if (!point.is_valid())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Point is not valid");
S1Angle angle = S1Angle::Degrees(degrees);
S2Cap cap(center.ToPoint(), angle);
dst_data.emplace_back(cap.Contains(point.ToPoint()));
}
return dst;
}
};
}
void registerFunctionS2CapContains(FunctionFactory & factory)
{
factory.registerFunction<FunctionS2CapContains>();
}
}
#endif

View File

@ -0,0 +1,141 @@
#if !defined(ARCADIA_BUILD)
# include "config_functions.h"
#endif
#if USE_S2_GEOMETRY
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnTuple.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeTuple.h>
#include <Functions/FunctionFactory.h>
#include <Common/typeid_cast.h>
#include <Common/NaNUtils.h>
#include <common/range.h>
#include "s2_fwd.h"
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int BAD_ARGUMENTS;
}
namespace
{
/**
* The cap represents a portion of the sphere that has been cut off by a plane.
* See comment for s2CapContains function.
* This function returns the smallest cap that contains both of input caps.
* It is represented by identifier of the center and a radius.
*/
class FunctionS2CapUnion : public IFunction
{
public:
static constexpr auto name = "s2CapUnion";
static FunctionPtr create(ContextPtr)
{
return std::make_shared<FunctionS2CapUnion>();
}
std::string getName() const override
{
return name;
}
size_t getNumberOfArguments() const override { return 4; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
for (size_t index = 0; index < getNumberOfArguments(); ++index)
{
const auto * arg = arguments[index].get();
if (index == 1 || index == 3)
{
if (!WhichDataType(arg).isFloat64())
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of argument {} of function {}. Must be Float64",
arg->getName(), index + 1, getName());
}
else if (!WhichDataType(arg).isUInt64())
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of argument {} of function {}. Must be UInt64",
arg->getName(), index + 1, getName()
);
}
DataTypePtr center = std::make_shared<DataTypeUInt64>();
DataTypePtr radius = std::make_shared<DataTypeFloat64>();
return std::make_shared<DataTypeTuple>(DataTypes{center, radius});
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
const auto * col_center1 = arguments[0].column.get();
const auto * col_radius1 = arguments[1].column.get();
const auto * col_center2 = arguments[2].column.get();
const auto * col_radius2 = arguments[3].column.get();
auto col_res_center = ColumnUInt64::create();
auto col_res_radius = ColumnFloat64::create();
auto & vec_res_center = col_res_center->getData();
vec_res_center.reserve(input_rows_count);
auto & vec_res_radius = col_res_radius->getData();
vec_res_radius.reserve(input_rows_count);
for (const auto row : collections::range(0, input_rows_count))
{
const UInt64 first_center = col_center1->getUInt(row);
const Float64 first_radius = col_radius1->getFloat64(row);
const UInt64 second_center = col_center2->getUInt(row);
const Float64 second_radius = col_radius2->getFloat64(row);
if (isNaN(first_radius) || isNaN(second_radius))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Radius of the cap must not be nan");
if (std::isinf(first_radius) || std::isinf(second_radius))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Radius of the cap must not be infinite");
auto first_center_cell = S2CellId(first_center);
auto second_center_cell = S2CellId(second_center);
if (!first_center_cell.is_valid() || !second_center_cell.is_valid())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Center of the cap is not valid");
S2Cap cap1(first_center_cell.ToPoint(), S1Angle::Degrees(first_radius));
S2Cap cap2(second_center_cell.ToPoint(), S1Angle::Degrees(second_radius));
S2Cap cap_union = cap1.Union(cap2);
vec_res_center.emplace_back(S2CellId(cap_union.center()).id());
vec_res_radius.emplace_back(cap_union.GetRadius().degrees());
}
return ColumnTuple::create(Columns{std::move(col_res_center), std::move(col_res_radius)});
}
};
}
void registerFunctionS2CapUnion(FunctionFactory & factory)
{
factory.registerFunction<FunctionS2CapUnion>();
}
}
#endif

View File

@ -0,0 +1,104 @@
#if !defined(ARCADIA_BUILD)
# include "config_functions.h"
#endif
#if USE_S2_GEOMETRY
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnTuple.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeTuple.h>
#include <Functions/FunctionFactory.h>
#include <Common/typeid_cast.h>
#include <common/range.h>
#include "s2_fwd.h"
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int BAD_ARGUMENTS;
}
namespace
{
/**
* Each cell in s2 library is a quadrilateral bounded by four geodesics.
*/
class FunctionS2CellsIntersect : public IFunction
{
public:
static constexpr auto name = "s2CellsIntersect";
static FunctionPtr create(ContextPtr)
{
return std::make_shared<FunctionS2CellsIntersect>();
}
std::string getName() const override
{
return name;
}
size_t getNumberOfArguments() const override { return 2; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
for (size_t i = 0; i < getNumberOfArguments(); ++i)
{
const auto * arg = arguments[i].get();
if (!WhichDataType(arg).isUInt64())
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of argument {} of function {}. Must be UInt64",
arg->getName(), i, getName());
}
return std::make_shared<DataTypeUInt8>();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
const auto * col_id_first = arguments[0].column.get();
const auto * col_id_second = arguments[1].column.get();
auto dst = ColumnUInt8::create();
auto & dst_data = dst->getData();
dst_data.reserve(input_rows_count);
for (const auto row : collections::range(0, input_rows_count))
{
const UInt64 id_first = col_id_first->getInt(row);
const UInt64 id_second = col_id_second->getInt(row);
auto first_cell = S2CellId(id_first);
auto second_cell = S2CellId(id_second);
if (!first_cell.is_valid() || !second_cell.is_valid())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cell is not valid");
dst_data.emplace_back(S2CellId(id_first).intersects(S2CellId(id_second)));
}
return dst;
}
};
}
void registerFunctionS2CellsIntersect(FunctionFactory & factory)
{
factory.registerFunction<FunctionS2CellsIntersect>();
}
}
#endif

View File

@ -0,0 +1,111 @@
#if !defined(ARCADIA_BUILD)
# include "config_functions.h"
#endif
#if USE_S2_GEOMETRY
#include <Columns/ColumnArray.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <Functions/FunctionFactory.h>
#include <Common/typeid_cast.h>
#include <common/range.h>
#include "s2_fwd.h"
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int BAD_ARGUMENTS;
}
namespace
{
/**
* Each cell in s2 library is a quadrilateral bounded by four geodesics.
* So, each cell has 4 neighbors
*/
class FunctionS2GetNeighbors : public IFunction
{
public:
static constexpr auto name = "s2GetNeighbors";
static FunctionPtr create(ContextPtr)
{
return std::make_shared<FunctionS2GetNeighbors>();
}
std::string getName() const override
{
return name;
}
size_t getNumberOfArguments() const override { return 1; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
const auto * arg = arguments[0].get();
if (!WhichDataType(arg).isUInt64())
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of argument {} of function {}. Must be Float64",
arg->getName(), 1, getName());
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>());
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
const auto * col_id = arguments[0].column.get();
auto dst = ColumnArray::create(ColumnUInt64::create());
auto & dst_data = dst->getData();
auto & dst_offsets = dst->getOffsets();
dst_offsets.resize(input_rows_count);
size_t current_offset = 0;
for (const auto row : collections::range(0, input_rows_count))
{
const UInt64 id = col_id->getUInt(row);
S2CellId cell_id(id);
if (!cell_id.is_valid())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cell is not valid");
S2CellId neighbors[4];
cell_id.GetEdgeNeighbors(neighbors);
dst_data.reserve(dst_data.size() + 4);
for (auto & neighbor : neighbors)
{
++current_offset;
dst_data.insert(neighbor.id());
}
dst_offsets[row] = current_offset;
}
return dst;
}
};
}
void registerFunctionS2GetNeighbors(FunctionFactory & factory)
{
factory.registerFunction<FunctionS2GetNeighbors>();
}
}
#endif

115
src/Functions/s2RectAdd.cpp Normal file
View File

@ -0,0 +1,115 @@
#if !defined(ARCADIA_BUILD)
# include "config_functions.h"
#endif
#if USE_S2_GEOMETRY
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnTuple.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeTuple.h>
#include <Functions/FunctionFactory.h>
#include <Common/typeid_cast.h>
#include <common/range.h>
#include "s2_fwd.h"
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int BAD_ARGUMENTS;
}
namespace
{
class FunctionS2RectAdd : public IFunction
{
public:
static constexpr auto name = "s2RectAdd";
static FunctionPtr create(ContextPtr)
{
return std::make_shared<FunctionS2RectAdd>();
}
std::string getName() const override
{
return name;
}
size_t getNumberOfArguments() const override { return 4; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
for (size_t index = 0; index < getNumberOfArguments(); ++index)
{
const auto * arg = arguments[index].get();
if (!WhichDataType(arg).isUInt64())
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of argument {} of function {}. Must be UInt64",
arg->getName(), index, getName());
}
DataTypePtr element = std::make_shared<DataTypeUInt64>();
return std::make_shared<DataTypeTuple>(DataTypes{element, element});
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
const auto * col_lo = arguments[0].column.get();
const auto * col_hi = arguments[1].column.get();
const auto * col_point = arguments[2].column.get();
auto col_res_first = ColumnUInt64::create();
auto col_res_second = ColumnUInt64::create();
auto & vec_res_first = col_res_first->getData();
vec_res_first.reserve(input_rows_count);
auto & vec_res_second = col_res_second->getData();
vec_res_second.reserve(input_rows_count);
for (const auto row : collections::range(0, input_rows_count))
{
const auto lo = S2CellId(col_lo->getUInt(row));
const auto hi = S2CellId(col_hi->getUInt(row));
const auto point = S2CellId(col_point->getUInt(row));
if (!lo.is_valid() || !hi.is_valid())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Rectangle is not valid");
if (!point.is_valid())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Point is not valid");
S2LatLngRect rect(lo.ToLatLng(), hi.ToLatLng());
rect.AddPoint(point.ToPoint());
vec_res_first.emplace_back(S2CellId(rect.lo()).id());
vec_res_second.emplace_back(S2CellId(rect.hi()).id());
}
return ColumnTuple::create(Columns{std::move(col_res_first), std::move(col_res_second)});
}
};
}
void registerFunctionS2RectAdd(FunctionFactory & factory)
{
factory.registerFunction<FunctionS2RectAdd>();
}
}
#endif

View File

@ -0,0 +1,105 @@
#if !defined(ARCADIA_BUILD)
# include "config_functions.h"
#endif
#if USE_S2_GEOMETRY
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnTuple.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeTuple.h>
#include <Functions/FunctionFactory.h>
#include <Common/typeid_cast.h>
#include <common/range.h>
#include "s2_fwd.h"
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int BAD_ARGUMENTS;
}
namespace
{
class FunctionS2RectContains : public IFunction
{
public:
static constexpr auto name = "s2RectContains";
static FunctionPtr create(ContextPtr)
{
return std::make_shared<FunctionS2RectContains>();
}
std::string getName() const override
{
return name;
}
size_t getNumberOfArguments() const override { return 4; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
for (size_t i = 0; i < getNumberOfArguments(); ++i)
{
const auto * arg = arguments[i].get();
if (!WhichDataType(arg).isUInt64())
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of argument {} of function {}. Must be UInt64",
arg->getName(), i, getName());
}
return std::make_shared<DataTypeUInt8>();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
const auto * col_lo = arguments[0].column.get();
const auto * col_hi = arguments[1].column.get();
const auto * col_point = arguments[2].column.get();
auto dst = ColumnVector<UInt8>::create();
auto & dst_data = dst->getData();
dst_data.reserve(input_rows_count);
for (const auto row : collections::range(0, input_rows_count))
{
const auto lo = S2CellId(col_lo->getUInt(row));
const auto hi = S2CellId(col_hi->getUInt(row));
const auto point = S2CellId(col_point->getUInt(row));
if (!lo.is_valid() || !hi.is_valid())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Rectangle is not valid");
if (!point.is_valid())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Point is not valid");
S2LatLngRect rect(lo.ToLatLng(), hi.ToLatLng());
dst_data.emplace_back(rect.Contains(point.ToLatLng()));
}
return dst;
}
};
}
void registerFunctionS2RectContains(FunctionFactory & factory)
{
factory.registerFunction<FunctionS2RectContains>();
}
}
#endif

View File

@ -0,0 +1,121 @@
#if !defined(ARCADIA_BUILD)
# include "config_functions.h"
#endif
#if USE_S2_GEOMETRY
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnTuple.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeTuple.h>
#include <Functions/FunctionFactory.h>
#include <Common/typeid_cast.h>
#include <common/range.h>
#include "s2_fwd.h"
class S2CellId;
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int BAD_ARGUMENTS;
}
namespace
{
class FunctionS2RectIntersection : public IFunction
{
public:
static constexpr auto name = "s2RectIntersection";
static FunctionPtr create(ContextPtr)
{
return std::make_shared<FunctionS2RectIntersection>();
}
std::string getName() const override
{
return name;
}
size_t getNumberOfArguments() const override { return 4; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
for (size_t i = 0; i < getNumberOfArguments(); ++i)
{
const auto * arg = arguments[i].get();
if (!WhichDataType(arg).isUInt64())
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of argument {} of function {}. Must be UInt64",
arg->getName(), i, getName());
}
DataTypePtr element = std::make_shared<DataTypeUInt64>();
return std::make_shared<DataTypeTuple>(DataTypes{element, element});
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
const auto * col_lo1 = arguments[0].column.get();
const auto * col_hi1 = arguments[1].column.get();
const auto * col_lo2 = arguments[2].column.get();
const auto * col_hi2 = arguments[3].column.get();
auto col_res_first = ColumnUInt64::create();
auto col_res_second = ColumnUInt64::create();
auto & vec_res_first = col_res_first->getData();
vec_res_first.reserve(input_rows_count);
auto & vec_res_second = col_res_second->getData();
vec_res_second.reserve(input_rows_count);
for (const auto row : collections::range(0, input_rows_count))
{
const auto lo1 = S2CellId(col_lo1->getUInt(row));
const auto hi1 = S2CellId(col_hi1->getUInt(row));
const auto lo2 = S2CellId(col_lo2->getUInt(row));
const auto hi2 = S2CellId(col_hi2->getUInt(row));
if (!lo1.is_valid() || !hi1.is_valid())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "First rectangle is not valid");
if (!lo2.is_valid() || !hi2.is_valid())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Second rectangle is not valid");
S2LatLngRect rect1(lo1.ToLatLng(), hi1.ToLatLng());
S2LatLngRect rect2(lo2.ToLatLng(), hi2.ToLatLng());
S2LatLngRect rect_intersection = rect1.Intersection(rect2);
vec_res_first.emplace_back(S2CellId(rect_intersection.lo()).id());
vec_res_second.emplace_back(S2CellId(rect_intersection.hi()).id());
}
return ColumnTuple::create(Columns{std::move(col_res_first), std::move(col_res_second)});
}
};
}
void registerFunctionS2RectIntersection(FunctionFactory & factory)
{
factory.registerFunction<FunctionS2RectIntersection>();
}
}
#endif

View File

@ -0,0 +1,119 @@
#if !defined(ARCADIA_BUILD)
# include "config_functions.h"
#endif
#if USE_S2_GEOMETRY
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnTuple.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeTuple.h>
#include <Functions/FunctionFactory.h>
#include <Common/typeid_cast.h>
#include <common/range.h>
#include "s2_fwd.h"
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int BAD_ARGUMENTS;
}
namespace
{
class FunctionS2RectUnion : public IFunction
{
public:
static constexpr auto name = "s2RectUnion";
static FunctionPtr create(ContextPtr)
{
return std::make_shared<FunctionS2RectUnion>();
}
std::string getName() const override
{
return name;
}
size_t getNumberOfArguments() const override { return 4; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
for (size_t i = 0; i < getNumberOfArguments(); ++i)
{
const auto * arg = arguments[i].get();
if (!WhichDataType(arg).isUInt64())
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of argument {} of function {}. Must be UInt64",
arg->getName(), i + 1, getName());
}
DataTypePtr element = std::make_shared<DataTypeUInt64>();
return std::make_shared<DataTypeTuple>(DataTypes{element, element});
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
const auto * col_lo1 = arguments[0].column.get();
const auto * col_hi1 = arguments[1].column.get();
const auto * col_lo2 = arguments[2].column.get();
const auto * col_hi2 = arguments[3].column.get();
auto col_res_first = ColumnUInt64::create();
auto col_res_second = ColumnUInt64::create();
auto & vec_res_first = col_res_first->getData();
vec_res_first.reserve(input_rows_count);
auto & vec_res_second = col_res_second->getData();
vec_res_second.reserve(input_rows_count);
for (const auto row : collections::range(0, input_rows_count))
{
const auto lo1 = S2CellId(col_lo1->getUInt(row));
const auto hi1 = S2CellId(col_hi1->getUInt(row));
const auto lo2 = S2CellId(col_lo2->getUInt(row));
const auto hi2 = S2CellId(col_hi2->getUInt(row));
if (!lo1.is_valid() || !hi1.is_valid())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "First rectangle is not valid");
if (!lo2.is_valid() || !hi2.is_valid())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Second rectangle is not valid");
S2LatLngRect rect1(lo1.ToLatLng(), hi1.ToLatLng());
S2LatLngRect rect2(lo2.ToLatLng(), hi2.ToLatLng());
S2LatLngRect rect_union = rect1.Union(rect2);
vec_res_first.emplace_back(S2CellId(rect_union.lo()).id());
vec_res_second.emplace_back(S2CellId(rect_union.hi()).id());
}
return ColumnTuple::create(Columns{std::move(col_res_first), std::move(col_res_second)});
}
};
}
void registerFunctionS2RectUnion(FunctionFactory & factory)
{
factory.registerFunction<FunctionS2RectUnion>();
}
}
#endif

110
src/Functions/s2ToGeo.cpp Normal file
View File

@ -0,0 +1,110 @@
#if !defined(ARCADIA_BUILD)
# include "config_functions.h"
#endif
#if USE_S2_GEOMETRY
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnTuple.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeTuple.h>
#include <Functions/FunctionFactory.h>
#include <Common/typeid_cast.h>
#include <common/range.h>
#include "s2_fwd.h"
class S2CellId;
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int BAD_ARGUMENTS;
}
namespace
{
/**
* Returns a point (longitude, latitude) in degrees
*/
class FunctionS2ToGeo : public IFunction
{
public:
static constexpr auto name = "s2ToGeo";
static FunctionPtr create(ContextPtr)
{
return std::make_shared<FunctionS2ToGeo>();
}
std::string getName() const override
{
return name;
}
size_t getNumberOfArguments() const override { return 1; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
const auto * arg = arguments[0].get();
if (!WhichDataType(arg).isUInt64())
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of argument {} of function {}. Must be Float64",
arg->getName(), 1, getName());
DataTypePtr element = std::make_shared<DataTypeFloat64>();
return std::make_shared<DataTypeTuple>(DataTypes{element, element});
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
const auto * col_id = arguments[0].column.get();
auto col_longitude = ColumnFloat64::create();
auto col_latitude = ColumnFloat64::create();
auto & longitude = col_longitude->getData();
longitude.reserve(input_rows_count);
auto & latitude = col_latitude->getData();
latitude.reserve(input_rows_count);
for (const auto row : collections::range(0, input_rows_count))
{
const auto id = S2CellId(col_id->getUInt(row));
if (!id.is_valid())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Point is not valid");
S2Point point = id.ToPoint();
S2LatLng ll(point);
longitude.emplace_back(ll.lng().degrees());
latitude.emplace_back(ll.lat().degrees());
}
return ColumnTuple::create(Columns{std::move(col_longitude), std::move(col_latitude)});
}
};
}
void registerFunctionS2ToGeo(FunctionFactory & factory)
{
factory.registerFunction<FunctionS2ToGeo>();
}
}
#endif

16
src/Functions/s2_fwd.h Normal file
View File

@ -0,0 +1,16 @@
#pragma once
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wambiguous-reversed-operator"
#endif
#include <s2/s2latlng.h> // Y_IGNORE
#include <s2/s2cell_id.h> // Y_IGNORE
#include <s2/s2point.h> // Y_IGNORE
#include <s2/s2latlng_rect.h> // Y_IGNORE
#include <s2/s2cap.h> // Y_IGNORE
#include <s2/s1angle.h> // Y_IGNORE
#ifdef __clang__
#pragma clang diagnostic pop
#endif

View File

@ -277,6 +277,7 @@ SRCS(
gcd.cpp gcd.cpp
generateUUIDv4.cpp generateUUIDv4.cpp
geoToH3.cpp geoToH3.cpp
geoToS2.cpp
geohashDecode.cpp geohashDecode.cpp
geohashEncode.cpp geohashEncode.cpp
geohashesInBox.cpp geohashesInBox.cpp
@ -300,6 +301,7 @@ SRCS(
h3ToParent.cpp h3ToParent.cpp
h3ToString.cpp h3ToString.cpp
h3kRing.cpp h3kRing.cpp
h3toGeo.cpp
hasColumnInTable.cpp hasColumnInTable.cpp
hasThreadFuzzer.cpp hasThreadFuzzer.cpp
hasToken.cpp hasToken.cpp
@ -455,6 +457,15 @@ SRCS(
runningConcurrency.cpp runningConcurrency.cpp
runningDifference.cpp runningDifference.cpp
runningDifferenceStartingWithFirstValue.cpp runningDifferenceStartingWithFirstValue.cpp
s2CapContains.cpp
s2CapUnion.cpp
s2CellsIntersect.cpp
s2GetNeighbors.cpp
s2RectAdd.cpp
s2RectContains.cpp
s2RectIntersection.cpp
s2RectUnion.cpp
s2ToGeo.cpp
sigmoid.cpp sigmoid.cpp
sign.cpp sign.cpp
sin.cpp sin.cpp

View File

@ -0,0 +1,269 @@
#include <IO/FileEncryptionCommon.h>
#if USE_SSL
#include <IO/ReadHelpers.h>
#include <IO/ReadBuffer.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
#include <cassert>
#include <random>
namespace DB
{
namespace ErrorCodes
{
extern const int DATA_ENCRYPTION_ERROR;
}
namespace FileEncryption
{
namespace
{
String toBigEndianString(UInt128 value)
{
WriteBufferFromOwnString out;
writeBinaryBigEndian(value, out);
return std::move(out.str());
}
UInt128 fromBigEndianString(const String & str)
{
ReadBufferFromMemory in{str.data(), str.length()};
UInt128 result;
readBinaryBigEndian(result, in);
return result;
}
}
InitVector::InitVector(const String & iv_) : iv(fromBigEndianString(iv_)) {}
const String & InitVector::str() const
{
local = toBigEndianString(iv + counter);
return local;
}
Encryption::Encryption(const String & iv_, const EncryptionKey & key_, size_t offset_)
: evp_cipher(defaultCipher())
, init_vector(iv_)
, key(key_)
, block_size(cipherIVLength(evp_cipher))
{
if (iv_.size() != cipherIVLength(evp_cipher))
throw DB::Exception("Expected iv with size " + std::to_string(cipherIVLength(evp_cipher)) + ", got iv with size " + std::to_string(iv_.size()),
DB::ErrorCodes::DATA_ENCRYPTION_ERROR);
if (key_.size() != cipherKeyLength(evp_cipher))
throw DB::Exception("Expected key with size " + std::to_string(cipherKeyLength(evp_cipher)) + ", got iv with size " + std::to_string(key_.size()),
DB::ErrorCodes::DATA_ENCRYPTION_ERROR);
offset = offset_;
}
size_t Encryption::partBlockSize(size_t size, size_t off) const
{
assert(off < block_size);
/// write the part as usual block
if (off == 0)
return 0;
return off + size <= block_size ? size : (block_size - off) % block_size;
}
void Encryptor::encrypt(const char * plaintext, WriteBuffer & buf, size_t size)
{
if (!size)
return;
auto iv = InitVector(init_vector);
auto off = blockOffset(offset);
iv.set(blocks(offset));
size_t part_size = partBlockSize(size, off);
if (off)
{
buf.write(encryptPartialBlock(plaintext, part_size, iv, off).data(), part_size);
offset += part_size;
size -= part_size;
iv.inc();
}
if (size)
{
buf.write(encryptNBytes(plaintext + part_size, size, iv).data(), size);
offset += size;
}
}
String Encryptor::encryptPartialBlock(const char * partial_block, size_t size, const InitVector & iv, size_t off) const
{
if (size > block_size)
throw Exception("Expected partial block, got block with size > block_size: size = " + std::to_string(size) + " and offset = " + std::to_string(off),
ErrorCodes::DATA_ENCRYPTION_ERROR);
String plaintext(block_size, '\0');
for (size_t i = 0; i < size; ++i)
plaintext[i + off] = partial_block[i];
return String(encryptNBytes(plaintext.data(), block_size, iv), off, size);
}
String Encryptor::encryptNBytes(const char * data, size_t bytes, const InitVector & iv) const
{
String ciphertext(bytes, '\0');
auto * ciphertext_ref = ciphertext.data();
auto evp_ctx_ptr = std::unique_ptr<EVP_CIPHER_CTX, decltype(&::EVP_CIPHER_CTX_free)>(EVP_CIPHER_CTX_new(), &EVP_CIPHER_CTX_free);
auto * evp_ctx = evp_ctx_ptr.get();
if (EVP_EncryptInit_ex(evp_ctx, evp_cipher, nullptr, nullptr, nullptr) != 1)
throw Exception("Failed to initialize encryption context with cipher", ErrorCodes::DATA_ENCRYPTION_ERROR);
if (EVP_EncryptInit_ex(evp_ctx, nullptr, nullptr,
reinterpret_cast<const unsigned char*>(key.str().data()),
reinterpret_cast<const unsigned char*>(iv.str().data())) != 1)
throw Exception("Failed to set key and IV for encryption", ErrorCodes::DATA_ENCRYPTION_ERROR);
int output_len = 0;
if (EVP_EncryptUpdate(evp_ctx,
reinterpret_cast<unsigned char*>(ciphertext_ref), &output_len,
reinterpret_cast<const unsigned char*>(data), static_cast<int>(bytes)) != 1)
throw Exception("Failed to encrypt", ErrorCodes::DATA_ENCRYPTION_ERROR);
ciphertext_ref += output_len;
int final_output_len = 0;
if (EVP_EncryptFinal_ex(evp_ctx,
reinterpret_cast<unsigned char*>(ciphertext_ref), &final_output_len) != 1)
throw Exception("Failed to fetch ciphertext", ErrorCodes::DATA_ENCRYPTION_ERROR);
if (output_len < 0 || final_output_len < 0 || static_cast<size_t>(output_len) + static_cast<size_t>(final_output_len) != bytes)
throw Exception("Only part of the data was encrypted", ErrorCodes::DATA_ENCRYPTION_ERROR);
return ciphertext;
}
void Decryptor::decrypt(const char * ciphertext, BufferBase::Position buf, size_t size, size_t off)
{
if (!size)
return;
auto iv = InitVector(init_vector);
iv.set(blocks(off));
off = blockOffset(off);
size_t part_size = partBlockSize(size, off);
if (off)
{
decryptPartialBlock(buf, ciphertext, part_size, iv, off);
size -= part_size;
if (part_size + off == block_size)
iv.inc();
}
if (size)
decryptNBytes(buf, ciphertext + part_size, size, iv);
}
void Decryptor::decryptPartialBlock(BufferBase::Position & to, const char * partial_block, size_t size, const InitVector & iv, size_t off) const
{
if (size > block_size)
throw Exception("Expecter partial block, got block with size > block_size: size = " + std::to_string(size) + " and offset = " + std::to_string(off),
ErrorCodes::DATA_ENCRYPTION_ERROR);
String ciphertext(block_size, '\0');
String plaintext(block_size, '\0');
for (size_t i = 0; i < size; ++i)
ciphertext[i + off] = partial_block[i];
auto * plaintext_ref = plaintext.data();
decryptNBytes(plaintext_ref, ciphertext.data(), off + size, iv);
for (size_t i = 0; i < size; ++i)
*(to++) = plaintext[i + off];
}
void Decryptor::decryptNBytes(BufferBase::Position & to, const char * data, size_t bytes, const InitVector & iv) const
{
auto evp_ctx_ptr = std::unique_ptr<EVP_CIPHER_CTX, decltype(&::EVP_CIPHER_CTX_free)>(EVP_CIPHER_CTX_new(), &EVP_CIPHER_CTX_free);
auto * evp_ctx = evp_ctx_ptr.get();
if (EVP_DecryptInit_ex(evp_ctx, evp_cipher, nullptr, nullptr, nullptr) != 1)
throw Exception("Failed to initialize decryption context with cipher", ErrorCodes::DATA_ENCRYPTION_ERROR);
if (EVP_DecryptInit_ex(evp_ctx, nullptr, nullptr,
reinterpret_cast<const unsigned char*>(key.str().data()),
reinterpret_cast<const unsigned char*>(iv.str().data())) != 1)
throw Exception("Failed to set key and IV for decryption", ErrorCodes::DATA_ENCRYPTION_ERROR);
int output_len = 0;
if (EVP_DecryptUpdate(evp_ctx,
reinterpret_cast<unsigned char*>(to), &output_len,
reinterpret_cast<const unsigned char*>(data), static_cast<int>(bytes)) != 1)
throw Exception("Failed to decrypt", ErrorCodes::DATA_ENCRYPTION_ERROR);
to += output_len;
int final_output_len = 0;
if (EVP_DecryptFinal_ex(evp_ctx,
reinterpret_cast<unsigned char*>(to), &final_output_len) != 1)
throw Exception("Failed to fetch plaintext", ErrorCodes::DATA_ENCRYPTION_ERROR);
if (output_len < 0 || final_output_len < 0 || static_cast<size_t>(output_len) + static_cast<size_t>(final_output_len) != bytes)
throw Exception("Only part of the data was decrypted", ErrorCodes::DATA_ENCRYPTION_ERROR);
}
String readIV(size_t size, ReadBuffer & in)
{
String iv(size, 0);
in.readStrict(reinterpret_cast<char *>(iv.data()), size);
return iv;
}
String randomString(size_t size)
{
String iv(size, 0);
std::random_device rd;
std::mt19937 gen{rd()};
std::uniform_int_distribution<size_t> dis;
char * ptr = iv.data();
while (size)
{
auto value = dis(gen);
size_t n = std::min(size, sizeof(value));
memcpy(ptr, &value, n);
ptr += n;
size -= n;
}
return iv;
}
void writeIV(const String & iv, WriteBuffer & out)
{
out.write(iv.data(), iv.length());
}
size_t cipherKeyLength(const EVP_CIPHER * evp_cipher)
{
return static_cast<size_t>(EVP_CIPHER_key_length(evp_cipher));
}
size_t cipherIVLength(const EVP_CIPHER * evp_cipher)
{
return static_cast<size_t>(EVP_CIPHER_iv_length(evp_cipher));
}
const EVP_CIPHER * defaultCipher()
{
return EVP_aes_128_ctr();
}
}
}
#endif

View File

@ -0,0 +1,104 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include <Common/config.h>
#endif
#if USE_SSL
#include <Core/Types.h>
#include <openssl/evp.h>
namespace DB
{
class ReadBuffer;
class WriteBuffer;
namespace FileEncryption
{
constexpr size_t kIVSize = sizeof(UInt128);
class InitVector
{
public:
InitVector(const String & iv_);
const String & str() const;
void inc() { ++counter; }
void inc(size_t n) { counter += n; }
void set(size_t n) { counter = n; }
private:
UInt128 iv;
UInt128 counter = 0;
mutable String local;
};
class EncryptionKey
{
public:
EncryptionKey(const String & key_) : key(key_) { }
size_t size() const { return key.size(); }
const String & str() const { return key; }
private:
String key;
};
class Encryption
{
public:
Encryption(const String & iv_, const EncryptionKey & key_, size_t offset_);
protected:
size_t blockOffset(size_t pos) const { return pos % block_size; }
size_t blocks(size_t pos) const { return pos / block_size; }
size_t partBlockSize(size_t size, size_t off) const;
const EVP_CIPHER * get() const { return evp_cipher; }
const EVP_CIPHER * evp_cipher;
const String init_vector;
const EncryptionKey key;
size_t block_size;
/// absolute offset
size_t offset = 0;
};
class Encryptor : public Encryption
{
public:
using Encryption::Encryption;
void encrypt(const char * plaintext, WriteBuffer & buf, size_t size);
private:
String encryptPartialBlock(const char * partial_block, size_t size, const InitVector & iv, size_t off) const;
String encryptNBytes(const char * data, size_t bytes, const InitVector & iv) const;
};
class Decryptor : public Encryption
{
public:
Decryptor(const String & iv_, const EncryptionKey & key_) : Encryption(iv_, key_, 0) { }
void decrypt(const char * ciphertext, char * buf, size_t size, size_t off);
private:
void decryptPartialBlock(char *& to, const char * partial_block, size_t size, const InitVector & iv, size_t off) const;
void decryptNBytes(char *& to, const char * data, size_t bytes, const InitVector & iv) const;
};
String readIV(size_t size, ReadBuffer & in);
String randomString(size_t size);
void writeIV(const String & iv, WriteBuffer & out);
size_t cipherKeyLength(const EVP_CIPHER * evp_cipher);
size_t cipherIVLength(const EVP_CIPHER * evp_cipher);
const EVP_CIPHER * defaultCipher();
}
}
#endif

View File

@ -0,0 +1,101 @@
#include <IO/ReadBufferFromEncryptedFile.h>
#if USE_SSL
namespace DB
{
namespace ErrorCodes
{
extern const int ARGUMENT_OUT_OF_BOUND;
}
ReadBufferFromEncryptedFile::ReadBufferFromEncryptedFile(
size_t buf_size_,
std::unique_ptr<ReadBufferFromFileBase> in_,
const String & init_vector_,
const FileEncryption::EncryptionKey & key_,
const size_t iv_offset_)
: ReadBufferFromFileBase(buf_size_, nullptr, 0)
, in(std::move(in_))
, buf_size(buf_size_)
, decryptor(FileEncryption::Decryptor(init_vector_, key_))
, iv_offset(iv_offset_)
{
}
off_t ReadBufferFromEncryptedFile::seek(off_t off, int whence)
{
if (whence == SEEK_CUR)
{
if (off < 0 && -off > getPosition())
throw Exception("SEEK_CUR shift out of bounds", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
if (!working_buffer.empty() && static_cast<size_t>(offset() + off) < working_buffer.size())
{
pos += off;
return getPosition();
}
else
start_pos = off + getPosition();
}
else if (whence == SEEK_SET)
{
if (off < 0)
throw Exception("SEEK_SET underflow: off = " + std::to_string(off), ErrorCodes::ARGUMENT_OUT_OF_BOUND);
if (!working_buffer.empty() && static_cast<size_t>(off) >= start_pos
&& static_cast<size_t>(off) < (start_pos + working_buffer.size()))
{
pos = working_buffer.begin() + (off - start_pos);
return getPosition();
}
else
start_pos = off;
}
else
throw Exception("ReadBufferFromEncryptedFile::seek expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
initialize();
return start_pos;
}
bool ReadBufferFromEncryptedFile::nextImpl()
{
if (in->eof())
return false;
if (initialized)
start_pos += working_buffer.size();
initialize();
return true;
}
void ReadBufferFromEncryptedFile::initialize()
{
size_t in_pos = start_pos + iv_offset;
String data;
data.resize(buf_size);
size_t data_size = 0;
in->seek(in_pos, SEEK_SET);
while (data_size < buf_size && !in->eof())
{
auto size = in->read(data.data() + data_size, buf_size - data_size);
data_size += size;
in_pos += size;
in->seek(in_pos, SEEK_SET);
}
data.resize(data_size);
working_buffer.resize(data_size);
decryptor.decrypt(data.data(), working_buffer.begin(), data_size, start_pos);
pos = working_buffer.begin();
initialized = true;
}
}
#endif

View File

@ -0,0 +1,50 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include <Common/config.h>
#endif
#if USE_SSL
#include <IO/ReadBufferFromFileBase.h>
#include <IO/FileEncryptionCommon.h>
namespace DB
{
class ReadBufferFromEncryptedFile : public ReadBufferFromFileBase
{
public:
ReadBufferFromEncryptedFile(
size_t buf_size_,
std::unique_ptr<ReadBufferFromFileBase> in_,
const String & init_vector_,
const FileEncryption::EncryptionKey & key_,
const size_t iv_offset_);
off_t seek(off_t off, int whence) override;
off_t getPosition() override { return start_pos + offset(); }
std::string getFileName() const override { return in->getFileName(); }
private:
bool nextImpl() override;
void initialize();
std::unique_ptr<ReadBufferFromFileBase> in;
size_t buf_size;
FileEncryption::Decryptor decryptor;
bool initialized = false;
// current working_buffer.begin() offset from decrypted file
size_t start_pos = 0;
size_t iv_offset = 0;
};
}
#endif

View File

@ -45,20 +45,27 @@ bool ReadBufferFromS3::nextImpl()
{ {
Stopwatch watch; Stopwatch watch;
bool next_result = false; bool next_result = false;
auto sleep_time_with_backoff_milliseconds = std::chrono::milliseconds(100);
if (!impl) if (impl)
{
/// `impl` has been initialized earlier and now we're at the end of the current portion of data.
impl->position() = position();
assert(!impl->hasPendingData());
}
else
{
/// `impl` is not initialized and we're about to read the first portion of data.
impl = initialize(); impl = initialize();
next_result = impl->hasPendingData();
}
for (size_t attempt = 0; attempt < max_single_read_retries; ++attempt) auto sleep_time_with_backoff_milliseconds = std::chrono::milliseconds(100);
for (size_t attempt = 0; (attempt < max_single_read_retries) && !next_result; ++attempt)
{ {
try try
{ {
/// Try to read a next portion of data.
next_result = impl->next(); next_result = impl->next();
/// FIXME. 1. Poco `istream` cannot read less than buffer_size or this state is being discarded during
/// istream <-> iostream conversion. `gcount` always contains 0,
/// that's why we always have error "Cannot read from istream at offset 0".
break; break;
} }
catch (const Exception & e) catch (const Exception & e)
@ -68,24 +75,26 @@ bool ReadBufferFromS3::nextImpl()
LOG_INFO(log, "Caught exception while reading S3 object. Bucket: {}, Key: {}, Offset: {}, Attempt: {}, Message: {}", LOG_INFO(log, "Caught exception while reading S3 object. Bucket: {}, Key: {}, Offset: {}, Attempt: {}, Message: {}",
bucket, key, getPosition(), attempt, e.message()); bucket, key, getPosition(), attempt, e.message());
/// Pause before next attempt.
std::this_thread::sleep_for(sleep_time_with_backoff_milliseconds);
sleep_time_with_backoff_milliseconds *= 2;
/// Try to reinitialize `impl`.
impl.reset(); impl.reset();
impl = initialize(); impl = initialize();
next_result = impl->hasPendingData();
} }
std::this_thread::sleep_for(sleep_time_with_backoff_milliseconds);
sleep_time_with_backoff_milliseconds *= 2;
} }
watch.stop(); watch.stop();
ProfileEvents::increment(ProfileEvents::S3ReadMicroseconds, watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::S3ReadMicroseconds, watch.elapsedMicroseconds());
if (!next_result) if (!next_result)
return false; return false;
working_buffer = internal_buffer = impl->buffer(); BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset()); /// use the buffer returned by `impl`
pos = working_buffer.begin();
ProfileEvents::increment(ProfileEvents::S3ReadBytes, internal_buffer.size());
ProfileEvents::increment(ProfileEvents::S3ReadBytes, working_buffer.size());
offset += working_buffer.size(); offset += working_buffer.size();
return true; return true;

View File

@ -921,6 +921,17 @@ readBinaryBigEndian(T & x, ReadBuffer & buf) /// Assuming little endian archi
x = __builtin_bswap64(x); x = __builtin_bswap64(x);
} }
template <typename T>
inline std::enable_if_t<is_big_int_v<T>, void>
readBinaryBigEndian(T & x, ReadBuffer & buf) /// Assuming little endian architecture.
{
for (size_t i = 0; i != std::size(x.items); ++i)
{
auto & item = x.items[std::size(x.items) - i - 1];
readBinaryBigEndian(item, buf);
}
}
/// Generic methods to read value in text tab-separated format. /// Generic methods to read value in text tab-separated format.
template <typename T> template <typename T>

View File

@ -0,0 +1,79 @@
#include <IO/WriteBufferFromEncryptedFile.h>
#if USE_SSL
#include <Common/MemoryTracker.h>
namespace DB
{
WriteBufferFromEncryptedFile::WriteBufferFromEncryptedFile(
size_t buf_size_,
std::unique_ptr<WriteBufferFromFileBase> out_,
const String & init_vector_,
const FileEncryption::EncryptionKey & key_,
const size_t & file_size)
: WriteBufferFromFileBase(buf_size_, nullptr, 0)
, out(std::move(out_))
, flush_iv(!file_size)
, iv(init_vector_)
, encryptor(FileEncryption::Encryptor(init_vector_, key_, file_size))
{
}
WriteBufferFromEncryptedFile::~WriteBufferFromEncryptedFile()
{
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
finish();
}
void WriteBufferFromEncryptedFile::finish()
{
if (finished)
return;
try
{
finishImpl();
out->finalize();
finished = true;
}
catch (...)
{
/// Do not try to flush next time after exception.
out->position() = out->buffer().begin();
finished = true;
throw;
}
}
void WriteBufferFromEncryptedFile::finishImpl()
{
/// If buffer has pending data - write it.
next();
out->finalize();
}
void WriteBufferFromEncryptedFile::sync()
{
/// If buffer has pending data - write it.
next();
out->sync();
}
void WriteBufferFromEncryptedFile::nextImpl()
{
if (!offset())
return;
if (flush_iv)
{
FileEncryption::writeIV(iv, *out);
flush_iv = false;
}
encryptor.encrypt(working_buffer.begin(), *out, offset());
}
}
#endif

View File

@ -0,0 +1,47 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include <Common/config.h>
#endif
#if USE_SSL
#include <IO/WriteBufferFromFileBase.h>
#include <IO/FileEncryptionCommon.h>
namespace DB
{
class WriteBufferFromEncryptedFile : public WriteBufferFromFileBase
{
public:
WriteBufferFromEncryptedFile(
size_t buf_size_,
std::unique_ptr<WriteBufferFromFileBase> out_,
const String & init_vector_,
const FileEncryption::EncryptionKey & key_,
const size_t & file_size);
~WriteBufferFromEncryptedFile() override;
void sync() override;
void finalize() override { finish(); }
std::string getFileName() const override { return out->getFileName(); }
private:
void nextImpl() override;
void finish();
void finishImpl();
bool finished = false;
std::unique_ptr<WriteBufferFromFileBase> out;
bool flush_iv;
String iv;
FileEncryption::Encryptor encryptor;
};
}
#endif

View File

@ -1099,6 +1099,17 @@ writeBinaryBigEndian(T x, WriteBuffer & buf) /// Assuming little endian archi
writePODBinary(x, buf); writePODBinary(x, buf);
} }
template <typename T>
inline std::enable_if_t<is_big_int_v<T>, void>
writeBinaryBigEndian(const T & x, WriteBuffer & buf) /// Assuming little endian architecture.
{
for (size_t i = 0; i != std::size(x.items); ++i)
{
const auto & item = x.items[std::size(x.items) - i - 1];
writeBinaryBigEndian(item, buf);
}
}
struct PcgSerializer struct PcgSerializer
{ {
static void serializePcg32(const pcg32_fast & rng, WriteBuffer & buf) static void serializePcg32(const pcg32_fast & rng, WriteBuffer & buf)

View File

@ -0,0 +1,150 @@
#if !defined(ARCADIA_BUILD)
#include <Common/config.h>
#endif
#if USE_SSL
#include <gtest/gtest.h>
#include <IO/WriteBufferFromString.h>
#include <IO/FileEncryptionCommon.h>
using namespace DB;
using namespace DB::FileEncryption;
struct InitVectorTestParam
{
const std::string_view comment;
const String init;
UInt128 adder;
UInt128 setter;
const String after_inc;
const String after_add;
const String after_set;
};
class InitVectorTest : public ::testing::TestWithParam<InitVectorTestParam> {};
String string_ends_with(size_t size, String str)
{
String res(size, 0);
res.replace(size - str.size(), str.size(), str);
return res;
}
static std::ostream & operator << (std::ostream & ostr, const InitVectorTestParam & param)
{
return ostr << param.comment;
}
TEST_P(InitVectorTest, InitVector)
{
const auto & param = GetParam();
auto iv = InitVector(param.init);
ASSERT_EQ(param.init, iv.str());
iv.inc();
ASSERT_EQ(param.after_inc, iv.str());
iv.inc(param.adder);
ASSERT_EQ(param.after_add, iv.str());
iv.set(param.setter);
ASSERT_EQ(param.after_set, iv.str());
iv.set(0);
ASSERT_EQ(param.init, iv.str());
}
INSTANTIATE_TEST_SUITE_P(InitVectorInputs,
InitVectorTest,
::testing::ValuesIn(std::initializer_list<InitVectorTestParam>{
{
"Basic init vector test. Get zero-string, add 0, set 0",
String(16, 0),
0,
0,
string_ends_with(16, "\x1"),
string_ends_with(16, "\x1"),
String(16, 0),
},
{
"Init vector test. Get zero-string, add 85, set 1024",
String(16, 0),
85,
1024,
string_ends_with(16, "\x1"),
string_ends_with(16, "\x56"),
string_ends_with(16, String("\x4\0", 2)),
},
{
"Long init vector test",
"\xa8\x65\x9c\x73\xf8\x5d\x83\xb4\x5c\xa6\x8c\x19\xf4\x77\x80\xe1",
3349249125638641,
1698923461902341,
"\xa8\x65\x9c\x73\xf8\x5d\x83\xb4\x5c\xa6\x8c\x19\xf4\x77\x80\xe2",
"\xa8\x65\x9c\x73\xf8\x5d\x83\xb4\x5c\xb2\x72\x39\xc8\xdd\x62\xd3",
String("\xa8\x65\x9c\x73\xf8\x5d\x83\xb4\x5c\xac\x95\x43\x65\xea\x00\xe6", 16)
},
})
);
TEST(FileEncryption, Encryption)
{
String iv(16, 0);
EncryptionKey key("1234567812345678");
String input = "abcd1234efgh5678ijkl";
String expected = "\xfb\x8a\x9e\x66\x82\x72\x1b\xbe\x6b\x1d\xd8\x98\xc5\x8c\x63\xee\xcd\x36\x4a\x50";
String result(expected.size(), 0);
for (size_t i = 0; i <= expected.size(); ++i)
{
auto buf = WriteBufferFromString(result);
auto encryptor = Encryptor(iv, key, 0);
encryptor.encrypt(input.data(), buf, i);
ASSERT_EQ(expected.substr(0, i), result.substr(0, i));
}
size_t offset = 25;
String offset_expected = "\x6c\x67\xe4\xf5\x8f\x86\xb0\x19\xe5\xcd\x53\x59\xe0\xc6\x01\x5e\xc1\xfd\x60\x9d";
for (size_t i = 0; i <= expected.size(); ++i)
{
auto buf = WriteBufferFromString(result);
auto encryptor = Encryptor(iv, key, offset);
encryptor.encrypt(input.data(), buf, i);
ASSERT_EQ(offset_expected.substr(0, i), result.substr(0, i));
}
}
TEST(FileEncryption, Decryption)
{
String iv(16, 0);
EncryptionKey key("1234567812345678");
String expected = "abcd1234efgh5678ijkl";
String input = "\xfb\x8a\x9e\x66\x82\x72\x1b\xbe\x6b\x1d\xd8\x98\xc5\x8c\x63\xee\xcd\x36\x4a\x50";
auto decryptor = Decryptor(iv, key);
String result(expected.size(), 0);
for (size_t i = 0; i <= expected.size(); ++i)
{
decryptor.decrypt(input.data(), result.data(), i, 0);
ASSERT_EQ(expected.substr(0, i), result.substr(0, i));
}
size_t offset = 25;
String offset_input = "\x6c\x67\xe4\xf5\x8f\x86\xb0\x19\xe5\xcd\x53\x59\xe0\xc6\x01\x5e\xc1\xfd\x60\x9d";
for (size_t i = 0; i <= expected.size(); ++i)
{
decryptor.decrypt(offset_input.data(), result.data(), i, offset);
ASSERT_EQ(expected.substr(0, i), result.substr(0, i));
}
}
#endif

View File

@ -26,6 +26,7 @@ SRCS(
CascadeWriteBuffer.cpp CascadeWriteBuffer.cpp
CompressionMethod.cpp CompressionMethod.cpp
DoubleConverter.cpp DoubleConverter.cpp
FileEncryptionCommon.cpp
HTTPChunkedReadBuffer.cpp HTTPChunkedReadBuffer.cpp
HTTPCommon.cpp HTTPCommon.cpp
HashingWriteBuffer.cpp HashingWriteBuffer.cpp
@ -44,6 +45,7 @@ SRCS(
NullWriteBuffer.cpp NullWriteBuffer.cpp
PeekableReadBuffer.cpp PeekableReadBuffer.cpp
Progress.cpp Progress.cpp
ReadBufferFromEncryptedFile.cpp
ReadBufferFromFile.cpp ReadBufferFromFile.cpp
ReadBufferFromFileBase.cpp ReadBufferFromFileBase.cpp
ReadBufferFromFileDecorator.cpp ReadBufferFromFileDecorator.cpp
@ -55,6 +57,7 @@ SRCS(
SeekAvoidingReadBuffer.cpp SeekAvoidingReadBuffer.cpp
TimeoutSetter.cpp TimeoutSetter.cpp
UseSSL.cpp UseSSL.cpp
WriteBufferFromEncryptedFile.cpp
WriteBufferFromFile.cpp WriteBufferFromFile.cpp
WriteBufferFromFileBase.cpp WriteBufferFromFileBase.cpp
WriteBufferFromFileDecorator.cpp WriteBufferFromFileDecorator.cpp

View File

@ -149,6 +149,7 @@ void executeQuery(
OptimizeShardingKeyRewriteInVisitor::Data visitor_data{ OptimizeShardingKeyRewriteInVisitor::Data visitor_data{
sharding_key_expr, sharding_key_expr,
sharding_key_expr->getSampleBlock().getByPosition(0).type,
sharding_key_column_name, sharding_key_column_name,
shard_info, shard_info,
not_optimized_cluster->getSlotToShard(), not_optimized_cluster->getSlotToShard(),

View File

@ -2355,11 +2355,6 @@ OutputFormatPtr Context::getOutputFormatParallelIfPossible(const String & name,
return FormatFactory::instance().getOutputFormatParallelIfPossible(name, buf, sample, shared_from_this()); return FormatFactory::instance().getOutputFormatParallelIfPossible(name, buf, sample, shared_from_this());
} }
OutputFormatPtr Context::getOutputFormat(const String & name, WriteBuffer & buf, const Block & sample) const
{
return FormatFactory::instance().getOutputFormat(name, buf, sample, shared_from_this());
}
time_t Context::getUptimeSeconds() const time_t Context::getUptimeSeconds() const
{ {
@ -2732,4 +2727,18 @@ PartUUIDsPtr Context::getIgnoredPartUUIDs() const
return ignored_part_uuids; return ignored_part_uuids;
} }
void Context::setMySQLProtocolContext(MySQLWireContext * mysql_context)
{
assert(session_context.lock().get() == this);
assert(!mysql_protocol_context);
assert(mysql_context);
mysql_protocol_context = mysql_context;
}
MySQLWireContext * Context::getMySQLProtocolContext() const
{
assert(!mysql_protocol_context || session_context.lock().get());
return mysql_protocol_context;
}
} }

Some files were not shown because too many files have changed in this diff Show More