Merge branch 'master' into integration-tests-7

This commit is contained in:
Mikhail f. Shiryaev 2022-07-26 14:12:37 +02:00 committed by GitHub
commit 3c07684c00
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
88 changed files with 1006 additions and 307 deletions

View File

@ -1,12 +1,12 @@
### Table of Contents ### Table of Contents
**[ClickHouse release v22.7, 2022-07-21](#226)**<br> **[ClickHouse release v22.7, 2022-07-21](#227)**<br/>
**[ClickHouse release v22.6, 2022-06-16](#226)**<br> **[ClickHouse release v22.6, 2022-06-16](#226)**<br/>
**[ClickHouse release v22.5, 2022-05-19](#225)**<br> **[ClickHouse release v22.5, 2022-05-19](#225)**<br/>
**[ClickHouse release v22.4, 2022-04-20](#224)**<br> **[ClickHouse release v22.4, 2022-04-20](#224)**<br/>
**[ClickHouse release v22.3-lts, 2022-03-17](#223)**<br> **[ClickHouse release v22.3-lts, 2022-03-17](#223)**<br/>
**[ClickHouse release v22.2, 2022-02-17](#222)**<br> **[ClickHouse release v22.2, 2022-02-17](#222)**<br/>
**[ClickHouse release v22.1, 2022-01-18](#221)**<br> **[ClickHouse release v22.1, 2022-01-18](#221)**<br/>
**[Changelog for 2021](https://clickhouse.com/docs/en/whats-new/changelog/2021/)**<br> **[Changelog for 2021](https://clickhouse.com/docs/en/whats-new/changelog/2021/)**<br/>
### <a id="227"></a> ClickHouse release 22.7, 2022-07-21 ### <a id="227"></a> ClickHouse release 22.7, 2022-07-21

View File

@ -554,6 +554,16 @@ macro (clickhouse_add_executable target)
endif() endif()
endmacro() endmacro()
# With cross-compiling, all targets are built for the target platform which usually different from the host
# platform. This is problematic if a build artifact X (e.g. a file or an executable) is generated by running
# another executable Y previously produced in the build. This is solved by compiling and running Y for/on
# the host platform. Add target to the list:
# add_native_target(<target> ...)
set_property (GLOBAL PROPERTY NATIVE_BUILD_TARGETS)
function (add_native_target)
set_property (GLOBAL APPEND PROPERTY NATIVE_BUILD_TARGETS ${ARGV})
endfunction (add_native_target)
set(ConfigIncludePath ${CMAKE_CURRENT_BINARY_DIR}/includes/configs CACHE INTERNAL "Path to generated configuration files.") set(ConfigIncludePath ${CMAKE_CURRENT_BINARY_DIR}/includes/configs CACHE INTERNAL "Path to generated configuration files.")
include_directories(${ConfigIncludePath}) include_directories(${ConfigIncludePath})
@ -568,3 +578,33 @@ add_subdirectory (tests)
add_subdirectory (utils) add_subdirectory (utils)
include (cmake/sanitize_target_link_libraries.cmake) include (cmake/sanitize_target_link_libraries.cmake)
# Build native targets if necessary
get_property(NATIVE_BUILD_TARGETS GLOBAL PROPERTY NATIVE_BUILD_TARGETS)
if (NATIVE_BUILD_TARGETS
AND NOT(
CMAKE_HOST_SYSTEM_NAME STREQUAL CMAKE_SYSTEM_NAME
AND CMAKE_HOST_SYSTEM_PROCESSOR STREQUAL CMAKE_SYSTEM_PROCESSOR
)
)
message (STATUS "Building native targets...")
set (NATIVE_BUILD_DIR "${CMAKE_BINARY_DIR}/native")
execute_process(
COMMAND ${CMAKE_COMMAND} -E make_directory "${NATIVE_BUILD_DIR}"
COMMAND_ECHO STDOUT)
execute_process(
COMMAND ${CMAKE_COMMAND}
"-DCMAKE_C_COMPILER=${CMAKE_C_COMPILER}"
"-DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}"
${CMAKE_SOURCE_DIR}
WORKING_DIRECTORY "${NATIVE_BUILD_DIR}"
COMMAND_ECHO STDOUT)
execute_process(
COMMAND ${CMAKE_COMMAND} --build "${NATIVE_BUILD_DIR}" --target ${NATIVE_BUILD_TARGETS}
COMMAND_ECHO STDOUT)
endif ()

View File

@ -156,7 +156,7 @@ endif()
add_contrib (sqlite-cmake sqlite-amalgamation) add_contrib (sqlite-cmake sqlite-amalgamation)
add_contrib (s2geometry-cmake s2geometry) add_contrib (s2geometry-cmake s2geometry)
add_contrib(c-ares-cmake c-ares) add_contrib (c-ares-cmake c-ares)
add_contrib (qpl-cmake qpl) add_contrib (qpl-cmake qpl)
# Put all targets defined here and in subdirectories under "contrib/<immediate-subdir>" folders in GUI-based IDEs. # Put all targets defined here and in subdirectories under "contrib/<immediate-subdir>" folders in GUI-based IDEs.

2
contrib/avro vendored

@ -1 +1 @@
Subproject commit e43c46e87fd32eafdc09471e95344555454c5ef8 Subproject commit 7832659ec986075d560f930c288e973c64679552

2
contrib/grpc vendored

@ -1 +1 @@
Subproject commit 5e23e96c0c02e451dbb291cf9f66231d02b6cdb6 Subproject commit 3f975ecab377cd5f739af780566596128f17bb74

View File

@ -0,0 +1,36 @@
---
sidebar_position: 1
sidebar_label: 2022
---
# 2022 Changelog
### ClickHouse release v22.6.4.35-stable FIXME as compared to v22.6.3.35-stable
#### Build/Testing/Packaging Improvement
* Backported in [#38822](https://github.com/ClickHouse/ClickHouse/issues/38822): - Change `all|noarch` packages to architecture-dependent - Fix some documentation for it - Push aarch64|arm64 packages to artifactory and release assets - Fixes [#36443](https://github.com/ClickHouse/ClickHouse/issues/36443). [#38580](https://github.com/ClickHouse/ClickHouse/pull/38580) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
#### Bug Fix (user-visible misbehavior in official stable or prestable release)
* Backported in [#38242](https://github.com/ClickHouse/ClickHouse/issues/38242): Fix possible crash in `Distributed` async insert in case of removing a replica from config. [#38029](https://github.com/ClickHouse/ClickHouse/pull/38029) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
* Backported in [#38865](https://github.com/ClickHouse/ClickHouse/issues/38865): Fix s3 seekable reads with parallel read buffer. (Affected memory usage during query). Closes [#38258](https://github.com/ClickHouse/ClickHouse/issues/38258). [#38802](https://github.com/ClickHouse/ClickHouse/pull/38802) ([Kseniia Sumarokova](https://github.com/kssenii)).
* Backported in [#38853](https://github.com/ClickHouse/ClickHouse/issues/38853): Update `simdjson`. This fixes [#38621](https://github.com/ClickHouse/ClickHouse/issues/38621). [#38838](https://github.com/ClickHouse/ClickHouse/pull/38838) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
* Backported in [#38942](https://github.com/ClickHouse/ClickHouse/issues/38942): - Fix settings profile with seconds unit. [#38896](https://github.com/ClickHouse/ClickHouse/pull/38896) ([Raúl Marín](https://github.com/Algunenano)).
* Backported in [#39063](https://github.com/ClickHouse/ClickHouse/issues/39063): Any allocations inside OvercommitTracker may lead to deadlock. Logging was not very informative so it's easier just to remove logging. Fixes [#37794](https://github.com/ClickHouse/ClickHouse/issues/37794). [#39030](https://github.com/ClickHouse/ClickHouse/pull/39030) ([Dmitry Novik](https://github.com/novikd)).
* Backported in [#39077](https://github.com/ClickHouse/ClickHouse/issues/39077): Fix bug in filesystem cache that could happen in some corner case which coincided with cache capacity hitting the limit. Closes [#39066](https://github.com/ClickHouse/ClickHouse/issues/39066). [#39070](https://github.com/ClickHouse/ClickHouse/pull/39070) ([Kseniia Sumarokova](https://github.com/kssenii)).
* Backported in [#39151](https://github.com/ClickHouse/ClickHouse/issues/39151): Fix error `Block structure mismatch` which could happen for INSERT into table with attached MATERIALIZED VIEW and enabled setting `extremes = 1`. Closes [#29759](https://github.com/ClickHouse/ClickHouse/issues/29759) and [#38729](https://github.com/ClickHouse/ClickHouse/issues/38729). [#39125](https://github.com/ClickHouse/ClickHouse/pull/39125) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
* Backported in [#39275](https://github.com/ClickHouse/ClickHouse/issues/39275): Fixed error `Not found column Type in block` in selects with `PREWHERE` and read-in-order optimizations. [#39157](https://github.com/ClickHouse/ClickHouse/pull/39157) ([Yakov Olkhovskiy](https://github.com/yakov-olkhovskiy)).
* Backported in [#39371](https://github.com/ClickHouse/ClickHouse/issues/39371): Declare RabbitMQ queue without default arguments `x-max-length` and `x-overflow`. [#39259](https://github.com/ClickHouse/ClickHouse/pull/39259) ([rnbondarenko](https://github.com/rnbondarenko)).
* Backported in [#39352](https://github.com/ClickHouse/ClickHouse/issues/39352): Fix incorrect fetch postgresql tables query fro PostgreSQL database engine. Closes [#33502](https://github.com/ClickHouse/ClickHouse/issues/33502). [#39283](https://github.com/ClickHouse/ClickHouse/pull/39283) ([Kseniia Sumarokova](https://github.com/kssenii)).
#### NO CL CATEGORY
* Backported in [#38685](https://github.com/ClickHouse/ClickHouse/issues/38685):. [#38449](https://github.com/ClickHouse/ClickHouse/pull/38449) ([Maksim Kita](https://github.com/kitaisreal)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Use native Map type for OpenTelemetry attributes [#38814](https://github.com/ClickHouse/ClickHouse/pull/38814) ([Ilya Yatsishin](https://github.com/qoega)).
* Retry docker buildx commands with progressive sleep in between [#38898](https://github.com/ClickHouse/ClickHouse/pull/38898) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
* Add docker_server.py running to backport and release CIs [#39011](https://github.com/ClickHouse/ClickHouse/pull/39011) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
* Fix meilisearch tests [#39110](https://github.com/ClickHouse/ClickHouse/pull/39110) ([Kseniia Sumarokova](https://github.com/kssenii)).

View File

@ -119,16 +119,9 @@ On CentOS, RedHat run `sudo yum install cmake ninja-build`.
If you use Arch or Gentoo, you probably know it yourself how to install CMake. If you use Arch or Gentoo, you probably know it yourself how to install CMake.
For installing CMake and Ninja on Mac OS X first install Homebrew and then install everything else via brew:
/usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"
brew install cmake ninja
Next, check the version of CMake: `cmake --version`. If it is below 3.12, you should install a newer version from the website: https://cmake.org/download/.
## C++ Compiler {#c-compiler} ## C++ Compiler {#c-compiler}
Compilers Clang starting from version 11 is supported for building ClickHouse. Compilers Clang starting from version 12 is supported for building ClickHouse.
Clang should be used instead of gcc. Though, our continuous integration (CI) platform runs checks for about a dozen of build combinations. Clang should be used instead of gcc. Though, our continuous integration (CI) platform runs checks for about a dozen of build combinations.
@ -138,9 +131,6 @@ On Ubuntu/Debian you can use the automatic installation script (check [official
sudo bash -c "$(wget -O - https://apt.llvm.org/llvm.sh)" sudo bash -c "$(wget -O - https://apt.llvm.org/llvm.sh)"
``` ```
Mac OS X build is also supported. Just run `brew install llvm`
## The Building Process {#the-building-process} ## The Building Process {#the-building-process}
Now that you are ready to build ClickHouse we recommend you to create a separate directory `build` inside `ClickHouse` that will contain all of the build artefacts: Now that you are ready to build ClickHouse we recommend you to create a separate directory `build` inside `ClickHouse` that will contain all of the build artefacts:

View File

@ -692,9 +692,7 @@ auto s = std::string{"Hello"};
**1.** Virtual inheritance is not used. **1.** Virtual inheritance is not used.
**2.** Exception specifiers from C++03 are not used. **2.** Constructs which have convenient syntactic sugar in modern C++, e.g.
**3.** Constructs which have convenient syntactic sugar in modern C++, e.g.
``` ```
// Traditional way without syntactic sugar // Traditional way without syntactic sugar
@ -745,7 +743,7 @@ But other things being equal, cross-platform or portable code is preferred.
**2.** Language: C++20 (see the list of available [C++20 features](https://en.cppreference.com/w/cpp/compiler_support#C.2B.2B20_features)). **2.** Language: C++20 (see the list of available [C++20 features](https://en.cppreference.com/w/cpp/compiler_support#C.2B.2B20_features)).
**3.** Compiler: `clang`. At this time (April 2021), the code is compiled using clang version 11. (It can also be compiled using `gcc` version 10, but it's untested and not suitable for production usage). **3.** Compiler: `clang`. At the time of writing (July 2022), the code is compiled using clang version >= 12. (It can also be compiled using `gcc`, but it's untested and not suitable for production usage).
The standard library is used (`libc++`). The standard library is used (`libc++`).
@ -755,7 +753,7 @@ The standard library is used (`libc++`).
The CPU instruction set is the minimum supported set among our servers. Currently, it is SSE 4.2. The CPU instruction set is the minimum supported set among our servers. Currently, it is SSE 4.2.
**6.** Use `-Wall -Wextra -Werror` compilation flags. Also `-Weverything` is used with few exceptions. **6.** Use `-Wall -Wextra -Werror -Weverything` compilation flags with a few exception.
**7.** Use static linking with all libraries except those that are difficult to connect to statically (see the output of the `ldd` command). **7.** Use static linking with all libraries except those that are difficult to connect to statically (see the output of the `ldd` command).

View File

@ -81,11 +81,11 @@ $ ./src/unit_tests_dbms --gtest_filter=LocalAddress*
## Performance Tests {#performance-tests} ## Performance Tests {#performance-tests}
Performance tests allow to measure and compare performance of some isolated part of ClickHouse on synthetic queries. Tests are located at `tests/performance`. Each test is represented by `.xml` file with description of test case. Tests are run with `docker/test/performance-comparison` tool . See the readme file for invocation. Performance tests allow to measure and compare performance of some isolated part of ClickHouse on synthetic queries. Performance tests are located at `tests/performance/`. Each test is represented by an `.xml` file with a description of the test case. Tests are run with `docker/test/performance-comparison` tool . See the readme file for invocation.
Each test run one or multiple queries (possibly with combinations of parameters) in a loop. Each test run one or multiple queries (possibly with combinations of parameters) in a loop.
If you want to improve performance of ClickHouse in some scenario, and if improvements can be observed on simple queries, it is highly recommended to write a performance test. It always makes sense to use `perf top` or other `perf` tools during your tests. If you want to improve performance of ClickHouse in some scenario, and if improvements can be observed on simple queries, it is highly recommended to write a performance test. Also, it is recommended to write performance tests when you add or modify SQL functions which are relatively isolated and not too obscure. It always makes sense to use `perf top` or other `perf` tools during your tests.
## Test Tools and Scripts {#test-tools-and-scripts} ## Test Tools and Scripts {#test-tools-and-scripts}

View File

@ -482,9 +482,9 @@ For example:
## Projections {#projections} ## Projections {#projections}
Projections are like [materialized views](../../../sql-reference/statements/create/view.md#materialized) but defined in part-level. It provides consistency guarantees along with automatic usage in queries. Projections are like [materialized views](../../../sql-reference/statements/create/view.md#materialized) but defined in part-level. It provides consistency guarantees along with automatic usage in queries.
::: note
Projections are an experimental feature. To enable them you must set the [allow_experimental_projection_optimization](../../../operations/settings/settings.md#allow-experimental-projection-optimization) to `1`. See also the [force_optimize_projection](../../../operations/settings/settings.md#force-optimize-projection) setting. When you are implementing projections you should also consider the [force_optimize_projection](../../../operations/settings/settings.md#force-optimize-projection) setting.
:::
Projections are not supported in the `SELECT` statements with the [FINAL](../../../sql-reference/statements/select/from.md#select-from-final) modifier. Projections are not supported in the `SELECT` statements with the [FINAL](../../../sql-reference/statements/select/from.md#select-from-final) modifier.
### Projection Query {#projection-query} ### Projection Query {#projection-query}

View File

@ -67,7 +67,7 @@ Features:
### Grafana {#grafana} ### Grafana {#grafana}
[Grafana](https://grafana.com/grafana/plugins/vertamedia-clickhouse-datasource) is a platform for monitoring and visualization. [Grafana](https://grafana.com/grafana/plugins/grafana-clickhouse-datasource/) is a platform for monitoring and visualization.
"Grafana allows you to query, visualize, alert on and understand your metrics no matter where they are stored. Create, explore, and share dashboards with your team and foster a data driven culture. Trusted and loved by the community" &mdash; grafana.com. "Grafana allows you to query, visualize, alert on and understand your metrics no matter where they are stored. Create, explore, and share dashboards with your team and foster a data driven culture. Trusted and loved by the community" &mdash; grafana.com.

View File

@ -302,18 +302,34 @@ Default value: `ALL`.
Specifies [JOIN](../../sql-reference/statements/select/join.md) algorithm. Specifies [JOIN](../../sql-reference/statements/select/join.md) algorithm.
Several algorithms can be specified, and an available one would be chosen for a particular query based on kind/strictness and table engine.
Possible values: Possible values:
- `hash` — [Hash join algorithm](https://en.wikipedia.org/wiki/Hash_join) is used. - `default``hash` or `direct`, if possible (same as `direct,hash`)
- `partial_merge` — [Sort-merge algorithm](https://en.wikipedia.org/wiki/Sort-merge_join) is used.
- `prefer_partial_merge` — ClickHouse always tries to use `merge` join if possible.
- `auto` — ClickHouse tries to change `hash` join to `merge` join on the fly to avoid out of memory.
Default value: `hash`. - `hash` — [Hash join algorithm](https://en.wikipedia.org/wiki/Hash_join) is used. The most generic implementation that supports all combinations of kind and strictness and multiple join keys that are combined with `OR` in the `JOIN ON` section.
When using `hash` algorithm the right part of `JOIN` is uploaded into RAM. - `parallel_hash` - a variation of `hash` join that splits the data into buckets and builds several hashtables instead of one concurrently to speed up this process.
When using the `hash` algorithm, the right part of `JOIN` is uploaded into RAM.
- `partial_merge` — a variation of the [sort-merge algorithm](https://en.wikipedia.org/wiki/Sort-merge_join), where only the right table is fully sorted.
The `RIGHT JOIN` and `FULL JOIN` are supported only with `ALL` strictness (`SEMI`, `ANTI`, `ANY`, and `ASOF` are not supported).
When using `partial_merge` algorithm, ClickHouse sorts the data and dumps it to the disk. The `partial_merge` algorithm in ClickHouse differs slightly from the classic realization. First, ClickHouse sorts the right table by joining keys in blocks and creates a min-max index for sorted blocks. Then it sorts parts of the left table by `join key` and joins them over the right table. The min-max index is also used to skip unneeded right table blocks.
- `direct` - can be applied when the right storage supports key-value requests.
The `direct` algorithm performs a lookup in the right table using rows from the left table as keys. It's supported only by special storage such as [Dictionary](../../engines/table-engines/special/dictionary.md#dictionary) or [EmbeddedRocksDB](../../engines/table-engines/integrations/embedded-rocksdb.md) and only the `LEFT` and `INNER` JOINs.
- `auto` — try `hash` join and switch on the fly to another algorithm if the memory limit is violated.
- `full_sorting_merge` — [Sort-merge algorithm](https://en.wikipedia.org/wiki/Sort-merge_join) with full sorting joined tables before joining.
- `prefer_partial_merge` — ClickHouse always tries to use `partial_merge` join if possible, otherwise, it uses `hash`. *Deprecated*, same as `partial_merge,hash`.
When using `partial_merge` algorithm ClickHouse sorts the data and dumps it to the disk. The `merge` algorithm in ClickHouse differs a bit from the classic realization. First ClickHouse sorts the right table by [join key](../../sql-reference/statements/select/join.md#select-join) in blocks and creates min-max index for sorted blocks. Then it sorts parts of left table by `join key` and joins them over right table. The min-max index is also used to skip unneeded right table blocks.
## join_any_take_last_row {#settings-join_any_take_last_row} ## join_any_take_last_row {#settings-join_any_take_last_row}

View File

@ -656,8 +656,9 @@ ClickHouse может парсить только базовый формат `Y
Изменяет поведение операций, выполняемых со строгостью `ANY`. Изменяет поведение операций, выполняемых со строгостью `ANY`.
:::danger "Внимание" :::warning "Внимание"
Настройка применяется только для операций `JOIN`, выполняемых над таблицами с движком [Join](../../engines/table-engines/special/join.md). Настройка применяется только для операций `JOIN`, выполняемых над таблицами с движком [Join](../../engines/table-engines/special/join.md).
:::
Возможные значения: Возможные значения:
@ -2112,8 +2113,9 @@ SELECT * FROM test_table
Устанавливает приоритет ([nice](https://en.wikipedia.org/wiki/Nice_(Unix))) для потоков, исполняющих запросы. Планировщик ОС учитывает эти приоритеты при выборе следующего потока для исполнения на доступном ядре CPU. Устанавливает приоритет ([nice](https://en.wikipedia.org/wiki/Nice_(Unix))) для потоков, исполняющих запросы. Планировщик ОС учитывает эти приоритеты при выборе следующего потока для исполнения на доступном ядре CPU.
:::danger "Предупреждение" :::warning "Предупреждение"
Для использования этой настройки необходимо установить свойство `CAP_SYS_NICE`. Пакет `clickhouse-server` устанавливает его во время инсталляции. Некоторые виртуальные окружения не позволяют установить `CAP_SYS_NICE`. В этом случае, `clickhouse-server` выводит сообщение при запуске. Для использования этой настройки необходимо установить свойство `CAP_SYS_NICE`. Пакет `clickhouse-server` устанавливает его во время инсталляции. Некоторые виртуальные окружения не позволяют установить `CAP_SYS_NICE`. В этом случае, `clickhouse-server` выводит сообщение при запуске.
:::
Допустимые значения: Допустимые значения:

View File

@ -5,7 +5,7 @@ sidebar_label: AggregateFunction
# AggregateFunction {#data-type-aggregatefunction} # AggregateFunction {#data-type-aggregatefunction}
Агрегатные функции могут обладать определяемым реализацией промежуточным состоянием, которое может быть сериализовано в тип данных, соответствующий AggregateFunction(…), и быть записано в таблицу обычно посредством [материализованного представления] (../../sql-reference/statements/create.md#create-view). Чтобы получить промежуточное состояние, обычно используются агрегатные функции с суффиксом `-State`. Чтобы в дальнейшем получить агрегированные данные необходимо использовать те же агрегатные функции с суффиксом `-Merge`. Агрегатные функции могут обладать определяемым реализацией промежуточным состоянием, которое может быть сериализовано в тип данных, соответствующий AggregateFunction(…), и быть записано в таблицу обычно посредством [материализованного представления] (../../sql-reference/statements/create/view.md). Чтобы получить промежуточное состояние, обычно используются агрегатные функции с суффиксом `-State`. Чтобы в дальнейшем получить агрегированные данные необходимо использовать те же агрегатные функции с суффиксом `-Merge`.
`AggregateFunction(name, types_of_arguments…)` — параметрический тип данных. `AggregateFunction(name, types_of_arguments…)` — параметрический тип данных.
@ -63,5 +63,4 @@ SELECT uniqMerge(state) FROM (SELECT uniqState(UserID) AS state FROM table GROUP
## Пример использования {#primer-ispolzovaniia} ## Пример использования {#primer-ispolzovaniia}
Смотрите в описании движка [AggregatingMergeTree](../../sql-reference/data-types/aggregatefunction.md). Смотрите в описании движка [AggregatingMergeTree](../../engines/table-engines/mergetree-family/aggregatingmergetree.md).

View File

@ -18,7 +18,7 @@ option (ENABLE_CLICKHOUSE_SERVER "Server mode (main mode)" ${ENABLE_CLICKHOUSE_A
option (ENABLE_CLICKHOUSE_CLIENT "Client mode (interactive tui/shell that connects to the server)" option (ENABLE_CLICKHOUSE_CLIENT "Client mode (interactive tui/shell that connects to the server)"
${ENABLE_CLICKHOUSE_ALL}) ${ENABLE_CLICKHOUSE_ALL})
if (CLICKHOUSE_SPLIT_BINARY OR NOT ENABLE_UTILS) if (CLICKHOUSE_SPLIT_BINARY)
option (ENABLE_CLICKHOUSE_SELF_EXTRACTING "Self-extracting executable" OFF) option (ENABLE_CLICKHOUSE_SELF_EXTRACTING "Self-extracting executable" OFF)
else () else ()
option (ENABLE_CLICKHOUSE_SELF_EXTRACTING "Self-extracting executable" ON) option (ENABLE_CLICKHOUSE_SELF_EXTRACTING "Self-extracting executable" ON)
@ -434,6 +434,9 @@ else ()
endif () endif ()
set (CLICKHOUSE_BUNDLE) set (CLICKHOUSE_BUNDLE)
if (ENABLE_CLICKHOUSE_SELF_EXTRACTING)
list(APPEND CLICKHOUSE_BUNDLE self-extracting)
endif ()
if (ENABLE_CLICKHOUSE_SERVER) if (ENABLE_CLICKHOUSE_SERVER)
add_custom_target (clickhouse-server ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-server DEPENDS clickhouse) add_custom_target (clickhouse-server ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-server DEPENDS clickhouse)
install (FILES "${CMAKE_CURRENT_BINARY_DIR}/clickhouse-server" DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse) install (FILES "${CMAKE_CURRENT_BINARY_DIR}/clickhouse-server" DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)

View File

@ -34,6 +34,10 @@
#include <base/bit_cast.h> #include <base/bit_cast.h>
#include <IO/ReadBufferFromFileDescriptor.h> #include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromFileDescriptor.h> #include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <memory> #include <memory>
#include <cmath> #include <cmath>
#include <unistd.h> #include <unistd.h>
@ -95,6 +99,9 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED; extern const int NOT_IMPLEMENTED;
extern const int CANNOT_SEEK_THROUGH_FILE; extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int UNKNOWN_FORMAT_VERSION;
extern const int INCORRECT_NUMBER_OF_COLUMNS;
extern const int TYPE_MISMATCH;
} }
@ -115,6 +122,12 @@ public:
/// Deterministically change seed to some other value. This can be used to generate more values than were in source. /// Deterministically change seed to some other value. This can be used to generate more values than were in source.
virtual void updateSeed() = 0; virtual void updateSeed() = 0;
/// Save into file. Binary, platform-dependent, version-dependent serialization.
virtual void serialize(WriteBuffer & out) const = 0;
/// Read from file
virtual void deserialize(ReadBuffer & in) = 0;
virtual ~IModel() = default; virtual ~IModel() = default;
}; };
@ -189,6 +202,8 @@ public:
void train(const IColumn &) override {} void train(const IColumn &) override {}
void finalize() override {} void finalize() override {}
void serialize(WriteBuffer &) const override {}
void deserialize(ReadBuffer &) override {}
ColumnPtr generate(const IColumn & column) override ColumnPtr generate(const IColumn & column) override
{ {
@ -230,6 +245,8 @@ public:
void train(const IColumn &) override {} void train(const IColumn &) override {}
void finalize() override {} void finalize() override {}
void serialize(WriteBuffer &) const override {}
void deserialize(ReadBuffer &) override {}
ColumnPtr generate(const IColumn & column) override ColumnPtr generate(const IColumn & column) override
{ {
@ -279,6 +296,8 @@ public:
void train(const IColumn &) override {} void train(const IColumn &) override {}
void finalize() override {} void finalize() override {}
void serialize(WriteBuffer &) const override {}
void deserialize(ReadBuffer &) override {}
ColumnPtr generate(const IColumn & column) override ColumnPtr generate(const IColumn & column) override
{ {
@ -311,6 +330,8 @@ class IdentityModel : public IModel
public: public:
void train(const IColumn &) override {} void train(const IColumn &) override {}
void finalize() override {} void finalize() override {}
void serialize(WriteBuffer &) const override {}
void deserialize(ReadBuffer &) override {}
ColumnPtr generate(const IColumn & column) override ColumnPtr generate(const IColumn & column) override
{ {
@ -395,6 +416,8 @@ public:
void train(const IColumn &) override {} void train(const IColumn &) override {}
void finalize() override {} void finalize() override {}
void serialize(WriteBuffer &) const override {}
void deserialize(ReadBuffer &) override {}
ColumnPtr generate(const IColumn & column) override ColumnPtr generate(const IColumn & column) override
{ {
@ -431,6 +454,8 @@ public:
void train(const IColumn &) override {} void train(const IColumn &) override {}
void finalize() override {} void finalize() override {}
void serialize(WriteBuffer &) const override {}
void deserialize(ReadBuffer &) override {}
ColumnPtr generate(const IColumn & column) override ColumnPtr generate(const IColumn & column) override
{ {
@ -469,6 +494,8 @@ public:
void train(const IColumn &) override {} void train(const IColumn &) override {}
void finalize() override {} void finalize() override {}
void serialize(WriteBuffer &) const override {}
void deserialize(ReadBuffer &) override {}
ColumnPtr generate(const IColumn & column) override ColumnPtr generate(const IColumn & column) override
{ {
@ -512,6 +539,26 @@ struct MarkovModelParameters
size_t frequency_add; size_t frequency_add;
double frequency_desaturate; double frequency_desaturate;
size_t determinator_sliding_window_size; size_t determinator_sliding_window_size;
void serialize(WriteBuffer & out) const
{
writeBinary(order, out);
writeBinary(frequency_cutoff, out);
writeBinary(num_buckets_cutoff, out);
writeBinary(frequency_add, out);
writeBinary(frequency_desaturate, out);
writeBinary(determinator_sliding_window_size, out);
}
void deserialize(ReadBuffer & in)
{
readBinary(order, in);
readBinary(frequency_cutoff, in);
readBinary(num_buckets_cutoff, in);
readBinary(frequency_add, in);
readBinary(frequency_desaturate, in);
readBinary(determinator_sliding_window_size, in);
}
}; };
@ -565,6 +612,39 @@ private:
return END; return END;
} }
void serialize(WriteBuffer & out) const
{
writeBinary(total, out);
writeBinary(count_end, out);
size_t size = buckets.size();
writeBinary(size, out);
for (const auto & elem : buckets)
{
writeBinary(elem.first, out);
writeBinary(elem.second, out);
}
}
void deserialize(ReadBuffer & in)
{
readBinary(total, in);
readBinary(count_end, in);
size_t size = 0;
readBinary(size, in);
buckets.reserve(size);
for (size_t i = 0; i < size; ++i)
{
Buckets::value_type elem;
readBinary(elem.first, in);
readBinary(elem.second, in);
buckets.emplace(std::move(elem));
}
}
}; };
using Table = HashMap<NGramHash, Histogram, TrivialHash>; using Table = HashMap<NGramHash, Histogram, TrivialHash>;
@ -621,6 +701,37 @@ public:
explicit MarkovModel(MarkovModelParameters params_) explicit MarkovModel(MarkovModelParameters params_)
: params(std::move(params_)), code_points(params.order, BEGIN) {} : params(std::move(params_)), code_points(params.order, BEGIN) {}
void serialize(WriteBuffer & out) const
{
params.serialize(out);
size_t size = table.size();
writeBinary(size, out);
for (const auto & elem : table)
{
writeBinary(elem.getKey(), out);
elem.getMapped().serialize(out);
}
}
void deserialize(ReadBuffer & in)
{
params.deserialize(in);
size_t size = 0;
readBinary(size, in);
table.reserve(size);
for (size_t i = 0; i < size; ++i)
{
NGramHash key{};
readBinary(key, in);
Histogram & histogram = table[key];
histogram.deserialize(in);
}
}
void consume(const char * data, size_t size) void consume(const char * data, size_t size)
{ {
/// First 'order' number of code points are pre-filled with BEGIN. /// First 'order' number of code points are pre-filled with BEGIN.
@ -655,7 +766,6 @@ public:
} }
} }
void finalize() void finalize()
{ {
if (params.num_buckets_cutoff) if (params.num_buckets_cutoff)
@ -878,6 +988,16 @@ public:
{ {
seed = hash(seed); seed = hash(seed);
} }
void serialize(WriteBuffer & out) const override
{
markov_model.serialize(out);
}
void deserialize(ReadBuffer & in) override
{
markov_model.deserialize(in);
}
}; };
@ -916,6 +1036,16 @@ public:
{ {
nested_model->updateSeed(); nested_model->updateSeed();
} }
void serialize(WriteBuffer & out) const override
{
nested_model->serialize(out);
}
void deserialize(ReadBuffer & in) override
{
nested_model->deserialize(in);
}
}; };
@ -954,6 +1084,16 @@ public:
{ {
nested_model->updateSeed(); nested_model->updateSeed();
} }
void serialize(WriteBuffer & out) const override
{
nested_model->serialize(out);
}
void deserialize(ReadBuffer & in) override
{
nested_model->deserialize(in);
}
}; };
@ -1046,6 +1186,18 @@ public:
for (auto & model : models) for (auto & model : models)
model->updateSeed(); model->updateSeed();
} }
void serialize(WriteBuffer & out) const
{
for (const auto & model : models)
model->serialize(out);
}
void deserialize(ReadBuffer & in)
{
for (auto & model : models)
model->deserialize(in);
}
}; };
} }
@ -1068,8 +1220,10 @@ try
("input-format", po::value<std::string>(), "input format of the initial table data") ("input-format", po::value<std::string>(), "input format of the initial table data")
("output-format", po::value<std::string>(), "default output format") ("output-format", po::value<std::string>(), "default output format")
("seed", po::value<std::string>(), "seed (arbitrary string), must be random string with at least 10 bytes length; note that a seed for each column is derived from this seed and a column name: you can obfuscate data for different tables and as long as you use identical seed and identical column names, the data for corresponding non-text columns for different tables will be transformed in the same way, so the data for different tables can be JOINed after obfuscation") ("seed", po::value<std::string>(), "seed (arbitrary string), must be random string with at least 10 bytes length; note that a seed for each column is derived from this seed and a column name: you can obfuscate data for different tables and as long as you use identical seed and identical column names, the data for corresponding non-text columns for different tables will be transformed in the same way, so the data for different tables can be JOINed after obfuscation")
("limit", po::value<UInt64>(), "if specified - stop after generating that number of rows") ("limit", po::value<UInt64>(), "if specified - stop after generating that number of rows; the limit can be also greater than the number of source dataset - in this case it will process the dataset in a loop more than one time, using different seeds on every iteration, generating result as large as needed")
("silent", po::value<bool>()->default_value(false), "don't print information messages to stderr") ("silent", po::value<bool>()->default_value(false), "don't print information messages to stderr")
("save", po::value<std::string>(), "save the models after training to the specified file. You can use --limit 0 to skip the generation step. The file is using binary, platform-dependent, opaque serialization format. The model parameters are saved, while the seed is not.")
("load", po::value<std::string>(), "load the models instead of training from the specified file. The table structure must match the saved file. The seed should be specified separately, while other model parameters are loaded.")
("order", po::value<UInt64>()->default_value(5), "order of markov model to generate strings") ("order", po::value<UInt64>()->default_value(5), "order of markov model to generate strings")
("frequency-cutoff", po::value<UInt64>()->default_value(5), "frequency cutoff for markov model: remove all buckets with count less than specified") ("frequency-cutoff", po::value<UInt64>()->default_value(5), "frequency cutoff for markov model: remove all buckets with count less than specified")
("num-buckets-cutoff", po::value<UInt64>()->default_value(0), "cutoff for number of different possible continuations for a context: remove all histograms with less than specified number of buckets") ("num-buckets-cutoff", po::value<UInt64>()->default_value(0), "cutoff for number of different possible continuations for a context: remove all histograms with less than specified number of buckets")
@ -1096,12 +1250,26 @@ try
return 0; return 0;
} }
if (options.count("save") && options.count("load"))
{
std::cerr << "The options --save and --load cannot be used together.\n";
return 1;
}
UInt64 seed = sipHash64(options["seed"].as<std::string>()); UInt64 seed = sipHash64(options["seed"].as<std::string>());
std::string structure = options["structure"].as<std::string>(); std::string structure = options["structure"].as<std::string>();
std::string input_format = options["input-format"].as<std::string>(); std::string input_format = options["input-format"].as<std::string>();
std::string output_format = options["output-format"].as<std::string>(); std::string output_format = options["output-format"].as<std::string>();
std::string load_from_file;
std::string save_into_file;
if (options.count("load"))
load_from_file = options["load"].as<std::string>();
else if (options.count("save"))
save_into_file = options["save"].as<std::string>();
UInt64 limit = 0; UInt64 limit = 0;
if (options.count("limit")) if (options.count("limit"))
limit = options["limit"].as<UInt64>(); limit = options["limit"].as<UInt64>();
@ -1117,7 +1285,7 @@ try
markov_model_params.frequency_desaturate = options["frequency-desaturate"].as<double>(); markov_model_params.frequency_desaturate = options["frequency-desaturate"].as<double>();
markov_model_params.determinator_sliding_window_size = options["determinator-sliding-window-size"].as<UInt64>(); markov_model_params.determinator_sliding_window_size = options["determinator-sliding-window-size"].as<UInt64>();
// Create header block /// Create the header block
std::vector<std::string> structure_vals; std::vector<std::string> structure_vals;
boost::split(structure_vals, structure, boost::algorithm::is_any_of(" ,"), boost::algorithm::token_compress_on); boost::split(structure_vals, structure, boost::algorithm::is_any_of(" ,"), boost::algorithm::token_compress_on);
@ -1143,6 +1311,7 @@ try
ReadBufferFromFileDescriptor file_in(STDIN_FILENO); ReadBufferFromFileDescriptor file_in(STDIN_FILENO);
WriteBufferFromFileDescriptor file_out(STDOUT_FILENO); WriteBufferFromFileDescriptor file_out(STDOUT_FILENO);
if (load_from_file.empty())
{ {
/// stdin must be seekable /// stdin must be seekable
auto res = lseek(file_in.getFD(), 0, SEEK_SET); auto res = lseek(file_in.getFD(), 0, SEEK_SET);
@ -1156,6 +1325,9 @@ try
/// Train step /// Train step
UInt64 source_rows = 0; UInt64 source_rows = 0;
bool rewind_needed = false;
if (load_from_file.empty())
{ {
if (!silent) if (!silent)
std::cerr << "Training models\n"; std::cerr << "Training models\n";
@ -1173,11 +1345,71 @@ try
if (!silent) if (!silent)
std::cerr << "Processed " << source_rows << " rows\n"; std::cerr << "Processed " << source_rows << " rows\n";
} }
obfuscator.finalize();
rewind_needed = true;
}
else
{
if (!silent)
std::cerr << "Loading models\n";
ReadBufferFromFile model_file_in(load_from_file);
CompressedReadBuffer model_in(model_file_in);
UInt8 version = 0;
readBinary(version, model_in);
if (version != 0)
throw Exception("Unknown version of the model file", ErrorCodes::UNKNOWN_FORMAT_VERSION);
readBinary(source_rows, model_in);
Names data_types = header.getDataTypeNames();
size_t header_size = 0;
readBinary(header_size, model_in);
if (header_size != data_types.size())
throw Exception("The saved model was created for different number of columns", ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS);
for (size_t i = 0; i < header_size; ++i)
{
String type;
readBinary(type, model_in);
if (type != data_types[i])
throw Exception("The saved model was created for different types of columns", ErrorCodes::TYPE_MISMATCH);
}
obfuscator.deserialize(model_in);
} }
obfuscator.finalize(); if (!save_into_file.empty())
{
if (!silent)
std::cerr << "Saving models\n";
if (!limit) WriteBufferFromFile model_file_out(save_into_file);
CompressedWriteBuffer model_out(model_file_out, CompressionCodecFactory::instance().get("ZSTD", 1));
/// You can change version on format change, it is currently set to zero.
UInt8 version = 0;
writeBinary(version, model_out);
writeBinary(source_rows, model_out);
/// We are writing the data types for validation, because the models serialization depends on the data types.
Names data_types = header.getDataTypeNames();
size_t header_size = data_types.size();
writeBinary(header_size, model_out);
for (const auto & type : data_types)
writeBinary(type, model_out);
/// Write the models.
obfuscator.serialize(model_out);
model_out.finalize();
model_file_out.finalize();
}
if (!options.count("limit"))
limit = source_rows; limit = source_rows;
/// Generation step /// Generation step
@ -1187,7 +1419,8 @@ try
if (!silent) if (!silent)
std::cerr << "Generating data\n"; std::cerr << "Generating data\n";
file_in.seek(0, SEEK_SET); if (rewind_needed)
file_in.rewind();
Pipe pipe(context->getInputFormat(input_format, file_in, header, max_block_size)); Pipe pipe(context->getInputFormat(input_format, file_in, header, max_block_size));
@ -1220,6 +1453,7 @@ try
out_executor.finish(); out_executor.finish();
obfuscator.updateSeed(); obfuscator.updateSeed();
rewind_needed = true;
} }
return 0; return 0;

View File

@ -1,6 +1,18 @@
if (NOT(
CMAKE_HOST_SYSTEM_NAME STREQUAL CMAKE_SYSTEM_NAME
AND CMAKE_HOST_SYSTEM_PROCESSOR STREQUAL CMAKE_SYSTEM_PROCESSOR
)
)
set (COMPRESSOR "${CMAKE_BINARY_DIR}/native/utils/self-extracting-executable/pre_compressor")
set (DECOMPRESSOR "--decompressor=${CMAKE_BINARY_DIR}/utils/self-extracting-executable/decompressor")
else ()
set (COMPRESSOR "${CMAKE_BINARY_DIR}/utils/self-extracting-executable/compressor")
endif ()
add_custom_target (self-extracting ALL add_custom_target (self-extracting ALL
${CMAKE_COMMAND} -E remove clickhouse ${CMAKE_COMMAND} -E remove clickhouse
COMMAND ${CMAKE_BINARY_DIR}/utils/self-extracting-executable/compressor clickhouse ../clickhouse COMMAND ${COMPRESSOR} ${DECOMPRESSOR} clickhouse ../clickhouse
DEPENDS clickhouse compressor DEPENDS clickhouse compressor
) )

View File

@ -329,9 +329,9 @@ void QueryFuzzer::fuzzWindowFrame(ASTWindowDefinition & def)
case 0: case 0:
{ {
const auto r = fuzz_rand() % 3; const auto r = fuzz_rand() % 3;
def.frame_type = r == 0 ? WindowFrame::FrameType::Rows def.frame_type = r == 0 ? WindowFrame::FrameType::ROWS
: r == 1 ? WindowFrame::FrameType::Range : r == 1 ? WindowFrame::FrameType::RANGE
: WindowFrame::FrameType::Groups; : WindowFrame::FrameType::GROUPS;
break; break;
} }
case 1: case 1:
@ -385,7 +385,7 @@ void QueryFuzzer::fuzzWindowFrame(ASTWindowDefinition & def)
break; break;
} }
if (def.frame_type == WindowFrame::FrameType::Range if (def.frame_type == WindowFrame::FrameType::RANGE
&& def.frame_begin_type == WindowFrame::BoundaryType::Unbounded && def.frame_begin_type == WindowFrame::BoundaryType::Unbounded
&& def.frame_begin_preceding && def.frame_begin_preceding
&& def.frame_end_type == WindowFrame::BoundaryType::Current) && def.frame_end_type == WindowFrame::BoundaryType::Current)

View File

@ -359,7 +359,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, max_bytes_in_join, 0, "Maximum size of the hash table for JOIN (in number of bytes in memory).", 0) \ M(UInt64, max_bytes_in_join, 0, "Maximum size of the hash table for JOIN (in number of bytes in memory).", 0) \
M(OverflowMode, join_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \ M(OverflowMode, join_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \
M(Bool, join_any_take_last_row, false, "When disabled (default) ANY JOIN will take the first found row for a key. When enabled, it will take the last row seen if there are multiple rows for the same key.", IMPORTANT) \ M(Bool, join_any_take_last_row, false, "When disabled (default) ANY JOIN will take the first found row for a key. When enabled, it will take the last row seen if there are multiple rows for the same key.", IMPORTANT) \
M(JoinAlgorithm, join_algorithm, JoinAlgorithm::HASH, "Specify join algorithm: 'auto', 'hash', 'partial_merge', 'prefer_partial_merge', 'parallel_hash'. 'auto' tries to change HashJoin to MergeJoin on the fly to avoid out of memory.", 0) \ M(JoinAlgorithm, join_algorithm, JoinAlgorithm::DEFAULT, "Specify join algorithm.", 0) \
M(UInt64, default_max_bytes_in_join, 1000000000, "Maximum size of right-side table if limit is required but max_bytes_in_join is not set.", 0) \ M(UInt64, default_max_bytes_in_join, 1000000000, "Maximum size of right-side table if limit is required but max_bytes_in_join is not set.", 0) \
M(UInt64, partial_merge_join_left_table_buffer_bytes, 0, "If not 0 group left table blocks in bigger ones for left-side table in partial merge join. It uses up to 2x of specified memory per joining thread.", 0) \ M(UInt64, partial_merge_join_left_table_buffer_bytes, 0, "If not 0 group left table blocks in bigger ones for left-side table in partial merge join. It uses up to 2x of specified memory per joining thread.", 0) \
M(UInt64, partial_merge_join_rows_in_right_blocks, 65536, "Split right-hand joining data in blocks of specified size. It's a portion of data indexed by min-max values and possibly unloaded on disk.", 0) \ M(UInt64, partial_merge_join_rows_in_right_blocks, 65536, "Split right-hand joining data in blocks of specified size. It's a portion of data indexed by min-max values and possibly unloaded on disk.", 0) \
@ -696,9 +696,11 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, input_format_max_rows_to_read_for_schema_inference, 25000, "The maximum rows of data to read for automatic schema inference", 0) \ M(UInt64, input_format_max_rows_to_read_for_schema_inference, 25000, "The maximum rows of data to read for automatic schema inference", 0) \
M(Bool, input_format_csv_use_best_effort_in_schema_inference, true, "Use some tweaks and heuristics to infer schema in CSV format", 0) \ M(Bool, input_format_csv_use_best_effort_in_schema_inference, true, "Use some tweaks and heuristics to infer schema in CSV format", 0) \
M(Bool, input_format_tsv_use_best_effort_in_schema_inference, true, "Use some tweaks and heuristics to infer schema in TSV format", 0) \ M(Bool, input_format_tsv_use_best_effort_in_schema_inference, true, "Use some tweaks and heuristics to infer schema in TSV format", 0) \
M(Bool, input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference, false, "Allow to skip columns with unsupported types while schema inference for format Parquet", 0) \ M(Bool, input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format Parquet", 0) \
M(Bool, input_format_orc_skip_columns_with_unsupported_types_in_schema_inference, false, "Allow to skip columns with unsupported types while schema inference for format ORC", 0) \ M(Bool, input_format_protobuf_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip fields with unsupported types while schema inference for format Protobuf", 0) \
M(Bool, input_format_arrow_skip_columns_with_unsupported_types_in_schema_inference, false, "Allow to skip columns with unsupported types while schema inference for format Arrow", 0) \ M(Bool, input_format_capn_proto_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format CapnProto", 0) \
M(Bool, input_format_orc_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format ORC", 0) \
M(Bool, input_format_arrow_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format Arrow", 0) \
M(String, column_names_for_schema_inference, "", "The list of column names to use in schema inference for formats without column names. The format: 'column1,column2,column3,...'", 0) \ M(String, column_names_for_schema_inference, "", "The list of column names to use in schema inference for formats without column names. The format: 'column1,column2,column3,...'", 0) \
M(Bool, input_format_json_read_bools_as_numbers, true, "Allow to parse bools as numbers in JSON input formats", 0) \ M(Bool, input_format_json_read_bools_as_numbers, true, "Allow to parse bools as numbers in JSON input formats", 0) \
M(Bool, input_format_protobuf_flatten_google_wrappers, false, "Enable Google wrappers for regular non-nested columns, e.g. google.protobuf.StringValue 'str' for String column 'str'. For Nullable columns empty wrappers are recognized as defaults, and missing as nulls", 0) \ M(Bool, input_format_protobuf_flatten_google_wrappers, false, "Enable Google wrappers for regular non-nested columns, e.g. google.protobuf.StringValue 'str' for String column 'str'. For Nullable columns empty wrappers are recognized as defaults, and missing as nulls", 0) \

View File

@ -31,7 +31,8 @@ IMPLEMENT_SETTING_ENUM(JoinStrictness, ErrorCodes::UNKNOWN_JOIN,
IMPLEMENT_SETTING_MULTI_ENUM(JoinAlgorithm, ErrorCodes::UNKNOWN_JOIN, IMPLEMENT_SETTING_MULTI_ENUM(JoinAlgorithm, ErrorCodes::UNKNOWN_JOIN,
{{"auto", JoinAlgorithm::AUTO}, {{"default", JoinAlgorithm::DEFAULT},
{"auto", JoinAlgorithm::AUTO},
{"hash", JoinAlgorithm::HASH}, {"hash", JoinAlgorithm::HASH},
{"partial_merge", JoinAlgorithm::PARTIAL_MERGE}, {"partial_merge", JoinAlgorithm::PARTIAL_MERGE},
{"prefer_partial_merge", JoinAlgorithm::PREFER_PARTIAL_MERGE}, {"prefer_partial_merge", JoinAlgorithm::PREFER_PARTIAL_MERGE},

View File

@ -38,7 +38,8 @@ DECLARE_SETTING_ENUM(JoinStrictness)
enum class JoinAlgorithm enum class JoinAlgorithm
{ {
AUTO = 0, DEFAULT = 0,
AUTO,
HASH, HASH,
PARTIAL_MERGE, PARTIAL_MERGE,
PREFER_PARTIAL_MERGE, PREFER_PARTIAL_MERGE,

View File

@ -29,6 +29,7 @@ namespace ErrorCodes
extern const int UNKNOWN_EXCEPTION; extern const int UNKNOWN_EXCEPTION;
extern const int INCORRECT_DATA; extern const int INCORRECT_DATA;
extern const int CAPN_PROTO_BAD_TYPE; extern const int CAPN_PROTO_BAD_TYPE;
extern const int BAD_ARGUMENTS;
} }
capnp::StructSchema CapnProtoSchemaParser::getMessageSchema(const FormatSchemaInfo & schema_info) capnp::StructSchema CapnProtoSchemaParser::getMessageSchema(const FormatSchemaInfo & schema_info)
@ -450,7 +451,7 @@ static DataTypePtr getEnumDataTypeFromEnumSchema(const capnp::EnumSchema & enum_
throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "ClickHouse supports only 8 and 16-bit Enums"); throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "ClickHouse supports only 8 and 16-bit Enums");
} }
static DataTypePtr getDataTypeFromCapnProtoType(const capnp::Type & capnp_type) static DataTypePtr getDataTypeFromCapnProtoType(const capnp::Type & capnp_type, bool skip_unsupported_fields)
{ {
switch (capnp_type.which()) switch (capnp_type.which())
{ {
@ -483,24 +484,44 @@ static DataTypePtr getDataTypeFromCapnProtoType(const capnp::Type & capnp_type)
case capnp::schema::Type::LIST: case capnp::schema::Type::LIST:
{ {
auto list_schema = capnp_type.asList(); auto list_schema = capnp_type.asList();
auto nested_type = getDataTypeFromCapnProtoType(list_schema.getElementType()); auto nested_type = getDataTypeFromCapnProtoType(list_schema.getElementType(), skip_unsupported_fields);
if (!nested_type)
return nullptr;
return std::make_shared<DataTypeArray>(nested_type); return std::make_shared<DataTypeArray>(nested_type);
} }
case capnp::schema::Type::STRUCT: case capnp::schema::Type::STRUCT:
{ {
auto struct_schema = capnp_type.asStruct(); auto struct_schema = capnp_type.asStruct();
if (struct_schema.getFields().size() == 0)
{
if (skip_unsupported_fields)
return nullptr;
throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Empty messages are not supported");
}
/// Check if it can be Nullable. /// Check if it can be Nullable.
if (checkIfStructIsNamedUnion(struct_schema)) if (checkIfStructIsNamedUnion(struct_schema))
{ {
auto fields = struct_schema.getUnionFields(); auto fields = struct_schema.getUnionFields();
if (fields.size() != 2 || (!fields[0].getType().isVoid() && !fields[1].getType().isVoid())) if (fields.size() != 2 || (!fields[0].getType().isVoid() && !fields[1].getType().isVoid()))
{
if (skip_unsupported_fields)
return nullptr;
throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Unions are not supported"); throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Unions are not supported");
}
auto value_type = fields[0].getType().isVoid() ? fields[1].getType() : fields[0].getType(); auto value_type = fields[0].getType().isVoid() ? fields[1].getType() : fields[0].getType();
if (value_type.isStruct() || value_type.isList()) if (value_type.isStruct() || value_type.isList())
{
if (skip_unsupported_fields)
return nullptr;
throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Tuples and Lists cannot be inside Nullable"); throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Tuples and Lists cannot be inside Nullable");
}
auto nested_type = getDataTypeFromCapnProtoType(value_type); auto nested_type = getDataTypeFromCapnProtoType(value_type, skip_unsupported_fields);
if (!nested_type)
return nullptr;
return std::make_shared<DataTypeNullable>(nested_type); return std::make_shared<DataTypeNullable>(nested_type);
} }
@ -512,17 +533,26 @@ static DataTypePtr getDataTypeFromCapnProtoType(const capnp::Type & capnp_type)
Names nested_names; Names nested_names;
for (auto field : struct_schema.getNonUnionFields()) for (auto field : struct_schema.getNonUnionFields())
{ {
auto nested_type = getDataTypeFromCapnProtoType(field.getType(), skip_unsupported_fields);
if (!nested_type)
continue;
nested_names.push_back(field.getProto().getName()); nested_names.push_back(field.getProto().getName());
nested_types.push_back(getDataTypeFromCapnProtoType(field.getType())); nested_types.push_back(nested_type);
} }
if (nested_types.empty())
return nullptr;
return std::make_shared<DataTypeTuple>(std::move(nested_types), std::move(nested_names)); return std::make_shared<DataTypeTuple>(std::move(nested_types), std::move(nested_names));
} }
default: default:
{
if (skip_unsupported_fields)
return nullptr;
throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Unsupported CapnProtoType: {}", getCapnProtoFullTypeName(capnp_type)); throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Unsupported CapnProtoType: {}", getCapnProtoFullTypeName(capnp_type));
}
} }
} }
NamesAndTypesList capnProtoSchemaToCHSchema(const capnp::StructSchema & schema) NamesAndTypesList capnProtoSchemaToCHSchema(const capnp::StructSchema & schema, bool skip_unsupported_fields)
{ {
if (checkIfStructContainsUnnamedUnion(schema)) if (checkIfStructContainsUnnamedUnion(schema))
throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Unnamed union is not supported"); throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Unnamed union is not supported");
@ -531,9 +561,13 @@ NamesAndTypesList capnProtoSchemaToCHSchema(const capnp::StructSchema & schema)
for (auto field : schema.getNonUnionFields()) for (auto field : schema.getNonUnionFields())
{ {
auto name = field.getProto().getName(); auto name = field.getProto().getName();
auto type = getDataTypeFromCapnProtoType(field.getType()); auto type = getDataTypeFromCapnProtoType(field.getType(), skip_unsupported_fields);
names_and_types.emplace_back(name, type); if (type)
names_and_types.emplace_back(name, type);
} }
if (names_and_types.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot convert CapnProto schema to ClickHouse table schema, all fields have unsupported types");
return names_and_types; return names_and_types;
} }

View File

@ -38,7 +38,7 @@ capnp::DynamicValue::Reader getReaderByColumnName(const capnp::DynamicStruct::Re
void checkCapnProtoSchemaStructure(const capnp::StructSchema & schema, const Block & header, FormatSettings::EnumComparingMode mode); void checkCapnProtoSchemaStructure(const capnp::StructSchema & schema, const Block & header, FormatSettings::EnumComparingMode mode);
NamesAndTypesList capnProtoSchemaToCHSchema(const capnp::StructSchema & schema); NamesAndTypesList capnProtoSchemaToCHSchema(const capnp::StructSchema & schema, bool skip_unsupported_fields);
} }
#endif #endif

View File

@ -110,6 +110,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.pretty.output_format_pretty_row_numbers = settings.output_format_pretty_row_numbers; format_settings.pretty.output_format_pretty_row_numbers = settings.output_format_pretty_row_numbers;
format_settings.protobuf.input_flatten_google_wrappers = settings.input_format_protobuf_flatten_google_wrappers; format_settings.protobuf.input_flatten_google_wrappers = settings.input_format_protobuf_flatten_google_wrappers;
format_settings.protobuf.output_nullables_with_google_wrappers = settings.output_format_protobuf_nullables_with_google_wrappers; format_settings.protobuf.output_nullables_with_google_wrappers = settings.output_format_protobuf_nullables_with_google_wrappers;
format_settings.protobuf.skip_fields_with_unsupported_types_in_schema_inference = settings.input_format_protobuf_skip_fields_with_unsupported_types_in_schema_inference;
format_settings.regexp.escaping_rule = settings.format_regexp_escaping_rule; format_settings.regexp.escaping_rule = settings.format_regexp_escaping_rule;
format_settings.regexp.regexp = settings.format_regexp; format_settings.regexp.regexp = settings.format_regexp;
format_settings.regexp.skip_unmatched = settings.format_regexp_skip_unmatched; format_settings.regexp.skip_unmatched = settings.format_regexp_skip_unmatched;
@ -151,6 +152,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.orc.output_string_as_string = settings.output_format_orc_string_as_string; format_settings.orc.output_string_as_string = settings.output_format_orc_string_as_string;
format_settings.defaults_for_omitted_fields = settings.input_format_defaults_for_omitted_fields; format_settings.defaults_for_omitted_fields = settings.input_format_defaults_for_omitted_fields;
format_settings.capn_proto.enum_comparing_mode = settings.format_capn_proto_enum_comparising_mode; format_settings.capn_proto.enum_comparing_mode = settings.format_capn_proto_enum_comparising_mode;
format_settings.capn_proto.skip_fields_with_unsupported_types_in_schema_inference = settings.input_format_capn_proto_skip_fields_with_unsupported_types_in_schema_inference;
format_settings.seekable_read = settings.input_format_allow_seeks; format_settings.seekable_read = settings.input_format_allow_seeks;
format_settings.msgpack.number_of_columns = settings.input_format_msgpack_number_of_columns; format_settings.msgpack.number_of_columns = settings.input_format_msgpack_number_of_columns;
format_settings.msgpack.output_uuid_representation = settings.output_format_msgpack_uuid_representation; format_settings.msgpack.output_uuid_representation = settings.output_format_msgpack_uuid_representation;

View File

@ -185,6 +185,7 @@ struct FormatSettings
* because Protobuf without delimiters is not generally useful. * because Protobuf without delimiters is not generally useful.
*/ */
bool allow_multiple_rows_without_delimiter = false; bool allow_multiple_rows_without_delimiter = false;
bool skip_fields_with_unsupported_types_in_schema_inference = false;
} protobuf; } protobuf;
struct struct
@ -255,6 +256,7 @@ struct FormatSettings
struct struct
{ {
EnumComparingMode enum_comparing_mode = EnumComparingMode::BY_VALUES; EnumComparingMode enum_comparing_mode = EnumComparingMode::BY_VALUES;
bool skip_fields_with_unsupported_types_in_schema_inference = false;
} capn_proto; } capn_proto;
enum class MsgPackUUIDRepresentation enum class MsgPackUUIDRepresentation

View File

@ -3427,19 +3427,23 @@ namespace
return std::make_shared<DataTypeEnum<Type>>(std::move(values)); return std::make_shared<DataTypeEnum<Type>>(std::move(values));
} }
NameAndTypePair getNameAndDataTypeFromField(const google::protobuf::FieldDescriptor * field_descriptor, bool allow_repeat = true) std::optional<NameAndTypePair> getNameAndDataTypeFromField(const google::protobuf::FieldDescriptor * field_descriptor, bool skip_unsupported_fields, bool allow_repeat = true)
{ {
if (allow_repeat && field_descriptor->is_map()) if (allow_repeat && field_descriptor->is_map())
{ {
auto name_and_type = getNameAndDataTypeFromField(field_descriptor, false); auto name_and_type = getNameAndDataTypeFromField(field_descriptor, skip_unsupported_fields, false);
const auto * tuple_type = assert_cast<const DataTypeTuple *>(name_and_type.type.get()); if (!name_and_type)
return {name_and_type.name, std::make_shared<DataTypeMap>(tuple_type->getElements())}; return std::nullopt;
const auto * tuple_type = assert_cast<const DataTypeTuple *>(name_and_type->type.get());
return NameAndTypePair{name_and_type->name, std::make_shared<DataTypeMap>(tuple_type->getElements())};
} }
if (allow_repeat && field_descriptor->is_repeated()) if (allow_repeat && field_descriptor->is_repeated())
{ {
auto name_and_type = getNameAndDataTypeFromField(field_descriptor, false); auto name_and_type = getNameAndDataTypeFromField(field_descriptor, skip_unsupported_fields, false);
return {name_and_type.name, std::make_shared<DataTypeArray>(name_and_type.type)}; if (!name_and_type)
return std::nullopt;
return NameAndTypePair{name_and_type->name, std::make_shared<DataTypeArray>(name_and_type->type)};
} }
switch (field_descriptor->type()) switch (field_descriptor->type())
@ -3447,31 +3451,35 @@ namespace
case FieldTypeId::TYPE_SFIXED32: case FieldTypeId::TYPE_SFIXED32:
case FieldTypeId::TYPE_SINT32: case FieldTypeId::TYPE_SINT32:
case FieldTypeId::TYPE_INT32: case FieldTypeId::TYPE_INT32:
return {field_descriptor->name(), std::make_shared<DataTypeInt32>()}; return NameAndTypePair{field_descriptor->name(), std::make_shared<DataTypeInt32>()};
case FieldTypeId::TYPE_SFIXED64: case FieldTypeId::TYPE_SFIXED64:
case FieldTypeId::TYPE_SINT64: case FieldTypeId::TYPE_SINT64:
case FieldTypeId::TYPE_INT64: case FieldTypeId::TYPE_INT64:
return {field_descriptor->name(), std::make_shared<DataTypeInt64>()}; return NameAndTypePair{field_descriptor->name(), std::make_shared<DataTypeInt64>()};
case FieldTypeId::TYPE_BOOL: case FieldTypeId::TYPE_BOOL:
return {field_descriptor->name(), std::make_shared<DataTypeUInt8>()}; return NameAndTypePair{field_descriptor->name(), std::make_shared<DataTypeUInt8>()};
case FieldTypeId::TYPE_FLOAT: case FieldTypeId::TYPE_FLOAT:
return {field_descriptor->name(), std::make_shared<DataTypeFloat32>()}; return NameAndTypePair{field_descriptor->name(), std::make_shared<DataTypeFloat32>()};
case FieldTypeId::TYPE_DOUBLE: case FieldTypeId::TYPE_DOUBLE:
return {field_descriptor->name(), std::make_shared<DataTypeFloat64>()}; return NameAndTypePair{field_descriptor->name(), std::make_shared<DataTypeFloat64>()};
case FieldTypeId::TYPE_UINT32: case FieldTypeId::TYPE_UINT32:
case FieldTypeId::TYPE_FIXED32: case FieldTypeId::TYPE_FIXED32:
return {field_descriptor->name(), std::make_shared<DataTypeUInt32>()}; return NameAndTypePair{field_descriptor->name(), std::make_shared<DataTypeUInt32>()};
case FieldTypeId::TYPE_UINT64: case FieldTypeId::TYPE_UINT64:
case FieldTypeId::TYPE_FIXED64: case FieldTypeId::TYPE_FIXED64:
return {field_descriptor->name(), std::make_shared<DataTypeUInt64>()}; return NameAndTypePair{field_descriptor->name(), std::make_shared<DataTypeUInt64>()};
case FieldTypeId::TYPE_BYTES: case FieldTypeId::TYPE_BYTES:
case FieldTypeId::TYPE_STRING: case FieldTypeId::TYPE_STRING:
return {field_descriptor->name(), std::make_shared<DataTypeString>()}; return NameAndTypePair{field_descriptor->name(), std::make_shared<DataTypeString>()};
case FieldTypeId::TYPE_ENUM: case FieldTypeId::TYPE_ENUM:
{ {
const auto * enum_descriptor = field_descriptor->enum_type(); const auto * enum_descriptor = field_descriptor->enum_type();
if (enum_descriptor->value_count() == 0) if (enum_descriptor->value_count() == 0)
{
if (skip_unsupported_fields)
return std::nullopt;
throw Exception("Empty enum field", ErrorCodes::BAD_ARGUMENTS); throw Exception("Empty enum field", ErrorCodes::BAD_ARGUMENTS);
}
int max_abs = std::abs(enum_descriptor->value(0)->number()); int max_abs = std::abs(enum_descriptor->value(0)->number());
for (int i = 1; i != enum_descriptor->value_count(); ++i) for (int i = 1; i != enum_descriptor->value_count(); ++i)
{ {
@ -3479,21 +3487,33 @@ namespace
max_abs = std::abs(enum_descriptor->value(i)->number()); max_abs = std::abs(enum_descriptor->value(i)->number());
} }
if (max_abs < 128) if (max_abs < 128)
return {field_descriptor->name(), getEnumDataType<Int8>(enum_descriptor)}; return NameAndTypePair{field_descriptor->name(), getEnumDataType<Int8>(enum_descriptor)};
else if (max_abs < 32768) else if (max_abs < 32768)
return {field_descriptor->name(), getEnumDataType<Int16>(enum_descriptor)}; return NameAndTypePair{field_descriptor->name(), getEnumDataType<Int16>(enum_descriptor)};
else else
{
if (skip_unsupported_fields)
return std::nullopt;
throw Exception("ClickHouse supports only 8-bit and 16-bit enums", ErrorCodes::BAD_ARGUMENTS); throw Exception("ClickHouse supports only 8-bit and 16-bit enums", ErrorCodes::BAD_ARGUMENTS);
}
} }
case FieldTypeId::TYPE_GROUP: case FieldTypeId::TYPE_GROUP:
case FieldTypeId::TYPE_MESSAGE: case FieldTypeId::TYPE_MESSAGE:
{ {
const auto * message_descriptor = field_descriptor->message_type(); const auto * message_descriptor = field_descriptor->message_type();
if (message_descriptor->field_count() == 1) if (message_descriptor->field_count() == 0)
{
if (skip_unsupported_fields)
return std::nullopt;
throw Exception("Empty messages are not supported", ErrorCodes::BAD_ARGUMENTS);
}
else if (message_descriptor->field_count() == 1)
{ {
const auto * nested_field_descriptor = message_descriptor->field(0); const auto * nested_field_descriptor = message_descriptor->field(0);
auto nested_name_and_type = getNameAndDataTypeFromField(nested_field_descriptor); auto nested_name_and_type = getNameAndDataTypeFromField(nested_field_descriptor, skip_unsupported_fields);
return {field_descriptor->name() + "_" + nested_name_and_type.name, nested_name_and_type.type}; if (!nested_name_and_type)
return std::nullopt;
return NameAndTypePair{field_descriptor->name() + "_" + nested_name_and_type->name, nested_name_and_type->type};
} }
else else
{ {
@ -3501,11 +3521,16 @@ namespace
Strings nested_names; Strings nested_names;
for (int i = 0; i != message_descriptor->field_count(); ++i) for (int i = 0; i != message_descriptor->field_count(); ++i)
{ {
auto nested_name_and_type = getNameAndDataTypeFromField(message_descriptor->field(i)); auto nested_name_and_type = getNameAndDataTypeFromField(message_descriptor->field(i), skip_unsupported_fields);
nested_types.push_back(nested_name_and_type.type); if (!nested_name_and_type)
nested_names.push_back(nested_name_and_type.name); continue;
nested_types.push_back(nested_name_and_type->type);
nested_names.push_back(nested_name_and_type->name);
} }
return {field_descriptor->name(), std::make_shared<DataTypeTuple>(std::move(nested_types), std::move(nested_names))};
if (nested_types.empty())
return std::nullopt;
return NameAndTypePair{field_descriptor->name(), std::make_shared<DataTypeTuple>(std::move(nested_types), std::move(nested_names))};
} }
} }
} }
@ -3540,11 +3565,16 @@ std::unique_ptr<ProtobufSerializer> ProtobufSerializer::create(
return ProtobufSerializerBuilder(writer).buildMessageSerializer(column_names, data_types, missing_column_indices, message_descriptor, with_length_delimiter, with_envelope, defaults_for_nullable_google_wrappers); return ProtobufSerializerBuilder(writer).buildMessageSerializer(column_names, data_types, missing_column_indices, message_descriptor, with_length_delimiter, with_envelope, defaults_for_nullable_google_wrappers);
} }
NamesAndTypesList protobufSchemaToCHSchema(const google::protobuf::Descriptor * message_descriptor) NamesAndTypesList protobufSchemaToCHSchema(const google::protobuf::Descriptor * message_descriptor, bool skip_unsupported_fields)
{ {
NamesAndTypesList schema; NamesAndTypesList schema;
for (int i = 0; i != message_descriptor->field_count(); ++i) for (int i = 0; i != message_descriptor->field_count(); ++i)
schema.push_back(getNameAndDataTypeFromField(message_descriptor->field(i))); {
if (auto name_and_type = getNameAndDataTypeFromField(message_descriptor->field(i), skip_unsupported_fields))
schema.push_back(*name_and_type);
}
if (schema.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot convert Protobuf schema to ClickHouse table schema, all fields have unsupported types");
return schema; return schema;
} }

View File

@ -54,7 +54,7 @@ public:
ProtobufWriter & writer); ProtobufWriter & writer);
}; };
NamesAndTypesList protobufSchemaToCHSchema(const google::protobuf::Descriptor * message_descriptor); NamesAndTypesList protobufSchemaToCHSchema(const google::protobuf::Descriptor * message_descriptor, bool skip_unsupported_fields);
} }
#endif #endif

View File

@ -337,6 +337,7 @@ public:
{ {
pos = pos_; pos = pos_;
end = end_; end = end_;
curr_split = 0;
} }
bool get(Pos & token_begin, Pos & token_end) bool get(Pos & token_begin, Pos & token_end)

View File

@ -22,7 +22,7 @@ namespace ErrorCodes
struct L1Distance struct L1Distance
{ {
static inline String name = "L1"; static constexpr auto name = "L1";
struct ConstParams {}; struct ConstParams {};
@ -53,7 +53,7 @@ struct L1Distance
struct L2Distance struct L2Distance
{ {
static inline String name = "L2"; static constexpr auto name = "L2";
struct ConstParams {}; struct ConstParams {};
@ -84,7 +84,7 @@ struct L2Distance
struct L2SquaredDistance : L2Distance struct L2SquaredDistance : L2Distance
{ {
static inline String name = "L2Squared"; static constexpr auto name = "L2Squared";
template <typename ResultType> template <typename ResultType>
static ResultType finalize(const State<ResultType> & state, const ConstParams &) static ResultType finalize(const State<ResultType> & state, const ConstParams &)
@ -95,7 +95,7 @@ struct L2SquaredDistance : L2Distance
struct LpDistance struct LpDistance
{ {
static inline String name = "Lp"; static constexpr auto name = "Lp";
struct ConstParams struct ConstParams
{ {
@ -130,7 +130,7 @@ struct LpDistance
struct LinfDistance struct LinfDistance
{ {
static inline String name = "Linf"; static constexpr auto name = "Linf";
struct ConstParams {}; struct ConstParams {};
@ -161,7 +161,7 @@ struct LinfDistance
struct CosineDistance struct CosineDistance
{ {
static inline String name = "Cosine"; static constexpr auto name = "Cosine";
struct ConstParams {}; struct ConstParams {};
@ -200,8 +200,7 @@ template <class Kernel>
class FunctionArrayDistance : public IFunction class FunctionArrayDistance : public IFunction
{ {
public: public:
static inline auto name = "array" + Kernel::name + "Distance"; String getName() const override { static auto name = String("array") + Kernel::name + "Distance"; return name; }
String getName() const override { return name; }
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionArrayDistance<Kernel>>(); } static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionArrayDistance<Kernel>>(); }
size_t getNumberOfArguments() const override { return 2; } size_t getNumberOfArguments() const override { return 2; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {}; } ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {}; }

View File

@ -21,7 +21,7 @@ namespace ErrorCodes
struct L1Norm struct L1Norm
{ {
static inline String name = "L1"; static constexpr auto name = "L1";
struct ConstParams {}; struct ConstParams {};
@ -46,7 +46,7 @@ struct L1Norm
struct L2Norm struct L2Norm
{ {
static inline String name = "L2"; static constexpr auto name = "L2";
struct ConstParams {}; struct ConstParams {};
@ -71,7 +71,7 @@ struct L2Norm
struct L2SquaredNorm : L2Norm struct L2SquaredNorm : L2Norm
{ {
static inline String name = "L2Squared"; static constexpr auto name = "L2Squared";
template <typename ResultType> template <typename ResultType>
inline static ResultType finalize(ResultType result, const ConstParams &) inline static ResultType finalize(ResultType result, const ConstParams &)
@ -83,7 +83,7 @@ struct L2SquaredNorm : L2Norm
struct LpNorm struct LpNorm
{ {
static inline String name = "Lp"; static constexpr auto name = "Lp";
struct ConstParams struct ConstParams
{ {
@ -112,7 +112,7 @@ struct LpNorm
struct LinfNorm struct LinfNorm
{ {
static inline String name = "Linf"; static constexpr auto name = "Linf";
struct ConstParams {}; struct ConstParams {};
@ -140,8 +140,7 @@ template <class Kernel>
class FunctionArrayNorm : public IFunction class FunctionArrayNorm : public IFunction
{ {
public: public:
static inline auto name = "array" + Kernel::name + "Norm"; String getName() const override { static auto name = String("array") + Kernel::name + "Norm"; return name; }
String getName() const override { return name; }
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionArrayNorm<Kernel>>(); } static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionArrayNorm<Kernel>>(); }
size_t getNumberOfArguments() const override { return 1; } size_t getNumberOfArguments() const override { return 1; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {}; } ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {}; }

View File

@ -1124,7 +1124,7 @@ template <class Traits>
class TupleOrArrayFunction : public IFunction class TupleOrArrayFunction : public IFunction
{ {
public: public:
static inline String name = Traits::name; static constexpr auto name = Traits::name;
explicit TupleOrArrayFunction(ContextPtr context_) explicit TupleOrArrayFunction(ContextPtr context_)
: IFunction() : IFunction()
@ -1173,7 +1173,7 @@ extern FunctionPtr createFunctionArrayCosineDistance(ContextPtr context_);
struct L1NormTraits struct L1NormTraits
{ {
static inline String name = "L1Norm"; static constexpr auto name = "L1Norm";
static constexpr auto CreateTupleFunction = FunctionL1Norm::create; static constexpr auto CreateTupleFunction = FunctionL1Norm::create;
static constexpr auto CreateArrayFunction = createFunctionArrayL1Norm; static constexpr auto CreateArrayFunction = createFunctionArrayL1Norm;
@ -1181,7 +1181,7 @@ struct L1NormTraits
struct L2NormTraits struct L2NormTraits
{ {
static inline String name = "L2Norm"; static constexpr auto name = "L2Norm";
static constexpr auto CreateTupleFunction = FunctionL2Norm::create; static constexpr auto CreateTupleFunction = FunctionL2Norm::create;
static constexpr auto CreateArrayFunction = createFunctionArrayL2Norm; static constexpr auto CreateArrayFunction = createFunctionArrayL2Norm;
@ -1189,7 +1189,7 @@ struct L2NormTraits
struct L2SquaredNormTraits struct L2SquaredNormTraits
{ {
static inline String name = "L2SquaredNorm"; static constexpr auto name = "L2SquaredNorm";
static constexpr auto CreateTupleFunction = FunctionL2SquaredNorm::create; static constexpr auto CreateTupleFunction = FunctionL2SquaredNorm::create;
static constexpr auto CreateArrayFunction = createFunctionArrayL2SquaredNorm; static constexpr auto CreateArrayFunction = createFunctionArrayL2SquaredNorm;
@ -1197,7 +1197,7 @@ struct L2SquaredNormTraits
struct LpNormTraits struct LpNormTraits
{ {
static inline String name = "LpNorm"; static constexpr auto name = "LpNorm";
static constexpr auto CreateTupleFunction = FunctionLpNorm::create; static constexpr auto CreateTupleFunction = FunctionLpNorm::create;
static constexpr auto CreateArrayFunction = createFunctionArrayLpNorm; static constexpr auto CreateArrayFunction = createFunctionArrayLpNorm;
@ -1205,7 +1205,7 @@ struct LpNormTraits
struct LinfNormTraits struct LinfNormTraits
{ {
static inline String name = "LinfNorm"; static constexpr auto name = "LinfNorm";
static constexpr auto CreateTupleFunction = FunctionLinfNorm::create; static constexpr auto CreateTupleFunction = FunctionLinfNorm::create;
static constexpr auto CreateArrayFunction = createFunctionArrayLinfNorm; static constexpr auto CreateArrayFunction = createFunctionArrayLinfNorm;
@ -1213,7 +1213,7 @@ struct LinfNormTraits
struct L1DistanceTraits struct L1DistanceTraits
{ {
static inline String name = "L1Distance"; static constexpr auto name = "L1Distance";
static constexpr auto CreateTupleFunction = FunctionL1Distance::create; static constexpr auto CreateTupleFunction = FunctionL1Distance::create;
static constexpr auto CreateArrayFunction = createFunctionArrayL1Distance; static constexpr auto CreateArrayFunction = createFunctionArrayL1Distance;
@ -1221,7 +1221,7 @@ struct L1DistanceTraits
struct L2DistanceTraits struct L2DistanceTraits
{ {
static inline String name = "L2Distance"; static constexpr auto name = "L2Distance";
static constexpr auto CreateTupleFunction = FunctionL2Distance::create; static constexpr auto CreateTupleFunction = FunctionL2Distance::create;
static constexpr auto CreateArrayFunction = createFunctionArrayL2Distance; static constexpr auto CreateArrayFunction = createFunctionArrayL2Distance;
@ -1229,7 +1229,7 @@ struct L2DistanceTraits
struct L2SquaredDistanceTraits struct L2SquaredDistanceTraits
{ {
static inline String name = "L2SquaredDistance"; static constexpr auto name = "L2SquaredDistance";
static constexpr auto CreateTupleFunction = FunctionL2SquaredDistance::create; static constexpr auto CreateTupleFunction = FunctionL2SquaredDistance::create;
static constexpr auto CreateArrayFunction = createFunctionArrayL2SquaredDistance; static constexpr auto CreateArrayFunction = createFunctionArrayL2SquaredDistance;
@ -1237,7 +1237,7 @@ struct L2SquaredDistanceTraits
struct LpDistanceTraits struct LpDistanceTraits
{ {
static inline String name = "LpDistance"; static constexpr auto name = "LpDistance";
static constexpr auto CreateTupleFunction = FunctionLpDistance::create; static constexpr auto CreateTupleFunction = FunctionLpDistance::create;
static constexpr auto CreateArrayFunction = createFunctionArrayLpDistance; static constexpr auto CreateArrayFunction = createFunctionArrayLpDistance;
@ -1245,7 +1245,7 @@ struct LpDistanceTraits
struct LinfDistanceTraits struct LinfDistanceTraits
{ {
static inline String name = "LinfDistance"; static constexpr auto name = "LinfDistance";
static constexpr auto CreateTupleFunction = FunctionLinfDistance::create; static constexpr auto CreateTupleFunction = FunctionLinfDistance::create;
static constexpr auto CreateArrayFunction = createFunctionArrayLinfDistance; static constexpr auto CreateArrayFunction = createFunctionArrayLinfDistance;
@ -1253,7 +1253,7 @@ struct LinfDistanceTraits
struct CosineDistanceTraits struct CosineDistanceTraits
{ {
static inline String name = "cosineDistance"; static constexpr auto name = "cosineDistance";
static constexpr auto CreateTupleFunction = FunctionCosineDistance::create; static constexpr auto CreateTupleFunction = FunctionCosineDistance::create;
static constexpr auto CreateArrayFunction = createFunctionArrayCosineDistance; static constexpr auto CreateArrayFunction = createFunctionArrayCosineDistance;

View File

@ -44,6 +44,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS; extern const int BAD_ARGUMENTS;
extern const int CANNOT_SEEK_THROUGH_FILE; extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int SEEK_POSITION_OUT_OF_BOUND; extern const int SEEK_POSITION_OUT_OF_BOUND;
extern const int UNKNOWN_FILE_SIZE;
} }
template <typename SessionPtr> template <typename SessionPtr>
@ -119,6 +120,7 @@ namespace detail
size_t offset_from_begin_pos = 0; size_t offset_from_begin_pos = 0;
Range read_range; Range read_range;
std::optional<size_t> file_size;
/// Delayed exception in case retries with partial content are not satisfiable. /// Delayed exception in case retries with partial content are not satisfiable.
std::exception_ptr exception; std::exception_ptr exception;
@ -201,11 +203,11 @@ namespace detail
size_t getFileSize() override size_t getFileSize() override
{ {
if (read_range.end) if (file_size)
return *read_range.end - getRangeBegin(); return *file_size;
Poco::Net::HTTPResponse response; Poco::Net::HTTPResponse response;
for (size_t i = 0; i < 10; ++i) for (size_t i = 0; i < settings.http_max_tries; ++i)
{ {
try try
{ {
@ -214,20 +216,30 @@ namespace detail
} }
catch (const Poco::Exception & e) catch (const Poco::Exception & e)
{ {
if (i == settings.http_max_tries - 1)
throw;
LOG_ERROR(log, "Failed to make HTTP_HEAD request to {}. Error: {}", uri.toString(), e.displayText()); LOG_ERROR(log, "Failed to make HTTP_HEAD request to {}. Error: {}", uri.toString(), e.displayText());
} }
} }
if (response.hasContentLength()) if (response.hasContentLength())
read_range.end = getRangeBegin() + response.getContentLength(); {
if (!read_range.end)
read_range.end = getRangeBegin() + response.getContentLength();
return *read_range.end; file_size = response.getContentLength();
return *file_size;
}
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size for: {}", uri.toString());
} }
String getFileName() const override { return uri.toString(); } String getFileName() const override { return uri.toString(); }
enum class InitializeError enum class InitializeError
{ {
RETRIABLE_ERROR,
/// If error is not retriable, `exception` variable must be set. /// If error is not retriable, `exception` variable must be set.
NON_RETRIABLE_ERROR, NON_RETRIABLE_ERROR,
/// Allows to skip not found urls for globs /// Allows to skip not found urls for globs
@ -401,19 +413,30 @@ namespace detail
saved_uri_redirect = uri_redirect; saved_uri_redirect = uri_redirect;
} }
if (response.hasContentLength())
LOG_DEBUG(log, "Received response with content length: {}", response.getContentLength());
if (withPartialContent() && response.getStatus() != Poco::Net::HTTPResponse::HTTPStatus::HTTP_PARTIAL_CONTENT) if (withPartialContent() && response.getStatus() != Poco::Net::HTTPResponse::HTTPStatus::HTTP_PARTIAL_CONTENT)
{ {
/// Having `200 OK` instead of `206 Partial Content` is acceptable in case we retried with range.begin == 0. /// Having `200 OK` instead of `206 Partial Content` is acceptable in case we retried with range.begin == 0.
if (read_range.begin && *read_range.begin != 0) if (read_range.begin && *read_range.begin != 0)
{ {
if (!exception) if (!exception)
{
exception = std::make_exception_ptr(Exception( exception = std::make_exception_ptr(Exception(
ErrorCodes::HTTP_RANGE_NOT_SATISFIABLE, ErrorCodes::HTTP_RANGE_NOT_SATISFIABLE,
"Cannot read with range: [{}, {}]", "Cannot read with range: [{}, {}] (response status: {}, reason: {})",
*read_range.begin, *read_range.begin,
read_range.end ? *read_range.end : '-')); read_range.end ? toString(*read_range.end) : "-",
toString(response.getStatus()), response.getReason()));
}
/// Retry 200OK
if (response.getStatus() == Poco::Net::HTTPResponse::HTTPStatus::HTTP_OK)
initialization_error = InitializeError::RETRIABLE_ERROR;
else
initialization_error = InitializeError::NON_RETRIABLE_ERROR;
initialization_error = InitializeError::NON_RETRIABLE_ERROR;
return; return;
} }
else if (read_range.end) else if (read_range.end)
@ -481,13 +504,25 @@ namespace detail
bool result = false; bool result = false;
size_t milliseconds_to_wait = settings.http_retry_initial_backoff_ms; size_t milliseconds_to_wait = settings.http_retry_initial_backoff_ms;
auto on_retriable_error = [&]()
{
retry_with_range_header = true;
impl.reset();
auto http_session = session->getSession();
http_session->reset();
sleepForMilliseconds(milliseconds_to_wait);
};
for (size_t i = 0; i < settings.http_max_tries; ++i) for (size_t i = 0; i < settings.http_max_tries; ++i)
{ {
exception = nullptr;
try try
{ {
if (!impl) if (!impl)
{ {
initialize(); initialize();
if (initialization_error == InitializeError::NON_RETRIABLE_ERROR) if (initialization_error == InitializeError::NON_RETRIABLE_ERROR)
{ {
assert(exception); assert(exception);
@ -497,6 +532,22 @@ namespace detail
{ {
return false; return false;
} }
else if (initialization_error == InitializeError::RETRIABLE_ERROR)
{
LOG_ERROR(
log,
"HTTP request to `{}` failed at try {}/{} with bytes read: {}/{}. "
"(Current backoff wait is {}/{} ms)",
uri.toString(), i + 1, settings.http_max_tries, getOffset(),
read_range.end ? toString(*read_range.end) : "unknown",
milliseconds_to_wait, settings.http_retry_max_backoff_ms);
assert(exception);
on_retriable_error();
continue;
}
assert(!exception);
if (use_external_buffer) if (use_external_buffer)
{ {
@ -531,12 +582,8 @@ namespace detail
milliseconds_to_wait, milliseconds_to_wait,
settings.http_retry_max_backoff_ms); settings.http_retry_max_backoff_ms);
retry_with_range_header = true; on_retriable_error();
exception = std::current_exception(); exception = std::current_exception();
impl.reset();
auto http_session = session->getSession();
http_session->reset();
sleepForMilliseconds(milliseconds_to_wait);
} }
milliseconds_to_wait = std::min(milliseconds_to_wait * 2, settings.http_retry_max_backoff_ms); milliseconds_to_wait = std::min(milliseconds_to_wait * 2, settings.http_retry_max_backoff_ms);

View File

@ -89,7 +89,6 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED; extern const int NOT_IMPLEMENTED;
extern const int UNKNOWN_IDENTIFIER; extern const int UNKNOWN_IDENTIFIER;
extern const int UNKNOWN_TYPE_OF_AST_NODE; extern const int UNKNOWN_TYPE_OF_AST_NODE;
extern const int UNSUPPORTED_METHOD;
} }
namespace namespace
@ -828,8 +827,8 @@ void ExpressionAnalyzer::makeWindowDescriptionFromAST(const Context & context_,
desc.full_sort_description.insert(desc.full_sort_description.end(), desc.full_sort_description.insert(desc.full_sort_description.end(),
desc.order_by.begin(), desc.order_by.end()); desc.order_by.begin(), desc.order_by.end());
if (definition.frame_type != WindowFrame::FrameType::Rows if (definition.frame_type != WindowFrame::FrameType::ROWS
&& definition.frame_type != WindowFrame::FrameType::Range) && definition.frame_type != WindowFrame::FrameType::RANGE)
{ {
throw Exception(ErrorCodes::NOT_IMPLEMENTED, throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Window frame '{}' is not implemented (while processing '{}')", "Window frame '{}' is not implemented (while processing '{}')",
@ -1079,34 +1078,58 @@ static ActionsDAGPtr createJoinedBlockActions(ContextPtr context, const TableJoi
return ExpressionAnalyzer(expression_list, syntax_result, context).getActionsDAG(true, false); return ExpressionAnalyzer(expression_list, syntax_result, context).getActionsDAG(true, false);
} }
static std::shared_ptr<IJoin> chooseJoinAlgorithm(std::shared_ptr<TableJoin> analyzed_join, const Block & right_sample_block, ContextPtr context) std::shared_ptr<DirectKeyValueJoin> tryKeyValueJoin(std::shared_ptr<TableJoin> analyzed_join, const Block & right_sample_block);
{
/// HashJoin with Dictionary optimisation
if (analyzed_join->tryInitDictJoin(right_sample_block, context))
return std::make_shared<HashJoin>(analyzed_join, right_sample_block);
bool allow_merge_join = analyzed_join->allowMergeJoin(); static std::shared_ptr<IJoin> chooseJoinAlgorithm(std::shared_ptr<TableJoin> analyzed_join, std::unique_ptr<QueryPlan> & joined_plan, ContextPtr context)
if (analyzed_join->forceHashJoin() || (analyzed_join->preferMergeJoin() && !allow_merge_join)) {
Block right_sample_block = joined_plan->getCurrentDataStream().header;
if (analyzed_join->isEnabledAlgorithm(JoinAlgorithm::DIRECT))
{
if (JoinPtr kvjoin = tryKeyValueJoin(analyzed_join, right_sample_block))
{
/// Do not need to execute plan for right part
joined_plan.reset();
return kvjoin;
}
/// It's not a hash join actually, that's why we check JoinAlgorithm::DIRECT
/// It's would be fixed in https://github.com/ClickHouse/ClickHouse/pull/38956
if (analyzed_join->tryInitDictJoin(right_sample_block, context))
{
joined_plan.reset();
return std::make_shared<HashJoin>(analyzed_join, right_sample_block);
}
}
if (analyzed_join->isEnabledAlgorithm(JoinAlgorithm::PARTIAL_MERGE) ||
analyzed_join->isEnabledAlgorithm(JoinAlgorithm::PREFER_PARTIAL_MERGE))
{
if (MergeJoin::isSupported(analyzed_join))
return std::make_shared<MergeJoin>(analyzed_join, right_sample_block);
}
if (analyzed_join->isEnabledAlgorithm(JoinAlgorithm::HASH) ||
/// partial_merge is preferred, but can't be used for specified kind of join, fallback to hash
analyzed_join->isEnabledAlgorithm(JoinAlgorithm::PREFER_PARTIAL_MERGE) ||
analyzed_join->isEnabledAlgorithm(JoinAlgorithm::PARALLEL_HASH))
{ {
if (analyzed_join->allowParallelHashJoin()) if (analyzed_join->allowParallelHashJoin())
{
return std::make_shared<ConcurrentHashJoin>(context, analyzed_join, context->getSettings().max_threads, right_sample_block); return std::make_shared<ConcurrentHashJoin>(context, analyzed_join, context->getSettings().max_threads, right_sample_block);
}
return std::make_shared<HashJoin>(analyzed_join, right_sample_block); return std::make_shared<HashJoin>(analyzed_join, right_sample_block);
} }
else if (analyzed_join->forceMergeJoin() || (analyzed_join->preferMergeJoin() && allow_merge_join))
if (analyzed_join->isEnabledAlgorithm(JoinAlgorithm::FULL_SORTING_MERGE))
{ {
return std::make_shared<MergeJoin>(analyzed_join, right_sample_block); if (FullSortingMergeJoin::isSupported(analyzed_join))
return std::make_shared<FullSortingMergeJoin>(analyzed_join, right_sample_block);
} }
else if (analyzed_join->forceFullSortingMergeJoin())
{ if (analyzed_join->isEnabledAlgorithm(JoinAlgorithm::AUTO))
if (analyzed_join->getClauses().size() != 1) return std::make_shared<JoinSwitcher>(analyzed_join, right_sample_block);
throw Exception("Full sorting merge join is supported only for single-condition joins", ErrorCodes::NOT_IMPLEMENTED);
if (analyzed_join->isSpecialStorage()) throw Exception("Can't execute any of specified algorithms for specified strictness/kind and right storage type",
throw Exception("Full sorting merge join is not supported for special storage", ErrorCodes::NOT_IMPLEMENTED); ErrorCodes::NOT_IMPLEMENTED);
return std::make_shared<FullSortingMergeJoin>(analyzed_join, right_sample_block);
}
return std::make_shared<JoinSwitcher>(analyzed_join, right_sample_block);
} }
static std::unique_ptr<QueryPlan> buildJoinedPlan( static std::unique_ptr<QueryPlan> buildJoinedPlan(
@ -1164,27 +1187,26 @@ static std::unique_ptr<QueryPlan> buildJoinedPlan(
std::shared_ptr<DirectKeyValueJoin> tryKeyValueJoin(std::shared_ptr<TableJoin> analyzed_join, const Block & right_sample_block) std::shared_ptr<DirectKeyValueJoin> tryKeyValueJoin(std::shared_ptr<TableJoin> analyzed_join, const Block & right_sample_block)
{ {
auto error_or_null = [&](const String & msg) if (!analyzed_join->isEnabledAlgorithm(JoinAlgorithm::DIRECT))
{
if (analyzed_join->isForcedAlgorithm(JoinAlgorithm::DIRECT))
throw DB::Exception(ErrorCodes::UNSUPPORTED_METHOD, "Can't use '{}' join algorithm: {}", JoinAlgorithm::DIRECT, msg);
return nullptr;
};
if (!analyzed_join->isAllowedAlgorithm(JoinAlgorithm::DIRECT))
return nullptr; return nullptr;
auto storage = analyzed_join->getStorageKeyValue(); auto storage = analyzed_join->getStorageKeyValue();
if (!storage) if (!storage)
return error_or_null("unsupported storage"); {
return nullptr;
}
if (!isInnerOrLeft(analyzed_join->kind())) if (!isInnerOrLeft(analyzed_join->kind()))
return error_or_null("illegal kind"); {
return nullptr;
}
if (analyzed_join->strictness() != ASTTableJoin::Strictness::All && if (analyzed_join->strictness() != ASTTableJoin::Strictness::All &&
analyzed_join->strictness() != ASTTableJoin::Strictness::Any && analyzed_join->strictness() != ASTTableJoin::Strictness::Any &&
analyzed_join->strictness() != ASTTableJoin::Strictness::RightAny) analyzed_join->strictness() != ASTTableJoin::Strictness::RightAny)
return error_or_null("illegal strictness"); {
return nullptr;
}
const auto & clauses = analyzed_join->getClauses(); const auto & clauses = analyzed_join->getClauses();
bool only_one_key = clauses.size() == 1 && bool only_one_key = clauses.size() == 1 &&
@ -1194,15 +1216,16 @@ std::shared_ptr<DirectKeyValueJoin> tryKeyValueJoin(std::shared_ptr<TableJoin> a
!clauses[0].on_filter_condition_right; !clauses[0].on_filter_condition_right;
if (!only_one_key) if (!only_one_key)
return error_or_null("multiple keys is not allowed"); {
return nullptr;
}
String key_name = clauses[0].key_names_right[0]; String key_name = clauses[0].key_names_right[0];
String original_key_name = analyzed_join->getOriginalName(key_name); String original_key_name = analyzed_join->getOriginalName(key_name);
const auto & storage_primary_key = storage->getPrimaryKey(); const auto & storage_primary_key = storage->getPrimaryKey();
if (storage_primary_key.size() != 1 || storage_primary_key[0] != original_key_name) if (storage_primary_key.size() != 1 || storage_primary_key[0] != original_key_name)
{ {
return error_or_null(fmt::format("key '{}'{} doesn't match storage '{}'", return nullptr;
key_name, (key_name != original_key_name ? " (aka '" + original_key_name + "')" : ""), fmt::join(storage_primary_key, ",")));
} }
return std::make_shared<DirectKeyValueJoin>(analyzed_join, right_sample_block, storage); return std::make_shared<DirectKeyValueJoin>(analyzed_join, right_sample_block, storage);
@ -1240,18 +1263,7 @@ JoinPtr SelectQueryExpressionAnalyzer::makeJoin(
joined_plan->addStep(std::move(converting_step)); joined_plan->addStep(std::move(converting_step));
} }
const Block & right_sample_block = joined_plan->getCurrentDataStream().header; JoinPtr join = chooseJoinAlgorithm(analyzed_join, joined_plan, getContext());
if (JoinPtr kvjoin = tryKeyValueJoin(analyzed_join, right_sample_block))
{
joined_plan.reset();
return kvjoin;
}
JoinPtr join = chooseJoinAlgorithm(analyzed_join, right_sample_block, getContext());
/// Do not make subquery for join over dictionary.
if (analyzed_join->getDictionaryReader())
joined_plan.reset();
return join; return join;
} }

View File

@ -34,14 +34,26 @@ public:
throw Exception(ErrorCodes::LOGICAL_ERROR, "FullSortingMergeJoin::addJoinedBlock should not be called"); throw Exception(ErrorCodes::LOGICAL_ERROR, "FullSortingMergeJoin::addJoinedBlock should not be called");
} }
void checkTypesOfKeys(const Block & left_block) const override static bool isSupported(const std::shared_ptr<TableJoin> & table_join)
{ {
if (table_join->getClauses().size() != 1) if (!table_join->oneDisjunct())
throw Exception(ErrorCodes::LOGICAL_ERROR, "FullSortingMergeJoin supports only one join key"); return false;
bool support_storage = !table_join->isSpecialStorage();
const auto & on_expr = table_join->getOnlyClause();
bool support_conditions = !on_expr.on_filter_condition_left && !on_expr.on_filter_condition_right;
/// Key column can change nullability and it's not handled on type conversion stage, so algorithm should be aware of it /// Key column can change nullability and it's not handled on type conversion stage, so algorithm should be aware of it
if (table_join->hasUsing() && table_join->joinUseNulls()) bool support_using_and_nulls = !table_join->hasUsing() || !table_join->joinUseNulls();
throw DB::Exception(ErrorCodes::NOT_IMPLEMENTED, "FullSortingMergeJoin doesn't support USING with join_use_nulls");
return support_conditions && support_using_and_nulls && support_storage;
}
void checkTypesOfKeys(const Block & left_block) const override
{
if (!isSupported(table_join))
throw DB::Exception(ErrorCodes::NOT_IMPLEMENTED, "FullSortingMergeJoin doesn't support specified query");
const auto & onexpr = table_join->getOnlyClause(); const auto & onexpr = table_join->getOnlyClause();
for (size_t i = 0; i < onexpr.key_names_left.size(); ++i) for (size_t i = 0; i < onexpr.key_names_left.size(); ++i)

View File

@ -718,7 +718,7 @@ void HashJoin::initRightBlockStructure(Block & saved_block_sample)
bool multiple_disjuncts = !table_join->oneDisjunct(); bool multiple_disjuncts = !table_join->oneDisjunct();
/// We could remove key columns for LEFT | INNER HashJoin but we should keep them for JoinSwitcher (if any). /// We could remove key columns for LEFT | INNER HashJoin but we should keep them for JoinSwitcher (if any).
bool save_key_columns = !table_join->forceHashJoin() || isRightOrFull(kind) || multiple_disjuncts; bool save_key_columns = table_join->isEnabledAlgorithm(JoinAlgorithm::AUTO) || isRightOrFull(kind) || multiple_disjuncts;
if (save_key_columns) if (save_key_columns)
{ {
saved_block_sample = right_table_keys.cloneEmpty(); saved_block_sample = right_table_keys.cloneEmpty();

View File

@ -59,11 +59,20 @@ void replaceJoinedTable(const ASTSelectQuery & select_query)
if (!join || !join->table_expression) if (!join || !join->table_expression)
return; return;
/// TODO: Push down for CROSS JOIN is not OK [disabled]
const auto & table_join = join->table_join->as<ASTTableJoin &>(); const auto & table_join = join->table_join->as<ASTTableJoin &>();
/// TODO: Push down for CROSS JOIN is not OK [disabled]
if (table_join.kind == ASTTableJoin::Kind::Cross) if (table_join.kind == ASTTableJoin::Kind::Cross)
return; return;
/* Do not push down predicates for ASOF because it can lead to incorrect results
* (for example, if we will filter a suitable row before joining and will choose another, not the closest row).
* ANY join behavior can also be different with this optimization,
* but it's ok because we don't guarantee which row to choose for ANY, unlike ASOF, where we have to pick the closest one.
*/
if (table_join.strictness == ASTTableJoin::Strictness::Asof)
return;
auto & table_expr = join->table_expression->as<ASTTableExpression &>(); auto & table_expr = join->table_expression->as<ASTTableExpression &>();
if (table_expr.database_and_table_name) if (table_expr.database_and_table_name)
{ {
@ -311,7 +320,8 @@ std::shared_ptr<TableJoin> JoinedTables::makeTableJoin(const ASTSelectQuery & se
{ {
table_join->setStorageJoin(storage_join); table_join->setStorageJoin(storage_join);
} }
else if (auto storage_dict = std::dynamic_pointer_cast<StorageDictionary>(storage); storage_dict) else if (auto storage_dict = std::dynamic_pointer_cast<StorageDictionary>(storage);
storage_dict && join_algorithm.isSet(JoinAlgorithm::DIRECT))
{ {
table_join->setStorageJoin(storage_dict); table_join->setStorageJoin(storage_dict);
} }

View File

@ -1135,6 +1135,20 @@ void MergeJoin::addConditionJoinColumn(Block & block, JoinTableSide block_side)
} }
} }
bool MergeJoin::isSupported(const std::shared_ptr<TableJoin> & table_join)
{
auto kind = table_join->kind();
auto strictness = table_join->strictness();
bool is_any = (strictness == ASTTableJoin::Strictness::Any);
bool is_all = (strictness == ASTTableJoin::Strictness::All);
bool is_semi = (strictness == ASTTableJoin::Strictness::Semi);
bool all_join = is_all && (isInner(kind) || isLeft(kind) || isRight(kind) || isFull(kind));
bool special_left = isInnerOrLeft(kind) && (is_any || is_semi);
return (all_join || special_left) && table_join->oneDisjunct();
}
MergeJoin::RightBlockInfo::RightBlockInfo(std::shared_ptr<Block> block_, size_t block_number_, size_t & skip_, RowBitmaps * bitmaps_) MergeJoin::RightBlockInfo::RightBlockInfo(std::shared_ptr<Block> block_, size_t block_number_, size_t & skip_, RowBitmaps * bitmaps_)
: block(block_) : block(block_)

View File

@ -37,6 +37,8 @@ public:
std::shared_ptr<NotJoinedBlocks> getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const override; std::shared_ptr<NotJoinedBlocks> getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const override;
static bool isSupported(const std::shared_ptr<TableJoin> & table_join);
private: private:
friend class NotJoinedMerge; friend class NotJoinedMerge;

View File

@ -363,7 +363,7 @@ void TableJoin::addJoinedColumnsAndCorrectTypesImpl(TColumns & left_columns, boo
* For `JOIN ON expr1 == expr2` we will infer common type later in makeTableJoin, * For `JOIN ON expr1 == expr2` we will infer common type later in makeTableJoin,
* when part of plan built and types of expression will be known. * when part of plan built and types of expression will be known.
*/ */
inferJoinKeyCommonType(left_columns, columns_from_joined_table, !isSpecialStorage(), forceFullSortingMergeJoin()); inferJoinKeyCommonType(left_columns, columns_from_joined_table, !isSpecialStorage(), isEnabledAlgorithm(JoinAlgorithm::FULL_SORTING_MERGE));
if (auto it = left_type_map.find(col.name); it != left_type_map.end()) if (auto it = left_type_map.find(col.name); it != left_type_map.end())
{ {
@ -409,18 +409,6 @@ bool TableJoin::oneDisjunct() const
return clauses.size() == 1; return clauses.size() == 1;
} }
bool TableJoin::allowMergeJoin() const
{
bool is_any = (strictness() == ASTTableJoin::Strictness::Any);
bool is_all = (strictness() == ASTTableJoin::Strictness::All);
bool is_semi = (strictness() == ASTTableJoin::Strictness::Semi);
bool all_join = is_all && (isInner(kind()) || isLeft(kind()) || isRight(kind()) || isFull(kind()));
bool special_left = isLeft(kind()) && (is_any || is_semi);
return (all_join || special_left) && oneDisjunct();
}
bool TableJoin::needStreamWithNonJoinedRows() const bool TableJoin::needStreamWithNonJoinedRows() const
{ {
if (strictness() == ASTTableJoin::Strictness::Asof || if (strictness() == ASTTableJoin::Strictness::Asof ||
@ -511,7 +499,7 @@ TableJoin::createConvertingActions(
const ColumnsWithTypeAndName & left_sample_columns, const ColumnsWithTypeAndName & left_sample_columns,
const ColumnsWithTypeAndName & right_sample_columns) const ColumnsWithTypeAndName & right_sample_columns)
{ {
inferJoinKeyCommonType(left_sample_columns, right_sample_columns, !isSpecialStorage(), forceFullSortingMergeJoin()); inferJoinKeyCommonType(left_sample_columns, right_sample_columns, !isSpecialStorage(), isEnabledAlgorithm(JoinAlgorithm::FULL_SORTING_MERGE));
NameToNameMap left_key_column_rename; NameToNameMap left_key_column_rename;
NameToNameMap right_key_column_rename; NameToNameMap right_key_column_rename;

View File

@ -193,24 +193,20 @@ public:
bool sameStrictnessAndKind(ASTTableJoin::Strictness, ASTTableJoin::Kind) const; bool sameStrictnessAndKind(ASTTableJoin::Strictness, ASTTableJoin::Kind) const;
const SizeLimits & sizeLimits() const { return size_limits; } const SizeLimits & sizeLimits() const { return size_limits; }
VolumePtr getTemporaryVolume() { return tmp_volume; } VolumePtr getTemporaryVolume() { return tmp_volume; }
bool allowMergeJoin() const;
bool isAllowedAlgorithm(JoinAlgorithm val) const { return join_algorithm.isSet(val) || join_algorithm.isSet(JoinAlgorithm::AUTO); } bool isEnabledAlgorithm(JoinAlgorithm val) const
bool isForcedAlgorithm(JoinAlgorithm val) const { return join_algorithm == MultiEnum<JoinAlgorithm>(val); } {
/// When join_algorithm = 'default' (not specified by user) we use hash or direct algorithm.
bool preferMergeJoin() const { return join_algorithm == MultiEnum<JoinAlgorithm>(JoinAlgorithm::PREFER_PARTIAL_MERGE); } /// It's behaviour that was initially supported by clickhouse.
bool forceMergeJoin() const { return join_algorithm == MultiEnum<JoinAlgorithm>(JoinAlgorithm::PARTIAL_MERGE); } bool is_enbaled_by_default = val == JoinAlgorithm::DEFAULT
|| val == JoinAlgorithm::HASH
|| val == JoinAlgorithm::DIRECT;
if (join_algorithm.isSet(JoinAlgorithm::DEFAULT) && is_enbaled_by_default)
return true;
return join_algorithm.isSet(val);
}
bool allowParallelHashJoin() const; bool allowParallelHashJoin() const;
bool forceFullSortingMergeJoin() const { return !isSpecialStorage() && join_algorithm.isSet(JoinAlgorithm::FULL_SORTING_MERGE); }
bool forceHashJoin() const
{
/// HashJoin always used for DictJoin
return dictionary_reader
|| join_algorithm == MultiEnum<JoinAlgorithm>(JoinAlgorithm::HASH)
|| join_algorithm == MultiEnum<JoinAlgorithm>(JoinAlgorithm::PARALLEL_HASH);
}
bool joinUseNulls() const { return join_use_nulls; } bool joinUseNulls() const { return join_use_nulls; }
bool forceNullableRight() const { return join_use_nulls && isLeftOrFull(table_join.kind); } bool forceNullableRight() const { return join_use_nulls && isLeftOrFull(table_join.kind); }

View File

@ -4,6 +4,7 @@
#include <Core/NamesAndTypes.h> #include <Core/NamesAndTypes.h>
#include <Common/checkStackSize.h> #include <Common/checkStackSize.h>
#include <Core/SettingsEnums.h>
#include <Interpreters/TreeRewriter.h> #include <Interpreters/TreeRewriter.h>
#include <Interpreters/LogicalExpressionsOptimizer.h> #include <Interpreters/LogicalExpressionsOptimizer.h>
@ -683,7 +684,7 @@ bool tryJoinOnConst(TableJoin & analyzed_join, ASTPtr & on_expression, ContextPt
else else
return false; return false;
if (!analyzed_join.forceHashJoin()) if (!analyzed_join.isEnabledAlgorithm(JoinAlgorithm::HASH))
throw Exception(ErrorCodes::NOT_IMPLEMENTED, throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"JOIN ON constant ({}) supported only with join algorithm 'hash'", "JOIN ON constant ({}) supported only with join algorithm 'hash'",
queryToString(on_expression)); queryToString(on_expression));
@ -770,7 +771,7 @@ void collectJoinedColumns(TableJoin & analyzed_join, ASTTableJoin & table_join,
data.asofToJoinKeys(); data.asofToJoinKeys();
} }
if (!analyzed_join.oneDisjunct() && !analyzed_join.forceHashJoin()) if (!analyzed_join.oneDisjunct() && !analyzed_join.isEnabledAlgorithm(JoinAlgorithm::HASH))
throw DB::Exception(ErrorCodes::NOT_IMPLEMENTED, "Only `hash` join supports multiple ORs for keys in JOIN ON section"); throw DB::Exception(ErrorCodes::NOT_IMPLEMENTED, "Only `hash` join supports multiple ORs for keys in JOIN ON section");
} }
} }

View File

@ -90,8 +90,8 @@ void WindowFrame::toString(WriteBuffer & buf) const
void WindowFrame::checkValid() const void WindowFrame::checkValid() const
{ {
// Check the validity of offsets. // Check the validity of offsets.
if (type == WindowFrame::FrameType::Rows if (type == WindowFrame::FrameType::ROWS
|| type == WindowFrame::FrameType::Groups) || type == WindowFrame::FrameType::GROUPS)
{ {
if (begin_type == BoundaryType::Offset if (begin_type == BoundaryType::Offset
&& !((begin_offset.getType() == Field::Types::UInt64 && !((begin_offset.getType() == Field::Types::UInt64
@ -197,7 +197,7 @@ void WindowDescription::checkValid() const
frame.checkValid(); frame.checkValid();
// RANGE OFFSET requires exactly one ORDER BY column. // RANGE OFFSET requires exactly one ORDER BY column.
if (frame.type == WindowFrame::FrameType::Range if (frame.type == WindowFrame::FrameType::RANGE
&& (frame.begin_type == WindowFrame::BoundaryType::Offset && (frame.begin_type == WindowFrame::BoundaryType::Offset
|| frame.end_type == WindowFrame::BoundaryType::Offset) || frame.end_type == WindowFrame::BoundaryType::Offset)
&& order_by.size() != 1) && order_by.size() != 1)

View File

@ -28,7 +28,7 @@ struct WindowFunctionDescription
struct WindowFrame struct WindowFrame
{ {
enum class FrameType { Rows, Groups, Range }; enum class FrameType { ROWS, GROUPS, RANGE };
enum class BoundaryType { Unbounded, Current, Offset }; enum class BoundaryType { Unbounded, Current, Offset };
// This flag signifies that the frame properties were not set explicitly by // This flag signifies that the frame properties were not set explicitly by
@ -36,7 +36,7 @@ struct WindowFrame
// for the default frame of RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW. // for the default frame of RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW.
bool is_default = true; bool is_default = true;
FrameType type = FrameType::Range; FrameType type = FrameType::RANGE;
// UNBOUNDED FOLLOWING for the frame end is forbidden by the standard, but for // UNBOUNDED FOLLOWING for the frame end is forbidden by the standard, but for
// uniformity the begin_preceding still has to be set to true for UNBOUNDED // uniformity the begin_preceding still has to be set to true for UNBOUNDED

View File

@ -17,7 +17,7 @@ struct ASTWindowDefinition : public IAST
ASTPtr order_by; ASTPtr order_by;
bool frame_is_default = true; bool frame_is_default = true;
WindowFrame::FrameType frame_type = WindowFrame::FrameType::Range; WindowFrame::FrameType frame_type = WindowFrame::FrameType::RANGE;
WindowFrame::BoundaryType frame_begin_type = WindowFrame::BoundaryType::Unbounded; WindowFrame::BoundaryType frame_begin_type = WindowFrame::BoundaryType::Unbounded;
ASTPtr frame_begin_offset; ASTPtr frame_begin_offset;
bool frame_begin_preceding = true; bool frame_begin_preceding = true;

View File

@ -1198,15 +1198,15 @@ static bool tryParseFrameDefinition(ASTWindowDefinition * node, IParser::Pos & p
node->frame_is_default = false; node->frame_is_default = false;
if (keyword_rows.ignore(pos, expected)) if (keyword_rows.ignore(pos, expected))
{ {
node->frame_type = WindowFrame::FrameType::Rows; node->frame_type = WindowFrame::FrameType::ROWS;
} }
else if (keyword_groups.ignore(pos, expected)) else if (keyword_groups.ignore(pos, expected))
{ {
node->frame_type = WindowFrame::FrameType::Groups; node->frame_type = WindowFrame::FrameType::GROUPS;
} }
else if (keyword_range.ignore(pos, expected)) else if (keyword_range.ignore(pos, expected))
{ {
node->frame_type = WindowFrame::FrameType::Range; node->frame_type = WindowFrame::FrameType::RANGE;
} }
else else
{ {

View File

@ -299,7 +299,7 @@ NamesAndTypesList CapnProtoSchemaReader::readSchema()
auto schema_parser = CapnProtoSchemaParser(); auto schema_parser = CapnProtoSchemaParser();
auto schema = schema_parser.getMessageSchema(schema_info); auto schema = schema_parser.getMessageSchema(schema_info);
return capnProtoSchemaToCHSchema(schema); return capnProtoSchemaToCHSchema(schema, format_settings.capn_proto.skip_fields_with_unsupported_types_in_schema_inference);
} }
void registerInputFormatCapnProto(FormatFactory & factory) void registerInputFormatCapnProto(FormatFactory & factory)

View File

@ -58,13 +58,14 @@ ProtobufListSchemaReader::ProtobufListSchemaReader(const FormatSettings & format
true, true,
format_settings.schema.is_server, format_settings.schema.is_server,
format_settings.schema.format_schema_path) format_settings.schema.format_schema_path)
, skip_unsopported_fields(format_settings.protobuf.skip_fields_with_unsupported_types_in_schema_inference)
{ {
} }
NamesAndTypesList ProtobufListSchemaReader::readSchema() NamesAndTypesList ProtobufListSchemaReader::readSchema()
{ {
const auto * message_descriptor = ProtobufSchemas::instance().getMessageTypeForFormatSchema(schema_info, ProtobufSchemas::WithEnvelope::Yes); const auto * message_descriptor = ProtobufSchemas::instance().getMessageTypeForFormatSchema(schema_info, ProtobufSchemas::WithEnvelope::Yes);
return protobufSchemaToCHSchema(message_descriptor); return protobufSchemaToCHSchema(message_descriptor, skip_unsopported_fields);
} }
void registerInputFormatProtobufList(FormatFactory & factory) void registerInputFormatProtobufList(FormatFactory & factory)

View File

@ -50,6 +50,7 @@ public:
private: private:
const FormatSchemaInfo schema_info; const FormatSchemaInfo schema_info;
bool skip_unsopported_fields;
}; };
} }

View File

@ -78,15 +78,15 @@ ProtobufSchemaReader::ProtobufSchemaReader(const FormatSettings & format_setting
format_settings.schema.format_schema, format_settings.schema.format_schema,
"Protobuf", "Protobuf",
true, true,
format_settings.schema.is_server, format_settings.schema.is_server, format_settings.schema.format_schema_path)
format_settings.schema.format_schema_path) , skip_unsupported_fields(format_settings.protobuf.skip_fields_with_unsupported_types_in_schema_inference)
{ {
} }
NamesAndTypesList ProtobufSchemaReader::readSchema() NamesAndTypesList ProtobufSchemaReader::readSchema()
{ {
const auto * message_descriptor = ProtobufSchemas::instance().getMessageTypeForFormatSchema(schema_info, ProtobufSchemas::WithEnvelope::No); const auto * message_descriptor = ProtobufSchemas::instance().getMessageTypeForFormatSchema(schema_info, ProtobufSchemas::WithEnvelope::No);
return protobufSchemaToCHSchema(message_descriptor); return protobufSchemaToCHSchema(message_descriptor, skip_unsupported_fields);
} }
void registerProtobufSchemaReader(FormatFactory & factory) void registerProtobufSchemaReader(FormatFactory & factory)

View File

@ -57,6 +57,7 @@ public:
private: private:
const FormatSchemaInfo schema_info; const FormatSchemaInfo schema_info;
bool skip_unsupported_fields;
}; };
} }

View File

@ -263,7 +263,7 @@ WindowTransform::WindowTransform(const Block & input_header_,
// Choose a row comparison function for RANGE OFFSET frame based on the // Choose a row comparison function for RANGE OFFSET frame based on the
// type of the ORDER BY column. // type of the ORDER BY column.
if (window_description.frame.type == WindowFrame::FrameType::Range if (window_description.frame.type == WindowFrame::FrameType::RANGE
&& (window_description.frame.begin_type && (window_description.frame.begin_type
== WindowFrame::BoundaryType::Offset == WindowFrame::BoundaryType::Offset
|| window_description.frame.end_type || window_description.frame.end_type
@ -612,10 +612,10 @@ void WindowTransform::advanceFrameStart()
case WindowFrame::BoundaryType::Offset: case WindowFrame::BoundaryType::Offset:
switch (window_description.frame.type) switch (window_description.frame.type)
{ {
case WindowFrame::FrameType::Rows: case WindowFrame::FrameType::ROWS:
advanceFrameStartRowsOffset(); advanceFrameStartRowsOffset();
break; break;
case WindowFrame::FrameType::Range: case WindowFrame::FrameType::RANGE:
advanceFrameStartRangeOffset(); advanceFrameStartRangeOffset();
break; break;
default: default:
@ -659,14 +659,14 @@ bool WindowTransform::arePeers(const RowNumber & x, const RowNumber & y) const
return true; return true;
} }
if (window_description.frame.type == WindowFrame::FrameType::Rows) if (window_description.frame.type == WindowFrame::FrameType::ROWS)
{ {
// For ROWS frame, row is only peers with itself (checked above); // For ROWS frame, row is only peers with itself (checked above);
return false; return false;
} }
// For RANGE and GROUPS frames, rows that compare equal w/ORDER BY are peers. // For RANGE and GROUPS frames, rows that compare equal w/ORDER BY are peers.
assert(window_description.frame.type == WindowFrame::FrameType::Range); assert(window_description.frame.type == WindowFrame::FrameType::RANGE);
const size_t n = order_by_indices.size(); const size_t n = order_by_indices.size();
if (n == 0) if (n == 0)
{ {
@ -844,10 +844,10 @@ void WindowTransform::advanceFrameEnd()
case WindowFrame::BoundaryType::Offset: case WindowFrame::BoundaryType::Offset:
switch (window_description.frame.type) switch (window_description.frame.type)
{ {
case WindowFrame::FrameType::Rows: case WindowFrame::FrameType::ROWS:
advanceFrameEndRowsOffset(); advanceFrameEndRowsOffset();
break; break;
case WindowFrame::FrameType::Range: case WindowFrame::FrameType::RANGE:
advanceFrameEndRangeOffset(); advanceFrameEndRangeOffset();
break; break;
default: default:

View File

@ -19,6 +19,19 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
} }
MergeFromLogEntryTask::MergeFromLogEntryTask(
ReplicatedMergeTreeQueue::SelectedEntryPtr selected_entry_,
StorageReplicatedMergeTree & storage_,
IExecutableTask::TaskResultCallback & task_result_callback_)
: ReplicatedMergeMutateTaskBase(
&Poco::Logger::get(
storage_.getStorageID().getShortName() + "::" + selected_entry_->log_entry->new_part_name + " (MergeFromLogEntryTask)"),
storage_,
selected_entry_,
task_result_callback_)
{
}
ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare() ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
{ {

View File

@ -17,9 +17,10 @@ namespace DB
class MergeFromLogEntryTask : public ReplicatedMergeMutateTaskBase class MergeFromLogEntryTask : public ReplicatedMergeMutateTaskBase
{ {
public: public:
template <class Callback> MergeFromLogEntryTask(
MergeFromLogEntryTask(ReplicatedMergeTreeQueue::SelectedEntryPtr selected_entry_, StorageReplicatedMergeTree & storage_, Callback && task_result_callback_) ReplicatedMergeTreeQueue::SelectedEntryPtr selected_entry_,
: ReplicatedMergeMutateTaskBase(&Poco::Logger::get("MergeFromLogEntryTask"), storage_, selected_entry_, task_result_callback_) {} StorageReplicatedMergeTree & storage_,
IExecutableTask::TaskResultCallback & task_result_callback_);
UInt64 getPriority() override { return priority; } UInt64 getPriority() override { return priority; }

View File

@ -60,6 +60,7 @@ MergeListElement::MergeListElement(
, thread_id{getThreadId()} , thread_id{getThreadId()}
, merge_type{future_part->merge_type} , merge_type{future_part->merge_type}
, merge_algorithm{MergeAlgorithm::Undecided} , merge_algorithm{MergeAlgorithm::Undecided}
, description{"to apply mutate/merge in " + query_id}
{ {
for (const auto & source_part : future_part->parts) for (const auto & source_part : future_part->parts)
{ {
@ -77,7 +78,7 @@ MergeListElement::MergeListElement(
is_mutation = (result_part_info.getDataVersion() != source_data_version); is_mutation = (result_part_info.getDataVersion() != source_data_version);
} }
memory_tracker.setDescription("Mutate/Merge"); memory_tracker.setDescription(description.c_str());
/// MemoryTracker settings should be set here, because /// MemoryTracker settings should be set here, because
/// later (see MemoryTrackerThreadSwitcher) /// later (see MemoryTrackerThreadSwitcher)
/// parent memory tracker will be changed, and if merge executed from the /// parent memory tracker will be changed, and if merge executed from the

View File

@ -113,7 +113,6 @@ struct MergeListElement : boost::noncopyable
/// Updated only for Vertical algorithm /// Updated only for Vertical algorithm
std::atomic<UInt64> columns_written{}; std::atomic<UInt64> columns_written{};
MemoryTracker memory_tracker{VariableContext::Process};
/// Used to adjust ThreadStatus::untracked_memory_limit /// Used to adjust ThreadStatus::untracked_memory_limit
UInt64 max_untracked_memory; UInt64 max_untracked_memory;
/// Used to avoid losing any allocation context /// Used to avoid losing any allocation context
@ -126,6 +125,11 @@ struct MergeListElement : boost::noncopyable
/// Detected after merge already started /// Detected after merge already started
std::atomic<MergeAlgorithm> merge_algorithm; std::atomic<MergeAlgorithm> merge_algorithm;
/// Description used for logging
/// Needs to outlive memory_tracker since it's used in its destructor
const String description{"Mutate/Merge"};
MemoryTracker memory_tracker{VariableContext::Process};
MergeListElement( MergeListElement(
const StorageID & table_id_, const StorageID & table_id_,
FutureMergedMutatedPartPtr future_part, FutureMergedMutatedPartPtr future_part,

View File

@ -14,7 +14,6 @@ class StorageMergeTree;
class MergePlainMergeTreeTask : public IExecutableTask class MergePlainMergeTreeTask : public IExecutableTask
{ {
public: public:
template <class Callback>
MergePlainMergeTreeTask( MergePlainMergeTreeTask(
StorageMergeTree & storage_, StorageMergeTree & storage_,
StorageMetadataPtr metadata_snapshot_, StorageMetadataPtr metadata_snapshot_,
@ -22,14 +21,14 @@ public:
Names deduplicate_by_columns_, Names deduplicate_by_columns_,
MergeMutateSelectedEntryPtr merge_mutate_entry_, MergeMutateSelectedEntryPtr merge_mutate_entry_,
TableLockHolder table_lock_holder_, TableLockHolder table_lock_holder_,
Callback && task_result_callback_) IExecutableTask::TaskResultCallback & task_result_callback_)
: storage(storage_) : storage(storage_)
, metadata_snapshot(std::move(metadata_snapshot_)) , metadata_snapshot(std::move(metadata_snapshot_))
, deduplicate(deduplicate_) , deduplicate(deduplicate_)
, deduplicate_by_columns(std::move(deduplicate_by_columns_)) , deduplicate_by_columns(std::move(deduplicate_by_columns_))
, merge_mutate_entry(std::move(merge_mutate_entry_)) , merge_mutate_entry(std::move(merge_mutate_entry_))
, table_lock_holder(std::move(table_lock_holder_)) , table_lock_holder(std::move(table_lock_holder_))
, task_result_callback(std::forward<Callback>(task_result_callback_)) , task_result_callback(task_result_callback_)
{ {
for (auto & item : merge_mutate_entry->future_part->parts) for (auto & item : merge_mutate_entry->future_part->parts)
priority += item->getBytesOnDisk(); priority += item->getBytesOnDisk();

View File

@ -1092,8 +1092,8 @@ protected:
/// Strongly connected with two fields above. /// Strongly connected with two fields above.
/// Every task that is finished will ask to assign a new one into an executor. /// Every task that is finished will ask to assign a new one into an executor.
/// These callbacks will be passed to the constructor of each task. /// These callbacks will be passed to the constructor of each task.
std::function<void(bool)> common_assignee_trigger; IExecutableTask::TaskResultCallback common_assignee_trigger;
std::function<void(bool)> moves_assignee_trigger; IExecutableTask::TaskResultCallback moves_assignee_trigger;
using DataPartIteratorByInfo = DataPartsIndexes::index<TagByInfo>::type::iterator; using DataPartIteratorByInfo = DataPartsIndexes::index<TagByInfo>::type::iterator;
using DataPartIteratorByStateAndInfo = DataPartsIndexes::index<TagByStateAndInfo>::type::iterator; using DataPartIteratorByStateAndInfo = DataPartsIndexes::index<TagByStateAndInfo>::type::iterator;

View File

@ -22,18 +22,17 @@ class StorageMergeTree;
class MutatePlainMergeTreeTask : public IExecutableTask class MutatePlainMergeTreeTask : public IExecutableTask
{ {
public: public:
template <class Callback>
MutatePlainMergeTreeTask( MutatePlainMergeTreeTask(
StorageMergeTree & storage_, StorageMergeTree & storage_,
StorageMetadataPtr metadata_snapshot_, StorageMetadataPtr metadata_snapshot_,
MergeMutateSelectedEntryPtr merge_mutate_entry_, MergeMutateSelectedEntryPtr merge_mutate_entry_,
TableLockHolder table_lock_holder_, TableLockHolder table_lock_holder_,
Callback && task_result_callback_) IExecutableTask::TaskResultCallback & task_result_callback_)
: storage(storage_) : storage(storage_)
, metadata_snapshot(std::move(metadata_snapshot_)) , metadata_snapshot(std::move(metadata_snapshot_))
, merge_mutate_entry(std::move(merge_mutate_entry_)) , merge_mutate_entry(std::move(merge_mutate_entry_))
, table_lock_holder(std::move(table_lock_holder_)) , table_lock_holder(std::move(table_lock_holder_))
, task_result_callback(std::forward<Callback>(task_result_callback_)) , task_result_callback(task_result_callback_)
{ {
for (auto & part : merge_mutate_entry->future_part->parts) for (auto & part : merge_mutate_entry->future_part->parts)
priority += part->getBytesOnDisk(); priority += part->getBytesOnDisk();

View File

@ -16,19 +16,20 @@ class StorageReplicatedMergeTree;
class ReplicatedMergeMutateTaskBase : public IExecutableTask class ReplicatedMergeMutateTaskBase : public IExecutableTask
{ {
public: public:
template <class Callback>
ReplicatedMergeMutateTaskBase( ReplicatedMergeMutateTaskBase(
Poco::Logger * log_, Poco::Logger * log_,
StorageReplicatedMergeTree & storage_, StorageReplicatedMergeTree & storage_,
ReplicatedMergeTreeQueue::SelectedEntryPtr & selected_entry_, ReplicatedMergeTreeQueue::SelectedEntryPtr & selected_entry_,
Callback && task_result_callback_) IExecutableTask::TaskResultCallback & task_result_callback_)
: selected_entry(selected_entry_) : selected_entry(selected_entry_)
, entry(*selected_entry->log_entry) , entry(*selected_entry->log_entry)
, log(log_) , log(log_)
, storage(storage_) , storage(storage_)
/// This is needed to ask an asssignee to assign a new merge/mutate operation /// This is needed to ask an asssignee to assign a new merge/mutate operation
/// It takes bool argument and true means that current task is successfully executed. /// It takes bool argument and true means that current task is successfully executed.
, task_result_callback(task_result_callback_) {} , task_result_callback(task_result_callback_)
{
}
~ReplicatedMergeMutateTaskBase() override = default; ~ReplicatedMergeMutateTaskBase() override = default;
void onCompleted() override; void onCompleted() override;

View File

@ -938,8 +938,9 @@ bool StorageMergeTree::merge(
return false; return false;
/// Copying a vector of columns `deduplicate bu columns. /// Copying a vector of columns `deduplicate bu columns.
IExecutableTask::TaskResultCallback f = [](bool) {};
auto task = std::make_shared<MergePlainMergeTreeTask>( auto task = std::make_shared<MergePlainMergeTreeTask>(
*this, metadata_snapshot, deduplicate, deduplicate_by_columns, merge_mutate_entry, table_lock_holder, [](bool){}); *this, metadata_snapshot, deduplicate, deduplicate_by_columns, merge_mutate_entry, table_lock_holder, f);
task->setCurrentTransaction(MergeTreeTransactionHolder{}, MergeTreeTransactionPtr{txn}); task->setCurrentTransaction(MergeTreeTransactionHolder{}, MergeTreeTransactionPtr{txn});

View File

@ -5,7 +5,7 @@ from helpers.cluster import ClickHouseCluster
import time import time
from kazoo.client import KazooClient from kazoo.client import KazooClient, KazooRetry
CLUSTER_SIZE = 5 CLUSTER_SIZE = 5
QUORUM_SIZE = CLUSTER_SIZE // 2 + 1 QUORUM_SIZE = CLUSTER_SIZE // 2 + 1
@ -52,8 +52,11 @@ def started_cluster():
def get_fake_zk(nodename, timeout=30.0): def get_fake_zk(nodename, timeout=30.0):
_fake_zk_instance = KazooClient( _fake_zk_instance = KazooClient(
hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout hosts=cluster.get_instance_ip(nodename) + ":9181",
timeout=timeout,
command_retry=KazooRetry(max_tries=10),
) )
_fake_zk_instance.start() _fake_zk_instance.start()
return _fake_zk_instance return _fake_zk_instance
@ -117,7 +120,7 @@ def test_cluster_recovery(started_cluster):
data_in_cluster = [] data_in_cluster = []
def add_data(zk, path, data): def add_data(zk, path, data):
zk.create(path, data.encode()) zk.retry(zk.create, path, data.encode())
data_in_cluster.append((path, data)) data_in_cluster.append((path, data))
def assert_all_data(zk): def assert_all_data(zk):

View File

@ -1,4 +1,12 @@
import pytest import pytest
# FIXME Tests with MaterializedPostgresSQL are temporarily disabled
# https://github.com/ClickHouse/ClickHouse/issues/36898
# https://github.com/ClickHouse/ClickHouse/issues/38677
# https://github.com/ClickHouse/ClickHouse/pull/39272#issuecomment-1190087190
pytestmark = pytest.mark.skip
import time import time
import os.path as p import os.path as p
import random import random

View File

@ -1,4 +1,12 @@
import pytest import pytest
# FIXME Tests with MaterializedPostgresSQL are temporarily disabled
# https://github.com/ClickHouse/ClickHouse/issues/36898
# https://github.com/ClickHouse/ClickHouse/issues/38677
# https://github.com/ClickHouse/ClickHouse/pull/39272#issuecomment-1190087190
pytestmark = pytest.mark.skip
import time import time
import psycopg2 import psycopg2
import os.path as p import os.path as p

View File

@ -1,4 +1,12 @@
import pytest import pytest
# FIXME Tests with MaterializedPostgresSQL are temporarily disabled
# https://github.com/ClickHouse/ClickHouse/issues/36898
# https://github.com/ClickHouse/ClickHouse/issues/38677
# https://github.com/ClickHouse/ClickHouse/pull/39272#issuecomment-1190087190
pytestmark = pytest.mark.skip
import time import time
import psycopg2 import psycopg2
import os.path as p import os.path as p

View File

@ -46,7 +46,6 @@ age String
isOnline Enum8(\'offline\' = 0, \'online\' = 1) isOnline Enum8(\'offline\' = 0, \'online\' = 1)
someRatio Float64 someRatio Float64
visitTime UInt64 visitTime UInt64
newMessage Tuple(empty Array(Tuple()), z Float32)
randomBigNumber Int64 randomBigNumber Int64
newFieldInt Array(Int32) newFieldInt Array(Int32)
color Array(Float32) color Array(Float32)

View File

@ -3,3 +3,5 @@
['1','2,3'] ['1','2,3']
['1','2','3'] ['1','2','3']
['1','2','3'] ['1','2','3']
['expr1','1+1=2']
['expr2','2+2=4=1+3']

View File

@ -6,3 +6,5 @@ select splitByChar(',', '1,2,3', 3);
select splitByChar(',', '1,2,3', -2); -- { serverError 44 } select splitByChar(',', '1,2,3', -2); -- { serverError 44 }
select splitByChar(',', '1,2,3', ''); -- { serverError 43 } select splitByChar(',', '1,2,3', ''); -- { serverError 43 }
SELECT splitByChar('=', s, 1) FROM values('s String', 'expr1=1+1=2', 'expr2=2+2=4=1+3')

View File

@ -47,16 +47,16 @@ SELECT '--- totals';
SELECT rdb.key % 2, sum(k), max(value2) FROM t2 INNER JOIN rdb ON rdb.key == t2.k GROUP BY (rdb.key % 2) WITH TOTALS; SELECT rdb.key % 2, sum(k), max(value2) FROM t2 INNER JOIN rdb ON rdb.key == t2.k GROUP BY (rdb.key % 2) WITH TOTALS;
SELECT '---'; SELECT '---';
SELECT * FROM t1 RIGHT JOIN rdb ON rdb.key == t1.k; -- { serverError UNSUPPORTED_METHOD } SELECT * FROM t1 RIGHT JOIN rdb ON rdb.key == t1.k; -- { serverError NOT_IMPLEMENTED }
SELECT * FROM t1 RIGHT JOIN rdb ON rdb.key == t1.k FORMAT Null SETTINGS join_algorithm = 'direct,hash'; SELECT * FROM t1 RIGHT JOIN rdb ON rdb.key == t1.k FORMAT Null SETTINGS join_algorithm = 'direct,hash';
SELECT * FROM t1 FULL JOIN rdb ON rdb.key == t1.k; -- { serverError UNSUPPORTED_METHOD } SELECT * FROM t1 FULL JOIN rdb ON rdb.key == t1.k; -- { serverError NOT_IMPLEMENTED }
SELECT * FROM t1 FULL JOIN rdb ON rdb.key == t1.k FORMAT Null SETTINGS join_algorithm = 'direct,hash'; SELECT * FROM t1 FULL JOIN rdb ON rdb.key == t1.k FORMAT Null SETTINGS join_algorithm = 'direct,hash';
SELECT * FROM t1 INNER JOIN rdb ON rdb.key + 1 == t1.k; -- { serverError UNSUPPORTED_METHOD } SELECT * FROM t1 INNER JOIN rdb ON rdb.key + 1 == t1.k; -- { serverError NOT_IMPLEMENTED }
SELECT * FROM t1 INNER JOIN rdb ON rdb.key + 1 == t1.k FORMAT Null SETTINGS join_algorithm = 'direct,hash'; SELECT * FROM t1 INNER JOIN rdb ON rdb.key + 1 == t1.k FORMAT Null SETTINGS join_algorithm = 'direct,hash';
SELECT * FROM t1 INNER JOIN (SELECT * FROM rdb) AS rdb ON rdb.key == t1.k; -- { serverError UNSUPPORTED_METHOD } SELECT * FROM t1 INNER JOIN (SELECT * FROM rdb) AS rdb ON rdb.key == t1.k; -- { serverError NOT_IMPLEMENTED }
SELECT * FROM t1 INNER JOIN (SELECT * FROM rdb) AS rdb ON rdb.key == t1.k FORMAT Null SETTINGS join_algorithm = 'direct,hash'; SELECT * FROM t1 INNER JOIN (SELECT * FROM rdb) AS rdb ON rdb.key == t1.k FORMAT Null SETTINGS join_algorithm = 'direct,hash';
DROP TABLE IF EXISTS rdb; DROP TABLE IF EXISTS rdb;

View File

@ -0,0 +1,8 @@
OK
OK
OK
OK
str String
text String
str String
text String

View File

@ -0,0 +1,37 @@
#!/usr/bin/env bash
# Tags: no-fasttest, no-parallel, no-replicated-database
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
USER_FILES_PATH=$(clickhouse-client --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}')
touch $USER_FILES_PATH/data.capnp
SCHEMADIR=$(clickhouse-client --query "select * from file('data.capnp', 'CapnProto', 'val1 char') settings format_schema='nonexist:Message'" 2>&1 | grep Exception | grep -oP "file \K.*(?=/nonexist.capnp)")
CLIENT_SCHEMADIR=$CURDIR/format_schemas
SERVER_SCHEMADIR=test_02327
mkdir -p $SCHEMADIR/$SERVER_SCHEMADIR
cp -r $CLIENT_SCHEMADIR/02327_* $SCHEMADIR/$SERVER_SCHEMADIR/
$CLICKHOUSE_CLIENT --query="desc file(data.pb) settings format_schema='$SERVER_SCHEMADIR/02327_schema:MessageWithEmpty'" 2>&1 | grep -F -q 'CANNOT_EXTRACT_TABLE_STRUCTURE' && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT --query="desc file(data.capnp) settings format_schema='$SERVER_SCHEMADIR/02327_schema:MessageWithEmpty'" 2>&1 | grep -F -q 'CANNOT_EXTRACT_TABLE_STRUCTURE' && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT --query="create table test_protobuf engine=File(Protobuf) settings format_schema='$SERVER_SCHEMADIR/02327_schema:MessageWithEmpty'" 2>&1 | grep -F -q 'CANNOT_EXTRACT_TABLE_STRUCTURE' && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT --query="create table test_capnp engine=File(CapnProto) settings format_schema='$SERVER_SCHEMADIR/02327_schema:MessageWithEmpty'" 2>&1 | grep -F -q 'CANNOT_EXTRACT_TABLE_STRUCTURE' && echo 'OK' || echo 'FAIL';
$CLICKHOUSE_CLIENT --query="desc file(data.pb) settings format_schema='$SERVER_SCHEMADIR/02327_schema:MessageWithEmpty', input_format_protobuf_skip_fields_with_unsupported_types_in_schema_inference=1";
$CLICKHOUSE_CLIENT --query="desc file(data.capnp) settings format_schema='$SERVER_SCHEMADIR/02327_schema:MessageWithEmpty', input_format_capn_proto_skip_fields_with_unsupported_types_in_schema_inference=1";
$CLICKHOUSE_CLIENT --query="drop table if exists test_protobuf";
$CLICKHOUSE_CLIENT --query="create table test_protobuf engine=File(Protobuf) settings format_schema='$SERVER_SCHEMADIR/02327_schema:MessageWithEmpty', input_format_protobuf_skip_fields_with_unsupported_types_in_schema_inference=1";
$CLICKHOUSE_CLIENT --query="desc test_protobuf";
$CLICKHOUSE_CLIENT --query="drop table test_protobuf";
$CLICKHOUSE_CLIENT --query="drop table if exists test_capnp";
$CLICKHOUSE_CLIENT --query="create table test_capnp engine=File(CapnProto) settings format_schema='$SERVER_SCHEMADIR/02327_schema:MessageWithEmpty', input_format_capn_proto_skip_fields_with_unsupported_types_in_schema_inference=1";
$CLICKHOUSE_CLIENT --query="desc test_capnp";
$CLICKHOUSE_CLIENT --query="drop table test_capnp";
rm -rf ${SCHEMADIR:?}/${SERVER_SCHEMADIR:?}

View File

@ -0,0 +1,17 @@
DROP TABLE IF EXISTS t1;
CREATE TABLE t1 (c1 Int32, c2 Int32) ENGINE MergeTree ORDER BY c1;
INSERT INTO t1 (c1, c2) VALUES (1, 10), (1, 20), (1, 30);
DROP TABLE IF EXISTS t2;
CREATE TABLE t2 (c1 Int32, c2 Int32, c3 String) ENGINE MergeTree ORDER BY (c1, c2, c3);
INSERT INTO t2 (c1, c2, c3) VALUES (1, 5, 'a'), (1, 15, 'b'), (1, 25, 'c');
SET enable_optimize_predicate_expression = 1;
WITH
v1 AS (SELECT t1.c2, t2.c2, t2.c3 FROM t1 ASOF JOIN t2 USING (c1, c2))
SELECT count() FROM v1 WHERE c3 = 'b';
SET enable_optimize_predicate_expression = 0;
WITH
v1 AS (SELECT t1.c2, t2.c2, t2.c3 FROM t1 ASOF JOIN t2 USING (c1, c2))
SELECT count() FROM v1 WHERE c3 = 'b';

View File

@ -0,0 +1,25 @@
#!/usr/bin/env bash
# Tags: no-fasttest, no-parallel
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
for i in $(seq 1 10);
do
$CLICKHOUSE_CLIENT -q "drop table if exists t_avro_$i"
$CLICKHOUSE_CLIENT -q "create table t_avro_$i (x UInt32, s String) engine=File(Avro)"
done
for i in $(seq 1 10);
do
$CLICKHOUSE_CLIENT -q "insert into t_avro_$i select number, 'str' from numbers(1000) settings engine_file_truncate_on_insert=1" > /dev/null &
done
sleep 5
for i in $(seq 1 10);
do
$CLICKHOUSE_CLIENT-q "drop table t_avro_$i"
done

View File

@ -0,0 +1,13 @@
#!/usr/bin/env bash
# Tags: no-fasttest, no-parallel
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
USER_FILES_PATH=$(clickhouse-client --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}')
cp $CURDIR/data_avro/corrupted.avro $USER_FILES_PATH/
$CLICKHOUSE_CLIENT -q "select * from file(corrupted.avro)" 2>&1 | grep -F -q "Cannot read compressed data" && echo "OK" || echo "FAIL"

Binary file not shown.

View File

@ -75,11 +75,6 @@ message AltPerson {
male = 0; male = 0;
female = 1; female = 1;
}; };
message Dummy {
message Empty {};
repeated Empty empty = 1;
float z = 2;
};
repeated int32 location = 101 [packed=false]; repeated int32 location = 101 [packed=false];
float pi = 103; float pi = 103;
bytes uuid = 300; bytes uuid = 300;
@ -92,7 +87,6 @@ message AltPerson {
OnlineStatus isOnline = 1; OnlineStatus isOnline = 1;
double someRatio = 100; double someRatio = 100;
fixed64 visitTime = 15; fixed64 visitTime = 15;
Dummy newMessage = 1000;
sfixed64 randomBigNumber = 140; sfixed64 randomBigNumber = 140;
repeated int32 newFieldInt = 104; repeated int32 newFieldInt = 104;
repeated float color = 14; repeated float color = 14;

View File

@ -0,0 +1,11 @@
@0x9ef128e10a8010b8;
struct Empty
{
}
struct MessageWithEmpty
{
tuple1 @0 : Empty;
text @1 : Text;
}

View File

@ -0,0 +1,9 @@
syntax = "proto3";
message Empty {
}
message MessageWithEmpty {
Empty empty = 1;
string str = 2;
};

View File

@ -0,0 +1,3 @@
403499
1000 320 171 23
2500 569 354 13

View File

@ -0,0 +1,18 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT --max_threads 1 --query="SELECT URL, Title, SearchPhrase FROM test.hits LIMIT 1000" > "${CLICKHOUSE_TMP}"/data.tsv
$CLICKHOUSE_OBFUSCATOR --structure "URL String, Title String, SearchPhrase String" --input-format TSV --output-format TSV --seed hello --limit 0 --save "${CLICKHOUSE_TMP}"/model.bin < "${CLICKHOUSE_TMP}"/data.tsv 2>/dev/null
wc -c < "${CLICKHOUSE_TMP}"/model.bin
$CLICKHOUSE_OBFUSCATOR --structure "URL String, Title String, SearchPhrase String" --input-format TSV --output-format TSV --seed hello --limit 2500 --load "${CLICKHOUSE_TMP}"/model.bin < "${CLICKHOUSE_TMP}"/data.tsv > "${CLICKHOUSE_TMP}"/data2500.tsv 2>/dev/null
rm "${CLICKHOUSE_TMP}"/model.bin
$CLICKHOUSE_LOCAL --structure "URL String, Title String, SearchPhrase String" --input-format TSV --output-format TSV --query "SELECT count(), uniq(URL), uniq(Title), uniq(SearchPhrase) FROM table" < "${CLICKHOUSE_TMP}"/data.tsv
$CLICKHOUSE_LOCAL --structure "URL String, Title String, SearchPhrase String" --input-format TSV --output-format TSV --query "SELECT count(), uniq(URL), uniq(Title), uniq(SearchPhrase) FROM table" < "${CLICKHOUSE_TMP}"/data2500.tsv
rm "${CLICKHOUSE_TMP}"/data.tsv
rm "${CLICKHOUSE_TMP}"/data2500.tsv

View File

@ -9,6 +9,10 @@ else()
endif() endif()
include(../cmake/limit_jobs.cmake) include(../cmake/limit_jobs.cmake)
if (ENABLE_CLICKHOUSE_SELF_EXTRACTING)
add_subdirectory (self-extracting-executable)
endif ()
# Utils used in package # Utils used in package
add_subdirectory (config-processor) add_subdirectory (config-processor)
add_subdirectory (report) add_subdirectory (report)
@ -32,7 +36,6 @@ if (NOT DEFINED ENABLE_UTILS OR ENABLE_UTILS)
add_subdirectory (check-mysql-binlog) add_subdirectory (check-mysql-binlog)
add_subdirectory (keeper-bench) add_subdirectory (keeper-bench)
add_subdirectory (graphite-rollup) add_subdirectory (graphite-rollup)
add_subdirectory (self-extracting-executable)
if (TARGET ch_contrib::nuraft) if (TARGET ch_contrib::nuraft)
add_subdirectory (keeper-data-dumper) add_subdirectory (keeper-data-dumper)

View File

@ -1,7 +1,9 @@
v22.7.1.2484-stable 2022-07-21 v22.7.1.2484-stable 2022-07-21
v22.6.4.35-stable 2022-07-25
v22.6.3.35-stable 2022-07-06 v22.6.3.35-stable 2022-07-06
v22.6.2.12-stable 2022-06-29 v22.6.2.12-stable 2022-06-29
v22.6.1.1985-stable 2022-06-16 v22.6.1.1985-stable 2022-06-16
v22.5.3.21-stable 2022-07-25
v22.5.2.53-stable 2022-07-07 v22.5.2.53-stable 2022-07-07
v22.5.1.2079-stable 2022-05-19 v22.5.1.2079-stable 2022-05-19
v22.4.6.53-stable 2022-07-07 v22.4.6.53-stable 2022-07-07
@ -9,6 +11,7 @@ v22.4.5.9-stable 2022-05-06
v22.4.4.7-stable 2022-04-29 v22.4.4.7-stable 2022-04-29
v22.4.3.3-stable 2022-04-26 v22.4.3.3-stable 2022-04-26
v22.4.2.1-stable 2022-04-22 v22.4.2.1-stable 2022-04-22
v22.3.9.19-lts 2022-07-25
v22.3.8.39-lts 2022-07-07 v22.3.8.39-lts 2022-07-07
v22.3.7.28-lts 2022-06-20 v22.3.7.28-lts 2022-06-20
v22.3.6.5-lts 2022-05-06 v22.3.6.5-lts 2022-05-06

1 v22.7.1.2484-stable 2022-07-21
2 v22.6.4.35-stable 2022-07-25
3 v22.6.3.35-stable 2022-07-06
4 v22.6.2.12-stable 2022-06-29
5 v22.6.1.1985-stable 2022-06-16
6 v22.5.3.21-stable 2022-07-25
7 v22.5.2.53-stable 2022-07-07
8 v22.5.1.2079-stable 2022-05-19
9 v22.4.6.53-stable 2022-07-07
11 v22.4.4.7-stable 2022-04-29
12 v22.4.3.3-stable 2022-04-26
13 v22.4.2.1-stable 2022-04-22
14 v22.3.9.19-lts 2022-07-25
15 v22.3.8.39-lts 2022-07-07
16 v22.3.7.28-lts 2022-06-20
17 v22.3.6.5-lts 2022-05-06

View File

@ -1,6 +1,8 @@
add_executable (pre_compressor compressor.cpp) add_executable (pre_compressor compressor.cpp)
target_link_libraries(pre_compressor PUBLIC ch_contrib::zstd) target_link_libraries(pre_compressor PUBLIC ch_contrib::zstd)
add_native_target (pre_compressor)
add_executable (decompressor decompressor.cpp) add_executable (decompressor decompressor.cpp)
target_link_libraries(decompressor PUBLIC ch_contrib::zstd) target_link_libraries(decompressor PUBLIC ch_contrib::zstd)

View File

@ -9,17 +9,17 @@
#include <cerrno> #include <cerrno>
#include <memory> #include <memory>
#include <iostream> #include <iostream>
#if defined OS_DARWIN
// dependencies
#include <machine/endian.h>
#include <libkern/OSByteOrder.h>
// define 64 bit macros
#define htole64(x) OSSwapHostToLittleInt64(x)
#if (defined(OS_DARWIN) || defined(OS_FREEBSD)) && defined(__GNUC__)
# include <machine/endian.h>
#else #else
#include <endian.h> # include <endian.h>
#endif
#if defined OS_DARWIN
# include <libkern/OSByteOrder.h>
// define 64 bit macros
# define htole64(x) OSSwapHostToLittleInt64(x)
#endif #endif
#include "types.h" #include "types.h"
@ -103,12 +103,14 @@ int compress(int in_fd, int out_fd, int level, off_t & pointer, const struct sta
if (ZSTD_isError(check_result)) if (ZSTD_isError(check_result))
{ {
std::cerr << "Error (ZSTD): " << check_result << " " << ZSTD_getErrorName(check_result) << std::endl; std::cerr << "Error (ZSTD): " << check_result << " " << ZSTD_getErrorName(check_result) << std::endl;
ZSTD_freeCCtx(cctx);
return 1; return 1;
} }
check_result = ZSTD_CCtx_setParameter(cctx, ZSTD_c_checksumFlag, 1); check_result = ZSTD_CCtx_setParameter(cctx, ZSTD_c_checksumFlag, 1);
if (ZSTD_isError(check_result)) if (ZSTD_isError(check_result))
{ {
std::cerr << "Error (ZSTD): " << check_result << " " << ZSTD_getErrorName(check_result) << std::endl; std::cerr << "Error (ZSTD): " << check_result << " " << ZSTD_getErrorName(check_result) << std::endl;
ZSTD_freeCCtx(cctx);
return 1; return 1;
} }
@ -129,11 +131,13 @@ int compress(int in_fd, int out_fd, int level, off_t & pointer, const struct sta
if (output == MAP_FAILED) if (output == MAP_FAILED)
{ {
perror(nullptr); perror(nullptr);
ZSTD_freeCCtx(cctx);
return 1; return 1;
} }
if (-1 == lseek(out_fd, 0, SEEK_END)) if (-1 == lseek(out_fd, 0, SEEK_END))
{ {
perror(nullptr); perror(nullptr);
ZSTD_freeCCtx(cctx);
return 1; return 1;
} }
@ -154,6 +158,7 @@ int compress(int in_fd, int out_fd, int level, off_t & pointer, const struct sta
perror(nullptr); perror(nullptr);
if (0 != munmap(output, 2 * max_block_size)) if (0 != munmap(output, 2 * max_block_size))
perror(nullptr); perror(nullptr);
ZSTD_freeCCtx(cctx);
return 1; return 1;
} }
@ -161,6 +166,7 @@ int compress(int in_fd, int out_fd, int level, off_t & pointer, const struct sta
if (current_block_size != write_data(out_fd, output, current_block_size)) if (current_block_size != write_data(out_fd, output, current_block_size))
{ {
perror(nullptr); perror(nullptr);
ZSTD_freeCCtx(cctx);
return 1; return 1;
} }
pointer += current_block_size; pointer += current_block_size;
@ -172,8 +178,11 @@ int compress(int in_fd, int out_fd, int level, off_t & pointer, const struct sta
0 != munmap(output, 2 * max_block_size)) 0 != munmap(output, 2 * max_block_size))
{ {
perror(nullptr); perror(nullptr);
ZSTD_freeCCtx(cctx);
return 1; return 1;
} }
ZSTD_freeCCtx(cctx);
return 0; return 0;
} }
@ -363,7 +372,14 @@ int copy_decompressor_self(const char *self, int output_fd)
return 1; return 1;
} }
int decompressor_size = atoi(size_str); char * end = nullptr;
int decompressor_size = strtol(size_str, &end, 10);
if (*end != 0)
{
std::cerr << "Error: unable to extract decompressor" << std::endl;
close(input_fd);
return 1;
}
if (-1 == lseek(input_fd, -(decompressor_size + 15), SEEK_END)) if (-1 == lseek(input_fd, -(decompressor_size + 15), SEEK_END))
{ {
@ -407,7 +423,7 @@ int copy_decompressor_file(const char *path, int output_fd)
inline void usage(FILE * out, const char * name) inline void usage(FILE * out, const char * name)
{ {
fprintf(out, (void)fprintf(out,
"%s [--level=<level>] [--decompressor=<path>] <output_file> <input_file> [... <input_file>]\n" "%s [--level=<level>] [--decompressor=<path>] <output_file> <input_file> [... <input_file>]\n"
"\t--level - compression level, max is %d, negative - prefer speed over compression\n" "\t--level - compression level, max is %d, negative - prefer speed over compression\n"
"\t default is 5\n" "\t default is 5\n"

View File

@ -1,9 +1,9 @@
#include <zstd.h> #include <zstd.h>
#include <sys/mman.h> #include <sys/mman.h>
#if defined OS_DARWIN #if defined(OS_DARWIN) || defined(OS_FREEBSD)
#include <sys/mount.h> # include <sys/mount.h>
#else #else
#include <sys/statfs.h> # include <sys/statfs.h>
#endif #endif
#include <fcntl.h> #include <fcntl.h>
#include <sys/wait.h> #include <sys/wait.h>
@ -12,17 +12,17 @@
#include <cstdio> #include <cstdio>
#include <cstring> #include <cstring>
#include <iostream> #include <iostream>
#if defined OS_DARWIN
// dependencies
#include <machine/endian.h>
#include <libkern/OSByteOrder.h>
// define 64 bit macros
#define le64toh(x) OSSwapLittleToHostInt64(x)
#if (defined(OS_DARWIN) || defined(OS_FREEBSD)) && defined(__GNUC__)
# include <machine/endian.h>
#else #else
#include <endian.h> # include <endian.h>
#endif
#if defined OS_DARWIN
# include <libkern/OSByteOrder.h>
// define 64 bit macros
# define le64toh(x) OSSwapLittleToHostInt64(x)
#endif #endif
#include "types.h" #include "types.h"
@ -151,6 +151,8 @@ int decompress(char * input, char * output, off_t start, off_t end, size_t max_n
--number_of_forks; --number_of_forks;
} }
ZSTD_freeDCtx(dctx);
/// If error happen end of processed part will not reach end /// If error happen end of processed part will not reach end
if (in_pointer < end || error_happened) if (in_pointer < end || error_happened)
return 1; return 1;