Merge branch 'master' into replicated_database_improvements

This commit is contained in:
Alexander Tokmakov 2022-07-06 19:54:53 +02:00
commit 388872550a
224 changed files with 6508 additions and 897 deletions

3
.gitmodules vendored
View File

@ -265,6 +265,9 @@
[submodule "contrib/hashidsxx"]
path = contrib/hashidsxx
url = https://github.com/schoentoon/hashidsxx.git
[submodule "contrib/nats-io"]
path = contrib/nats-io
url = https://github.com/ClickHouse/nats.c.git
[submodule "contrib/vectorscan"]
path = contrib/vectorscan
url = https://github.com/VectorCamp/vectorscan.git

View File

@ -223,11 +223,25 @@ if (NOT CMAKE_BUILD_TYPE_UC STREQUAL "RELEASE")
endif ()
endif()
if (CMAKE_BUILD_TYPE_UC STREQUAL "RELEASE"
OR CMAKE_BUILD_TYPE_UC STREQUAL "RELWITHDEBINFO"
OR CMAKE_BUILD_TYPE_UC STREQUAL "MINSIZEREL")
set (OMIT_HEAVY_DEBUG_SYMBOLS_DEFAULT ON)
else()
set (OMIT_HEAVY_DEBUG_SYMBOLS_DEFAULT OFF)
endif()
# Provides faster linking and lower binary size.
# Tradeoff is the inability to debug some source files with e.g. gdb
# (empty stack frames and no local variables)."
option(OMIT_HEAVY_DEBUG_SYMBOLS
"Do not generate debugger info for heavy modules (ClickHouse functions and dictionaries, some contrib)"
${OMIT_HEAVY_DEBUG_SYMBOLS_DEFAULT})
if (CMAKE_BUILD_TYPE_UC STREQUAL "DEBUG")
set(USE_DEBUG_HELPERS ON)
endif()
option(USE_DEBUG_HELPERS "Enable debug helpers" ${USE_DEBUG_HELPERS})
option(BUILD_STANDALONE_KEEPER "Build keeper as small standalone binary" OFF)
if (NOT BUILD_STANDALONE_KEEPER)
option(CREATE_KEEPER_SYMLINK "Create symlink for clickhouse-keeper to main server binary" ON)

View File

@ -62,9 +62,10 @@ execute_process(COMMAND uname -m OUTPUT_VARIABLE ARCH)
# By default, prefer clang on Linux
# But note, that you still may change the compiler with -DCMAKE_C_COMPILER/-DCMAKE_CXX_COMPILER.
if (OS MATCHES "Linux"
# some build systems may use CC/CXX env variables
AND "$ENV{CC}" STREQUAL ""
AND "$ENV{CXX}" STREQUAL "")
AND "$ENV{CXX}" STREQUAL ""
AND NOT DEFINED CMAKE_C_COMPILER
AND NOT DEFINED CMAKE_CXX_COMPILER)
find_program(CLANG_PATH clang)
if (CLANG_PATH)
set(CMAKE_C_COMPILER "clang" CACHE INTERNAL "")
@ -87,8 +88,7 @@ if (OS MATCHES "Linux"
set (CMAKE_TOOLCHAIN_FILE "cmake/linux/toolchain-aarch64.cmake" CACHE INTERNAL "")
elseif (ARCH MATCHES "^(ppc64le.*|PPC64LE.*)")
set (CMAKE_TOOLCHAIN_FILE "cmake/linux/toolchain-ppc64le.cmake" CACHE INTERNAL "")
else ()
else ()
message (FATAL_ERROR "Unsupported architecture: ${ARCH}")
endif ()
endif()

View File

@ -37,7 +37,7 @@ The following versions of ClickHouse server are currently being supported with s
We're extremely grateful for security researchers and users that report vulnerabilities to the ClickHouse Open Source Community. All reports are thoroughly investigated by developers.
To report a potential vulnerability in ClickHouse please send the details about it to [security@clickhouse.com](mailto:security@clickhouse.com).
To report a potential vulnerability in ClickHouse please send the details about it to [security@clickhouse.com](mailto:security@clickhouse.com). We do not offer any financial rewards for reporting issues to us using this method. Alternatively, you can also submit your findings through our public bug bounty program hosted by [Bugcrowd](https://bugcrowd.com/clickhouse) and be rewarded for it as per the program scope and rules of engagement.
### When Should I Report a Vulnerability?

View File

@ -134,6 +134,7 @@ add_contrib (krb5-cmake krb5)
add_contrib (cyrus-sasl-cmake cyrus-sasl) # for krb5
add_contrib (libgsasl-cmake libgsasl) # requires krb5
add_contrib (librdkafka-cmake librdkafka) # requires: libgsasl
add_contrib (nats-io-cmake nats-io)
add_contrib (libhdfs3-cmake libhdfs3) # requires: protobuf, krb5
add_contrib (hive-metastore-cmake hive-metastore) # requires: thrift/avro/arrow/libhdfs3
add_contrib (cppkafka-cmake cppkafka)

View File

@ -462,5 +462,7 @@ foreach (TOOL ${PARQUET_TOOLS})
endforeach ()
# The library is large - avoid bloat.
target_compile_options (_arrow PRIVATE -g0)
target_compile_options (_parquet PRIVATE -g0)
if (OMIT_HEAVY_DEBUG_SYMBOLS)
target_compile_options (_arrow PRIVATE -g0)
target_compile_options (_parquet PRIVATE -g0)
endif()

View File

@ -114,7 +114,9 @@ endif()
target_link_libraries(_aws_s3 PRIVATE _aws_s3_checksums)
# The library is large - avoid bloat.
target_compile_options (_aws_s3 PRIVATE -g0)
target_compile_options (_aws_s3_checksums PRIVATE -g0)
if (OMIT_HEAVY_DEBUG_SYMBOLS)
target_compile_options (_aws_s3 PRIVATE -g0)
target_compile_options (_aws_s3_checksums PRIVATE -g0)
endif()
add_library(ch_contrib::aws_s3 ALIAS _aws_s3)

View File

@ -171,6 +171,8 @@ target_include_directories (_curl SYSTEM PUBLIC
target_link_libraries (_curl PRIVATE OpenSSL::SSL)
# The library is large - avoid bloat (XXX: is it?)
target_compile_options (_curl PRIVATE -g0)
if (OMIT_HEAVY_DEBUG_SYMBOLS)
target_compile_options (_curl PRIVATE -g0)
endif()
add_library (ch_contrib::curl ALIAS _curl)

@ -1 +1 @@
Subproject commit ffd86a32874e5c08a143019aad1aaf0907294c9f
Subproject commit a304ec48dcf15d942607032151f7e9ee504b5dcf

View File

@ -14,8 +14,11 @@ add_library(_protobuf-mutator
${LIBRARY_DIR}/src/text_format.cc
${LIBRARY_DIR}/src/utf8_fix.cc)
target_include_directories(_protobuf-mutator BEFORE INTERFACE "${LIBRARY_DIR}")
target_include_directories(_protobuf-mutator BEFORE INTERFACE "${ClickHouse_SOURCE_DIR}/contrib/protobuf/src")
# codegen_select_fuzzer includes <libfuzzer/libfuzzer_macro.h>...
target_include_directories(_protobuf-mutator BEFORE PUBLIC "${LIBRARY_DIR}/src")
# ... which includes <port/protobuf.h>
target_include_directories(_protobuf-mutator BEFORE PUBLIC "${LIBRARY_DIR}")
target_include_directories(_protobuf-mutator BEFORE PUBLIC "${ClickHouse_SOURCE_DIR}/contrib/protobuf/src")
target_link_libraries(_protobuf-mutator ch_contrib::protobuf)

1
contrib/nats-io vendored Submodule

@ -0,0 +1 @@
Subproject commit 6b2227f36757da090321e2d317569d2bd42c4cc1

View File

@ -0,0 +1,59 @@
option (ENABLE_NATS "Enable NATS" ${ENABLE_LIBRARIES})
if (OS_FREEBSD)
set(ENABLE_NATS OFF)
message (STATUS "Using internal nats-io library on FreeBSD is not supported")
endif()
if (NOT ENABLE_NATS)
message(STATUS "Not using nats-io")
return()
endif()
set(NATS_IO_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/nats-io/src")
if(UNIX)
set(NATS_PLATFORM_INCLUDE "unix")
elseif(WIN32)
set(NATS_PLATFORM_INCLUDE "apple")
endif()
file(GLOB PS_SOURCES "${NATS_IO_SOURCE_DIR}/${NATS_PLATFORM_INCLUDE}/*.c")
set(SRCS
"${NATS_IO_SOURCE_DIR}/asynccb.c"
"${NATS_IO_SOURCE_DIR}/buf.c"
"${NATS_IO_SOURCE_DIR}/comsock.c"
"${NATS_IO_SOURCE_DIR}/conn.c"
"${NATS_IO_SOURCE_DIR}/crypto.c"
"${NATS_IO_SOURCE_DIR}/hash.c"
"${NATS_IO_SOURCE_DIR}/js.c"
"${NATS_IO_SOURCE_DIR}/jsm.c"
"${NATS_IO_SOURCE_DIR}/kv.c"
"${NATS_IO_SOURCE_DIR}/msg.c"
"${NATS_IO_SOURCE_DIR}/nats.c"
"${NATS_IO_SOURCE_DIR}/natstime.c"
"${NATS_IO_SOURCE_DIR}/nkeys.c"
"${NATS_IO_SOURCE_DIR}/nuid.c"
"${NATS_IO_SOURCE_DIR}/opts.c"
"${NATS_IO_SOURCE_DIR}/parser.c"
"${NATS_IO_SOURCE_DIR}/pub.c"
"${NATS_IO_SOURCE_DIR}/srvpool.c"
"${NATS_IO_SOURCE_DIR}/stats.c"
"${NATS_IO_SOURCE_DIR}/status.c"
"${NATS_IO_SOURCE_DIR}/sub.c"
"${NATS_IO_SOURCE_DIR}/timer.c"
"${NATS_IO_SOURCE_DIR}/url.c"
"${NATS_IO_SOURCE_DIR}/util.c"
)
add_library(_nats_io ${SRCS} ${PS_SOURCES})
add_library(ch_contrib::nats_io ALIAS _nats_io)
target_include_directories(_nats_io SYSTEM PUBLIC ${NATS_IO_SOURCE_DIR})
target_include_directories(_nats_io SYSTEM PUBLIC ${NATS_IO_SOURCE_DIR}/adapters)
target_include_directories(_nats_io SYSTEM PUBLIC ${NATS_IO_SOURCE_DIR}/include)
target_include_directories(_nats_io SYSTEM PUBLIC ${NATS_IO_SOURCE_DIR}/${NATS_PLATFORM_INCLUDE})
target_link_libraries(_nats_io
PRIVATE OpenSSL::Crypto OpenSSL::SSL ch_contrib::uv
)

2
contrib/poco vendored

@ -1 +1 @@
Subproject commit 0e32cb42db76ddaa76848470219056908053b676
Subproject commit 9fec8e11dbb6a352e1cfba8cc9e23ebd7fb77310

2
contrib/simdjson vendored

@ -1 +1 @@
Subproject commit de196dd7a3a16e4056b0551ffa3b85c2f52581e1
Subproject commit 1075e8609c4afa253162d441437af929c29e31bb

View File

@ -268,10 +268,13 @@ endif()
add_library (_vectorscan ${SRCS})
target_compile_options (_vectorscan PRIVATE
-g0 # library has too much debug information
-fno-sanitize=undefined # assume the library takes care of itself
-O2 -fno-strict-aliasing -fno-omit-frame-pointer -fvisibility=hidden # options from original build system
)
# library has too much debug information
if (OMIT_HEAVY_DEBUG_SYMBOLS)
target_compile_options (_vectorscan PRIVATE -g0)
endif()
# Include version header manually generated by running the original build system
target_include_directories (_vectorscan SYSTEM PRIVATE common)

View File

@ -2,131 +2,138 @@
## What is ClickHouse?
ClickHouse is an open-source column-oriented database management system that allows generating analytical data reports in real time.
ClickHouse is an open-source column-oriented database management system that allows the generation of analytical data reports in real-time.
ClickHouse manages extremely large volumes of data in a stable and sustainable manner. It currently powers [Yandex.Metrica](https://metrica.yandex.com/), worlds [second largest](http://w3techs.com/technologies/overview/traffic_analysis/all) web analytics platform, with over 13 trillion database records and over 20 billion events a day, generating customized reports on-the-fly, directly from non-aggregated data. This system was successfully implemented at [CERNs LHCb experiment](https://www.yandex.com/company/press_center/press_releases/2012/2012-04-10/) to store and process metadata on 10bn events with over 1000 attributes per event registered in 2011.
ClickHouse manages extremely large volumes of data. It currently powers [Yandex.Metrica](https://metrica.yandex.com/), the worlds [second-largest](http://w3techs.com/technologies/overview/traffic_analysis/all) web analytics platform, with over 13 trillion database records and over 20 billion events a day, generating customized reports on-the-fly, directly from non-aggregated data. This system was successfully implemented at [CERNs LHCb experiment](https://www.yandex.com/company/press_center/press_releases/2012/2012-04-10/) to store and process metadata on 10bn events with over 1000 attributes per event registered in 2011.
For more information and documentation see https://clickhouse.com/.
## How to use this image
### start server instance
```bash
$ docker run -d --name some-clickhouse-server --ulimit nofile=262144:262144 clickhouse/clickhouse-server
docker run -d --name some-clickhouse-server --ulimit nofile=262144:262144 clickhouse/clickhouse-server
```
By default ClickHouse will be accessible only via docker network. See the [networking section below](#networking).
By default, ClickHouse will be accessible only via the Docker network. See the [networking section below](#networking).
By default, starting above server instance will be run as default user without password.
By default, starting above server instance will be run as the `default` user without a password.
### connect to it from a native client
```bash
$ docker run -it --rm --link some-clickhouse-server:clickhouse-server --entrypoint clickhouse-client clickhouse/clickhouse-server --host clickhouse-server
docker run -it --rm --link some-clickhouse-server:clickhouse-server --entrypoint clickhouse-client clickhouse/clickhouse-server --host clickhouse-server
# OR
$ docker exec -it some-clickhouse-server clickhouse-client
docker exec -it some-clickhouse-server clickhouse-client
```
More information about [ClickHouse client](https://clickhouse.com/docs/en/interfaces/cli/).
More information about the [ClickHouse client](https://clickhouse.com/docs/en/interfaces/cli/).
### connect to it using curl
```bash
echo "SELECT 'Hello, ClickHouse!'" | docker run -i --rm --link some-clickhouse-server:clickhouse-server curlimages/curl 'http://clickhouse-server:8123/?query=' -s --data-binary @-
```
More information about [ClickHouse HTTP Interface](https://clickhouse.com/docs/en/interfaces/http/).
### stopping / removing the containter
### stopping / removing the container
```bash
$ docker stop some-clickhouse-server
$ docker rm some-clickhouse-server
docker stop some-clickhouse-server
docker rm some-clickhouse-server
```
### networking
You can expose you ClickHouse running in docker by [mapping particular port](https://docs.docker.com/config/containers/container-networking/) from inside container to a host ports:
You can expose your ClickHouse running in docker by [mapping a particular port](https://docs.docker.com/config/containers/container-networking/) from inside the container using host ports:
```bash
$ docker run -d -p 18123:8123 -p19000:9000 --name some-clickhouse-server --ulimit nofile=262144:262144 clickhouse/clickhouse-server
$ echo 'SELECT version()' | curl 'http://localhost:18123/' --data-binary @-
docker run -d -p 18123:8123 -p19000:9000 --name some-clickhouse-server --ulimit nofile=262144:262144 clickhouse/clickhouse-server
echo 'SELECT version()' | curl 'http://localhost:18123/' --data-binary @-
20.12.3.3
```
or by allowing container to use [host ports directly](https://docs.docker.com/network/host/) using `--network=host` (also allows archiving better network performance):
or by allowing the container to use [host ports directly](https://docs.docker.com/network/host/) using `--network=host` (also allows archiving better network performance):
```bash
$ docker run -d --network=host --name some-clickhouse-server --ulimit nofile=262144:262144 clickhouse/clickhouse-server
$ echo 'SELECT version()' | curl 'http://localhost:8123/' --data-binary @-
docker run -d --network=host --name some-clickhouse-server --ulimit nofile=262144:262144 clickhouse/clickhouse-server
echo 'SELECT version()' | curl 'http://localhost:8123/' --data-binary @-
20.12.3.3
```
### Volumes
Typically you may want to mount the following folders inside your container to archieve persistency:
Typically you may want to mount the following folders inside your container to achieve persistency:
* `/var/lib/clickhouse/` - main folder where ClickHouse stores the data
* `/val/log/clickhouse-server/` - logs
* `/var/log/clickhouse-server/` - logs
```bash
$ docker run -d \
-v $(realpath ./ch_data):/var/lib/clickhouse/ \
-v $(realpath ./ch_logs):/var/log/clickhouse-server/ \
--name some-clickhouse-server --ulimit nofile=262144:262144 clickhouse/clickhouse-server
docker run -d \
-v $(realpath ./ch_data):/var/lib/clickhouse/ \
-v $(realpath ./ch_logs):/var/log/clickhouse-server/ \
--name some-clickhouse-server --ulimit nofile=262144:262144 clickhouse/clickhouse-server
```
You may also want to mount:
* `/etc/clickhouse-server/config.d/*.xml` - files with server configuration adjustmenets
* `/etc/clickhouse-server/usert.d/*.xml` - files with use settings adjustmenets
* `/etc/clickhouse-server/users.d/*.xml` - files with user settings adjustmenets
* `/docker-entrypoint-initdb.d/` - folder with database initialization scripts (see below).
### Linux capabilities
ClickHouse has some advanced functionality which requite enabling several [linux capabilities](https://man7.org/linux/man-pages/man7/capabilities.7.html).
ClickHouse has some advanced functionality, which requires enabling several [Linux capabilities](https://man7.org/linux/man-pages/man7/capabilities.7.html).
It is optional and can be enabled using the following [docker command line agruments](https://docs.docker.com/engine/reference/run/#runtime-privilege-and-linux-capabilities):
These are optional and can be enabled using the following [docker command-line arguments](https://docs.docker.com/engine/reference/run/#runtime-privilege-and-linux-capabilities):
```bash
$ docker run -d \
--cap-add=SYS_NICE --cap-add=NET_ADMIN --cap-add=IPC_LOCK \
--name some-clickhouse-server --ulimit nofile=262144:262144 clickhouse/clickhouse-server
docker run -d \
--cap-add=SYS_NICE --cap-add=NET_ADMIN --cap-add=IPC_LOCK \
--name some-clickhouse-server --ulimit nofile=262144:262144 clickhouse/clickhouse-server
```
## Configuration
Container exposes 8123 port for [HTTP interface](https://clickhouse.com/docs/en/interfaces/http_interface/) and 9000 port for [native client](https://clickhouse.com/docs/en/interfaces/tcp/).
The container exposes port 8123 for the [HTTP interface](https://clickhouse.com/docs/en/interfaces/http_interface/) and port 9000 for the [native client](https://clickhouse.com/docs/en/interfaces/tcp/).
ClickHouse configuration represented with a file "config.xml" ([documentation](https://clickhouse.com/docs/en/operations/configuration_files/))
ClickHouse configuration is represented with a file "config.xml" ([documentation](https://clickhouse.com/docs/en/operations/configuration_files/))
### Start server instance with custom configuration
```bash
$ docker run -d --name some-clickhouse-server --ulimit nofile=262144:262144 -v /path/to/your/config.xml:/etc/clickhouse-server/config.xml clickhouse/clickhouse-server
docker run -d --name some-clickhouse-server --ulimit nofile=262144:262144 -v /path/to/your/config.xml:/etc/clickhouse-server/config.xml clickhouse/clickhouse-server
```
### Start server as custom user
```
### Start server as a custom user
```bash
# $(pwd)/data/clickhouse should exist and be owned by current user
$ docker run --rm --user ${UID}:${GID} --name some-clickhouse-server --ulimit nofile=262144:262144 -v "$(pwd)/logs/clickhouse:/var/log/clickhouse-server" -v "$(pwd)/data/clickhouse:/var/lib/clickhouse" clickhouse/clickhouse-server
docker run --rm --user ${UID}:${GID} --name some-clickhouse-server --ulimit nofile=262144:262144 -v "$(pwd)/logs/clickhouse:/var/log/clickhouse-server" -v "$(pwd)/data/clickhouse:/var/lib/clickhouse" clickhouse/clickhouse-server
```
When you use the image with mounting local directories inside you probably would like to not mess your directory tree with files owner and permissions. Then you could use `--user` argument. In this case, you should mount every necessary directory (`/var/lib/clickhouse` and `/var/log/clickhouse-server`) inside the container. Otherwise, image will complain and not start.
When you use the image with local directories mounted, you probably want to specify the user to maintain the proper file ownership. Use the `--user` argument and mount `/var/lib/clickhouse` and `/var/log/clickhouse-server` inside the container. Otherwise, the image will complain and not start.
### Start server from root (useful in case of userns enabled)
```
$ docker run --rm -e CLICKHOUSE_UID=0 -e CLICKHOUSE_GID=0 --name clickhouse-server-userns -v "$(pwd)/logs/clickhouse:/var/log/clickhouse-server" -v "$(pwd)/data/clickhouse:/var/lib/clickhouse" clickhouse/clickhouse-server
```bash
docker run --rm -e CLICKHOUSE_UID=0 -e CLICKHOUSE_GID=0 --name clickhouse-server-userns -v "$(pwd)/logs/clickhouse:/var/log/clickhouse-server" -v "$(pwd)/data/clickhouse:/var/lib/clickhouse" clickhouse/clickhouse-server
```
### How to create default database and user on starting
Sometimes you may want to create user (user named `default` is used by default) and database on image starting. You can do it using environment variables `CLICKHOUSE_DB`, `CLICKHOUSE_USER`, `CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT` and `CLICKHOUSE_PASSWORD`:
Sometimes you may want to create a user (user named `default` is used by default) and database on image start. You can do it using environment variables `CLICKHOUSE_DB`, `CLICKHOUSE_USER`, `CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT` and `CLICKHOUSE_PASSWORD`:
```
$ docker run --rm -e CLICKHOUSE_DB=my_database -e CLICKHOUSE_USER=username -e CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=1 -e CLICKHOUSE_PASSWORD=password -p 9000:9000/tcp clickhouse/clickhouse-server
```bash
docker run --rm -e CLICKHOUSE_DB=my_database -e CLICKHOUSE_USER=username -e CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=1 -e CLICKHOUSE_PASSWORD=password -p 9000:9000/tcp clickhouse/clickhouse-server
```
## How to extend this image
If you would like to do additional initialization in an image derived from this one, add one or more `*.sql`, `*.sql.gz`, or `*.sh` scripts under `/docker-entrypoint-initdb.d`. After the entrypoint calls `initdb` it will run any `*.sql` files, run any executable `*.sh` scripts, and source any non-executable `*.sh` scripts found in that directory to do further initialization before starting the service.
Also you can provide environment variables `CLICKHOUSE_USER` & `CLICKHOUSE_PASSWORD` that will be used for clickhouse-client during initialization.
To perform additional initialization in an image derived from this one, add one or more `*.sql`, `*.sql.gz`, or `*.sh` scripts under `/docker-entrypoint-initdb.d`. After the entrypoint calls `initdb`, it will run any `*.sql` files, run any executable `*.sh` scripts, and source any non-executable `*.sh` scripts found in that directory to do further initialization before starting the service.
Also, you can provide environment variables `CLICKHOUSE_USER` & `CLICKHOUSE_PASSWORD` that will be used for clickhouse-client during initialization.
For example, to add an additional user and database, add the following to `/docker-entrypoint-initdb.d/init-db.sh`:
@ -135,11 +142,12 @@ For example, to add an additional user and database, add the following to `/dock
set -e
clickhouse client -n <<-EOSQL
CREATE DATABASE docker;
CREATE TABLE docker.docker (x Int32) ENGINE = Log;
CREATE DATABASE docker;
CREATE TABLE docker.docker (x Int32) ENGINE = Log;
EOSQL
```
## License
View [license information](https://github.com/ClickHouse/ClickHouse/blob/master/LICENSE) for the software contained in this image.

View File

@ -63,6 +63,7 @@ RUN python3 -m pip install \
PyMySQL \
aerospike==4.0.0 \
avro==1.10.2 \
asyncio \
cassandra-driver \
confluent-kafka==1.5.0 \
dict2xml \
@ -75,6 +76,7 @@ RUN python3 -m pip install \
kazoo \
lz4 \
minio \
nats-py \
protobuf \
psycopg2-binary==2.8.6 \
pymongo==3.11.0 \

View File

@ -0,0 +1,7 @@
version: '2.3'
services:
nats1:
image: nats
ports:
- "${NATS_EXTERNAL_PORT}:${NATS_INTERNAL_PORT}"
command: "-p 4444 --user click --pass house"

View File

@ -164,18 +164,3 @@ ClickHouse is available in pre-built binaries and packages. Binaries are portabl
They are built for stable, prestable and testing releases as long as for every commit to master and for every pull request.
To find the freshest build from `master`, go to [commits page](https://github.com/ClickHouse/ClickHouse/commits/master), click on the first green check mark or red cross near commit, and click to the “Details” link right after “ClickHouse Build Check”.
## Faster builds for development: Split build configuration {#split-build}
Normally, ClickHouse is statically linked into a single static `clickhouse` binary with minimal dependencies. This is convenient for distribution, but it means that on every change the entire binary needs to be linked, which is slow and may be inconvenient for development. There is an alternative configuration which instead creates dynamically loaded shared libraries and separate binaries `clickhouse-server`, `clickhouse-client` etc., allowing for faster incremental builds. To use it, add the following flags to your `cmake` invocation:
```
-DUSE_STATIC_LIBRARIES=0 -DSPLIT_SHARED_LIBRARIES=1 -DCLICKHOUSE_SPLIT_BINARY=1
```
Note that the split build has several drawbacks:
* There is no single `clickhouse` binary, and you have to run `clickhouse-server`, `clickhouse-client`, etc.
* Risk of segfault if you run any of the programs while rebuilding the project.
* You cannot run the integration tests since they only work a single complete binary.
* You can't easily copy the binaries elsewhere. Instead of moving a single binary you'll need to copy all binaries and libraries.
[Original article](https://clickhouse.com/docs/en/development/build/) <!--hide-->

View File

@ -6,93 +6,14 @@ description: A list of third-party libraries used
# Third-Party Libraries Used
The list of third-party libraries:
| Library name | License type |
|:-|:-|
| abseil-cpp | [Apache](https://github.com/ClickHouse-Extras/abseil-cpp/blob/4f3b686f86c3ebaba7e4e926e62a79cb1c659a54/LICENSE) |
| AMQP-CPP | [Apache](https://github.com/ClickHouse-Extras/AMQP-CPP/blob/1a6c51f4ac51ac56610fa95081bd2f349911375a/LICENSE) |
| arrow | [Apache](https://github.com/ClickHouse-Extras/arrow/blob/078e21bad344747b7656ef2d7a4f7410a0a303eb/LICENSE.txt) |
| avro | [Apache](https://github.com/ClickHouse-Extras/avro/blob/e43c46e87fd32eafdc09471e95344555454c5ef8/LICENSE.txt) |
| aws | [Apache](https://github.com/ClickHouse-Extras/aws-sdk-cpp/blob/7d48b2c8193679cc4516e5bd68ae4a64b94dae7d/LICENSE.txt) |
| aws-c-common | [Apache](https://github.com/ClickHouse-Extras/aws-c-common/blob/736a82d1697c108b04a277e66438a7f4e19b6857/LICENSE) |
| aws-c-event-stream | [Apache](https://github.com/ClickHouse-Extras/aws-c-event-stream/blob/3bc33662f9ccff4f4cbcf9509cc78c26e022fde0/LICENSE) |
| aws-checksums | [Apache](https://github.com/ClickHouse-Extras/aws-checksums/blob/519d6d9093819b6cf89ffff589a27ef8f83d0f65/LICENSE) |
| base58 | [MIT](https://github.com/ClickHouse/base-x/blob/3e58874643c087f57e82b0ff03825c933fab945a/LICENSE) |
| base64 | [BSD 2-clause](https://github.com/ClickHouse-Extras/Turbo-Base64/blob/af9b331f2b4f30b41c70f3a571ff904a8251c1d3/LICENSE) |
| boost | [Boost](https://github.com/ClickHouse-Extras/boost/blob/9cf09dbfd55a5c6202dedbdf40781a51b02c2675/LICENSE_1_0.txt) |
| boringssl | [BSD](https://github.com/ClickHouse-Extras/boringssl/blob/a6a2e2ab3e44d97ce98e51c558e989f211de7eb3/LICENSE) |
| brotli | [MIT](https://github.com/google/brotli/blob/63be8a99401992075c23e99f7c84de1c653e39e2/LICENSE) |
| capnproto | [MIT](https://github.com/capnproto/capnproto/blob/a00ccd91b3746ef2ab51d40fe3265829949d1ace/LICENSE) |
| cassandra | [Apache](https://github.com/ClickHouse-Extras/cpp-driver/blob/eb9b68dadbb4417a2c132ad4a1c2fa76e65e6fc1/LICENSE.txt) |
| cctz | [Apache](https://github.com/ClickHouse-Extras/cctz/blob/c0f1bcb97fd2782f7c3f972fadd5aad5affac4b8/LICENSE.txt) |
| cityhash102 | [MIT](https://github.com/ClickHouse/ClickHouse/blob/master/contrib/cityhash102/COPYING) |
| cppkafka | [BSD 2-clause](https://github.com/mfontanini/cppkafka/blob/5a119f689f8a4d90d10a9635e7ee2bee5c127de1/LICENSE) |
| croaring | [Apache](https://github.com/RoaringBitmap/CRoaring/blob/2c867e9f9c9e2a3a7032791f94c4c7ae3013f6e0/LICENSE) |
| curl | [Apache](https://github.com/curl/curl/blob/3b8bbbbd1609c638a3d3d0acb148a33dedb67be3/docs/LICENSE-MIXING.md) |
| cyrus-sasl | [BSD 2-clause](https://github.com/ClickHouse-Extras/cyrus-sasl/blob/e6466edfd638cc5073debe941c53345b18a09512/COPYING) |
| double-conversion | [BSD 3-clause](https://github.com/google/double-conversion/blob/cf2f0f3d547dc73b4612028a155b80536902ba02/LICENSE) |
| dragonbox | [Apache](https://github.com/ClickHouse-Extras/dragonbox/blob/923705af6fd953aa948fc175f6020b15f7359838/LICENSE-Apache2-LLVM) |
| fast_float | [Apache](https://github.com/fastfloat/fast_float/blob/7eae925b51fd0f570ccd5c880c12e3e27a23b86f/LICENSE) |
| fastops | [MIT](https://github.com/ClickHouse-Extras/fastops/blob/88752a5e03cf34639a4a37a4b41d8b463fffd2b5/LICENSE) |
| flatbuffers | [Apache](https://github.com/ClickHouse-Extras/flatbuffers/blob/eb3f827948241ce0e701516f16cd67324802bce9/LICENSE.txt) |
| fmtlib | [Unknown](https://github.com/fmtlib/fmt/blob/c108ee1d590089ccf642fc85652b845924067af2/LICENSE.rst) |
| gcem | [Apache](https://github.com/kthohr/gcem/blob/8d4f1b5d76ea8f6ff12f3f4f34cda45424556b00/LICENSE) |
| googletest | [BSD 3-clause](https://github.com/google/googletest/blob/e7e591764baba0a0c3c9ad0014430e7a27331d16/LICENSE) |
| grpc | [Apache](https://github.com/ClickHouse-Extras/grpc/blob/60c986e15cae70aade721d26badabab1f822fdd6/LICENSE) |
| h3 | [Apache](https://github.com/ClickHouse-Extras/h3/blob/c7f46cfd71fb60e2fefc90e28abe81657deff735/LICENSE) |
| vectorscan | [Boost](https://github.com/ClickHouse-Extras/hyperscan/blob/73695e419c27af7fe2a099c7aa57931cc02aea5d/LICENSE) |
| icu | [Public Domain](https://github.com/unicode-org/icu/blob/a56dde820dc35665a66f2e9ee8ba58e75049b668/icu4c/LICENSE) |
| icudata | [Public Domain](https://github.com/ClickHouse-Extras/icudata/blob/72d9a4a7febc904e2b0a534ccb25ae40fac5f1e5/LICENSE) |
| jemalloc | [BSD 2-clause](https://github.com/ClickHouse-Extras/jemalloc/blob/e6891d9746143bf2cf617493d880ba5a0b9a3efd/COPYING) |
| krb5 | [MIT](https://github.com/ClickHouse-Extras/krb5/blob/5149dea4e2be0f67707383d2682b897c14631374/src/lib/gssapi/LICENSE) |
| libc-headers | [LGPL](https://github.com/ClickHouse-Extras/libc-headers/blob/a720b7105a610acbd7427eea475a5b6810c151eb/LICENSE) |
| libcpuid | [BSD 2-clause](https://github.com/ClickHouse-Extras/libcpuid/blob/8db3b8d2d32d22437f063ce692a1b9bb15e42d18/COPYING) |
| libcxx | [Apache](https://github.com/ClickHouse-Extras/libcxx/blob/2fa892f69acbaa40f8a18c6484854a6183a34482/LICENSE.TXT) |
| libcxxabi | [Apache](https://github.com/ClickHouse-Extras/libcxxabi/blob/df8f1e727dbc9e2bedf2282096fa189dc3fe0076/LICENSE.TXT) |
| libdivide | [zLib](https://github.com/ClickHouse/ClickHouse/blob/master/contrib/libdivide/LICENSE.txt) |
| libfarmhash | [MIT](https://github.com/ClickHouse/ClickHouse/blob/master/contrib/libfarmhash/COPYING) |
| libgsasl | [LGPL](https://github.com/ClickHouse-Extras/libgsasl/blob/383ee28e82f69fa16ed43b48bd9c8ee5b313ab84/LICENSE) |
| libhdfs3 | [Apache](https://github.com/ClickHouse-Extras/libhdfs3/blob/095b9d48b400abb72d967cb0539af13b1e3d90cf/LICENSE.txt) |
| libmetrohash | [Apache](https://github.com/ClickHouse/ClickHouse/blob/master/contrib/libmetrohash/LICENSE) |
| libpq | [Unknown](https://github.com/ClickHouse-Extras/libpq/blob/e071ea570f8985aa00e34f5b9d50a3cfe666327e/COPYRIGHT) |
| libpqxx | [BSD 3-clause](https://github.com/ClickHouse-Extras/libpqxx/blob/357608d11b7a1961c3fb7db2ef9a5dbb2e87da77/COPYING) |
| librdkafka | [MIT](https://github.com/ClickHouse-Extras/librdkafka/blob/b8554f1682062c85ba519eb54ef2f90e02b812cb/LICENSE.murmur2) |
| libunwind | [Apache](https://github.com/ClickHouse-Extras/libunwind/blob/6b816d2fba3991f8fd6aaec17d92f68947eab667/LICENSE.TXT) |
| libuv | [BSD](https://github.com/ClickHouse-Extras/libuv/blob/e2e9b7e9f978ce8a1367b5fe781d97d1ce9f94ab/LICENSE) |
| llvm | [Apache](https://github.com/ClickHouse-Extras/llvm/blob/e5751459412bce1391fb7a2e9bbc01e131bf72f1/llvm/LICENSE.TXT) |
| lz4 | [BSD](https://github.com/lz4/lz4/blob/f39b79fb02962a1cd880bbdecb6dffba4f754a11/LICENSE) |
| mariadb-connector-c | [LGPL](https://github.com/ClickHouse-Extras/mariadb-connector-c/blob/5f4034a3a6376416504f17186c55fe401c6d8e5e/COPYING.LIB) |
| miniselect | [Boost](https://github.com/danlark1/miniselect/blob/be0af6bd0b6eb044d1acc4f754b229972d99903a/LICENSE_1_0.txt) |
| msgpack-c | [Boost](https://github.com/msgpack/msgpack-c/blob/46684265d50b5d1b062d4c5c428ba08462844b1d/LICENSE_1_0.txt) |
| murmurhash | [Public Domain](https://github.com/ClickHouse/ClickHouse/blob/master/contrib/murmurhash/LICENSE) |
| NuRaft | [Apache](https://github.com/ClickHouse-Extras/NuRaft/blob/7ecb16844af6a9c283ad432d85ecc2e7d1544676/LICENSE) |
| openldap | [Unknown](https://github.com/ClickHouse-Extras/openldap/blob/0208811b6043ca06fda8631a5e473df1ec515ccb/LICENSE) |
| orc | [Apache](https://github.com/ClickHouse-Extras/orc/blob/0a936f6bbdb9303308973073f8623b5a8d82eae1/LICENSE) |
| poco | [Boost](https://github.com/ClickHouse-Extras/poco/blob/7351c4691b5d401f59e3959adfc5b4fa263b32da/LICENSE) |
| protobuf | [BSD 3-clause](https://github.com/ClickHouse-Extras/protobuf/blob/75601841d172c73ae6bf4ce8121f42b875cdbabd/LICENSE) |
| rapidjson | [MIT](https://github.com/ClickHouse-Extras/rapidjson/blob/c4ef90ccdbc21d5d5a628d08316bfd301e32d6fa/bin/jsonschema/LICENSE) |
| re2 | [BSD 3-clause](https://github.com/google/re2/blob/13ebb377c6ad763ca61d12dd6f88b1126bd0b911/LICENSE) |
| replxx | [BSD 3-clause](https://github.com/ClickHouse-Extras/replxx/blob/c81be6c68b146f15f2096b7ef80e3f21fe27004c/LICENSE.md) |
| rocksdb | [BSD 3-clause](https://github.com/ClickHouse-Extras/rocksdb/blob/b6480c69bf3ab6e298e0d019a07fd4f69029b26a/LICENSE.leveldb) |
| s2geometry | [Apache](https://github.com/ClickHouse-Extras/s2geometry/blob/20ea540d81f4575a3fc0aea585aac611bcd03ede/LICENSE) |
| sentry-native | [MIT](https://github.com/ClickHouse-Extras/sentry-native/blob/94644e92f0a3ff14bd35ed902a8622a2d15f7be4/LICENSE) |
| simdjson | [Apache](https://github.com/simdjson/simdjson/blob/8df32cea3359cb30120795da6020b3b73da01d38/LICENSE) |
| snappy | [Public Domain](https://github.com/google/snappy/blob/3f194acb57e0487531c96b97af61dcbd025a78a3/COPYING) |
| sparsehash-c11 | [BSD 3-clause](https://github.com/sparsehash/sparsehash-c11/blob/cf0bffaa456f23bc4174462a789b90f8b6f5f42f/LICENSE) |
| stats | [Apache](https://github.com/kthohr/stats/blob/b6dd459c10a88c7ea04693c007e9e35820c5d9ad/LICENSE) |
| thrift | [Apache](https://github.com/apache/thrift/blob/010ccf0a0c7023fea0f6bf4e4078ebdff7e61982/LICENSE) |
| unixodbc | [LGPL](https://github.com/ClickHouse-Extras/UnixODBC/blob/b0ad30f7f6289c12b76f04bfb9d466374bb32168/COPYING) |
| xz | [Public Domain](https://github.com/xz-mirror/xz/blob/869b9d1b4edd6df07f819d360d306251f8147353/COPYING) |
| zlib-ng | [zLib](https://github.com/ClickHouse-Extras/zlib-ng/blob/6a5e93b9007782115f7f7e5235dedc81c4f1facb/LICENSE.md) |
| zstd | [BSD](https://github.com/facebook/zstd/blob/a488ba114ec17ea1054b9057c26a046fc122b3b6/LICENSE) |
The list of third-party libraries can be obtained by the following query:
ClickHouse utilizes third-party libraries for different purposes, e.g., to connect to other databases, to decode (encode) data during load (save) from (to) disk or to implement certain specialized SQL functions. To be independent of the available libraries in the target system, each third-party library is imported as a Git submodule into ClickHouse's source tree and compiled and linked with ClickHouse. A list of third-party libraries and their licenses can be obtained by the following query:
``` sql
SELECT library_name, license_type, license_path FROM system.licenses ORDER BY library_name COLLATE 'en';
```
(Note that the listed libraries are the ones located in the `contrib/` directory of the ClickHouse repository. Depending on the build options, some of of the libraries may have not been compiled, and as a result, their functionality may not be available at runtime.
[Example](https://play.clickhouse.com/play?user=play#U0VMRUNUIGxpYnJhcnlfbmFtZSwgbGljZW5zZV90eXBlLCBsaWNlbnNlX3BhdGggRlJPTSBzeXN0ZW0ubGljZW5zZXMgT1JERVIgQlkgbGlicmFyeV9uYW1lIENPTExBVEUgJ2VuJw==)
## Adding new third-party libraries and maintaining patches in third-party libraries {#adding-third-party-libraries}

View File

@ -276,3 +276,23 @@ Testing will commence as soon as ClickHouse employees label your PR with a tag
The system will prepare ClickHouse binary builds for your pull request individually. To retrieve these builds click the “Details” link next to “ClickHouse build check” entry in the list of checks. There you will find direct links to the built .deb packages of ClickHouse which you can deploy even on your production servers (if you have no fear).
Most probably some of the builds will fail at first times. This is due to the fact that we check builds both with gcc as well as with clang, with almost all of existing warnings (always with the `-Werror` flag) enabled for clang. On that same page, you can find all of the build logs so that you do not have to build ClickHouse in all of the possible ways.
## Faster builds for development: Split build configuration {#split-build}
ClickHouse is normally statically linked into a single static `clickhouse` binary with minimal dependencies. This is convenient for distribution, but it means that for every change the entire binary needs to be re-linked, which is slow and inconvenient for development. As an alternative, you can instead build dynamically linked shared libraries and separate binaries `clickhouse-server`, `clickhouse-client` etc., allowing for faster incremental builds. To use it, add the following flags to your `cmake` invocation:
```
-DUSE_STATIC_LIBRARIES=0 -DSPLIT_SHARED_LIBRARIES=1 -DCLICKHOUSE_SPLIT_BINARY=1
```
Note that the split build has several drawbacks:
* There is no single `clickhouse` binary, and you have to run `clickhouse-server`, `clickhouse-client`, etc.
* Risk of segfault if you run any of the programs while rebuilding the project.
* You cannot run the integration tests since they only work a single complete binary.
* You can't easily copy the binaries elsewhere. Instead of moving a single binary you'll need to copy all binaries and libraries.
If you are not interested in functionality provided by third-party libraries, you can further speed up the build using `cmake` options
```
-DENABLE_LIBRARIES=0 -DENABLE_EMBEDDED_COMPILER=0
```
In case of problems with any of the development options, you are on your own!

View File

@ -0,0 +1,163 @@
---
sidebar_position: 14
sidebar_label: NATS
---
# NATS Engine {#redisstreams-engine}
This engine allows integrating ClickHouse with [NATS](https://nats.io/).
`NATS` lets you:
- Publish or subcribe to message subjects.
- Process new messages as they become available.
## Creating a Table {#table_engine-redisstreams-creating-a-table}
``` sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = NATS SETTINGS
nats_url = 'host:port',
nats_subjects = 'subject1,subject2,...',
nats_format = 'data_format'[,]
[nats_row_delimiter = 'delimiter_symbol',]
[nats_schema = '',]
[nats_num_consumers = N,]
[nats_queue_group = 'group_name',]
[nats_secure = false,]
[nats_max_reconnect = N,]
[nats_reconnect_wait = N,]
[nats_server_list = 'host1:port1,host2:port2,...',]
[nats_skip_broken_messages = N,]
[nats_max_block_size = N,]
[nats_flush_interval_ms = N,]
[nats_username = 'user',]
[nats_password = 'password']
[redis_password = 'clickhouse']
```
Required parameters:
- `nats_url` host:port (for example, `localhost:5672`)..
- `nats_subjects` List of subject for NATS table to subscribe/publsh to. Supports wildcard subjects like `foo.*.bar` or `baz.>`
- `nats_format` Message format. Uses the same notation as the SQL `FORMAT` function, such as `JSONEachRow`. For more information, see the [Formats](../../../interfaces/formats.md) section.
Optional parameters:
- `nats_row_delimiter` Delimiter character, which ends the message.
- `nats_schema` Parameter that must be used if the format requires a schema definition. For example, [Capn Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
- `nats_num_consumers` The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient.
- `nats_queue_group` Name for queue group of NATS subscribers. Default is the table name.
- `nats_max_reconnect` Maximum amount of reconnection attempts per try to connect to NATS. Default: `5`.
- `nats_reconnect_wait` Amount of time in milliseconds to sleep between each reconnect attempt. Default: `5000`.
- `nats_server_list` - Server list for connection. Can be specified to connect to NATS cluster.
- `nats_skip_broken_messages` - NATS message parser tolerance to schema-incompatible messages per block. Default: `0`. If `nats_skip_broken_messages = N` then the engine skips *N* RabbitMQ messages that cannot be parsed (a message equals a row of data).
- `nats_max_block_size` - Number of row collected by poll(s) for flushing data from NATS.
- `nats_flush_interval_ms` - Timeout for flushing data read from NATS.
- `nats_username` - NATS username.
- `nats_password` - NATS password.
- `nats_token` - NATS auth token.
SSL connection:
For secure connection use `nats_secure = 1`.
The default behaviour of the used library is not to check if the created TLS connection is sufficiently secure. Whether the certificate is expired, self-signed, missing or invalid: the connection is simply permitted. More strict checking of certificates can possibly be implemented in the future.
Writing to NATS table:
If table reads only from one subject, any insert will publish to the same subject.
However, if table reads from multiple subjects, we need to specify which subject we want to publish to.
That is why whenever inserting into table with multiple subjects, setting `stream_like_engine_insert_queue` is needed.
You can select one of the subjects the table reads from and publish your data there. For example:
``` sql
CREATE TABLE queue (
key UInt64,
value UInt64
) ENGINE = NATS
SETTINGS nats_url = 'localhost:4444',
nats_subjects = 'subject1,subject2',
nats_format = 'JSONEachRow';
INSERT INTO queue
SETTINGS stream_like_engine_insert_queue = 'subject2'
VALUES (1, 1);
```
Also format settings can be added along with nats-related settings.
Example:
``` sql
CREATE TABLE queue (
key UInt64,
value UInt64,
date DateTime
) ENGINE = NATS
SETTINGS nats_url = 'localhost:4444',
nats_subjects = 'subject1',
nats_format = 'JSONEachRow',
date_time_input_format = 'best_effort';
```
The NATS server configuration can be added using the ClickHouse config file.
More specifically you can add Redis password for NATS engine:
``` xml
<nats>
<user>click</user>
<password>house</password>
<token>clickhouse</token>
</nats>
```
## Description {#description}
`SELECT` is not particularly useful for reading messages (except for debugging), because each message can be read only once. It is more practical to create real-time threads using [materialized views](../../../sql-reference/statements/create/view.md). To do this:
1. Use the engine to create a NATS consumer and consider it a data stream.
2. Create a table with the desired structure.
3. Create a materialized view that converts data from the engine and puts it into a previously created table.
When the `MATERIALIZED VIEW` joins the engine, it starts collecting data in the background. This allows you to continually receive messages from NATS and convert them to the required format using `SELECT`.
One NATS table can have as many materialized views as you like, they do not read data from the table directly, but receive new records (in blocks), this way you can write to several tables with different detail level (with grouping - aggregation and without).
Example:
``` sql
CREATE TABLE queue (
key UInt64,
value UInt64
) ENGINE = NATS
SETTINGS nats_url = 'localhost:4444',
nats_subjects = 'subject1',
nats_format = 'JSONEachRow',
date_time_input_format = 'best_effort';
CREATE TABLE daily (key UInt64, value UInt64)
ENGINE = MergeTree() ORDER BY key;
CREATE MATERIALIZED VIEW consumer TO daily
AS SELECT key, value FROM queue;
SELECT key, value FROM daily ORDER BY key;
```
To stop receiving streams data or to change the conversion logic, detach the materialized view:
``` sql
DETACH TABLE consumer;
ATTACH TABLE consumer;
```
If you want to change the target table by using `ALTER`, we recommend disabling the material view to avoid discrepancies between the target table and the data from the view.
## Virtual Columns {#virtual-columns}
- `_subject` - NATS message subject.
[Original article](https://clickhouse.com/docs/en/engines/table-engines/integrations/nats/) <!--hide-->

View File

@ -11,68 +11,69 @@ results of a `SELECT`, and to perform `INSERT`s into a file-backed table.
The supported formats are:
| Format | Input | Output |
|-------------------------------------------------------------------------------------------|-------|--------|
| [TabSeparated](#tabseparated) | ✔ | ✔ |
| [TabSeparatedRaw](#tabseparatedraw) | ✔ | ✔ |
| [TabSeparatedWithNames](#tabseparatedwithnames) | ✔ | ✔ |
| [TabSeparatedWithNamesAndTypes](#tabseparatedwithnamesandtypes) | ✔ | ✔ |
| [TabSeparatedRawWithNames](#tabseparatedrawwithnames) | ✔ | ✔ |
| [TabSeparatedRawWithNamesAndTypes](#tabseparatedrawwithnamesandtypes) | ✔ | ✔ |
| [Template](#format-template) | ✔ | ✔ |
| [TemplateIgnoreSpaces](#templateignorespaces) | ✔ | ✗ |
| [CSV](#csv) | ✔ | ✔ |
| [CSVWithNames](#csvwithnames) | ✔ | ✔ |
| [CSVWithNamesAndTypes](#csvwithnamesandtypes) | ✔ | ✔ |
| [CustomSeparated](#format-customseparated) | ✔ | ✔ |
| [CustomSeparatedWithNames](#customseparatedwithnames) | ✔ | ✔ |
| [CustomSeparatedWithNamesAndTypes](#customseparatedwithnamesandtypes) | ✔ | ✔ |
| [Values](#data-format-values) | ✔ | ✔ |
| [Vertical](#vertical) | ✗ | ✔ |
| [JSON](#json) | ✗ | ✔ |
| [JSONAsString](#jsonasstring) | ✔ | ✗ |
| [JSONStrings](#jsonstrings) | ✗ | ✔ |
| [JSONColumns](#jsoncolumns) | ✔ | ✔ |
| [JSONColumnsWithMetadata](#jsoncolumnswithmetadata) | ✗ | ✔ |
| [JSONCompact](#jsoncompact) | ✗ | ✔ |
| [JSONCompactStrings](#jsoncompactstrings) | ✗ | ✔ |
| [JSONCompactColumns](#jsoncompactcolumns) | ✔ | ✔ |
| [JSONEachRow](#jsoneachrow) | ✔ | ✔ |
| [JSONEachRowWithProgress](#jsoneachrowwithprogress) | ✗ | ✔ |
| [JSONStringsEachRow](#jsonstringseachrow) | ✔ | ✔ |
| [JSONStringsEachRowWithProgress](#jsonstringseachrowwithprogress) | ✗ | ✔ |
| [JSONCompactEachRow](#jsoncompacteachrow) | ✔ | ✔ |
| [JSONCompactEachRowWithNames](#jsoncompacteachrowwithnames) | ✔ | ✔ |
| [JSONCompactEachRowWithNamesAndTypes](#jsoncompacteachrowwithnamesandtypes) | ✔ | ✔ |
| [JSONCompactStringsEachRow](#jsoncompactstringseachrow) | ✔ | ✔ |
| [JSONCompactStringsEachRowWithNames](#jsoncompactstringseachrowwithnames) | ✔ | ✔ |
| [JSONCompactStringsEachRowWithNamesAndTypes](#jsoncompactstringseachrowwithnamesandtypes) | ✔ | ✔ |
| [TSKV](#tskv) | ✔ | ✔ |
| [Pretty](#pretty) | ✗ | ✔ |
| [PrettyCompact](#prettycompact) | ✗ | ✔ |
| [PrettyCompactMonoBlock](#prettycompactmonoblock) | ✗ | ✔ |
| [PrettyNoEscapes](#prettynoescapes) | ✗ | ✔ |
| [PrettySpace](#prettyspace) | ✗ | ✔ |
| [Prometheus](#prometheus) | ✗ | ✔ |
| [Protobuf](#protobuf) | ✔ | ✔ |
| [ProtobufSingle](#protobufsingle) | ✔ | ✔ |
| [Avro](#data-format-avro) | ✔ | ✔ |
| [AvroConfluent](#data-format-avro-confluent) | ✔ | ✗ |
| [Parquet](#data-format-parquet) | ✔ | ✔ |
| [Arrow](#data-format-arrow) | ✔ | ✔ |
| [ArrowStream](#data-format-arrow-stream) | ✔ | ✔ |
| [ORC](#data-format-orc) | ✔ | ✔ |
| [RowBinary](#rowbinary) | ✔ | ✔ |
| [RowBinaryWithNames](#rowbinarywithnamesandtypes) | ✔ | ✔ |
| [RowBinaryWithNamesAndTypes](#rowbinarywithnamesandtypes) | ✔ | ✔ |
| [Native](#native) | ✔ | ✔ |
| [Null](#null) | ✗ | ✔ |
| [XML](#xml) | ✗ | ✔ |
| [CapnProto](#capnproto) | ✔ | ✔ |
| [LineAsString](#lineasstring) | ✔ | ✗ |
| [Regexp](#data-format-regexp) | ✔ | ✗ |
| [RawBLOB](#rawblob) | ✔ | ✔ |
| [MsgPack](#msgpack) | ✔ | ✔ |
| [MySQLDump](#mysqldump) | ✔ | ✗ |
|-------------------------------------------------------------------------------------------|------|--------|
| [TabSeparated](#tabseparated) | ✔ | ✔ |
| [TabSeparatedRaw](#tabseparatedraw) | ✔ | ✔ |
| [TabSeparatedWithNames](#tabseparatedwithnames) | ✔ | ✔ |
| [TabSeparatedWithNamesAndTypes](#tabseparatedwithnamesandtypes) | ✔ | ✔ |
| [TabSeparatedRawWithNames](#tabseparatedrawwithnames) | ✔ | ✔ |
| [TabSeparatedRawWithNamesAndTypes](#tabseparatedrawwithnamesandtypes) | ✔ | ✔ |
| [Template](#format-template) | ✔ | ✔ |
| [TemplateIgnoreSpaces](#templateignorespaces) | ✔ | ✗ |
| [CSV](#csv) | ✔ | ✔ |
| [CSVWithNames](#csvwithnames) | ✔ | ✔ |
| [CSVWithNamesAndTypes](#csvwithnamesandtypes) | ✔ | ✔ |
| [CustomSeparated](#format-customseparated) | ✔ | ✔ |
| [CustomSeparatedWithNames](#customseparatedwithnames) | ✔ | ✔ |
| [CustomSeparatedWithNamesAndTypes](#customseparatedwithnamesandtypes) | ✔ | ✔ |
| [SQLInsert](#sqlinsert) | ✗ | ✔ |
| [Values](#data-format-values) | ✔ | ✔ |
| [Vertical](#vertical) | ✗ | ✔ |
| [JSON](#json) | ✗ | ✔ |
| [JSONAsString](#jsonasstring) | ✔ | ✗ |
| [JSONStrings](#jsonstrings) | ✗ | ✔ |
| [JSONColumns](#jsoncolumns) | ✔ | ✔ |
| [JSONColumnsWithMetadata](#jsoncolumnswithmetadata) | ✗ | ✔ |
| [JSONCompact](#jsoncompact) | ✗ | ✔ |
| [JSONCompactStrings](#jsoncompactstrings) | ✗ | ✔ |
| [JSONCompactColumns](#jsoncompactcolumns) | ✔ | ✔ |
| [JSONEachRow](#jsoneachrow) | ✔ | ✔ |
| [JSONEachRowWithProgress](#jsoneachrowwithprogress) | ✗ | ✔ |
| [JSONStringsEachRow](#jsonstringseachrow) | ✔ | ✔ |
| [JSONStringsEachRowWithProgress](#jsonstringseachrowwithprogress) | ✗ | ✔ |
| [JSONCompactEachRow](#jsoncompacteachrow) | ✔ | ✔ |
| [JSONCompactEachRowWithNames](#jsoncompacteachrowwithnames) | ✔ | ✔ |
| [JSONCompactEachRowWithNamesAndTypes](#jsoncompacteachrowwithnamesandtypes) | ✔ | ✔ |
| [JSONCompactStringsEachRow](#jsoncompactstringseachrow) | ✔ | ✔ |
| [JSONCompactStringsEachRowWithNames](#jsoncompactstringseachrowwithnames) | ✔ | ✔ |
| [JSONCompactStringsEachRowWithNamesAndTypes](#jsoncompactstringseachrowwithnamesandtypes) | ✔ | ✔ |
| [TSKV](#tskv) | ✔ | ✔ |
| [Pretty](#pretty) | ✗ | ✔ |
| [PrettyCompact](#prettycompact) | ✗ | ✔ |
| [PrettyCompactMonoBlock](#prettycompactmonoblock) | ✗ | ✔ |
| [PrettyNoEscapes](#prettynoescapes) | ✗ | ✔ |
| [PrettySpace](#prettyspace) | ✗ | ✔ |
| [Prometheus](#prometheus) | ✗ | ✔ |
| [Protobuf](#protobuf) | ✔ | ✔ |
| [ProtobufSingle](#protobufsingle) | ✔ | ✔ |
| [Avro](#data-format-avro) | ✔ | ✔ |
| [AvroConfluent](#data-format-avro-confluent) | ✔ | ✗ |
| [Parquet](#data-format-parquet) | ✔ | ✔ |
| [Arrow](#data-format-arrow) | ✔ | ✔ |
| [ArrowStream](#data-format-arrow-stream) | ✔ | ✔ |
| [ORC](#data-format-orc) | ✔ | ✔ |
| [RowBinary](#rowbinary) | ✔ | ✔ |
| [RowBinaryWithNames](#rowbinarywithnamesandtypes) | ✔ | ✔ |
| [RowBinaryWithNamesAndTypes](#rowbinarywithnamesandtypes) | ✔ | ✔ |
| [Native](#native) | ✔ | ✔ |
| [Null](#null) | ✗ | ✔ |
| [XML](#xml) | ✗ | ✔ |
| [CapnProto](#capnproto) | ✔ | ✔ |
| [LineAsString](#lineasstring) | ✔ | ✗ |
| [Regexp](#data-format-regexp) | ✔ | ✗ |
| [RawBLOB](#rawblob) | ✔ | ✔ |
| [MsgPack](#msgpack) | ✔ | ✔ |
| [MySQLDump](#mysqldump) | ✔ | ✗ |
You can control some format processing parameters with the ClickHouse settings. For more information read the [Settings](../operations/settings/settings.md) section.
@ -468,6 +469,34 @@ Also prints the header row with column names, similar to [TabSeparatedWithNames]
Also prints two header rows with column names and types, similar to [TabSeparatedWithNamesAndTypes](#tabseparatedwithnamesandtypes).
## SQLInsert {#sqlinsert}
Outputs data as a sequence of `INSERT INTO table (columns...) VALUES (...), (...) ...;` statements.
Example:
```sql
SELECT number AS x, number + 1 AS y, 'Hello' AS z FROM numbers(10) FORMAT SQLInsert SETTINGS output_format_sql_insert_max_batch_size = 2
```
```sql
INSERT INTO table (x, y, z) VALUES (0, 1, 'Hello'), (1, 2, 'Hello');
INSERT INTO table (x, y, z) VALUES (2, 3, 'Hello'), (3, 4, 'Hello');
INSERT INTO table (x, y, z) VALUES (4, 5, 'Hello'), (5, 6, 'Hello');
INSERT INTO table (x, y, z) VALUES (6, 7, 'Hello'), (7, 8, 'Hello');
INSERT INTO table (x, y, z) VALUES (8, 9, 'Hello'), (9, 10, 'Hello');
```
To read data output by this format ypu can use [MySQLDump](#mysqldump) input format.
### SQLInsert format settings {#sqlinsert-format-settings}
- [output_format_sql_insert_max_batch_size](../operations/settings/settings.md#output_format_sql_insert_max_batch_size) - The maximum number of rows in one INSERT statement. Default value - `65505`.
- [output_format_sql_insert_table_name](../operations/settings/settings.md#output_format_sql_insert_table_name) - The name of table in the output INSERT query. Default value - `'table'`.
- [output_format_sql_insert_include_column_names](../operations/settings/settings.md#output_format_sql_insert_include_column_names) - Include column names in INSERT query. Default value - `true`.
- [output_format_sql_insert_use_replace](../operations/settings/settings.md#output_format_sql_insert_use_replace) - Use REPLACE statement instead of INSERT. Default value - `false`.
- [output_format_sql_insert_quote_names](../operations/settings/settings.md#output_format_sql_insert_quote_names) - Quote column names with "\`" characters . Default value - `true`.
## JSON {#json}
Outputs data in JSON format. Besides data tables, it also outputs column names and types, along with some additional information: the total number of output rows, and the number of rows that could have been output if there werent a LIMIT. Example:

View File

@ -4637,3 +4637,35 @@ Possible values:
- 1 — Enabled.
Default value: 1.
## SQLInsert format settings {$sqlinsert-format-settings}
### output_format_sql_insert_max_batch_size {#output_format_sql_insert_max_batch_size}
The maximum number of rows in one INSERT statement.
Default value: `65505`.
### output_format_sql_insert_table_name {#output_format_sql_insert_table_name}
The name of table that will be used in the output INSERT statement.
Default value: `'table''`.
### output_format_sql_insert_include_column_names {#output_format_sql_insert_include_column_names}
Include column names in INSERT statement.
Default value: `true`.
### output_format_sql_insert_use_replace {#output_format_sql_insert_use_replace}
Use REPLACE keyword instead of INSERT.
Default value: `false`.
### output_format_sql_insert_quote_names {#output_format_sql_insert_quote_names}
Quote column names with "`" characters
Default value: `true`.

View File

@ -10,7 +10,7 @@ Creates a user defined function from a lambda expression. The expression must co
**Syntax**
```sql
CREATE FUNCTION name AS (parameter0, ...) -> expression
CREATE FUNCTION name [ON CLUSTER cluster] AS (parameter0, ...) -> expression
```
A function can have an arbitrary number of parameters.

View File

@ -105,7 +105,7 @@ System functions can not be dropped.
**Syntax**
``` sql
DROP FUNCTION [IF EXISTS] function_name
DROP FUNCTION [IF EXISTS] function_name [on CLUSTER cluster]
```
**Example**

View File

@ -5,12 +5,12 @@ sidebar_position: 101
---
# 我能把 ClickHouse 当做Key-value 键值存储来使用吗? {#can-i-use-clickhouse-as-a-key-value-storage}.
简短的回答是 **不能**关键值的工作量是在列表中的最高位置时,**不能**{.text-danger}使用ClickHouse的情况。它是一个[OLAP](../../faq/general/olap.md)系统,毕竟有很多优秀的键值存储系统在那里
简短的回答是 **不能**键值类型负载是ClickHouse最<span class="text-danger">**不适合**</span>的多种场景之一。ClickHouse 毕竟只是一个[OLAP](../../faq/general/olap.md)系统,对于这类负载来说,目前还是有很多优秀的键值存储系统可供选择
然而,可能在某些情况下使用ClickHouse进行类似键值的查询仍然是有意义的。通常是一些低预算的产品主要的工作负载是分析性的很适合ClickHouse但也有一些次要的过程需要一个键值模式请求吞吐量不是很高没有严格的延迟要求。如果你有无限的预算,你会为这样的次要工作负载安装一个次要的键值数据库,但实际上,多维护一个存储系统(监控、备份等)会有额外的成本,这可能是值得避免的。
然而在某些情况下使用ClickHouse进行类似键值的查询仍然是有意义的。通常一些主要的工作负载是分析性的比较适合使用Clickhouse低预算的产品中也有一些次要的操作是需要使用键值模式的同时这些操作的请求吞吐量不会很高没有严格的延迟要求。如果你有无限的预算,你会为这样的次要工作负载安装一个次要的键值数据库,但实际上,多维护一个存储系统(监控、备份等)会有额外的成本,这是可以考虑避免的。
如果你决定违背建议对ClickHouse运行一些类似键值的查询这里有一些提示。
如果你决定不遵从这些建议想要使用ClickHouse运行一些类似键值的查询那么这里有一些提示。
- ClickHouse中点查询昂贵的关键原因是其稀疏的主索引[MergeTree表引擎家族]../../engines/table-engines/mergetree-family/mergetree.md。这个索引不能指向每一行具体的数据相反它指向每N行系统必须从邻近的N行扫描到所需的行沿途读取过多的数据。在一个键值场景中通过`index_granularity`的设置来减少N的值可能是有用的。
- ClickHouse将每一列保存在一组单独的文件中所以要组装一个完整的行它需要通过这些文件中的每一个。它们的数量随着列数的增加而线性增加,所以在键值场景中,可能值得避免使用许多列,并将所有的有效数据放在一个单一的`String`列中并以某种序列化格式如JSON、Protobuf或任何有效的格式进行编码。
- ClickHouse中点查询开销大的关键原因是MergeTree表引擎家族[MergeTree表引擎家族]../../engines/table-engines/mergetree-family/mergetree.md采用的稀疏主索引。这个索引不能指向每一行具体的数据相反它指向每N行系统必须从邻近的N行扫描到所需的行沿途读取过多的数据。在一个键值场景中通过`index_granularity`的设置来减少N的值可能是有用的。
- ClickHouse将每一列保存在一组单独的文件中所以要组装一个完整的行它需要访问文件组中的每一个文件。访问数据数量会随着列数的增加而线性增加,所以在键值场景中,需要避免使用许多列,并将所有的有效数据放在一个单一的`String`列中并以某种序列化格式如JSON、Protobuf或任何有效的格式进行编码。
- 还有一种方法,使用[Join](../../engines/table-engines/special/join.md)表引擎代替正常的`MergeTree`表和[joinGet](../../sql-reference/functions/other-functions.md#joinget) 函数来检索数据。它可以提供更好的查询性能,但可能有一些可用性和可靠性问题。下面是一个[使用实例](https://github.com/ClickHouse/ClickHouse/blob/master/tests/queries/0_stateless/00800_versatile_storage_join.sql#L49-L51)。

View File

@ -59,7 +59,7 @@ void setUserAndGroup(std::string arg_uid, std::string arg_gid)
throwFromErrno(fmt::format("Cannot do 'getgrnam_r' to obtain gid from group name ({})", arg_gid), ErrorCodes::SYSTEM_ERROR);
if (!result)
throw Exception("Group {} is not found in the system", ErrorCodes::BAD_ARGUMENTS);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Group {} is not found in the system", arg_gid);
gid = entry.gr_gid;
}
@ -84,7 +84,7 @@ void setUserAndGroup(std::string arg_uid, std::string arg_gid)
throwFromErrno(fmt::format("Cannot do 'getpwnam_r' to obtain uid from user name ({})", arg_uid), ErrorCodes::SYSTEM_ERROR);
if (!result)
throw Exception("User {} is not found in the system", ErrorCodes::BAD_ARGUMENTS);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "User {} is not found in the system", arg_uid);
uid = entry.pw_uid;
}

View File

@ -96,6 +96,10 @@ if (TARGET ch_contrib::rdkafka)
add_headers_and_sources(dbms Storages/Kafka)
endif()
if (TARGET ch_contrib::nats_io)
add_headers_and_sources(dbms Storages/NATS)
endif()
add_headers_and_sources(dbms Storages/MeiliSearch)
if (TARGET ch_contrib::amqp_cpp)
@ -371,6 +375,10 @@ if (TARGET ch_contrib::rdkafka)
dbms_target_link_libraries(PRIVATE ch_contrib::rdkafka ch_contrib::cppkafka)
endif()
if (TARGET ch_contrib::nats_io)
dbms_target_link_libraries(PRIVATE ch_contrib::nats_io)
endif()
if (TARGET ch_contrib::sasl2)
dbms_target_link_libraries(PRIVATE ch_contrib::sasl2)
endif()

View File

@ -1834,9 +1834,21 @@ bool ClientBase::executeMultiQuery(const String & all_queries_text)
bool ClientBase::processQueryText(const String & text)
{
if (exit_strings.end() != exit_strings.find(trim(text, [](char c) { return isWhitespaceASCII(c) || c == ';'; })))
auto trimmed_input = trim(text, [](char c) { return isWhitespaceASCII(c) || c == ';'; });
if (exit_strings.end() != exit_strings.find(trimmed_input))
return false;
if (trimmed_input.starts_with("\\i"))
{
size_t skip_prefix_size = std::strlen("\\i");
auto file_name = trim(
trimmed_input.substr(skip_prefix_size, trimmed_input.size() - skip_prefix_size),
[](char c) { return isWhitespaceASCII(c); });
return processMultiQueryFromFile(file_name);
}
if (!is_multiquery)
{
assert(!query_fuzzer_runs);
@ -2019,6 +2031,17 @@ void ClientBase::runInteractive()
}
bool ClientBase::processMultiQueryFromFile(const String & file_name)
{
String queries_from_file;
ReadBufferFromFile in(file_name);
readStringUntilEOF(queries_from_file, in);
return executeMultiQuery(queries_from_file);
}
void ClientBase::runNonInteractive()
{
if (delayed_interactive)
@ -2026,23 +2049,13 @@ void ClientBase::runNonInteractive()
if (!queries_files.empty())
{
auto process_multi_query_from_file = [&](const String & file)
{
String queries_from_file;
ReadBufferFromFile in(file);
readStringUntilEOF(queries_from_file, in);
return executeMultiQuery(queries_from_file);
};
for (const auto & queries_file : queries_files)
{
for (const auto & interleave_file : interleave_queries_files)
if (!process_multi_query_from_file(interleave_file))
if (!processMultiQueryFromFile(interleave_file))
return;
if (!process_multi_query_from_file(queries_file))
if (!processMultiQueryFromFile(queries_file))
return;
}

View File

@ -154,6 +154,7 @@ private:
protected:
static bool isSyncInsertWithData(const ASTInsertQuery & insert_query, const ContextPtr & context);
bool processMultiQueryFromFile(const String & file_name);
bool is_interactive = false; /// Use either interactive line editing interface or batch mode.
bool is_multiquery = false;

View File

@ -715,29 +715,37 @@ ColumnPtr ColumnNullable::replicate(const Offsets & offsets) const
template <bool negative>
void ColumnNullable::applyNullMapImpl(const ColumnUInt8 & map)
void ColumnNullable::applyNullMapImpl(const NullMap & map)
{
NullMap & arr1 = getNullMapData();
const NullMap & arr2 = map.getData();
NullMap & arr = getNullMapData();
if (arr1.size() != arr2.size())
if (arr.size() != map.size())
throw Exception{"Inconsistent sizes of ColumnNullable objects", ErrorCodes::LOGICAL_ERROR};
for (size_t i = 0, size = arr1.size(); i < size; ++i)
arr1[i] |= negative ^ arr2[i];
for (size_t i = 0, size = arr.size(); i < size; ++i)
arr[i] |= negative ^ map[i];
}
void ColumnNullable::applyNullMap(const ColumnUInt8 & map)
void ColumnNullable::applyNullMap(const NullMap & map)
{
applyNullMapImpl<false>(map);
}
void ColumnNullable::applyNegatedNullMap(const ColumnUInt8 & map)
void ColumnNullable::applyNullMap(const ColumnUInt8 & map)
{
applyNullMapImpl<false>(map.getData());
}
void ColumnNullable::applyNegatedNullMap(const NullMap & map)
{
applyNullMapImpl<true>(map);
}
void ColumnNullable::applyNegatedNullMap(const ColumnUInt8 & map)
{
applyNullMapImpl<true>(map.getData());
}
void ColumnNullable::applyNullMap(const ColumnNullable & other)
{

View File

@ -199,7 +199,9 @@ public:
/// columns.
void applyNullMap(const ColumnNullable & other);
void applyNullMap(const ColumnUInt8 & map);
void applyNullMap(const NullMap & map);
void applyNegatedNullMap(const ColumnUInt8 & map);
void applyNegatedNullMap(const NullMap & map);
/// Check that size of null map equals to size of nested column.
void checkConsistency() const;
@ -209,7 +211,7 @@ private:
WrappedPtr null_map;
template <bool negative>
void applyNullMapImpl(const ColumnUInt8 & map);
void applyNullMapImpl(const NullMap & map);
int compareAtImpl(size_t n, size_t m, const IColumn & rhs_, int null_direction_hint, const Collator * collator=nullptr) const;

View File

@ -70,6 +70,9 @@ size_t Epoll::getManyReady(int max_events, epoll_event * events_out, bool blocki
if (ready_size == -1 && errno != EINTR)
throwFromErrno("Error in epoll_wait", DB::ErrorCodes::EPOLL_ERROR);
if (errno == EINTR)
LOG_TEST(&Poco::Logger::get("Epoll"), "EINTR");
}
while (ready_size <= 0 && (ready_size != 0 || blocking));

View File

@ -633,6 +633,7 @@
M(662, FS_METADATA_ERROR) \
M(663, INCONSISTENT_METADATA_FOR_BACKUP) \
M(664, ACCESS_STORAGE_DOESNT_ALLOW_BACKUP) \
M(665, CANNOT_CONNECT_NATS) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -6,6 +6,8 @@
#include <fcntl.h>
#include <unistd.h>
#include <Common/logger_useful.h>
namespace DB
{
@ -70,6 +72,8 @@ void TimerDescriptor::drain() const
if (errno != EINTR)
throwFromErrno("Cannot drain timer_fd", ErrorCodes::CANNOT_READ_FROM_SOCKET);
else
LOG_TEST(&Poco::Logger::get("TimerDescriptor"), "EINTR");
}
}
}

View File

@ -281,6 +281,13 @@ struct SetResponse : virtual Response
size_t bytesSize() const override { return sizeof(stat); }
};
enum class ListRequestType : uint8_t
{
ALL,
PERSISTENT_ONLY,
EPHEMERAL_ONLY
};
struct ListRequest : virtual Request
{
String path;
@ -492,6 +499,7 @@ public:
virtual void list(
const String & path,
ListRequestType list_request_type,
ListCallback callback,
WatchCallback watch) = 0;

View File

@ -1,3 +1,4 @@
#include "Common/ZooKeeper/IKeeper.h"
#include <Common/ZooKeeper/TestKeeper.h>
#include <Common/setThreadName.h>
#include <Common/StringUtils/StringUtils.h>
@ -119,12 +120,17 @@ struct TestKeeperSetRequest final : SetRequest, TestKeeperRequest
}
};
struct TestKeeperListRequest final : ListRequest, TestKeeperRequest
struct TestKeeperListRequest : ListRequest, TestKeeperRequest
{
ResponsePtr createResponse() const override;
std::pair<ResponsePtr, Undo> process(TestKeeper::Container & container, int64_t zxid) const override;
};
struct TestKeeperFilteredListRequest final : TestKeeperListRequest
{
ListRequestType list_request_type;
};
struct TestKeeperCheckRequest final : CheckRequest, TestKeeperRequest
{
TestKeeperCheckRequest() = default;
@ -390,8 +396,18 @@ std::pair<ResponsePtr, Undo> TestKeeperListRequest::process(TestKeeper::Containe
child_it != container.end() && startsWith(child_it->first, path_prefix);
++child_it)
{
using enum ListRequestType;
if (parentPath(child_it->first) == path)
response.names.emplace_back(baseName(child_it->first));
{
ListRequestType list_request_type = ALL;
if (const auto * filtered_list = dynamic_cast<const TestKeeperFilteredListRequest *>(this))
list_request_type = filtered_list->list_request_type;
const auto is_ephemeral = child_it->second.stat.ephemeralOwner != 0;
if (list_request_type == ALL || (is_ephemeral && list_request_type == EPHEMERAL_ONLY)
|| (!is_ephemeral && list_request_type == PERSISTENT_ONLY))
response.names.emplace_back(baseName(child_it->first));
}
}
response.stat = it->second.stat;
@ -768,11 +784,13 @@ void TestKeeper::set(
void TestKeeper::list(
const String & path,
ListRequestType list_request_type,
ListCallback callback,
WatchCallback watch)
{
TestKeeperListRequest request;
TestKeeperFilteredListRequest request;
request.path = path;
request.list_request_type = list_request_type;
RequestInfo request_info;
request_info.request = std::make_shared<TestKeeperListRequest>(std::move(request));

View File

@ -71,6 +71,7 @@ public:
void list(
const String & path,
ListRequestType list_request_type,
ListCallback callback,
WatchCallback watch) override;

View File

@ -9,6 +9,7 @@
#include <base/find_symbols.h>
#include <base/sort.h>
#include <base/getFQDNOrHostName.h>
#include "Common/ZooKeeper/IKeeper.h"
#include <Common/StringUtils/StringUtils.h>
#include <Common/Exception.h>
#include <Common/isLocalAddress.h>
@ -312,9 +313,10 @@ static Coordination::WatchCallback callbackForEvent(const EventPtr & watch)
Coordination::Error ZooKeeper::getChildrenImpl(const std::string & path, Strings & res,
Coordination::Stat * stat,
Coordination::WatchCallback watch_callback)
Coordination::WatchCallback watch_callback,
Coordination::ListRequestType list_request_type)
{
auto future_result = asyncTryGetChildrenNoThrow(path, watch_callback);
auto future_result = asyncTryGetChildrenNoThrow(path, watch_callback, list_request_type);
if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready)
{
@ -335,26 +337,28 @@ Coordination::Error ZooKeeper::getChildrenImpl(const std::string & path, Strings
}
}
Strings ZooKeeper::getChildren(
const std::string & path, Coordination::Stat * stat, const EventPtr & watch)
Strings ZooKeeper::getChildren(const std::string & path, Coordination::Stat * stat, const EventPtr & watch)
{
Strings res;
check(tryGetChildren(path, res, stat, watch), path);
return res;
}
Strings ZooKeeper::getChildrenWatch(
const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback)
Strings ZooKeeper::getChildrenWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback)
{
Strings res;
check(tryGetChildrenWatch(path, res, stat, watch_callback), path);
return res;
}
Coordination::Error ZooKeeper::tryGetChildren(const std::string & path, Strings & res,
Coordination::Stat * stat, const EventPtr & watch)
Coordination::Error ZooKeeper::tryGetChildren(
const std::string & path,
Strings & res,
Coordination::Stat * stat,
const EventPtr & watch,
Coordination::ListRequestType list_request_type)
{
Coordination::Error code = getChildrenImpl(path, res, stat, callbackForEvent(watch));
Coordination::Error code = getChildrenImpl(path, res, stat, callbackForEvent(watch), list_request_type);
if (!(code == Coordination::Error::ZOK || code == Coordination::Error::ZNONODE))
throw KeeperException(code, path);
@ -362,10 +366,14 @@ Coordination::Error ZooKeeper::tryGetChildren(const std::string & path, Strings
return code;
}
Coordination::Error ZooKeeper::tryGetChildrenWatch(const std::string & path, Strings & res,
Coordination::Stat * stat, Coordination::WatchCallback watch_callback)
Coordination::Error ZooKeeper::tryGetChildrenWatch(
const std::string & path,
Strings & res,
Coordination::Stat * stat,
Coordination::WatchCallback watch_callback,
Coordination::ListRequestType list_request_type)
{
Coordination::Error code = getChildrenImpl(path, res, stat, watch_callback);
Coordination::Error code = getChildrenImpl(path, res, stat, watch_callback, list_request_type);
if (!(code == Coordination::Error::ZOK || code == Coordination::Error::ZNONODE))
throw KeeperException(code, path);
@ -1046,7 +1054,8 @@ std::future<Coordination::SetResponse> ZooKeeper::asyncTrySetNoThrow(const std::
return future;
}
std::future<Coordination::ListResponse> ZooKeeper::asyncGetChildren(const std::string & path, Coordination::WatchCallback watch_callback)
std::future<Coordination::ListResponse> ZooKeeper::asyncGetChildren(
const std::string & path, Coordination::WatchCallback watch_callback, Coordination::ListRequestType list_request_type)
{
auto promise = std::make_shared<std::promise<Coordination::ListResponse>>();
auto future = promise->get_future();
@ -1059,11 +1068,12 @@ std::future<Coordination::ListResponse> ZooKeeper::asyncGetChildren(const std::s
promise->set_value(response);
};
impl->list(path, std::move(callback), watch_callback);
impl->list(path, list_request_type, std::move(callback), watch_callback);
return future;
}
std::future<Coordination::ListResponse> ZooKeeper::asyncTryGetChildrenNoThrow(const std::string & path, Coordination::WatchCallback watch_callback)
std::future<Coordination::ListResponse> ZooKeeper::asyncTryGetChildrenNoThrow(
const std::string & path, Coordination::WatchCallback watch_callback, Coordination::ListRequestType list_request_type)
{
auto promise = std::make_shared<std::promise<Coordination::ListResponse>>();
auto future = promise->get_future();
@ -1073,7 +1083,7 @@ std::future<Coordination::ListResponse> ZooKeeper::asyncTryGetChildrenNoThrow(co
promise->set_value(response);
};
impl->list(path, std::move(callback), watch_callback);
impl->list(path, list_request_type, std::move(callback), watch_callback);
return future;
}

View File

@ -194,11 +194,13 @@ public:
/// * The node doesn't exist.
Coordination::Error tryGetChildren(const std::string & path, Strings & res,
Coordination::Stat * stat = nullptr,
const EventPtr & watch = nullptr);
const EventPtr & watch = nullptr,
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
Coordination::Error tryGetChildrenWatch(const std::string & path, Strings & res,
Coordination::Stat * stat,
Coordination::WatchCallback watch_callback);
Coordination::WatchCallback watch_callback,
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
/// Performs several operations in a transaction.
/// Throws on every error.
@ -279,9 +281,15 @@ public:
FutureExists asyncTryExistsNoThrow(const std::string & path, Coordination::WatchCallback watch_callback = {});
using FutureGetChildren = std::future<Coordination::ListResponse>;
FutureGetChildren asyncGetChildren(const std::string & path, Coordination::WatchCallback watch_callback = {});
FutureGetChildren asyncGetChildren(
const std::string & path,
Coordination::WatchCallback watch_callback = {},
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
/// Like the previous one but don't throw any exceptions on future.get()
FutureGetChildren asyncTryGetChildrenNoThrow(const std::string & path, Coordination::WatchCallback watch_callback = {});
FutureGetChildren asyncTryGetChildrenNoThrow(
const std::string & path,
Coordination::WatchCallback watch_callback = {},
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
using FutureSet = std::future<Coordination::SetResponse>;
FutureSet asyncSet(const std::string & path, const std::string & data, int32_t version = -1);
@ -335,7 +343,11 @@ private:
const std::string & path, std::string & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback);
Coordination::Error setImpl(const std::string & path, const std::string & data, int32_t version, Coordination::Stat * stat);
Coordination::Error getChildrenImpl(
const std::string & path, Strings & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback);
const std::string & path,
Strings & res,
Coordination::Stat * stat,
Coordination::WatchCallback watch_callback,
Coordination::ListRequestType list_request_type);
Coordination::Error multiImpl(const Coordination::Requests & requests, Coordination::Responses & responses);
Coordination::Error existsImpl(const std::string & path, Coordination::Stat * stat_, Coordination::WatchCallback watch_callback);
Coordination::Error syncImpl(const std::string & path, std::string & returned_path);

View File

@ -1,3 +1,4 @@
#include "Common/ZooKeeper/IKeeper.h"
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Common/Stopwatch.h>
@ -298,6 +299,32 @@ std::string ZooKeeperListRequest::toStringImpl() const
return fmt::format("path = {}", path);
}
void ZooKeeperFilteredListRequest::writeImpl(WriteBuffer & out) const
{
Coordination::write(path, out);
Coordination::write(has_watch, out);
Coordination::write(static_cast<uint8_t>(list_request_type), out);
}
void ZooKeeperFilteredListRequest::readImpl(ReadBuffer & in)
{
Coordination::read(path, in);
Coordination::read(has_watch, in);
uint8_t read_request_type{0};
Coordination::read(read_request_type, in);
list_request_type = static_cast<ListRequestType>(read_request_type);
}
std::string ZooKeeperFilteredListRequest::toStringImpl() const
{
return fmt::format(
"path = {}\n"
"list_request_type = {}",
path,
list_request_type);
}
void ZooKeeperListResponse::readImpl(ReadBuffer & in)
{
Coordination::read(names, in);

View File

@ -347,6 +347,18 @@ struct ZooKeeperSimpleListRequest final : ZooKeeperListRequest
OpNum getOpNum() const override { return OpNum::SimpleList; }
};
struct ZooKeeperFilteredListRequest final : ZooKeeperListRequest
{
ListRequestType list_request_type{ListRequestType::ALL};
OpNum getOpNum() const override { return OpNum::FilteredList; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override;
size_t bytesSize() const override { return ZooKeeperListRequest::bytesSize() + sizeof(list_request_type); }
};
struct ZooKeeperListResponse : ListResponse, ZooKeeperResponse
{
void readImpl(ReadBuffer & in) override;

View File

@ -64,6 +64,8 @@ std::string toString(OpNum op_num)
return "SetACL";
case OpNum::GetACL:
return "GetACL";
case OpNum::FilteredList:
return "FilteredList";
}
int32_t raw_op = static_cast<int32_t>(op_num);
throw Exception("Operation " + std::to_string(raw_op) + " is unknown", Error::ZUNIMPLEMENTED);

View File

@ -32,6 +32,10 @@ enum class OpNum : int32_t
Check = 13,
Multi = 14,
Auth = 100,
// CH Keeper specific operations
FilteredList = 500,
SessionID = 997, /// Special internal request
};

View File

@ -28,6 +28,11 @@ void write(int32_t x, WriteBuffer & out)
writeBinary(x, out);
}
void write(uint8_t x, WriteBuffer & out)
{
writeBinary(x, out);
}
void write(OpNum x, WriteBuffer & out)
{
write(static_cast<int32_t>(x), out);
@ -91,6 +96,11 @@ void read(int64_t & x, ReadBuffer & in)
x = __builtin_bswap64(x);
}
void read(uint8_t & x, ReadBuffer & in)
{
readBinary(x, in);
}
void read(int32_t & x, ReadBuffer & in)
{
readBinary(x, in);

View File

@ -22,6 +22,7 @@ void write(uint64_t x, WriteBuffer & out);
void write(int64_t x, WriteBuffer & out);
void write(int32_t x, WriteBuffer & out);
void write(uint8_t x, WriteBuffer & out);
void write(OpNum x, WriteBuffer & out);
void write(bool x, WriteBuffer & out);
void write(const std::string & s, WriteBuffer & out);
@ -50,6 +51,7 @@ void read(uint64_t & x, ReadBuffer & in);
#endif
void read(int64_t & x, ReadBuffer & in);
void read(int32_t & x, ReadBuffer & in);
void read(uint8_t & x, ReadBuffer & in);
void read(OpNum & x, ReadBuffer & in);
void read(bool & x, ReadBuffer & in);
void read(int8_t & x, ReadBuffer & in);

View File

@ -1168,11 +1168,13 @@ void ZooKeeper::set(
void ZooKeeper::list(
const String & path,
ListRequestType list_request_type,
ListCallback callback,
WatchCallback watch)
{
ZooKeeperListRequest request;
ZooKeeperFilteredListRequest request;
request.path = path;
request.list_request_type = list_request_type;
RequestInfo request_info;
request_info.request = std::make_shared<ZooKeeperListRequest>(std::move(request));

View File

@ -164,6 +164,7 @@ public:
void list(
const String & path,
ListRequestType list_request_type,
ListCallback callback,
WatchCallback watch) override;

View File

@ -289,6 +289,9 @@ void KeeperStateMachine::rollback(uint64_t log_idx, nuraft::buffer & data)
if (!request_for_session.zxid)
request_for_session.zxid = log_idx;
if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID)
return;
std::lock_guard lock(storage_and_responses_lock);
storage->rollbackRequest(request_for_session.zxid);
}

View File

@ -6,6 +6,7 @@
#include <boost/algorithm/string.hpp>
#include <Poco/Base64Encoder.h>
#include <Poco/SHA1Engine.h>
#include "Common/ZooKeeper/ZooKeeperCommon.h"
#include <Common/SipHash.h>
#include <Common/ZooKeeper/ZooKeeperConstants.h>
#include <Common/StringUtils/StringUtils.h>
@ -19,6 +20,7 @@
#include <mutex>
#include <functional>
#include <base/defines.h>
#include <filesystem>
namespace DB
{
@ -1161,6 +1163,7 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc
}
auto & container = storage.container;
auto node_it = container.find(request.path);
if (node_it == container.end())
{
@ -1178,8 +1181,31 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc
const auto & children = node_it->value.getChildren();
response.names.reserve(children.size());
const auto add_child = [&](const auto child)
{
using enum Coordination::ListRequestType;
auto list_request_type = ALL;
if (auto * filtered_list = dynamic_cast<Coordination::ZooKeeperFilteredListRequest *>(&request))
list_request_type = filtered_list->list_request_type;
if (list_request_type == ALL)
return true;
auto child_path = (std::filesystem::path(request.path) / child.toView()).generic_string();
auto child_it = container.find(child_path);
if (child_it == container.end())
onStorageInconsistency();
const auto is_ephemeral = child_it->value.stat.ephemeralOwner != 0;
return (is_ephemeral && list_request_type == EPHEMERAL_ONLY) || (!is_ephemeral && list_request_type == PERSISTENT_ONLY);
};
for (const auto child : children)
response.names.push_back(child.toString());
{
if (add_child(child))
response.names.push_back(child.toString());
}
response.stat = node_it->value.stat;
response.error = Coordination::Error::ZOK;
@ -1623,7 +1649,7 @@ struct KeeperStorageAuthRequestProcessor final : public KeeperStorageRequestProc
void KeeperStorage::finalize()
{
if (finalized)
throw DB::Exception("Testkeeper storage already finalized", ErrorCodes::LOGICAL_ERROR);
throw DB::Exception("KeeperStorage already finalized", ErrorCodes::LOGICAL_ERROR);
finalized = true;
@ -1689,6 +1715,7 @@ KeeperStorageRequestProcessorsFactory::KeeperStorageRequestProcessorsFactory()
registerKeeperRequestProcessor<Coordination::OpNum::Set, KeeperStorageSetRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::List, KeeperStorageListRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::SimpleList, KeeperStorageListRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::FilteredList, KeeperStorageListRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::Check, KeeperStorageCheckRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::Multi, KeeperStorageMultiRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::SetACL, KeeperStorageSetACLRequestProcessor>(*this);

View File

@ -27,6 +27,7 @@
#include <Coordination/Changelog.h>
#include <filesystem>
#include <Common/SipHash.h>
#include <Coordination/pathUtils.h>
#include <Coordination/SnapshotableHashTable.h>
@ -1956,6 +1957,84 @@ TEST_P(CoordinationTest, TestUncommittedStateBasicCrud)
ASSERT_FALSE(get_committed_data());
}
TEST_P(CoordinationTest, TestListRequestTypes)
{
using namespace DB;
using namespace Coordination;
KeeperStorage storage{500, "", true};
int64_t zxid = 0;
static constexpr std::string_view path = "/test";
const auto create_path = [&](bool is_ephemeral)
{
const auto create_request = std::make_shared<ZooKeeperCreateRequest>();
int new_zxid = ++zxid;
create_request->path = path;
create_request->is_sequential = true;
create_request->is_ephemeral = is_ephemeral;
storage.preprocessRequest(create_request, 1, 0, new_zxid);
auto responses = storage.processRequest(create_request, 1, new_zxid);
EXPECT_GE(responses.size(), 1);
const auto & create_response = dynamic_cast<ZooKeeperCreateResponse &>(*responses[0].response);
return create_response.path_created;
};
static constexpr size_t persistent_num = 5;
std::unordered_set<std::string> expected_persistent_children;
for (size_t i = 0; i < persistent_num; ++i)
{
expected_persistent_children.insert(getBaseName(create_path(false)).toString());
}
ASSERT_EQ(expected_persistent_children.size(), persistent_num);
static constexpr size_t ephemeral_num = 5;
std::unordered_set<std::string> expected_ephemeral_children;
for (size_t i = 0; i < ephemeral_num; ++i)
{
expected_ephemeral_children.insert(getBaseName(create_path(true)).toString());
}
ASSERT_EQ(expected_ephemeral_children.size(), ephemeral_num);
const auto get_children = [&](const auto list_request_type)
{
const auto list_request = std::make_shared<ZooKeeperFilteredListRequest>();
int new_zxid = ++zxid;
list_request->path = parentPath(StringRef{path}).toString();
list_request->list_request_type = list_request_type;
storage.preprocessRequest(list_request, 1, 0, new_zxid);
auto responses = storage.processRequest(list_request, 1, new_zxid);
EXPECT_GE(responses.size(), 1);
const auto & list_response = dynamic_cast<ZooKeeperListResponse &>(*responses[0].response);
return list_response.names;
};
const auto persistent_children = get_children(ListRequestType::PERSISTENT_ONLY);
EXPECT_EQ(persistent_children.size(), persistent_num);
for (const auto & child : persistent_children)
{
EXPECT_TRUE(expected_persistent_children.contains(child)) << "Missing persistent child " << child;
}
const auto ephemeral_children = get_children(ListRequestType::EPHEMERAL_ONLY);
EXPECT_EQ(ephemeral_children.size(), ephemeral_num);
for (const auto & child : ephemeral_children)
{
EXPECT_TRUE(expected_ephemeral_children.contains(child)) << "Missing ephemeral child " << child;
}
const auto all_children = get_children(ListRequestType::ALL);
EXPECT_EQ(all_children.size(), ephemeral_num + persistent_num);
for (const auto & child : all_children)
{
EXPECT_TRUE(expected_ephemeral_children.contains(child) || expected_persistent_children.contains(child)) << "Missing child " << child;
}
}
INSTANTIATE_TEST_SUITE_P(CoordinationTestSuite,
CoordinationTest,
::testing::ValuesIn(std::initializer_list<CompressionParam>{

View File

@ -559,4 +559,31 @@ String toString(const Field & x)
x);
}
String fieldTypeToString(Field::Types::Which type)
{
switch (type)
{
case Field::Types::Which::Null: return "Null";
case Field::Types::Which::Array: return "Array";
case Field::Types::Which::Tuple: return "Tuple";
case Field::Types::Which::Map: return "Map";
case Field::Types::Which::Object: return "Object";
case Field::Types::Which::AggregateFunctionState: return "AggregateFunctionState";
case Field::Types::Which::Bool: return "Bool";
case Field::Types::Which::String: return "String";
case Field::Types::Which::Decimal32: return "Decimal32";
case Field::Types::Which::Decimal64: return "Decimal64";
case Field::Types::Which::Decimal128: return "Decimal128";
case Field::Types::Which::Decimal256: return "Decimal256";
case Field::Types::Which::Float64: return "Float64";
case Field::Types::Which::Int64: return "Int64";
case Field::Types::Which::Int128: return "Int128";
case Field::Types::Which::Int256: return "Int256";
case Field::Types::Which::UInt64: return "UInt64";
case Field::Types::Which::UInt128: return "UInt128";
case Field::Types::Which::UInt256: return "UInt256";
case Field::Types::Which::UUID: return "UUID";
}
}
}

View File

@ -1011,6 +1011,8 @@ void writeFieldText(const Field & x, WriteBuffer & buf);
String toString(const Field & x);
String fieldTypeToString(Field::Types::Which type);
}
template <>

View File

@ -73,6 +73,7 @@ void Connection::connect()
if (!connection || !connection->is_open())
updateConnection();
}
}
#endif

View File

@ -32,7 +32,10 @@ struct ConnectionInfo
class Connection : private boost::noncopyable
{
public:
explicit Connection(const ConnectionInfo & connection_info_, bool replication_ = false, size_t num_tries = 3);
explicit Connection(
const ConnectionInfo & connection_info_,
bool replication_ = false,
size_t num_tries = 3);
void execWithRetry(const std::function<void(pqxx::nontransaction &)> & exec);

View File

@ -20,11 +20,20 @@ class ConnectionHolder
{
public:
ConnectionHolder(PoolPtr pool_, ConnectionPtr connection_) : pool(pool_), connection(std::move(connection_)) {}
ConnectionHolder(PoolPtr pool_, ConnectionPtr connection_, bool auto_close_)
: pool(pool_)
, connection(std::move(connection_))
, auto_close(auto_close_)
{}
ConnectionHolder(const ConnectionHolder & other) = delete;
~ConnectionHolder() { pool->returnObject(std::move(connection)); }
~ConnectionHolder()
{
if (auto_close)
connection.reset();
pool->returnObject(std::move(connection));
}
pqxx::connection & get()
{
@ -39,6 +48,7 @@ public:
private:
PoolPtr pool;
ConnectionPtr connection;
bool auto_close;
};
using ConnectionHolderPtr = std::unique_ptr<ConnectionHolder>;

View File

@ -5,6 +5,7 @@
#include "Utils.h"
#include <Common/parseRemoteDescription.h>
#include <Common/Exception.h>
#include <Common/quoteString.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
@ -20,10 +21,14 @@ namespace postgres
{
PoolWithFailover::PoolWithFailover(
const DB::ExternalDataSourcesConfigurationByPriority & configurations_by_priority,
size_t pool_size, size_t pool_wait_timeout_, size_t max_tries_)
: pool_wait_timeout(pool_wait_timeout_)
, max_tries(max_tries_)
const DB::ExternalDataSourcesConfigurationByPriority & configurations_by_priority,
size_t pool_size,
size_t pool_wait_timeout_,
size_t max_tries_,
bool auto_close_connection_)
: pool_wait_timeout(pool_wait_timeout_)
, max_tries(max_tries_)
, auto_close_connection(auto_close_connection_)
{
LOG_TRACE(&Poco::Logger::get("PostgreSQLConnectionPool"), "PostgreSQL connection pool size: {}, connection wait timeout: {}, max failover tries: {}",
pool_size, pool_wait_timeout, max_tries_);
@ -40,10 +45,14 @@ PoolWithFailover::PoolWithFailover(
}
PoolWithFailover::PoolWithFailover(
const DB::StoragePostgreSQLConfiguration & configuration,
size_t pool_size, size_t pool_wait_timeout_, size_t max_tries_)
const DB::StoragePostgreSQLConfiguration & configuration,
size_t pool_size,
size_t pool_wait_timeout_,
size_t max_tries_,
bool auto_close_connection_)
: pool_wait_timeout(pool_wait_timeout_)
, max_tries(max_tries_)
, auto_close_connection(auto_close_connection_)
{
LOG_TRACE(&Poco::Logger::get("PostgreSQLConnectionPool"), "PostgreSQL connection pool size: {}, connection wait timeout: {}, max failover tries: {}",
pool_size, pool_wait_timeout, max_tries_);
@ -94,7 +103,9 @@ ConnectionHolderPtr PoolWithFailover::get()
catch (const pqxx::broken_connection & pqxx_error)
{
LOG_ERROR(log, "Connection error: {}", pqxx_error.what());
error_message << "Try " << try_idx + 1 << ". Connection to `" << replica.connection_info.host_port << "` failed: " << pqxx_error.what() << "\n";
error_message << fmt::format(
"Try {}. Connection to {} failed with error: {}\n",
try_idx + 1, DB::backQuote(replica.connection_info.host_port), pqxx_error.what());
replica.pool->returnObject(std::move(connection));
continue;
@ -105,7 +116,7 @@ ConnectionHolderPtr PoolWithFailover::get()
throw;
}
auto connection_holder = std::make_unique<ConnectionHolder>(replica.pool, std::move(connection));
auto connection_holder = std::make_unique<ConnectionHolder>(replica.pool, std::move(connection), auto_close_connection);
/// Move all traversed replicas to the end.
if (replicas.size() > 1)

View File

@ -12,6 +12,10 @@
#include <Storages/ExternalDataSourceConfiguration.h>
static constexpr inline auto POSTGRESQL_POOL_DEFAULT_SIZE = 16;
static constexpr inline auto POSTGRESQL_POOL_WAIT_TIMEOUT = 5000;
static constexpr inline auto POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES = 5;
namespace postgres
{
@ -21,21 +25,19 @@ class PoolWithFailover
using RemoteDescription = std::vector<std::pair<String, uint16_t>>;
public:
static constexpr inline auto POSTGRESQL_POOL_DEFAULT_SIZE = 16;
static constexpr inline auto POSTGRESQL_POOL_WAIT_TIMEOUT = 5000;
static constexpr inline auto POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES = 5;
explicit PoolWithFailover(
PoolWithFailover(
const DB::ExternalDataSourcesConfigurationByPriority & configurations_by_priority,
size_t pool_size = POSTGRESQL_POOL_DEFAULT_SIZE,
size_t pool_wait_timeout = POSTGRESQL_POOL_WAIT_TIMEOUT,
size_t max_tries_ = POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
size_t pool_size,
size_t pool_wait_timeout,
size_t max_tries_,
bool auto_close_connection_);
explicit PoolWithFailover(
const DB::StoragePostgreSQLConfiguration & configuration,
size_t pool_size = POSTGRESQL_POOL_DEFAULT_SIZE,
size_t pool_wait_timeout = POSTGRESQL_POOL_WAIT_TIMEOUT,
size_t max_tries_ = POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
size_t pool_size,
size_t pool_wait_timeout,
size_t max_tries_,
bool auto_close_connection_);
PoolWithFailover(const PoolWithFailover & other) = delete;
@ -58,6 +60,7 @@ private:
ReplicasWithPriority replicas_with_priority;
size_t pool_wait_timeout;
size_t max_tries;
bool auto_close_connection;
std::mutex mutex;
Poco::Logger * log = &Poco::Logger::get("PostgreSQLConnectionPool");
};

View File

@ -96,7 +96,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.", 0) \
M(UInt64, max_replicated_fetches_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for replicated fetches. Zero means unlimited. Only has meaning at server startup.", 0) \
M(UInt64, max_replicated_sends_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for replicated sends. Zero means unlimited. Only has meaning at server startup.", 0) \
M(Bool, stream_like_engine_allow_direct_select, false, "Allow direct SELECT query for Kafka, RabbitMQ and FileLog engines. In case there are attached materialized views, SELECT query is not allowed even if this setting is enabled.", 0) \
M(Bool, stream_like_engine_allow_direct_select, false, "Allow direct SELECT query for Kafka, RabbitMQ, FileLog, Redis Streams and NATS engines. In case there are attached materialized views, SELECT query is not allowed even if this setting is enabled.", 0) \
M(String, stream_like_engine_insert_queue, "", "When stream like engine reads from multiple queues, user will need to select one queue to insert into when writing. Used by Redis Streams and NATS.", 0) \
\
M(Milliseconds, distributed_directory_monitor_sleep_time_ms, 100, "Sleep time for StorageDistributed DirectoryMonitors, in case of any errors delay grows exponentially.", 0) \
M(Milliseconds, distributed_directory_monitor_max_sleep_time_ms, 30000, "Maximum sleep time for StorageDistributed DirectoryMonitors, it limits exponential growth too.", 0) \
@ -428,6 +429,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
\
M(UInt64, postgresql_connection_pool_size, 16, "Connection pool size for PostgreSQL table engine and database engine.", 0) \
M(UInt64, postgresql_connection_pool_wait_timeout, 5000, "Connection pool push/pop timeout on empty pool for PostgreSQL table engine and database engine. By default it will block on empty pool.", 0) \
M(Bool, postgresql_connection_pool_auto_close_connection, false, "Close connection before returning connection to the pool.", 0) \
M(UInt64, glob_expansion_max_elements, 1000, "Maximum number of allowed addresses (For external storages, table functions, etc).", 0) \
M(UInt64, odbc_bridge_connection_pool_size, 16, "Connection pool size for each connection settings string in ODBC bridge.", 0) \
M(Bool, odbc_bridge_use_connection_pooling, true, "Use connection pooling in ODBC bridge. If set to false, a new connection is created every time", 0) \
@ -763,6 +765,12 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
\
M(String, input_format_mysql_dump_table_name, "", "Name of the table in MySQL dump from which to read data", 0) \
M(Bool, input_format_mysql_dump_map_column_names, true, "Match columns from table in MySQL dump and columns from ClickHouse table by names", 0) \
\
M(UInt64, output_format_sql_insert_max_batch_size, DEFAULT_BLOCK_SIZE, "The maximum number of rows in one INSERT statement.", 0) \
M(String, output_format_sql_insert_table_name, "table", "The name of table in the output INSERT query", 0) \
M(Bool, output_format_sql_insert_include_column_names, true, "Include column names in INSERT query", 0) \
M(Bool, output_format_sql_insert_use_replace, false, "Use REPLACE statement instead of INSERT", 0) \
M(Bool, output_format_sql_insert_quote_names, true, "Quote column names with '`' characters", 0) \
// End of FORMAT_FACTORY_SETTINGS
// Please add settings non-related to formats into the COMMON_SETTINGS above.

View File

@ -30,12 +30,13 @@ IMPLEMENT_SETTING_ENUM(JoinStrictness, ErrorCodes::UNKNOWN_JOIN,
{"ANY", JoinStrictness::ANY}})
IMPLEMENT_SETTING_ENUM(JoinAlgorithm, ErrorCodes::UNKNOWN_JOIN,
IMPLEMENT_SETTING_MULTI_ENUM(JoinAlgorithm, ErrorCodes::UNKNOWN_JOIN,
{{"auto", JoinAlgorithm::AUTO},
{"hash", JoinAlgorithm::HASH},
{"partial_merge", JoinAlgorithm::PARTIAL_MERGE},
{"prefer_partial_merge", JoinAlgorithm::PREFER_PARTIAL_MERGE},
{"parallel_hash", JoinAlgorithm::PARALLEL_HASH}})
{"parallel_hash", JoinAlgorithm::PARALLEL_HASH},
{"direct", JoinAlgorithm::DIRECT}})
IMPLEMENT_SETTING_ENUM(TotalsMode, ErrorCodes::UNKNOWN_TOTALS_MODE,

View File

@ -43,9 +43,10 @@ enum class JoinAlgorithm
PARTIAL_MERGE,
PREFER_PARTIAL_MERGE,
PARALLEL_HASH,
DIRECT,
};
DECLARE_SETTING_ENUM(JoinAlgorithm)
DECLARE_SETTING_MULTI_ENUM(JoinAlgorithm)
/// Which rows should be included in TOTALS.

View File

@ -6,6 +6,7 @@
#cmakedefine01 USE_MYSQL
#cmakedefine01 USE_RDKAFKA
#cmakedefine01 USE_AMQPCPP
#cmakedefine01 USE_NATSIO
#cmakedefine01 USE_EMBEDDED_COMPILER
#cmakedefine01 USE_SSL
#cmakedefine01 USE_LDAP

View File

@ -344,9 +344,13 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
use_table_cache = safeGetLiteralValue<UInt8>(engine_args[5], engine_name);
}
auto pool = std::make_shared<postgres::PoolWithFailover>(configuration,
context->getSettingsRef().postgresql_connection_pool_size,
context->getSettingsRef().postgresql_connection_pool_wait_timeout);
const auto & settings = context->getSettingsRef();
auto pool = std::make_shared<postgres::PoolWithFailover>(
configuration,
settings.postgresql_connection_pool_size,
settings.postgresql_connection_pool_wait_timeout,
POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES,
settings.postgresql_connection_pool_auto_close_connection);
return std::make_shared<DatabasePostgreSQL>(
context, metadata_path, engine_define, database_name, configuration, pool, use_table_cache);

View File

@ -4,8 +4,7 @@ add_headers_and_sources(clickhouse_dictionaries .)
add_headers_and_sources(clickhouse_dictionaries "${CMAKE_CURRENT_BINARY_DIR}/generated/")
if (CMAKE_BUILD_TYPE_UC STREQUAL "RELEASE" OR CMAKE_BUILD_TYPE_UC STREQUAL "RELWITHDEBINFO" OR CMAKE_BUILD_TYPE_UC STREQUAL "MINSIZEREL")
if (OMIT_HEAVY_DEBUG_SYMBOLS)
# Won't generate debug info for files with heavy template instantiation to achieve faster linking and lower size.
set_source_files_properties(
FlatDictionary.cpp
@ -15,7 +14,7 @@ if (CMAKE_BUILD_TYPE_UC STREQUAL "RELEASE" OR CMAKE_BUILD_TYPE_UC STREQUAL "RELW
RangeHashedDictionary.cpp
DirectDictionary.cpp
PROPERTIES COMPILE_FLAGS -g0)
endif ()
endif()
list(REMOVE_ITEM clickhouse_dictionaries_sources DictionaryFactory.cpp DictionarySourceFactory.cpp DictionaryStructure.cpp getDictionaryConfigurationFromAST.cpp)
list(REMOVE_ITEM clickhouse_dictionaries_headers DictionaryFactory.h DictionarySourceFactory.h DictionaryStructure.h getDictionaryConfigurationFromAST.h)

View File

@ -191,10 +191,13 @@ void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory)
const auto settings_config_prefix = config_prefix + ".postgresql";
auto has_config_key = [](const String & key) { return dictionary_allowed_keys.contains(key) || key.starts_with("replica"); };
auto configuration = getExternalDataSourceConfigurationByPriority(config, settings_config_prefix, context, has_config_key);
const auto & settings = context->getSettingsRef();
auto pool = std::make_shared<postgres::PoolWithFailover>(
configuration.replicas_configurations,
context->getSettingsRef().postgresql_connection_pool_size,
context->getSettingsRef().postgresql_connection_pool_wait_timeout);
configuration.replicas_configurations,
settings.postgresql_connection_pool_size,
settings.postgresql_connection_pool_wait_timeout,
POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES,
settings.postgresql_connection_pool_auto_close_connection);
PostgreSQLDictionarySource::Configuration dictionary_configuration
{

View File

@ -158,6 +158,11 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.column_names_for_schema_inference = settings.column_names_for_schema_inference;
format_settings.mysql_dump.table_name = settings.input_format_mysql_dump_table_name;
format_settings.mysql_dump.map_column_names = settings.input_format_mysql_dump_map_column_names;
format_settings.sql_insert.max_batch_size = settings.output_format_sql_insert_max_batch_size;
format_settings.sql_insert.include_column_names = settings.output_format_sql_insert_include_column_names;
format_settings.sql_insert.table_name = settings.output_format_sql_insert_table_name;
format_settings.sql_insert.use_replace = settings.output_format_sql_insert_use_replace;
format_settings.sql_insert.quote_names = settings.output_format_sql_insert_quote_names;
/// Validate avro_schema_registry_url with RemoteHostFilter when non-empty and in Server context
if (format_settings.schema.is_server)

View File

@ -1,6 +1,7 @@
#pragma once
#include <Core/Names.h>
#include <Core/Defines.h>
#include <base/types.h>
@ -274,6 +275,15 @@ struct FormatSettings
String table_name;
bool map_column_names = true;
} mysql_dump;
struct
{
UInt64 max_batch_size = DEFAULT_BLOCK_SIZE;
String table_name = "table";
bool include_column_names = true;
bool use_replace = false;
bool quote_names = true;
} sql_insert;
};
}

View File

@ -82,6 +82,7 @@ void registerOutputFormatMySQLWire(FormatFactory & factory);
void registerOutputFormatMarkdown(FormatFactory & factory);
void registerOutputFormatPostgreSQLWire(FormatFactory & factory);
void registerOutputFormatPrometheus(FormatFactory & factory);
void registerOutputFormatSQLInsert(FormatFactory & factory);
/// Input only formats.
@ -205,6 +206,7 @@ void registerFormats()
registerOutputFormatPostgreSQLWire(factory);
registerOutputFormatCapnProto(factory);
registerOutputFormatPrometheus(factory);
registerOutputFormatSQLInsert(factory);
registerInputFormatRegexp(factory);
registerInputFormatJSONAsString(factory);

View File

@ -35,25 +35,8 @@ if (TARGET OpenSSL::Crypto)
target_link_libraries(clickhouse_functions PUBLIC OpenSSL::Crypto)
endif()
if (CMAKE_BUILD_TYPE_UC STREQUAL "RELEASE"
OR CMAKE_BUILD_TYPE_UC STREQUAL "RELWITHDEBINFO"
OR CMAKE_BUILD_TYPE_UC STREQUAL "MINSIZEREL")
set (STRIP_DSF_DEFAULT ON)
else()
set (STRIP_DSF_DEFAULT OFF)
endif()
# Provides faster linking and lower binary size.
# Tradeoff is the inability to debug some source files with e.g. gdb
# (empty stack frames and no local variables)."
option(STRIP_DEBUG_SYMBOLS_FUNCTIONS "Do not generate debugger info for ClickHouse functions" ${STRIP_DSF_DEFAULT})
if (STRIP_DEBUG_SYMBOLS_FUNCTIONS)
message(INFO "Not generating debugger info for ClickHouse functions")
if (OMIT_HEAVY_DEBUG_SYMBOLS)
target_compile_options(clickhouse_functions PRIVATE "-g0")
else()
message(STATUS "Generating debugger info for ClickHouse functions")
endif()
if (TARGET ch_contrib::icu)

View File

@ -12,7 +12,7 @@ if (HAS_SUGGEST_DESTRUCTOR_OVERRIDE)
target_compile_definitions(clickhouse_functions_gatherutils PUBLIC HAS_SUGGEST_DESTRUCTOR_OVERRIDE)
endif()
if (STRIP_DEBUG_SYMBOLS_FUNCTIONS)
if (OMIT_HEAVY_DEBUG_SYMBOLS)
target_compile_options(clickhouse_functions_gatherutils PRIVATE "-g0")
endif()

View File

@ -8,6 +8,6 @@ target_link_libraries(clickhouse_functions_jsonpath PRIVATE dbms)
target_link_libraries(clickhouse_functions_jsonpath PRIVATE clickhouse_parsers)
target_link_libraries(clickhouse_functions PRIVATE clickhouse_functions_jsonpath)
if (STRIP_DEBUG_SYMBOLS_FUNCTIONS)
if (OMIT_HEAVY_DEBUG_SYMBOLS)
target_compile_options(clickhouse_functions_jsonpath PRIVATE "-g0")
endif()

View File

@ -3,7 +3,7 @@ add_headers_and_sources(clickhouse_functions_url .)
add_library(clickhouse_functions_url ${clickhouse_functions_url_sources} ${clickhouse_functions_url_headers})
target_link_libraries(clickhouse_functions_url PRIVATE dbms)
if (STRIP_DEBUG_SYMBOLS_FUNCTIONS)
if (OMIT_HEAVY_DEBUG_SYMBOLS)
target_compile_options(clickhouse_functions_url PRIVATE "-g0")
endif()

View File

@ -3,6 +3,6 @@ add_headers_and_sources(clickhouse_functions_array .)
add_library(clickhouse_functions_array ${clickhouse_functions_array_sources} ${clickhouse_functions_array_headers})
target_link_libraries(clickhouse_functions_array PRIVATE dbms clickhouse_functions_gatherutils)
if (STRIP_DEBUG_SYMBOLS_FUNCTIONS)
if (OMIT_HEAVY_DEBUG_SYMBOLS)
target_compile_options(clickhouse_functions_array PRIVATE "-g0")
endif()

View File

@ -47,6 +47,7 @@ public:
off_t getPosition() override;
const ReadBufferFactory & getReadBufferFactory() const { return *reader_factory; }
ReadBufferFactory & getReadBufferFactory() { return *reader_factory; }
private:
/// Reader in progress with a list of read segments

View File

@ -33,6 +33,10 @@ size_t getFileSizeFromReadBuffer(ReadBuffer & in)
{
return getFileSize(compressed->getWrappedReadBuffer());
}
else if (auto * parallel = dynamic_cast<ParallelReadBuffer *>(&in))
{
return getFileSize(parallel->getReadBufferFactory());
}
return getFileSize(in);
}
@ -47,6 +51,10 @@ bool isBufferWithFileSize(const ReadBuffer & in)
{
return isBufferWithFileSize(compressed->getWrappedReadBuffer());
}
else if (const auto * parallel = dynamic_cast<const ParallelReadBuffer *>(&in))
{
return dynamic_cast<const WithFileSize *>(&parallel->getReadBufferFactory()) != nullptr;
}
return dynamic_cast<const WithFileSize *>(&in) != nullptr;
}

View File

@ -374,7 +374,15 @@ Block Aggregator::Params::getHeader(
if (only_merge)
{
res = header.cloneEmpty();
NameSet needed_columns(keys.begin(), keys.end());
for (const auto & aggregate : aggregates)
needed_columns.emplace(aggregate.column_name);
for (const auto & column : header)
{
if (needed_columns.contains(column.name))
res.insert(column.cloneEmpty());
}
if (final)
{

View File

@ -0,0 +1,130 @@
#include <Interpreters/DirectJoin.h>
#include <Columns/ColumnNullable.h>
#include <Interpreters/castColumn.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED;
extern const int UNSUPPORTED_JOIN_KEYS;
extern const int NOT_FOUND_COLUMN_IN_BLOCK;
}
static Block originalRightBlock(const Block & block, const TableJoin & table_join)
{
Block original_right_block;
for (const auto & col : block)
original_right_block.insert({col.column, col.type, table_join.getOriginalName(col.name)});
return original_right_block;
}
/// Converts `columns` from `source_sample_block` structure to `result_sample_block`.
/// Can select subset of columns and change types.
static MutableColumns convertBlockStructure(
const Block & source_sample_block, const Block & result_sample_block, MutableColumns && columns, const PaddedPODArray<UInt8> & null_map)
{
MutableColumns result_columns;
for (const auto & out_sample_col : result_sample_block)
{
auto i = source_sample_block.getPositionByName(out_sample_col.name);
if (columns[i] == nullptr)
{
throw DB::Exception(ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK, "Can't find column '{}'", out_sample_col.name);
}
ColumnWithTypeAndName col = source_sample_block.getByPosition(i);
if (!col.type->equals(*out_sample_col.type))
{
col.column = std::move(columns[i]);
result_columns.push_back(IColumn::mutate(castColumnAccurate(col, out_sample_col.type)));
}
else
{
result_columns.push_back(std::move(columns[i]));
}
columns[i] = nullptr;
if (result_columns.back()->isNullable())
{
assert_cast<ColumnNullable *>(result_columns.back().get())->applyNegatedNullMap(null_map);
}
}
return result_columns;
}
DirectKeyValueJoin::DirectKeyValueJoin(std::shared_ptr<TableJoin> table_join_,
const Block & right_sample_block_,
std::shared_ptr<IKeyValueStorage> storage_)
: table_join(table_join_)
, storage(storage_)
, right_sample_block(right_sample_block_)
, log(&Poco::Logger::get("DirectKeyValueJoin"))
{
if (!table_join->oneDisjunct()
|| table_join->getOnlyClause().key_names_left.size() != 1
|| table_join->getOnlyClause().key_names_right.size() != 1)
{
throw DB::Exception(ErrorCodes::UNSUPPORTED_JOIN_KEYS, "Not supported by direct JOIN");
}
if (table_join->strictness() != ASTTableJoin::Strictness::All &&
table_join->strictness() != ASTTableJoin::Strictness::Any &&
table_join->strictness() != ASTTableJoin::Strictness::RightAny)
{
throw DB::Exception(ErrorCodes::NOT_IMPLEMENTED, "Not supported by direct JOIN");
}
LOG_TRACE(log, "Using direct join");
}
bool DirectKeyValueJoin::addJoinedBlock(const Block &, bool)
{
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Unreachable code reached");
}
void DirectKeyValueJoin::checkTypesOfKeys(const Block & block) const
{
for (const auto & onexpr : table_join->getClauses())
{
JoinCommon::checkTypesOfKeys(block, onexpr.key_names_left, right_sample_block, onexpr.key_names_right);
}
}
void DirectKeyValueJoin::joinBlock(Block & block, std::shared_ptr<ExtraBlock> &)
{
const String & key_name = table_join->getOnlyClause().key_names_left[0];
const ColumnWithTypeAndName & key_col = block.getByName(key_name);
if (!key_col.column)
return;
NullMap null_map;
Chunk joined_chunk = storage->getByKeys({key_col}, null_map);
/// Expected right block may differ from structure in storage, because of `join_use_nulls` or we just select not all columns.
Block original_right_block = originalRightBlock(right_sample_block, *table_join);
Block sample_storage_block = storage->getInMemoryMetadataPtr()->getSampleBlock();
MutableColumns result_columns = convertBlockStructure(sample_storage_block, original_right_block, joined_chunk.mutateColumns(), null_map);
for (size_t i = 0; i < result_columns.size(); ++i)
{
ColumnWithTypeAndName col = right_sample_block.getByPosition(i);
col.column = std::move(result_columns[i]);
block.insert(std::move(col));
}
if (!isLeftOrFull(table_join->kind()))
{
MutableColumns dst_columns = block.mutateColumns();
for (auto & col : dst_columns)
{
col = IColumn::mutate(col->filter(null_map, -1));
}
block.setColumns(std::move(dst_columns));
}
}
}

View File

@ -0,0 +1,60 @@
#pragma once
#include <Common/logger_useful.h>
#include <Core/Block.h>
#include <Interpreters/IJoin.h>
#include <Interpreters/TableJoin.h>
#include <QueryPipeline/SizeLimits.h>
#include <Storages/IKVStorage.h>
#include <Storages/IStorage_fwd.h>
namespace DB
{
class NotJoinedBlocks;
class DirectKeyValueJoin : public IJoin
{
public:
DirectKeyValueJoin(
std::shared_ptr<TableJoin> table_join_,
const Block & right_sample_block_,
std::shared_ptr<IKeyValueStorage> storage_);
virtual const TableJoin & getTableJoin() const override { return *table_join; }
virtual bool addJoinedBlock(const Block &, bool) override;
virtual void checkTypesOfKeys(const Block &) const override;
/// Join the block with data from left hand of JOIN to the right hand data (that was previously built by calls to addJoinedBlock).
/// Could be called from different threads in parallel.
virtual void joinBlock(Block & block, std::shared_ptr<ExtraBlock> &) override;
virtual size_t getTotalRowCount() const override { return 0; }
virtual size_t getTotalByteCount() const override { return 0; }
virtual bool alwaysReturnsEmptySet() const override { return false; }
virtual bool isFilled() const override { return true; }
virtual std::shared_ptr<NotJoinedBlocks>
getNonJoinedBlocks(const Block &, const Block &, UInt64) const override
{
return nullptr;
}
private:
std::shared_ptr<TableJoin> table_join;
std::shared_ptr<IKeyValueStorage> storage;
Block right_sample_block;
Block sample_block_with_columns_to_add;
Poco::Logger * log;
};
}

View File

@ -26,6 +26,7 @@
#include <Interpreters/HashJoin.h>
#include <Interpreters/JoinSwitcher.h>
#include <Interpreters/MergeJoin.h>
#include <Interpreters/DirectJoin.h>
#include <Interpreters/Set.h>
#include <Interpreters/TableJoin.h>
@ -43,9 +44,12 @@
#include <Common/typeid_cast.h>
#include <Common/StringUtils/StringUtils.h>
#include <Core/SettingsEnums.h>
#include <Core/ColumnNumbers.h>
#include <Core/Names.h>
#include <Core/NamesAndTypes.h>
#include <Common/logger_useful.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeFactory.h>
@ -81,6 +85,7 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
extern const int UNKNOWN_IDENTIFIER;
extern const int UNKNOWN_TYPE_OF_AST_NODE;
extern const int UNSUPPORTED_METHOD;
}
namespace
@ -1066,24 +1071,24 @@ static ActionsDAGPtr createJoinedBlockActions(ContextPtr context, const TableJoi
return ExpressionAnalyzer(expression_list, syntax_result, context).getActionsDAG(true, false);
}
static std::shared_ptr<IJoin> chooseJoinAlgorithm(std::shared_ptr<TableJoin> analyzed_join, const Block & sample_block, ContextPtr context)
static std::shared_ptr<IJoin> chooseJoinAlgorithm(std::shared_ptr<TableJoin> analyzed_join, const Block & right_sample_block, ContextPtr context)
{
/// HashJoin with Dictionary optimisation
if (analyzed_join->tryInitDictJoin(sample_block, context))
return std::make_shared<HashJoin>(analyzed_join, sample_block);
if (analyzed_join->tryInitDictJoin(right_sample_block, context))
return std::make_shared<HashJoin>(analyzed_join, right_sample_block);
bool allow_merge_join = analyzed_join->allowMergeJoin();
if (analyzed_join->forceHashJoin() || (analyzed_join->preferMergeJoin() && !allow_merge_join))
{
if (analyzed_join->allowParallelHashJoin())
{
return std::make_shared<ConcurrentHashJoin>(context, analyzed_join, context->getSettings().max_threads, sample_block);
return std::make_shared<ConcurrentHashJoin>(context, analyzed_join, context->getSettings().max_threads, right_sample_block);
}
return std::make_shared<HashJoin>(analyzed_join, sample_block);
return std::make_shared<HashJoin>(analyzed_join, right_sample_block);
}
else if (analyzed_join->forceMergeJoin() || (analyzed_join->preferMergeJoin() && allow_merge_join))
return std::make_shared<MergeJoin>(analyzed_join, sample_block);
return std::make_shared<JoinSwitcher>(analyzed_join, sample_block);
return std::make_shared<MergeJoin>(analyzed_join, right_sample_block);
return std::make_shared<JoinSwitcher>(analyzed_join, right_sample_block);
}
static std::unique_ptr<QueryPlan> buildJoinedPlan(
@ -1094,12 +1099,12 @@ static std::unique_ptr<QueryPlan> buildJoinedPlan(
{
/// Actions which need to be calculated on joined block.
auto joined_block_actions = createJoinedBlockActions(context, analyzed_join);
Names original_right_columns;
NamesWithAliases required_columns_with_aliases = analyzed_join.getRequiredColumns(
Block(joined_block_actions->getResultColumns()), joined_block_actions->getRequiredColumns().getNames());
Names original_right_column_names;
for (auto & pr : required_columns_with_aliases)
original_right_columns.push_back(pr.first);
original_right_column_names.push_back(pr.first);
/** For GLOBAL JOINs (in the case, for example, of the push method for executing GLOBAL subqueries), the following occurs
* - in the addExternalStorage function, the JOIN (SELECT ...) subquery is replaced with JOIN _data1,
@ -1110,18 +1115,18 @@ static std::unique_ptr<QueryPlan> buildJoinedPlan(
auto interpreter = interpretSubquery(
join_element.table_expression,
context,
original_right_columns,
original_right_column_names,
query_options.copy().setWithAllColumns().ignoreProjections(false).ignoreAlias(false));
auto joined_plan = std::make_unique<QueryPlan>();
interpreter->buildQueryPlan(*joined_plan);
{
auto sample_block = interpreter->getSampleBlock();
auto rename_dag = std::make_unique<ActionsDAG>(sample_block.getColumnsWithTypeAndName());
Block original_right_columns = interpreter->getSampleBlock();
auto rename_dag = std::make_unique<ActionsDAG>(original_right_columns.getColumnsWithTypeAndName());
for (const auto & name_with_alias : required_columns_with_aliases)
{
if (sample_block.has(name_with_alias.first))
if (name_with_alias.first != name_with_alias.second && original_right_columns.has(name_with_alias.first))
{
auto pos = sample_block.getPositionByName(name_with_alias.first);
auto pos = original_right_columns.getPositionByName(name_with_alias.first);
const auto & alias = rename_dag->addAlias(*rename_dag->getInputs()[pos], name_with_alias.second);
rename_dag->getIndex()[pos] = &alias;
}
@ -1139,6 +1144,52 @@ static std::unique_ptr<QueryPlan> buildJoinedPlan(
return joined_plan;
}
std::shared_ptr<DirectKeyValueJoin> tryKeyValueJoin(std::shared_ptr<TableJoin> analyzed_join, const Block & right_sample_block)
{
auto error_or_null = [&](const String & msg)
{
if (analyzed_join->isForcedAlgorithm(JoinAlgorithm::DIRECT))
throw DB::Exception(ErrorCodes::UNSUPPORTED_METHOD, "Can't use '{}' join algorithm: {}", JoinAlgorithm::DIRECT, msg);
return nullptr;
};
if (!analyzed_join->isAllowedAlgorithm(JoinAlgorithm::DIRECT))
return nullptr;
auto storage = analyzed_join->getStorageKeyValue();
if (!storage)
return error_or_null("unsupported storage");
if (!isInnerOrLeft(analyzed_join->kind()))
return error_or_null("illegal kind");
if (analyzed_join->strictness() != ASTTableJoin::Strictness::All &&
analyzed_join->strictness() != ASTTableJoin::Strictness::Any &&
analyzed_join->strictness() != ASTTableJoin::Strictness::RightAny)
return error_or_null("illegal strictness");
const auto & clauses = analyzed_join->getClauses();
bool only_one_key = clauses.size() == 1 &&
clauses[0].key_names_left.size() == 1 &&
clauses[0].key_names_right.size() == 1 &&
!clauses[0].on_filter_condition_left &&
!clauses[0].on_filter_condition_right;
if (!only_one_key)
return error_or_null("multiple keys is not allowed");
String key_name = clauses[0].key_names_right[0];
String original_key_name = analyzed_join->getOriginalName(key_name);
const auto & storage_primary_key = storage->getPrimaryKey();
if (storage_primary_key.size() != 1 || storage_primary_key[0] != original_key_name)
{
return error_or_null(fmt::format("key '{}'{} doesn't match storage '{}'",
key_name, (key_name != original_key_name ? " (aka '" + original_key_name + "')" : ""), fmt::join(storage_primary_key, ",")));
}
return std::make_shared<DirectKeyValueJoin>(analyzed_join, right_sample_block, storage);
}
JoinPtr SelectQueryExpressionAnalyzer::makeTableJoin(
const ASTTablesInSelectQueryElement & join_element,
const ColumnsWithTypeAndName & left_columns,
@ -1171,7 +1222,14 @@ JoinPtr SelectQueryExpressionAnalyzer::makeTableJoin(
joined_plan->addStep(std::move(converting_step));
}
JoinPtr join = chooseJoinAlgorithm(analyzed_join, joined_plan->getCurrentDataStream().header, getContext());
const Block & right_sample_block = joined_plan->getCurrentDataStream().header;
if (JoinPtr kvjoin = tryKeyValueJoin(analyzed_join, right_sample_block))
{
joined_plan.reset();
return kvjoin;
}
JoinPtr join = chooseJoinAlgorithm(analyzed_join, right_sample_block, getContext());
/// Do not make subquery for join over dictionary.
if (analyzed_join->getDictionaryReader())
@ -1539,7 +1597,7 @@ ActionsDAGPtr SelectQueryExpressionAnalyzer::appendOrderBy(ExpressionActionsChai
if (optimize_read_in_order)
{
for (auto & child : select_query->orderBy()->children)
for (const auto & child : select_query->orderBy()->children)
{
auto actions_dag = std::make_shared<ActionsDAG>(columns_after_join);
getRootActions(child, only_types, actions_dag);

View File

@ -22,9 +22,10 @@
#include <Interpreters/DictionaryReader.h>
#include <Storages/StorageDictionary.h>
#include <Storages/IStorage.h>
#include <Core/ColumnNumbers.h>
#include <Common/Exception.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>

View File

@ -17,6 +17,7 @@
#include <Common/HashTable/HashMap.h>
#include <Common/HashTable/FixedHashMap.h>
#include <Storages/TableLockHolder.h>
#include <Common/logger_useful.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h>
@ -25,6 +26,9 @@
#include <Core/Block.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/IKVStorage.h>
namespace DB
{
@ -167,11 +171,6 @@ public:
/// Used by joinGet function that turns StorageJoin into a dictionary.
ColumnWithTypeAndName joinGet(const Block & block, const Block & block_with_columns_to_add) const;
/** Keep "totals" (separate part of dataset, see WITH TOTALS) to use later.
*/
void setTotals(const Block & block) override { totals = block; }
const Block & getTotals() const override { return totals; }
bool isFilled() const override { return from_storage_join || data->type == Type::DICT; }
/** For RIGHT and FULL JOINs.
@ -390,8 +389,6 @@ private:
Poco::Logger * log;
Block totals;
/// Should be set via setLock to protect hash table from modification from StorageJoin
/// If set HashJoin instance is not available for modification (addJoinedBlock)
TableLockHolder storage_join_lock = nullptr;

View File

@ -4,12 +4,12 @@
#include <vector>
#include <Core/Names.h>
#include <Core/Block.h>
#include <Columns/IColumn.h>
namespace DB
{
class Block;
struct ExtraBlock;
using ExtraBlockPtr = std::shared_ptr<ExtraBlock>;
@ -33,12 +33,17 @@ public:
/// Could be called from different threads in parallel.
virtual void joinBlock(Block & block, std::shared_ptr<ExtraBlock> & not_processed) = 0;
/// Set/Get totals for right table
virtual void setTotals(const Block & block) = 0;
virtual const Block & getTotals() const = 0;
/** Set/Get totals for right table
* Keep "totals" (separate part of dataset, see WITH TOTALS) to use later.
*/
virtual void setTotals(const Block & block) { totals = block; }
virtual const Block & getTotals() const { return totals; }
/// Number of rows/bytes stored in memory
virtual size_t getTotalRowCount() const = 0;
virtual size_t getTotalByteCount() const = 0;
/// Returns true if no data to join with.
virtual bool alwaysReturnsEmptySet() const = 0;
/// StorageJoin/Dictionary is already filled. No need to call addJoinedBlock.
@ -50,6 +55,9 @@ public:
virtual std::shared_ptr<NotJoinedBlocks>
getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const = 0;
private:
Block totals;
};
using JoinPtr = std::shared_ptr<IJoin>;

View File

@ -92,12 +92,12 @@ public:
BlockIO execute() override;
/// Builds QueryPlan for current query.
virtual void buildQueryPlan(QueryPlan & query_plan) override;
void buildQueryPlan(QueryPlan & query_plan) override;
bool ignoreLimits() const override { return options.ignore_limits; }
bool ignoreQuota() const override { return options.ignore_quota; }
virtual void ignoreWithTotals() override;
void ignoreWithTotals() override;
ASTPtr getQuery() const { return query_ptr; }

View File

@ -31,7 +31,7 @@ public:
~InterpreterSelectWithUnionQuery() override;
/// Builds QueryPlan for current query.
virtual void buildQueryPlan(QueryPlan & query_plan) override;
void buildQueryPlan(QueryPlan & query_plan) override;
BlockIO execute() override;
@ -43,7 +43,7 @@ public:
ContextPtr context_,
bool is_subquery = false);
virtual void ignoreWithTotals() override;
void ignoreWithTotals() override;
bool supportsTransactions() const override { return true; }

View File

@ -1,5 +1,7 @@
#include <Interpreters/JoinedTables.h>
#include <Core/SettingsEnums.h>
#include <Interpreters/IdentifierSemantic.h>
#include <Interpreters/InDepthNodeVisitor.h>
#include <Interpreters/InJoinSubqueriesPreprocessor.h>
@ -292,6 +294,7 @@ std::shared_ptr<TableJoin> JoinedTables::makeTableJoin(const ASTSelectQuery & se
return {};
auto settings = context->getSettingsRef();
MultiEnum<JoinAlgorithm> join_algorithm = settings.join_algorithm;
auto table_join = std::make_shared<TableJoin>(settings, context->getTemporaryVolume());
const ASTTablesInSelectQueryElement * ast_join = select_query.join();
@ -305,9 +308,18 @@ std::shared_ptr<TableJoin> JoinedTables::makeTableJoin(const ASTSelectQuery & se
if (storage)
{
if (auto storage_join = std::dynamic_pointer_cast<StorageJoin>(storage); storage_join)
{
table_join->setStorageJoin(storage_join);
}
else if (auto storage_dict = std::dynamic_pointer_cast<StorageDictionary>(storage); storage_dict)
{
table_join->setStorageJoin(storage_dict);
}
else if (auto storage_kv = std::dynamic_pointer_cast<IKeyValueStorage>(storage);
storage_kv && join_algorithm.isSet(JoinAlgorithm::DIRECT))
{
table_join->setStorageJoin(storage_kv);
}
}
}

View File

@ -563,7 +563,7 @@ MergeJoin::MergeJoin(std::shared_ptr<TableJoin> table_join_, const Block & right
/// Has to be called even if totals are empty
void MergeJoin::setTotals(const Block & totals_block)
{
totals = totals_block;
IJoin::setTotals(totals_block);
mergeRightBlocks();
if (is_right || is_full)

View File

@ -29,7 +29,6 @@ public:
void joinBlock(Block &, ExtraBlockPtr & not_processed) override;
void setTotals(const Block &) override;
const Block & getTotals() const override { return totals; }
size_t getTotalRowCount() const override { return right_blocks.row_count; }
size_t getTotalByteCount() const override { return right_blocks.bytes; }
@ -100,7 +99,6 @@ private:
std::unique_ptr<SortedBlocksWriter> disk_writer;
/// Set of files with sorted blocks
SortedBlocksWriter::SortedFiles flushed_right_blocks;
Block totals;
std::atomic<bool> is_in_memory{true};
const bool is_any_join;
const bool is_all_join;

View File

@ -12,6 +12,7 @@
#include <Common/hex.h>
#include <Common/CurrentThread.h>
#include <Core/Field.h>
namespace DB
@ -64,13 +65,7 @@ void OpenTelemetrySpanLogElement::appendToBlock(MutableColumns & columns) const
// The user might add some ints values, and we will have Int Field, and the
// insert will fail because the column requires Strings. Convert the fields
// here, because it's hard to remember to convert them in all other places.
Map map(attribute_names.size());
for (size_t attr_idx = 0; attr_idx < map.size(); ++attr_idx)
{
map[attr_idx] = Tuple{attribute_names[attr_idx], toString(attribute_values[attr_idx])};
}
columns[i++]->insert(map);
columns[i++]->insert(attributes);
}
@ -158,8 +153,7 @@ void OpenTelemetrySpanHolder::addAttribute(const std::string& name, UInt64 value
if (trace_id == UUID())
return;
this->attribute_names.push_back(name);
this->attribute_values.push_back(std::to_string(value));
this->attributes.push_back(Tuple{name, toString(value)});
}
void OpenTelemetrySpanHolder::addAttribute(const std::string& name, const std::string& value)
@ -167,8 +161,7 @@ void OpenTelemetrySpanHolder::addAttribute(const std::string& name, const std::s
if (trace_id == UUID())
return;
this->attribute_names.push_back(name);
this->attribute_values.push_back(value);
this->attributes.push_back(Tuple{name, value});
}
void OpenTelemetrySpanHolder::addAttribute(const Exception & e)
@ -176,8 +169,7 @@ void OpenTelemetrySpanHolder::addAttribute(const Exception & e)
if (trace_id == UUID())
return;
this->attribute_names.push_back("clickhouse.exception");
this->attribute_values.push_back(getExceptionMessage(e, false));
this->attributes.push_back(Tuple{"clickhouse.exception", getExceptionMessage(e, false)});
}
void OpenTelemetrySpanHolder::addAttribute(std::exception_ptr e)
@ -185,8 +177,7 @@ void OpenTelemetrySpanHolder::addAttribute(std::exception_ptr e)
if (trace_id == UUID() || e == nullptr)
return;
this->attribute_names.push_back("clickhouse.exception");
this->attribute_values.push_back(getExceptionMessage(e, false));
this->attributes.push_back(Tuple{"clickhouse.exception", getExceptionMessage(e, false)});
}
bool OpenTelemetryTraceContext::parseTraceparentHeader(const std::string & traceparent,

View File

@ -15,8 +15,7 @@ struct OpenTelemetrySpan
std::string operation_name;
UInt64 start_time_us;
UInt64 finish_time_us;
Array attribute_names;
Array attribute_values;
Map attributes;
// I don't understand how Links work, namely, which direction should they
// point to, and how they are related with parent_span_id, so no Links for now.
};

View File

@ -136,7 +136,8 @@ SortedBlocksWriter::TmpFilePtr SortedBlocksWriter::flush(const BlocksList & bloc
pipeline.getHeader(),
pipeline.getNumStreams(),
sort_description,
rows_in_block);
rows_in_block,
SortingQueueStrategy::Default);
pipeline.addTransform(std::move(transform));
}
@ -190,7 +191,8 @@ SortedBlocksWriter::PremergedFiles SortedBlocksWriter::premerge()
pipeline.getHeader(),
pipeline.getNumStreams(),
sort_description,
rows_in_block);
rows_in_block,
SortingQueueStrategy::Default);
pipeline.addTransform(std::move(transform));
}
@ -222,7 +224,8 @@ SortedBlocksWriter::SortedFiles SortedBlocksWriter::finishMerge(std::function<vo
pipeline.getHeader(),
pipeline.getNumStreams(),
sort_description,
rows_in_block);
rows_in_block,
SortingQueueStrategy::Default);
pipeline.addTransform(std::move(transform));
}
@ -303,7 +306,8 @@ Block SortedBlocksBuffer::mergeBlocks(Blocks && blocks) const
builder.getHeader(),
builder.getNumStreams(),
sort_description,
num_rows);
num_rows,
SortingQueueStrategy::Default);
builder.addTransform(std::move(transform));
}

View File

@ -190,14 +190,22 @@ void TableJoin::deduplicateAndQualifyColumnNames(const NameSet & left_table_colu
columns_from_joined_table.swap(dedup_columns);
}
String TableJoin::getOriginalName(const String & column_name) const
{
auto it = original_names.find(column_name);
if (it != original_names.end())
return it->second;
return column_name;
}
NamesWithAliases TableJoin::getNamesWithAliases(const NameSet & required_columns) const
{
NamesWithAliases out;
for (const auto & column : required_columns)
out.reserve(required_columns.size());
for (const auto & name : required_columns)
{
auto it = original_names.find(column);
if (it != original_names.end())
out.emplace_back(it->second, it->first); /// {original_name, name}
auto original_name = getOriginalName(name);
out.emplace_back(original_name, name);
}
return out;
}
@ -513,6 +521,7 @@ TableJoin::createConvertingActions(const ColumnsWithTypeAndName & left_sample_co
{
if (dag)
{
/// Just debug message
std::vector<std::string> input_cols;
for (const auto & col : dag->getRequiredColumns())
input_cols.push_back(col.name + ": " + col.type->getName());
@ -591,15 +600,16 @@ void TableJoin::inferJoinKeyCommonType(const LeftNamesAndTypes & left, const Rig
catch (DB::Exception & ex)
{
throw DB::Exception(ErrorCodes::TYPE_MISMATCH,
"Can't infer common type for joined columns: {}: {} at left, {}: {} at right. {}",
"Can't infer common type for joined columns: {}: {} at left, {}: {} at right ({})",
left_key_name, ltype->second->getName(),
right_key_name, rtype->second->getName(),
ex.message());
}
if (!allow_right && !common_type->equals(*rtype->second))
bool right_side_changed = !common_type->equals(*rtype->second);
if (right_side_changed && !allow_right)
{
throw DB::Exception(ErrorCodes::TYPE_MISMATCH,
"Can't change type for right table: {}: {} -> {}.",
"Can't change type for right table: {}: {} -> {}",
right_key_name, rtype->second->getName(), common_type->getName());
}
left_type_map[left_key_name] = right_type_map[right_key_name] = common_type;
@ -626,7 +636,7 @@ static ActionsDAGPtr changeKeyTypes(const ColumnsWithTypeAndName & cols_src,
bool has_some_to_do = false;
for (auto & col : cols_dst)
{
if (auto it = type_mapping.find(col.name); it != type_mapping.end())
if (auto it = type_mapping.find(col.name); it != type_mapping.end() && col.type != it->second)
{
col.type = it->second;
col.column = nullptr;
@ -635,6 +645,7 @@ static ActionsDAGPtr changeKeyTypes(const ColumnsWithTypeAndName & cols_src,
}
if (!has_some_to_do)
return nullptr;
return ActionsDAG::makeConvertingActions(cols_src, cols_dst, ActionsDAG::MatchColumnsMode::Name, true, add_new_cols, &key_column_rename);
}
@ -685,6 +696,11 @@ ActionsDAGPtr TableJoin::applyKeyConvertToTable(
return dag_stage1;
}
void TableJoin::setStorageJoin(std::shared_ptr<IKeyValueStorage> storage)
{
right_kv_storage = storage;
}
void TableJoin::setStorageJoin(std::shared_ptr<StorageJoin> storage)
{
if (right_storage_dictionary)
@ -784,7 +800,7 @@ void TableJoin::resetToCross()
bool TableJoin::allowParallelHashJoin() const
{
if (dictionary_reader || join_algorithm != JoinAlgorithm::PARALLEL_HASH)
if (dictionary_reader || !join_algorithm.isSet(JoinAlgorithm::PARALLEL_HASH))
return false;
if (table_join.kind != ASTTableJoin::Kind::Left && table_join.kind != ASTTableJoin::Kind::Inner)
return false;

View File

@ -9,7 +9,8 @@
#include <Interpreters/asof.h>
#include <QueryPipeline/SizeLimits.h>
#include <DataTypes/getLeastSupertype.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/IKVStorage.h>
#include <Common/Exception.h>
#include <Parsers/IAST_fwd.h>
@ -31,6 +32,7 @@ class Block;
class DictionaryReader;
class StorageJoin;
class StorageDictionary;
class IKeyValueStorage;
struct ColumnWithTypeAndName;
using ColumnsWithTypeAndName = std::vector<ColumnWithTypeAndName>;
@ -55,7 +57,7 @@ public:
struct JoinOnClause
{
Names key_names_left;
Names key_names_right; /// Duplicating right key names are qualified.
Names key_names_right; /// Duplicating right key names are qualified
ASTPtr on_filter_condition_left;
ASTPtr on_filter_condition_right;
@ -107,7 +109,7 @@ private:
const size_t default_max_bytes = 0;
const bool join_use_nulls = false;
const size_t max_joined_block_rows = 0;
JoinAlgorithm join_algorithm = JoinAlgorithm::AUTO;
MultiEnum<JoinAlgorithm> join_algorithm = MultiEnum<JoinAlgorithm>(JoinAlgorithm::AUTO);
const size_t partial_merge_join_rows_in_right_blocks = 0;
const size_t partial_merge_join_left_table_buffer_bytes = 0;
const size_t max_files_to_merge = 0;
@ -148,6 +150,8 @@ private:
std::shared_ptr<StorageDictionary> right_storage_dictionary;
std::shared_ptr<DictionaryReader> dictionary_reader;
std::shared_ptr<IKeyValueStorage> right_kv_storage;
Names requiredJoinedNames() const;
/// Create converting actions and change key column names if required
@ -189,14 +193,20 @@ public:
const SizeLimits & sizeLimits() const { return size_limits; }
VolumePtr getTemporaryVolume() { return tmp_volume; }
bool allowMergeJoin() const;
bool preferMergeJoin() const { return join_algorithm == JoinAlgorithm::PREFER_PARTIAL_MERGE; }
bool forceMergeJoin() const { return join_algorithm == JoinAlgorithm::PARTIAL_MERGE; }
bool allowParallelHashJoin() const;
bool isAllowedAlgorithm(JoinAlgorithm val) const { return join_algorithm.isSet(val) || join_algorithm.isSet(JoinAlgorithm::AUTO); }
bool isForcedAlgorithm(JoinAlgorithm val) const { return join_algorithm == MultiEnum<JoinAlgorithm>(val); }
bool preferMergeJoin() const { return join_algorithm == MultiEnum<JoinAlgorithm>(JoinAlgorithm::PREFER_PARTIAL_MERGE); }
bool forceMergeJoin() const { return join_algorithm == MultiEnum<JoinAlgorithm>(JoinAlgorithm::PARTIAL_MERGE); }
bool forceHashJoin() const
{
/// HashJoin always used for DictJoin
return dictionary_reader || join_algorithm == JoinAlgorithm::HASH || join_algorithm == JoinAlgorithm::PARALLEL_HASH;
return dictionary_reader
|| join_algorithm == MultiEnum<JoinAlgorithm>(JoinAlgorithm::HASH)
|| join_algorithm == MultiEnum<JoinAlgorithm>(JoinAlgorithm::PARALLEL_HASH);
}
bool allowParallelHashJoin() const;
bool forceNullableRight() const { return join_use_nulls && isLeftOrFull(table_join.kind); }
bool forceNullableLeft() const { return join_use_nulls && isRightOrFull(table_join.kind); }
@ -243,6 +253,7 @@ public:
bool hasUsing() const { return table_join.using_expression_list != nullptr; }
bool hasOn() const { return table_join.on_expression != nullptr; }
String getOriginalName(const String & column_name) const;
NamesWithAliases getNamesWithAliases(const NameSet & required_columns) const;
NamesWithAliases getRequiredColumns(const Block & sample, const Names & action_required_columns) const;
@ -294,6 +305,7 @@ public:
std::unordered_map<String, String> leftToRightKeyRemap() const;
void setStorageJoin(std::shared_ptr<IKeyValueStorage> storage);
void setStorageJoin(std::shared_ptr<StorageJoin> storage);
void setStorageJoin(std::shared_ptr<StorageDictionary> storage);
@ -301,8 +313,10 @@ public:
bool tryInitDictJoin(const Block & sample_block, ContextPtr context);
bool isSpecialStorage() const { return right_storage_dictionary || right_storage_join; }
bool isSpecialStorage() const { return right_storage_dictionary || right_storage_join || right_kv_storage; }
const DictionaryReader * getDictionaryReader() const { return dictionary_reader.get(); }
std::shared_ptr<IKeyValueStorage> getStorageKeyValue() { return right_kv_storage; }
};
}

View File

@ -384,8 +384,7 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
span.finish_time_us =
std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
span.attribute_names.push_back("clickhouse.thread_id");
span.attribute_values.push_back(thread_id);
span.attributes.push_back(Tuple{"clickhouse.thread_id", toString(thread_id)});
opentelemetry_span_log->add(span);
}

View File

@ -301,28 +301,15 @@ static void onExceptionBeforeStart(const String & query_for_logging, ContextPtr
span.operation_name = "query";
span.start_time_us = current_time_us;
span.finish_time_us = time_in_microseconds(std::chrono::system_clock::now());
/// Keep values synchronized to type enum in QueryLogElement::createBlock.
span.attribute_names.push_back("clickhouse.query_status");
span.attribute_values.push_back("ExceptionBeforeStart");
span.attribute_names.push_back("db.statement");
span.attribute_values.push_back(elem.query);
span.attribute_names.push_back("clickhouse.query_id");
span.attribute_values.push_back(elem.client_info.current_query_id);
span.attribute_names.push_back("clickhouse.exception");
span.attribute_values.push_back(elem.exception);
span.attribute_names.push_back("clickhouse.exception_code");
span.attribute_values.push_back(elem.exception_code);
span.attributes.reserve(6);
span.attributes.push_back(Tuple{"clickhouse.query_status", "ExceptionBeforeStart"});
span.attributes.push_back(Tuple{"db.statement", elem.query});
span.attributes.push_back(Tuple{"clickhouse.query_id", elem.client_info.current_query_id});
span.attributes.push_back(Tuple{"clickhouse.exception", elem.exception});
span.attributes.push_back(Tuple{"clickhouse.exception_code", toString(elem.exception_code)});
if (!context->query_trace_context.tracestate.empty())
{
span.attribute_names.push_back("clickhouse.tracestate");
span.attribute_values.push_back(
context->query_trace_context.tracestate);
span.attributes.push_back(Tuple{"clickhouse.tracestate", context->query_trace_context.tracestate});
}
opentelemetry_span_log->add(span);
@ -956,20 +943,13 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
span.start_time_us = elem.query_start_time_microseconds;
span.finish_time_us = time_in_microseconds(finish_time);
/// Keep values synchronized to type enum in QueryLogElement::createBlock.
span.attribute_names.push_back("clickhouse.query_status");
span.attribute_values.push_back("QueryFinish");
span.attribute_names.push_back("db.statement");
span.attribute_values.push_back(elem.query);
span.attribute_names.push_back("clickhouse.query_id");
span.attribute_values.push_back(elem.client_info.current_query_id);
span.attributes.reserve(4);
span.attributes.push_back(Tuple{"clickhouse.query_status", "QueryFinish"});
span.attributes.push_back(Tuple{"db.statement", elem.query});
span.attributes.push_back(Tuple{"clickhouse.query_id", elem.client_info.current_query_id});
if (!context->query_trace_context.tracestate.empty())
{
span.attribute_names.push_back("clickhouse.tracestate");
span.attribute_values.push_back(
context->query_trace_context.tracestate);
span.attributes.push_back(Tuple{"clickhouse.tracestate", context->query_trace_context.tracestate});
}
opentelemetry_span_log->add(span);

View File

@ -2,6 +2,7 @@
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Common/typeid_cast.h>
#include <Interpreters/getClusterName.h>
@ -22,7 +23,7 @@ std::string getClusterName(const IAST & node)
return ast_id->name();
if (const auto * ast_lit = node.as<ASTLiteral>())
return ast_lit->value.safeGet<String>();
return checkAndGetLiteralArgument<String>(*ast_lit, "cluster_name");
/// A hack to support hyphens in cluster names.
if (const auto * ast_func = node.as<ASTFunction>())

View File

@ -248,7 +248,7 @@ public:
return removeOnCluster<ASTAlterQuery>(clone(), params.default_database);
}
virtual QueryKind getQueryKind() const override { return QueryKind::Alter; }
QueryKind getQueryKind() const override { return QueryKind::Alter; }
protected:
void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;

View File

@ -120,7 +120,7 @@ public:
bool isView() const { return is_ordinary_view || is_materialized_view || is_live_view || is_window_view; }
virtual QueryKind getQueryKind() const override { return QueryKind::Create; }
QueryKind getQueryKind() const override { return QueryKind::Create; }
protected:
void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;

View File

@ -45,7 +45,7 @@ public:
return removeOnCluster<ASTDropQuery>(clone(), params.default_database);
}
virtual QueryKind getQueryKind() const override { return QueryKind::Drop; }
QueryKind getQueryKind() const override { return QueryKind::Drop; }
protected:
void formatQueryImpl(const FormatSettings & settings, FormatState &, FormatStateStacked) const override;

View File

@ -66,7 +66,7 @@ public:
return res;
}
virtual QueryKind getQueryKind() const override { return QueryKind::Insert; }
QueryKind getQueryKind() const override { return QueryKind::Insert; }
protected:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;

Some files were not shown because too many files have changed in this diff Show More