Merge branch 'master' into implicit_select

This commit is contained in:
Alexey Milovidov 2024-11-09 18:08:53 +01:00
commit 6e1b2709d2
43 changed files with 793 additions and 132 deletions

View File

@ -12,7 +12,7 @@ tests/ci/cancel_and_rerun_workflow_lambda/app.py
- Backward Incompatible Change
- Build/Testing/Packaging Improvement
- Documentation (changelog entry is not required)
- Critical Bug Fix (crash, LOGICAL_ERROR, data loss, RBAC)
- Critical Bug Fix (crash, data loss, RBAC)
- Bug Fix (user-visible misbehavior in an official stable release)
- CI Fix or Improvement (changelog entry is not required)
- Not for changelog (changelog entry is not required)

View File

@ -1,21 +1,31 @@
#!/bin/bash
set +x
set -eo pipefail
shopt -s nullglob
DO_CHOWN=1
if [ "${CLICKHOUSE_DO_NOT_CHOWN:-0}" = "1" ]; then
if [[ "${CLICKHOUSE_RUN_AS_ROOT:=0}" = "1" || "${CLICKHOUSE_DO_NOT_CHOWN:-0}" = "1" ]]; then
DO_CHOWN=0
fi
CLICKHOUSE_UID="${CLICKHOUSE_UID:-"$(id -u clickhouse)"}"
CLICKHOUSE_GID="${CLICKHOUSE_GID:-"$(id -g clickhouse)"}"
# CLICKHOUSE_UID and CLICKHOUSE_GID are kept for backward compatibility, but deprecated
# One must use either "docker run --user" or CLICKHOUSE_RUN_AS_ROOT=1 to run the process as
# FIXME: Remove ALL CLICKHOUSE_UID CLICKHOUSE_GID before 25.3
if [[ "${CLICKHOUSE_UID:-}" || "${CLICKHOUSE_GID:-}" ]]; then
echo 'WARNING: Support for CLICKHOUSE_UID/CLICKHOUSE_GID will be removed in a couple of releases.' >&2
echo 'WARNING: Either use a proper "docker run --user=xxx:xxxx" argument instead of CLICKHOUSE_UID/CLICKHOUSE_GID' >&2
echo 'WARNING: or set "CLICKHOUSE_RUN_AS_ROOT=1" ENV to run the clickhouse-server as root:root' >&2
fi
# support --user
if [ "$(id -u)" = "0" ]; then
USER=$CLICKHOUSE_UID
GROUP=$CLICKHOUSE_GID
# support `docker run --user=xxx:xxxx`
if [[ "$(id -u)" = "0" ]]; then
if [[ "$CLICKHOUSE_RUN_AS_ROOT" = 1 ]]; then
USER=0
GROUP=0
else
USER="${CLICKHOUSE_UID:-"$(id -u clickhouse)"}"
GROUP="${CLICKHOUSE_GID:-"$(id -g clickhouse)"}"
fi
if command -v gosu &> /dev/null; then
gosu="gosu $USER:$GROUP"
elif command -v su-exec &> /dev/null; then
@ -82,11 +92,11 @@ if [[ $# -lt 1 ]] || [[ "$1" == "--"* ]]; then
# There is a config file. It is already tested with gosu (if it is readably by keeper user)
if [ -f "$KEEPER_CONFIG" ]; then
exec $gosu /usr/bin/clickhouse-keeper --config-file="$KEEPER_CONFIG" "$@"
exec $gosu clickhouse-keeper --config-file="$KEEPER_CONFIG" "$@"
fi
# There is no config file. Will use embedded one
exec $gosu /usr/bin/clickhouse-keeper --log-file="$LOG_PATH" --errorlog-file="$ERROR_LOG_PATH" "$@"
exec $gosu clickhouse-keeper --log-file="$LOG_PATH" --errorlog-file="$ERROR_LOG_PATH" "$@"
fi
# Otherwise, we assume the user want to run his own process, for example a `bash` shell to explore this image

View File

@ -88,34 +88,32 @@ RUN if [ -n "${single_binary_location_url}" ]; then \
#docker-official-library:on
# A fallback to installation from ClickHouse repository
RUN if ! clickhouse local -q "SELECT ''" > /dev/null 2>&1; then \
apt-get update \
&& apt-get install --yes --no-install-recommends \
apt-transport-https \
dirmngr \
gnupg2 \
&& mkdir -p /etc/apt/sources.list.d \
&& GNUPGHOME=$(mktemp -d) \
&& GNUPGHOME="$GNUPGHOME" gpg --batch --no-default-keyring \
--keyring /usr/share/keyrings/clickhouse-keyring.gpg \
--keyserver hkp://keyserver.ubuntu.com:80 \
--recv-keys 3a9ea1193a97b548be1457d48919f6bd2b48d754 \
&& rm -rf "$GNUPGHOME" \
&& chmod +r /usr/share/keyrings/clickhouse-keyring.gpg \
&& echo "${REPOSITORY}" > /etc/apt/sources.list.d/clickhouse.list \
&& echo "installing from repository: ${REPOSITORY}" \
&& apt-get update \
&& for package in ${PACKAGES}; do \
packages="${packages} ${package}=${VERSION}" \
; done \
&& apt-get install --allow-unauthenticated --yes --no-install-recommends ${packages} || exit 1 \
&& rm -rf \
/var/lib/apt/lists/* \
/var/cache/debconf \
/tmp/* \
&& apt-get autoremove --purge -yq libksba8 \
&& apt-get autoremove -yq \
; fi
# It works unless the clickhouse binary already exists
RUN clickhouse local -q 'SELECT 1' >/dev/null 2>&1 && exit 0 || : \
; apt-get update \
&& apt-get install --yes --no-install-recommends \
dirmngr \
gnupg2 \
&& mkdir -p /etc/apt/sources.list.d \
&& GNUPGHOME=$(mktemp -d) \
&& GNUPGHOME="$GNUPGHOME" gpg --batch --no-default-keyring \
--keyring /usr/share/keyrings/clickhouse-keyring.gpg \
--keyserver hkp://keyserver.ubuntu.com:80 \
--recv-keys 3a9ea1193a97b548be1457d48919f6bd2b48d754 \
&& rm -rf "$GNUPGHOME" \
&& chmod +r /usr/share/keyrings/clickhouse-keyring.gpg \
&& echo "${REPOSITORY}" > /etc/apt/sources.list.d/clickhouse.list \
&& echo "installing from repository: ${REPOSITORY}" \
&& apt-get update \
&& for package in ${PACKAGES}; do \
packages="${packages} ${package}=${VERSION}" \
; done \
&& apt-get install --yes --no-install-recommends ${packages} || exit 1 \
&& rm -rf \
/var/lib/apt/lists/* \
/var/cache/debconf \
/tmp/* \
&& apt-get autoremove --purge -yq dirmngr gnupg2
# post install
# we need to allow "others" access to clickhouse folder, because docker container
@ -126,8 +124,6 @@ RUN clickhouse-local -q 'SELECT * FROM system.build_options' \
RUN locale-gen en_US.UTF-8
ENV LANG en_US.UTF-8
ENV LANGUAGE en_US:en
ENV LC_ALL en_US.UTF-8
ENV TZ UTC
RUN mkdir /docker-entrypoint-initdb.d

View File

@ -1,3 +1,11 @@
<!---
The README.md is generated by README.sh from the following sources:
- README.src/content.md
- README.src/license.md
If you want to change it, edit these files
-->
# ClickHouse Server Docker Image
## What is ClickHouse?
@ -8,6 +16,7 @@ ClickHouse works 100-1000x faster than traditional database management systems,
For more information and documentation see https://clickhouse.com/.
<!-- This is not related to the docker official library, remove it before commit to https://github.com/docker-library/docs -->
## Versions
- The `latest` tag points to the latest release of the latest stable branch.
@ -16,11 +25,12 @@ For more information and documentation see https://clickhouse.com/.
- The tag `head` is built from the latest commit to the default branch.
- Each tag has optional `-alpine` suffix to reflect that it's built on top of `alpine`.
<!-- REMOVE UNTIL HERE -->
### Compatibility
- The amd64 image requires support for [SSE3 instructions](https://en.wikipedia.org/wiki/SSE3). Virtually all x86 CPUs after 2005 support SSE3.
- The arm64 image requires support for the [ARMv8.2-A architecture](https://en.wikipedia.org/wiki/AArch64#ARMv8.2-A) and additionally the Load-Acquire RCpc register. The register is optional in version ARMv8.2-A and mandatory in [ARMv8.3-A](https://en.wikipedia.org/wiki/AArch64#ARMv8.3-A). Supported in Graviton >=2, Azure and GCP instances. Examples for unsupported devices are Raspberry Pi 4 (ARMv8.0-A) and Jetson AGX Xavier/Orin (ARMv8.2-A).
- Since the Clickhouse 24.11 Ubuntu images started using `ubuntu:22.04` as its base image. It requires docker version >= `20.10.10` containing [patch](https://github.com/moby/moby/commit/977283509f75303bc6612665a04abf76ff1d2468). As a workaround you could use `docker run [--privileged | --security-opt seccomp=unconfined]` instead, however that has security implications.
- Since the Clickhouse 24.11 Ubuntu images started using `ubuntu:22.04` as its base image. It requires docker version >= `20.10.10` containing [patch](https://github.com/moby/moby/commit/977283509f75303bc6612665a04abf76ff1d2468). As a workaround you could use `docker run --security-opt seccomp=unconfined` instead, however that has security implications.
## How to use this image
@ -30,7 +40,7 @@ For more information and documentation see https://clickhouse.com/.
docker run -d --name some-clickhouse-server --ulimit nofile=262144:262144 clickhouse/clickhouse-server
```
By default, ClickHouse will be accessible only via the Docker network. See the [networking section below](#networking).
By default, ClickHouse will be accessible only via the Docker network. See the **networking** section below.
By default, starting above server instance will be run as the `default` user without password.
@ -47,7 +57,7 @@ More information about the [ClickHouse client](https://clickhouse.com/docs/en/in
### connect to it using curl
```bash
echo "SELECT 'Hello, ClickHouse!'" | docker run -i --rm --link some-clickhouse-server:clickhouse-server curlimages/curl 'http://clickhouse-server:8123/?query=' -s --data-binary @-
echo "SELECT 'Hello, ClickHouse!'" | docker run -i --rm --link some-clickhouse-server:clickhouse-server buildpack-deps:curl curl 'http://clickhouse-server:8123/?query=' -s --data-binary @-
```
More information about the [ClickHouse HTTP Interface](https://clickhouse.com/docs/en/interfaces/http/).
@ -70,7 +80,7 @@ echo 'SELECT version()' | curl 'http://localhost:18123/' --data-binary @-
`22.6.3.35`
or by allowing the container to use [host ports directly](https://docs.docker.com/network/host/) using `--network=host` (also allows achieving better network performance):
Or by allowing the container to use [host ports directly](https://docs.docker.com/network/host/) using `--network=host` (also allows achieving better network performance):
```bash
docker run -d --network=host --name some-clickhouse-server --ulimit nofile=262144:262144 clickhouse/clickhouse-server
@ -88,8 +98,8 @@ Typically you may want to mount the following folders inside your container to a
```bash
docker run -d \
-v $(realpath ./ch_data):/var/lib/clickhouse/ \
-v $(realpath ./ch_logs):/var/log/clickhouse-server/ \
-v "$PWD/ch_data:/var/lib/clickhouse/" \
-v "$PWD/ch_logs:/var/log/clickhouse-server/" \
--name some-clickhouse-server --ulimit nofile=262144:262144 clickhouse/clickhouse-server
```
@ -111,6 +121,8 @@ docker run -d \
--name some-clickhouse-server --ulimit nofile=262144:262144 clickhouse/clickhouse-server
```
Read more in [knowledge base](https://clickhouse.com/docs/knowledgebase/configure_cap_ipc_lock_and_cap_sys_nice_in_docker).
## Configuration
The container exposes port 8123 for the [HTTP interface](https://clickhouse.com/docs/en/interfaces/http_interface/) and port 9000 for the [native client](https://clickhouse.com/docs/en/interfaces/tcp/).
@ -126,8 +138,8 @@ docker run -d --name some-clickhouse-server --ulimit nofile=262144:262144 -v /pa
### Start server as custom user
```bash
# $(pwd)/data/clickhouse should exist and be owned by current user
docker run --rm --user ${UID}:${GID} --name some-clickhouse-server --ulimit nofile=262144:262144 -v "$(pwd)/logs/clickhouse:/var/log/clickhouse-server" -v "$(pwd)/data/clickhouse:/var/lib/clickhouse" clickhouse/clickhouse-server
# $PWD/data/clickhouse should exist and be owned by current user
docker run --rm --user "${UID}:${GID}" --name some-clickhouse-server --ulimit nofile=262144:262144 -v "$PWD/logs/clickhouse:/var/log/clickhouse-server" -v "$PWD/data/clickhouse:/var/lib/clickhouse" clickhouse/clickhouse-server
```
When you use the image with local directories mounted, you probably want to specify the user to maintain the proper file ownership. Use the `--user` argument and mount `/var/lib/clickhouse` and `/var/log/clickhouse-server` inside the container. Otherwise, the image will complain and not start.
@ -135,7 +147,7 @@ When you use the image with local directories mounted, you probably want to spec
### Start server from root (useful in case of enabled user namespace)
```bash
docker run --rm -e CLICKHOUSE_UID=0 -e CLICKHOUSE_GID=0 --name clickhouse-server-userns -v "$(pwd)/logs/clickhouse:/var/log/clickhouse-server" -v "$(pwd)/data/clickhouse:/var/lib/clickhouse" clickhouse/clickhouse-server
docker run --rm -e CLICKHOUSE_RUN_AS_ROOT=1 --name clickhouse-server-userns -v "$PWD/logs/clickhouse:/var/log/clickhouse-server" -v "$PWD/data/clickhouse:/var/lib/clickhouse" clickhouse/clickhouse-server
```
### How to create default database and user on starting

38
docker/server/README.sh Executable file
View File

@ -0,0 +1,38 @@
#!/usr/bin/env bash
set -ueo pipefail
# A script to generate README.sh close to as it done in https://github.com/docker-library/docs
WORKDIR=$(dirname "$0")
SCRIPT_NAME=$(basename "$0")
CONTENT=README.src/content.md
LICENSE=README.src/license.md
cd "$WORKDIR"
R=README.md
cat > "$R" <<EOD
<!---
The $R is generated by $SCRIPT_NAME from the following sources:
- $CONTENT
- $LICENSE
If you want to change it, edit these files
-->
EOD
cat "$CONTENT" >> "$R"
cat >> "$R" <<EOD
## License
$(cat $LICENSE)
EOD
# Remove %%LOGO%% from the file with one line below
sed -i '/^%%LOGO%%/,+1d' "$R"
# Replace each %%IMAGE%% with our `clickhouse/clickhouse-server`
sed -i '/%%IMAGE%%/s:%%IMAGE%%:clickhouse/clickhouse-server:g' $R

View File

@ -0,0 +1 @@
ClickHouse is the fastest and most resource efficient OSS database for real-time apps and analytics.

View File

@ -0,0 +1,170 @@
# ClickHouse Server Docker Image
## What is ClickHouse?
%%LOGO%%
ClickHouse is an open-source column-oriented DBMS (columnar database management system) for online analytical processing (OLAP) that allows users to generate analytical reports using SQL queries in real-time.
ClickHouse works 100-1000x faster than traditional database management systems, and processes hundreds of millions to over a billion rows and tens of gigabytes of data per server per second. With a widespread user base around the globe, the technology has received praise for its reliability, ease of use, and fault tolerance.
For more information and documentation see https://clickhouse.com/.
<!-- This is not related to the docker official library, remove it before commit to https://github.com/docker-library/docs -->
## Versions
- The `latest` tag points to the latest release of the latest stable branch.
- Branch tags like `22.2` point to the latest release of the corresponding branch.
- Full version tags like `22.2.3.5` point to the corresponding release.
- The tag `head` is built from the latest commit to the default branch.
- Each tag has optional `-alpine` suffix to reflect that it's built on top of `alpine`.
<!-- REMOVE UNTIL HERE -->
### Compatibility
- The amd64 image requires support for [SSE3 instructions](https://en.wikipedia.org/wiki/SSE3). Virtually all x86 CPUs after 2005 support SSE3.
- The arm64 image requires support for the [ARMv8.2-A architecture](https://en.wikipedia.org/wiki/AArch64#ARMv8.2-A) and additionally the Load-Acquire RCpc register. The register is optional in version ARMv8.2-A and mandatory in [ARMv8.3-A](https://en.wikipedia.org/wiki/AArch64#ARMv8.3-A). Supported in Graviton >=2, Azure and GCP instances. Examples for unsupported devices are Raspberry Pi 4 (ARMv8.0-A) and Jetson AGX Xavier/Orin (ARMv8.2-A).
- Since the Clickhouse 24.11 Ubuntu images started using `ubuntu:22.04` as its base image. It requires docker version >= `20.10.10` containing [patch](https://github.com/moby/moby/commit/977283509f75303bc6612665a04abf76ff1d2468). As a workaround you could use `docker run --security-opt seccomp=unconfined` instead, however that has security implications.
## How to use this image
### start server instance
```bash
docker run -d --name some-clickhouse-server --ulimit nofile=262144:262144 %%IMAGE%%
```
By default, ClickHouse will be accessible only via the Docker network. See the **networking** section below.
By default, starting above server instance will be run as the `default` user without password.
### connect to it from a native client
```bash
docker run -it --rm --link some-clickhouse-server:clickhouse-server --entrypoint clickhouse-client %%IMAGE%% --host clickhouse-server
# OR
docker exec -it some-clickhouse-server clickhouse-client
```
More information about the [ClickHouse client](https://clickhouse.com/docs/en/interfaces/cli/).
### connect to it using curl
```bash
echo "SELECT 'Hello, ClickHouse!'" | docker run -i --rm --link some-clickhouse-server:clickhouse-server buildpack-deps:curl curl 'http://clickhouse-server:8123/?query=' -s --data-binary @-
```
More information about the [ClickHouse HTTP Interface](https://clickhouse.com/docs/en/interfaces/http/).
### stopping / removing the container
```bash
docker stop some-clickhouse-server
docker rm some-clickhouse-server
```
### networking
You can expose your ClickHouse running in docker by [mapping a particular port](https://docs.docker.com/config/containers/container-networking/) from inside the container using host ports:
```bash
docker run -d -p 18123:8123 -p19000:9000 --name some-clickhouse-server --ulimit nofile=262144:262144 %%IMAGE%%
echo 'SELECT version()' | curl 'http://localhost:18123/' --data-binary @-
```
`22.6.3.35`
Or by allowing the container to use [host ports directly](https://docs.docker.com/network/host/) using `--network=host` (also allows achieving better network performance):
```bash
docker run -d --network=host --name some-clickhouse-server --ulimit nofile=262144:262144 %%IMAGE%%
echo 'SELECT version()' | curl 'http://localhost:8123/' --data-binary @-
```
`22.6.3.35`
### Volumes
Typically you may want to mount the following folders inside your container to achieve persistency:
- `/var/lib/clickhouse/` - main folder where ClickHouse stores the data
- `/var/log/clickhouse-server/` - logs
```bash
docker run -d \
-v "$PWD/ch_data:/var/lib/clickhouse/" \
-v "$PWD/ch_logs:/var/log/clickhouse-server/" \
--name some-clickhouse-server --ulimit nofile=262144:262144 %%IMAGE%%
```
You may also want to mount:
- `/etc/clickhouse-server/config.d/*.xml` - files with server configuration adjustments
- `/etc/clickhouse-server/users.d/*.xml` - files with user settings adjustments
- `/docker-entrypoint-initdb.d/` - folder with database initialization scripts (see below).
### Linux capabilities
ClickHouse has some advanced functionality, which requires enabling several [Linux capabilities](https://man7.org/linux/man-pages/man7/capabilities.7.html).
They are optional and can be enabled using the following [docker command-line arguments](https://docs.docker.com/engine/reference/run/#runtime-privilege-and-linux-capabilities):
```bash
docker run -d \
--cap-add=SYS_NICE --cap-add=NET_ADMIN --cap-add=IPC_LOCK \
--name some-clickhouse-server --ulimit nofile=262144:262144 %%IMAGE%%
```
Read more in [knowledge base](https://clickhouse.com/docs/knowledgebase/configure_cap_ipc_lock_and_cap_sys_nice_in_docker).
## Configuration
The container exposes port 8123 for the [HTTP interface](https://clickhouse.com/docs/en/interfaces/http_interface/) and port 9000 for the [native client](https://clickhouse.com/docs/en/interfaces/tcp/).
ClickHouse configuration is represented with a file "config.xml" ([documentation](https://clickhouse.com/docs/en/operations/configuration_files/))
### Start server instance with custom configuration
```bash
docker run -d --name some-clickhouse-server --ulimit nofile=262144:262144 -v /path/to/your/config.xml:/etc/clickhouse-server/config.xml %%IMAGE%%
```
### Start server as custom user
```bash
# $PWD/data/clickhouse should exist and be owned by current user
docker run --rm --user "${UID}:${GID}" --name some-clickhouse-server --ulimit nofile=262144:262144 -v "$PWD/logs/clickhouse:/var/log/clickhouse-server" -v "$PWD/data/clickhouse:/var/lib/clickhouse" %%IMAGE%%
```
When you use the image with local directories mounted, you probably want to specify the user to maintain the proper file ownership. Use the `--user` argument and mount `/var/lib/clickhouse` and `/var/log/clickhouse-server` inside the container. Otherwise, the image will complain and not start.
### Start server from root (useful in case of enabled user namespace)
```bash
docker run --rm -e CLICKHOUSE_RUN_AS_ROOT=1 --name clickhouse-server-userns -v "$PWD/logs/clickhouse:/var/log/clickhouse-server" -v "$PWD/data/clickhouse:/var/lib/clickhouse" %%IMAGE%%
```
### How to create default database and user on starting
Sometimes you may want to create a user (user named `default` is used by default) and database on a container start. You can do it using environment variables `CLICKHOUSE_DB`, `CLICKHOUSE_USER`, `CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT` and `CLICKHOUSE_PASSWORD`:
```bash
docker run --rm -e CLICKHOUSE_DB=my_database -e CLICKHOUSE_USER=username -e CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=1 -e CLICKHOUSE_PASSWORD=password -p 9000:9000/tcp %%IMAGE%%
```
## How to extend this image
To perform additional initialization in an image derived from this one, add one or more `*.sql`, `*.sql.gz`, or `*.sh` scripts under `/docker-entrypoint-initdb.d`. After the entrypoint calls `initdb`, it will run any `*.sql` files, run any executable `*.sh` scripts, and source any non-executable `*.sh` scripts found in that directory to do further initialization before starting the service.
Also, you can provide environment variables `CLICKHOUSE_USER` & `CLICKHOUSE_PASSWORD` that will be used for clickhouse-client during initialization.
For example, to add an additional user and database, add the following to `/docker-entrypoint-initdb.d/init-db.sh`:
```bash
#!/bin/bash
set -e
clickhouse client -n <<-EOSQL
CREATE DATABASE docker;
CREATE TABLE docker.docker (x Int32) ENGINE = Log;
EOSQL
```

View File

@ -0,0 +1 @@
https://github.com/ClickHouse/ClickHouse

View File

@ -0,0 +1 @@
View [license information](https://github.com/ClickHouse/ClickHouse/blob/master/LICENSE) for the software contained in this image.

View File

@ -0,0 +1,43 @@
<?xml version="1.0" encoding="UTF-8"?>
<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" viewBox="0 0 616 616">
<defs>
<style>
.cls-1 {
clip-path: url(#clippath);
}
.cls-2 {
fill: none;
}
.cls-2, .cls-3, .cls-4 {
stroke-width: 0px;
}
.cls-3 {
fill: #1e1e1e;
}
.cls-4 {
fill: #faff69;
}
</style>
<clipPath id="clippath">
<rect class="cls-2" x="83.23" y="71.73" width="472.55" height="472.55"/>
</clipPath>
</defs>
<g id="Layer_2" data-name="Layer 2">
<rect class="cls-4" width="616" height="616"/>
</g>
<g id="Layer_1" data-name="Layer 1">
<g class="cls-1">
<g>
<path class="cls-3" d="m120.14,113.3c0-2.57,2.09-4.66,4.66-4.66h34.98c2.57,0,4.66,2.09,4.66,4.66v389.38c0,2.57-2.09,4.66-4.66,4.66h-34.98c-2.57,0-4.66-2.09-4.66-4.66V113.3Z"/>
<path class="cls-3" d="m208.75,113.3c0-2.57,2.09-4.66,4.66-4.66h34.98c2.57,0,4.66,2.09,4.66,4.66v389.38c0,2.57-2.09,4.66-4.66,4.66h-34.98c-2.57,0-4.66-2.09-4.66-4.66V113.3Z"/>
<path class="cls-3" d="m297.35,113.3c0-2.57,2.09-4.66,4.66-4.66h34.98c2.57,0,4.66,2.09,4.66,4.66v389.38c0,2.57-2.09,4.66-4.66,4.66h-34.98c-2.57,0-4.66-2.09-4.66-4.66V113.3Z"/>
<path class="cls-3" d="m385.94,113.3c0-2.57,2.09-4.66,4.66-4.66h34.98c2.57,0,4.66,2.09,4.66,4.66v389.38c0,2.57-2.09,4.66-4.66,4.66h-34.98c-2.57,0-4.66-2.09-4.66-4.66V113.3Z"/>
<path class="cls-3" d="m474.56,268.36c0-2.57,2.09-4.66,4.66-4.66h34.98c2.57,0,4.65,2.09,4.65,4.66v79.28c0,2.57-2.09,4.66-4.65,4.66h-34.98c-2.57,0-4.66-2.09-4.66-4.66v-79.28Z"/>
</g>
</g>
</g>
</svg>

After

Width:  |  Height:  |  Size: 1.7 KiB

View File

@ -0,0 +1 @@
[ClickHouse Inc.](%%GITHUB-REPO%%)

View File

@ -0,0 +1,7 @@
{
"hub": {
"categories": [
"databases-and-storage"
]
}
}

View File

@ -4,17 +4,28 @@ set -eo pipefail
shopt -s nullglob
DO_CHOWN=1
if [ "${CLICKHOUSE_DO_NOT_CHOWN:-0}" = "1" ]; then
if [[ "${CLICKHOUSE_RUN_AS_ROOT:=0}" = "1" || "${CLICKHOUSE_DO_NOT_CHOWN:-0}" = "1" ]]; then
DO_CHOWN=0
fi
CLICKHOUSE_UID="${CLICKHOUSE_UID:-"$(id -u clickhouse)"}"
CLICKHOUSE_GID="${CLICKHOUSE_GID:-"$(id -g clickhouse)"}"
# CLICKHOUSE_UID and CLICKHOUSE_GID are kept for backward compatibility, but deprecated
# One must use either "docker run --user" or CLICKHOUSE_RUN_AS_ROOT=1 to run the process as
# FIXME: Remove ALL CLICKHOUSE_UID CLICKHOUSE_GID before 25.3
if [[ "${CLICKHOUSE_UID:-}" || "${CLICKHOUSE_GID:-}" ]]; then
echo 'WARNING: Support for CLICKHOUSE_UID/CLICKHOUSE_GID will be removed in a couple of releases.' >&2
echo 'WARNING: Either use a proper "docker run --user=xxx:xxxx" argument instead of CLICKHOUSE_UID/CLICKHOUSE_GID' >&2
echo 'WARNING: or set "CLICKHOUSE_RUN_AS_ROOT=1" ENV to run the clickhouse-server as root:root' >&2
fi
# support --user
if [ "$(id -u)" = "0" ]; then
USER=$CLICKHOUSE_UID
GROUP=$CLICKHOUSE_GID
# support `docker run --user=xxx:xxxx`
if [[ "$(id -u)" = "0" ]]; then
if [[ "$CLICKHOUSE_RUN_AS_ROOT" = 1 ]]; then
USER=0
GROUP=0
else
USER="${CLICKHOUSE_UID:-"$(id -u clickhouse)"}"
GROUP="${CLICKHOUSE_GID:-"$(id -g clickhouse)"}"
fi
else
USER="$(id -u)"
GROUP="$(id -g)"
@ -55,14 +66,14 @@ function create_directory_and_do_chown() {
[ -z "$dir" ] && return
# ensure directories exist
if [ "$DO_CHOWN" = "1" ]; then
mkdir="mkdir"
mkdir=( mkdir )
else
# if DO_CHOWN=0 it means that the system does not map root user to "admin" permissions
# it mainly happens on NFS mounts where root==nobody for security reasons
# thus mkdir MUST run with user id/gid and not from nobody that has zero permissions
mkdir="/usr/bin/clickhouse su "${USER}:${GROUP}" mkdir"
mkdir=( clickhouse su "${USER}:${GROUP}" mkdir )
fi
if ! $mkdir -p "$dir"; then
if ! "${mkdir[@]}" -p "$dir"; then
echo "Couldn't create necessary directory: $dir"
exit 1
fi
@ -143,7 +154,7 @@ if [ -n "${RUN_INITDB_SCRIPTS}" ]; then
fi
# Listen only on localhost until the initialization is done
/usr/bin/clickhouse su "${USER}:${GROUP}" /usr/bin/clickhouse-server --config-file="$CLICKHOUSE_CONFIG" -- --listen_host=127.0.0.1 &
clickhouse su "${USER}:${GROUP}" clickhouse-server --config-file="$CLICKHOUSE_CONFIG" -- --listen_host=127.0.0.1 &
pid="$!"
# check if clickhouse is ready to accept connections
@ -203,18 +214,8 @@ if [[ $# -lt 1 ]] || [[ "$1" == "--"* ]]; then
CLICKHOUSE_WATCHDOG_ENABLE=${CLICKHOUSE_WATCHDOG_ENABLE:-0}
export CLICKHOUSE_WATCHDOG_ENABLE
# An option for easy restarting and replacing clickhouse-server in a container, especially in Kubernetes.
# For example, you can replace the clickhouse-server binary to another and restart it while keeping the container running.
if [[ "${CLICKHOUSE_DOCKER_RESTART_ON_EXIT:-0}" -eq "1" ]]; then
while true; do
# This runs the server as a child process of the shell script:
/usr/bin/clickhouse su "${USER}:${GROUP}" /usr/bin/clickhouse-server --config-file="$CLICKHOUSE_CONFIG" "$@" ||:
echo >&2 'ClickHouse Server exited, and the environment variable CLICKHOUSE_DOCKER_RESTART_ON_EXIT is set to 1. Restarting the server.'
done
else
# This replaces the shell script with the server:
exec /usr/bin/clickhouse su "${USER}:${GROUP}" /usr/bin/clickhouse-server --config-file="$CLICKHOUSE_CONFIG" "$@"
fi
# This replaces the shell script with the server:
exec clickhouse su "${USER}:${GROUP}" clickhouse-server --config-file="$CLICKHOUSE_CONFIG" "$@"
fi
# Otherwise, we assume the user want to run his own process, for example a `bash` shell to explore this image

View File

@ -431,7 +431,7 @@ catch (const Exception & e)
bool need_print_stack_trace = config().getBool("stacktrace", false) && e.code() != ErrorCodes::NETWORK_ERROR;
std::cerr << getExceptionMessage(e, need_print_stack_trace, true) << std::endl << std::endl;
/// If exception code isn't zero, we should return non-zero return code anyway.
return e.code() ? e.code() : -1;
return static_cast<UInt8>(e.code()) ? e.code() : -1;
}
catch (...)
{
@ -1390,7 +1390,8 @@ int mainEntryClickHouseClient(int argc, char ** argv)
catch (const DB::Exception & e)
{
std::cerr << DB::getExceptionMessage(e, false) << std::endl;
return 1;
auto code = DB::getCurrentExceptionCode();
return static_cast<UInt8>(code) ? code : 1;
}
catch (const boost::program_options::error & e)
{
@ -1399,7 +1400,8 @@ int mainEntryClickHouseClient(int argc, char ** argv)
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(true) << std::endl;
return 1;
std::cerr << DB::getCurrentExceptionMessage(true) << '\n';
auto code = DB::getCurrentExceptionCode();
return static_cast<UInt8>(code) ? code : 1;
}
}

View File

@ -9,6 +9,7 @@
#include <IO/WriteBufferFromFile.h>
#include <IO/ReadBufferFromFile.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Compression/ParallelCompressedWriteBuffer.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedReadBufferFromFile.h>
#include <IO/WriteHelpers.h>
@ -17,6 +18,8 @@
#include <Parsers/ExpressionElementParsers.h>
#include <Compression/CompressionFactory.h>
#include <Common/TerminalSize.h>
#include <Common/ThreadPool.h>
#include <Common/CurrentMetrics.h>
#include <Core/Defines.h>
@ -29,6 +32,13 @@ namespace DB
}
}
namespace CurrentMetrics
{
extern const Metric LocalThread;
extern const Metric LocalThreadActive;
extern const Metric LocalThreadScheduled;
}
namespace
{
@ -77,11 +87,12 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
("decompress,d", "decompress")
("offset-in-compressed-file", po::value<size_t>()->default_value(0ULL), "offset to the compressed block (i.e. physical file offset)")
("offset-in-decompressed-block", po::value<size_t>()->default_value(0ULL), "offset to the decompressed block (i.e. virtual offset)")
("block-size,b", po::value<unsigned>()->default_value(DBMS_DEFAULT_BUFFER_SIZE), "compress in blocks of specified size")
("block-size,b", po::value<size_t>()->default_value(DBMS_DEFAULT_BUFFER_SIZE), "compress in blocks of specified size")
("hc", "use LZ4HC instead of LZ4")
("zstd", "use ZSTD instead of LZ4")
("codec", po::value<std::vector<std::string>>()->multitoken(), "use codecs combination instead of LZ4")
("level", po::value<int>(), "compression level for codecs specified via flags")
("threads", po::value<size_t>()->default_value(1), "number of threads for parallel compression")
("none", "use no compression instead of LZ4")
("stat", "print block statistics of compressed data")
("stacktrace", "print stacktrace of exception")
@ -109,7 +120,8 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
bool stat_mode = options.count("stat");
bool use_none = options.count("none");
print_stacktrace = options.count("stacktrace");
unsigned block_size = options["block-size"].as<unsigned>();
size_t block_size = options["block-size"].as<size_t>();
size_t num_threads = options["threads"].as<size_t>();
std::vector<std::string> codecs;
if (options.count("codec"))
codecs = options["codec"].as<std::vector<std::string>>();
@ -117,6 +129,12 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
if ((use_lz4hc || use_zstd || use_none) && !codecs.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Wrong options, codec flags like --zstd and --codec options are mutually exclusive");
if (num_threads < 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid value of `threads` parameter");
if (num_threads > 1 && decompress)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Parallel mode is only implemented for compression (not for decompression)");
if (!codecs.empty() && options.count("level"))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Wrong options, --level is not compatible with --codec list");
@ -145,7 +163,6 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
else
codec = CompressionCodecFactory::instance().get(method_family, level);
std::unique_ptr<ReadBufferFromFileBase> rb;
std::unique_ptr<WriteBufferFromFileBase> wb;
@ -186,9 +203,20 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
else
{
/// Compression
CompressedWriteBuffer to(*wb, codec, block_size);
copyData(*rb, to);
to.finalize();
if (num_threads == 1)
{
CompressedWriteBuffer to(*wb, codec, block_size);
copyData(*rb, to);
to.finalize();
}
else
{
ThreadPool pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, num_threads);
ParallelCompressedWriteBuffer to(*wb, codec, block_size, num_threads, pool);
copyData(*rb, to);
to.finalize();
}
}
}
catch (...)

View File

@ -546,16 +546,18 @@ int mainEntryClickHouseDisks(int argc, char ** argv)
catch (const DB::Exception & e)
{
std::cerr << DB::getExceptionMessage(e, false) << std::endl;
return 0;
auto code = DB::getCurrentExceptionCode();
return static_cast<UInt8>(code) ? code : 1;
}
catch (const boost::program_options::error & e)
{
std::cerr << "Bad arguments: " << e.what() << std::endl;
return 0;
return DB::ErrorCodes::BAD_ARGUMENTS;
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(true) << std::endl;
return 0;
auto code = DB::getCurrentExceptionCode();
return static_cast<UInt8>(code) ? code : 1;
}
}

View File

@ -448,7 +448,8 @@ int mainEntryClickHouseKeeperClient(int argc, char ** argv)
catch (const DB::Exception & e)
{
std::cerr << DB::getExceptionMessage(e, false) << std::endl;
return 1;
auto code = DB::getCurrentExceptionCode();
return static_cast<UInt8>(code) ? code : 1;
}
catch (const boost::program_options::error & e)
{
@ -458,6 +459,7 @@ int mainEntryClickHouseKeeperClient(int argc, char ** argv)
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(true) << std::endl;
return 1;
auto code = DB::getCurrentExceptionCode();
return static_cast<UInt8>(code) ? code : 1;
}
}

View File

@ -81,7 +81,7 @@ int mainEntryClickHouseKeeper(int argc, char ** argv)
{
std::cerr << DB::getCurrentExceptionMessage(true) << "\n";
auto code = DB::getCurrentExceptionCode();
return code ? code : 1;
return static_cast<UInt8>(code) ? code : 1;
}
}
@ -672,7 +672,7 @@ catch (...)
/// Poco does not provide stacktrace.
tryLogCurrentException("Application");
auto code = getCurrentExceptionCode();
return code ? code : -1;
return static_cast<UInt8>(code) ? code : -1;
}

View File

@ -13,7 +13,7 @@ int mainEntryClickHouseLibraryBridge(int argc, char ** argv)
{
std::cerr << DB::getCurrentExceptionMessage(true) << "\n";
auto code = DB::getCurrentExceptionCode();
return code ? code : 1;
return static_cast<UInt8>(code) ? code : 1;
}
}

View File

@ -615,12 +615,14 @@ catch (const DB::Exception & e)
{
bool need_print_stack_trace = getClientConfiguration().getBool("stacktrace", false);
std::cerr << getExceptionMessage(e, need_print_stack_trace, true) << std::endl;
return e.code() ? e.code() : -1;
auto code = DB::getCurrentExceptionCode();
return static_cast<UInt8>(code) ? code : 1;
}
catch (...)
{
std::cerr << getCurrentExceptionMessage(false) << std::endl;
return getCurrentExceptionCode();
std::cerr << DB::getCurrentExceptionMessage(true) << '\n';
auto code = DB::getCurrentExceptionCode();
return static_cast<UInt8>(code) ? code : 1;
}
void LocalServer::updateLoggerLevel(const String & logs_level)
@ -1029,7 +1031,7 @@ int mainEntryClickHouseLocal(int argc, char ** argv)
{
std::cerr << DB::getExceptionMessage(e, false) << std::endl;
auto code = DB::getCurrentExceptionCode();
return code ? code : 1;
return static_cast<UInt8>(code) ? code : 1;
}
catch (const boost::program_options::error & e)
{
@ -1040,6 +1042,6 @@ int mainEntryClickHouseLocal(int argc, char ** argv)
{
std::cerr << DB::getCurrentExceptionMessage(true) << '\n';
auto code = DB::getCurrentExceptionCode();
return code ? code : 1;
return static_cast<UInt8>(code) ? code : 1;
}
}

View File

@ -1480,5 +1480,5 @@ catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(true) << "\n";
auto code = DB::getCurrentExceptionCode();
return code ? code : 1;
return static_cast<UInt8>(code) ? code : 1;
}

View File

@ -13,7 +13,7 @@ int mainEntryClickHouseODBCBridge(int argc, char ** argv)
{
std::cerr << DB::getCurrentExceptionMessage(true) << "\n";
auto code = DB::getCurrentExceptionCode();
return code ? code : 1;
return static_cast<UInt8>(code) ? code : 1;
}
}

View File

@ -343,7 +343,7 @@ int mainEntryClickHouseServer(int argc, char ** argv)
{
std::cerr << DB::getCurrentExceptionMessage(true) << "\n";
auto code = DB::getCurrentExceptionCode();
return code ? code : 1;
return static_cast<UInt8>(code) ? code : 1;
}
}
@ -2537,7 +2537,7 @@ catch (...)
/// Poco does not provide stacktrace.
tryLogCurrentException("Application");
auto code = getCurrentExceptionCode();
return code ? code : -1;
return static_cast<UInt8>(code) ? code : -1;
}
std::unique_ptr<TCPProtocolStackFactory> Server::buildProtocolStackFromConfig(

View File

@ -1650,6 +1650,11 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
if (!parsed_insert_query)
return;
/// If it's clickhouse-local, and the input data reading is already baked into the query pipeline,
/// don't read the data again here. This happens in some cases (e.g. input() table function) but not others (e.g. INFILE).
if (!connection->isSendDataNeeded())
return;
bool have_data_in_stdin = !is_interactive && !stdin_is_a_tty && isStdinNotEmptyAndValid(std_in);
if (need_render_progress)

View File

@ -148,7 +148,21 @@ void highlight(const String & query, std::vector<replxx::Replxx::Color> & colors
try
{
parse_res = parser.parse(token_iterator, ast, expected);
while (!token_iterator->isEnd())
{
parse_res = parser.parse(token_iterator, ast, expected);
if (!parse_res)
break;
if (!token_iterator->isEnd() && token_iterator->type != TokenType::Semicolon)
{
parse_res = false;
break;
}
while (token_iterator->type == TokenType::Semicolon)
++token_iterator;
}
}
catch (...)
{
@ -182,7 +196,7 @@ void highlight(const String & query, std::vector<replxx::Replxx::Color> & colors
/// Highlight the last error in red. If the parser failed or the lexer found an invalid token,
/// or if it didn't parse all the data (except, the data for INSERT query, which is legitimately unparsed)
if ((!parse_res || last_token.isError() || (!token_iterator->isEnd() && token_iterator->type != TokenType::Semicolon))
if ((!parse_res || last_token.isError())
&& !(insert_data && expected.max_parsed_pos >= insert_data)
&& expected.max_parsed_pos >= prev)
{

View File

@ -109,6 +109,10 @@ public:
/// Send block of data; if name is specified, server will write it to external (temporary) table of that name.
virtual void sendData(const Block & block, const String & name, bool scalar) = 0;
/// Whether the client needs to read and send the data for the INSERT.
/// False if the server will read the data through other means (in particular if clickhouse-local added input reading step directly into the query pipeline).
virtual bool isSendDataNeeded() const { return true; }
/// Send all contents of external (temporary) tables.
virtual void sendExternalTablesData(ExternalTablesData & data) = 0;

View File

@ -328,6 +328,11 @@ void LocalConnection::sendData(const Block & block, const String &, bool)
sendProfileEvents();
}
bool LocalConnection::isSendDataNeeded() const
{
return !state || state->input_pipeline == nullptr;
}
void LocalConnection::sendCancel()
{
state->is_cancelled = true;

View File

@ -120,6 +120,8 @@ public:
void sendData(const Block & block, const String & name/* = "" */, bool scalar/* = false */) override;
bool isSendDataNeeded() const override;
void sendExternalTablesData(ExternalTablesData &) override;
void sendMergeTreeReadTaskResponse(const ParallelReadResponse & response) override;

View File

@ -41,6 +41,10 @@
M(PostgreSQLConnection, "Number of client connections using PostgreSQL protocol") \
M(OpenFileForRead, "Number of files open for reading") \
M(OpenFileForWrite, "Number of files open for writing") \
M(Compressing, "Number of compress operations using internal compression codecs") \
M(Decompressing, "Number of decompress operations using internal compression codecs") \
M(ParallelCompressedWriteBufferThreads, "Number of threads in all instances of ParallelCompressedWriteBuffer - these threads are doing parallel compression and writing") \
M(ParallelCompressedWriteBufferWait, "Number of threads in all instances of ParallelCompressedWriteBuffer that are currently waiting for buffer to become available for writing") \
M(TotalTemporaryFiles, "Number of temporary files created") \
M(TemporaryFilesForSort, "Number of temporary files created for external sorting") \
M(TemporaryFilesForAggregation, "Number of temporary files created for external aggregation") \
@ -99,6 +103,9 @@
M(IOThreads, "Number of threads in the IO thread pool.") \
M(IOThreadsActive, "Number of threads in the IO thread pool running a task.") \
M(IOThreadsScheduled, "Number of queued or active jobs in the IO thread pool.") \
M(CompressionThread, "Number of threads in compression thread pools.") \
M(CompressionThreadActive, "Number of threads in compression thread pools running a task.") \
M(CompressionThreadScheduled, "Number of queued or active jobs in compression thread pools.") \
M(ThreadPoolRemoteFSReaderThreads, "Number of threads in the thread pool for remote_filesystem_read_method=threadpool.") \
M(ThreadPoolRemoteFSReaderThreadsActive, "Number of threads in the thread pool for remote_filesystem_read_method=threadpool running a task.") \
M(ThreadPoolRemoteFSReaderThreadsScheduled, "Number of queued or active jobs in the thread pool for remote_filesystem_read_method=threadpool.") \

View File

@ -122,7 +122,7 @@ public:
void scheduleOrThrowOnError(Job job, Priority priority = {});
/// Similar to scheduleOrThrowOnError(...). Wait for specified amount of time and schedule a job or return false.
bool trySchedule(Job job, Priority priority = {}, uint64_t wait_microseconds = 0) noexcept;
[[nodiscard]] bool trySchedule(Job job, Priority priority = {}, uint64_t wait_microseconds = 0) noexcept;
/// Similar to scheduleOrThrowOnError(...). Wait for specified amount of time and schedule a job or throw an exception.
void scheduleOrThrow(Job job, Priority priority = {}, uint64_t wait_microseconds = 0, bool propagate_opentelemetry_tracing_context = true);
@ -142,7 +142,7 @@ public:
/// Returns true if the pool already terminated
/// (and any further scheduling will produce CANNOT_SCHEDULE_TASK exception)
bool finished() const;
[[nodiscard]] bool finished() const;
void setMaxThreads(size_t value);
void setMaxFreeThreads(size_t value);

View File

@ -2,7 +2,6 @@
#include <cstring>
#include <base/types.h>
#include <base/unaligned.h>
#include <base/defines.h>
#include <IO/WriteHelpers.h>

View File

@ -5,11 +5,18 @@
#include <Parsers/ASTFunction.h>
#include <base/unaligned.h>
#include <Common/Exception.h>
#include <Common/CurrentMetrics.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTIdentifier.h>
#include <Compression/CompressionCodecMultiple.h>
namespace CurrentMetrics
{
extern const Metric Compressing;
extern const Metric Decompressing;
}
namespace DB
{
@ -80,6 +87,8 @@ UInt32 ICompressionCodec::compress(const char * source, UInt32 source_size, char
{
assert(source != nullptr && dest != nullptr);
CurrentMetrics::Increment metric_increment(CurrentMetrics::Compressing);
dest[0] = getMethodByte();
UInt8 header_size = getHeaderSize();
/// Write data from header_size
@ -93,6 +102,8 @@ UInt32 ICompressionCodec::decompress(const char * source, UInt32 source_size, ch
{
assert(source != nullptr && dest != nullptr);
CurrentMetrics::Increment metric_increment(CurrentMetrics::Decompressing);
UInt8 header_size = getHeaderSize();
if (source_size < header_size)
throw Exception(decompression_error_code,

View File

@ -0,0 +1,166 @@
#include <city.h>
#include <base/types.h>
#include <base/defines.h>
#include <IO/WriteHelpers.h>
#include <Common/setThreadName.h>
#include <Common/scope_guard_safe.h>
#include <Common/CurrentThread.h>
#include <Common/CurrentMetrics.h>
#include <Compression/ParallelCompressedWriteBuffer.h>
namespace CurrentMetrics
{
extern const Metric ParallelCompressedWriteBufferThreads;
extern const Metric ParallelCompressedWriteBufferWait;
}
namespace DB
{
ParallelCompressedWriteBuffer::ParallelCompressedWriteBuffer(
WriteBuffer & out_,
CompressionCodecPtr codec_,
size_t buf_size_,
size_t num_threads_,
ThreadPool & pool_)
: WriteBuffer(nullptr, 0), out(out_), codec(codec_), buf_size(buf_size_), num_threads(num_threads_), pool(pool_)
{
buffers.emplace_back(buf_size);
current_buffer = buffers.begin();
BufferBase::set(current_buffer->uncompressed.data(), buf_size, 0);
}
void ParallelCompressedWriteBuffer::nextImpl()
{
if (!offset())
return;
std::unique_lock lock(mutex);
/// The buffer will be compressed and processed in the thread.
current_buffer->busy = true;
current_buffer->sequence_num = current_sequence_num;
++current_sequence_num;
current_buffer->uncompressed_size = offset();
pool.scheduleOrThrowOnError([this, my_current_buffer = current_buffer, thread_group = CurrentThread::getGroup()]
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachFromGroupIfNotDetached();
);
if (thread_group)
CurrentThread::attachToGroupIfDetached(thread_group);
setThreadName("ParallelCompres");
compress(my_current_buffer);
});
BufferPair * previous_buffer = &*current_buffer;
++current_buffer;
if (current_buffer == buffers.end())
{
if (buffers.size() < num_threads)
{
/// If we didn't use all num_threads buffers yet, create a new one.
current_buffer = buffers.emplace(current_buffer, buf_size);
}
else
{
/// Otherwise, wrap around to the first buffer in the list.
current_buffer = buffers.begin();
}
}
/// Wait while the buffer becomes not busy
if (current_buffer->busy)
{
CurrentMetrics::Increment metric_increment(CurrentMetrics::ParallelCompressedWriteBufferWait);
cond.wait(lock, [&]{ return !current_buffer->busy; });
}
/// Now this buffer can be used.
current_buffer->previous = previous_buffer;
BufferBase::set(current_buffer->uncompressed.data(), buf_size, 0);
}
void ParallelCompressedWriteBuffer::finalizeImpl()
{
next();
pool.wait();
}
void ParallelCompressedWriteBuffer::compress(Iterator buffer)
{
CurrentMetrics::Increment metric_increment(CurrentMetrics::ParallelCompressedWriteBufferThreads);
chassert(buffer->uncompressed_size <= INT_MAX);
UInt32 uncompressed_size = static_cast<UInt32>(buffer->uncompressed_size);
UInt32 compressed_reserve_size = codec->getCompressedReserveSize(uncompressed_size);
/// If all previous buffers have been written,
/// and if the output buffer has the required capacity,
/// we can compress data directly into the output buffer.
size_t required_out_capacity = compressed_reserve_size + sizeof(CityHash_v1_0_2::uint128);
bool can_write_directly = false;
if (!buffer->previous)
{
can_write_directly = out.available() >= required_out_capacity;
}
else
{
std::unique_lock lock(mutex);
can_write_directly = (!buffer->previous->busy || buffer->previous->sequence_num > buffer->sequence_num)
&& out.available() >= required_out_capacity;
}
if (can_write_directly)
{
char * out_compressed_ptr = out.position() + sizeof(CityHash_v1_0_2::uint128);
UInt32 compressed_size = codec->compress(buffer->uncompressed.data(), uncompressed_size, out_compressed_ptr);
CityHash_v1_0_2::uint128 checksum = CityHash_v1_0_2::CityHash128(out_compressed_ptr, compressed_size);
writeBinaryLittleEndian(checksum.low64, out);
writeBinaryLittleEndian(checksum.high64, out);
out.position() += compressed_size;
}
else
{
buffer->compressed.resize(compressed_reserve_size);
UInt32 compressed_size = codec->compress(buffer->uncompressed.data(), uncompressed_size, buffer->compressed.data());
CityHash_v1_0_2::uint128 checksum = CityHash_v1_0_2::CityHash128(buffer->compressed.data(), compressed_size);
/// Wait while all previous buffers have been written.
if (buffer->previous)
{
CurrentMetrics::Increment metric_wait_increment(CurrentMetrics::ParallelCompressedWriteBufferWait);
std::unique_lock lock(mutex);
cond.wait(lock, [&]{ return !buffer->previous->busy || buffer->previous->sequence_num > buffer->sequence_num; });
}
writeBinaryLittleEndian(checksum.low64, out);
writeBinaryLittleEndian(checksum.high64, out);
out.write(buffer->compressed.data(), compressed_size);
}
std::unique_lock lock(mutex);
buffer->busy = false;
cond.notify_all();
}
ParallelCompressedWriteBuffer::~ParallelCompressedWriteBuffer()
{
if (!canceled)
finalize();
}
}

View File

@ -0,0 +1,70 @@
#pragma once
#include <list>
#include <memory>
#include <Common/PODArray.h>
#include <IO/WriteBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <Compression/ICompressionCodec.h>
#include <Compression/CompressionFactory.h>
#include <Common/ThreadPool.h>
namespace DB
{
/** Uses multi-buffering for parallel compression.
* When the buffer is filled, it will be compressed in the background,
* and a new buffer is created for the next input data.
*/
class ParallelCompressedWriteBuffer final : public WriteBuffer
{
public:
explicit ParallelCompressedWriteBuffer(
WriteBuffer & out_,
CompressionCodecPtr codec_,
size_t buf_size_,
size_t num_threads_,
ThreadPool & pool_);
~ParallelCompressedWriteBuffer() override;
private:
void nextImpl() override;
void finalizeImpl() override;
WriteBuffer & out;
CompressionCodecPtr codec;
size_t buf_size;
size_t num_threads;
ThreadPool & pool;
struct BufferPair
{
explicit BufferPair(size_t input_size)
: uncompressed(input_size)
{
}
Memory<> uncompressed;
size_t uncompressed_size = 0;
PODArray<char> compressed;
BufferPair * previous = nullptr;
size_t sequence_num = 0;
bool busy = false;
};
std::mutex mutex;
std::condition_variable cond;
std::list<BufferPair> buffers;
using Iterator = std::list<BufferPair>::iterator;
Iterator current_buffer;
size_t current_sequence_num = 0;
void compress(Iterator buffer);
};
}

View File

@ -334,22 +334,26 @@ HashedDictionary<dictionary_key_type, sparse, sharded>::~HashedDictionary()
if (container.empty())
return;
pool.trySchedule([&container, thread_group = CurrentThread::getGroup()]
{
SCOPE_EXIT_SAFE(
if (!pool.trySchedule([&container, thread_group = CurrentThread::getGroup()]
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachFromGroupIfNotDetached();
);
/// Do not account memory that was occupied by the dictionaries for the query/user context.
MemoryTrackerBlockerInThread memory_blocker;
if (thread_group)
CurrentThread::detachFromGroupIfNotDetached();
);
CurrentThread::attachToGroupIfDetached(thread_group);
setThreadName("HashedDictDtor");
/// Do not account memory that was occupied by the dictionaries for the query/user context.
clearContainer(container);
}))
{
MemoryTrackerBlockerInThread memory_blocker;
if (thread_group)
CurrentThread::attachToGroupIfDetached(thread_group);
setThreadName("HashedDictDtor");
clearContainer(container);
});
}
++hash_tables_count;
};

View File

@ -108,27 +108,29 @@ public:
{
size_t arguments_size = arguments.size();
const auto * lhs_array = assert_cast<const ColumnArray *>(arguments.at(1).column.get());
ColumnPtr first_array_materialized = arguments[1].column->convertToFullColumnIfConst();
const ColumnArray & first_array = assert_cast<const ColumnArray &>(*first_array_materialized);
Columns data_columns;
data_columns.reserve(arguments_size);
data_columns.push_back(lhs_array->getDataPtr());
data_columns.push_back(first_array.getDataPtr());
for (size_t i = 2; i < arguments_size; ++i)
{
const auto * rhs_array = assert_cast<const ColumnArray *>(arguments[i].column.get());
ColumnPtr other_array_materialized = arguments[i].column->convertToFullColumnIfConst();
const ColumnArray & other_array = assert_cast<const ColumnArray &>(*other_array_materialized);
if (!lhs_array->hasEqualOffsets(*rhs_array))
if (!first_array.hasEqualOffsets(other_array))
throw Exception(ErrorCodes::SIZES_OF_ARRAYS_DONT_MATCH,
"The argument 2 and argument {} of function {} have different array offsets",
i + 1,
getName());
data_columns.push_back(rhs_array->getDataPtr());
data_columns.push_back(other_array.getDataPtr());
}
auto tuple_column = ColumnTuple::create(std::move(data_columns));
auto array_column = ColumnArray::create(std::move(tuple_column), lhs_array->getOffsetsPtr());
auto array_column = ColumnArray::create(std::move(tuple_column), first_array.getOffsetsPtr());
return array_column;
}
@ -168,7 +170,12 @@ REGISTER_FUNCTION(Nested)
{
factory.registerFunction<FunctionNested>(FunctionDocumentation{
.description=R"(
This is a function used internally by the ClickHouse engine and not meant to be used directly.
Returns the array of tuples from multiple arrays.
The first argument must be a constant array of Strings determining the names of the resulting Tuple.
The other arguments must be arrays of the same size.
)",
.examples{{"nested", "SELECT nested(['keys', 'values'], ['key_1', 'key_2'], ['value_1','value_2'])", ""}},
.categories{"OtherFunctions"}

View File

@ -645,7 +645,7 @@ Client::doRequestWithRetryNetworkErrors(RequestType & request, RequestFn request
try
{
/// S3 does retries network errors actually.
/// But it is matter when errors occur.
/// But it does matter when errors occur.
/// This code retries a specific case when
/// network error happens when XML document is being read from the response body.
/// Hence, the response body is a stream, network errors are possible at reading.
@ -656,8 +656,9 @@ Client::doRequestWithRetryNetworkErrors(RequestType & request, RequestFn request
/// Requests that expose the response stream as an answer are not retried with that code. E.g. GetObject.
return request_fn_(request_);
}
catch (Poco::Net::ConnectionResetException &)
catch (Poco::Net::NetException &)
{
/// This includes "connection reset", "malformed message", and possibly other exceptions.
if constexpr (IsReadMethod)
{

View File

@ -121,8 +121,7 @@ StoragePtr InterpreterInsertQuery::getTable(ASTInsertQuery & query)
if (current_context->getSettingsRef()[Setting::allow_experimental_analyzer])
{
InterpreterSelectQueryAnalyzer interpreter_select(query.select, current_context, select_query_options);
header_block = interpreter_select.getSampleBlock();
header_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query.select, current_context, select_query_options);
}
else
{

View File

@ -107,7 +107,7 @@ struct ManyAggregatedData
if (variant->aggregator)
{
// variant is moved here and will be destroyed in the destructor of the lambda function.
pool->trySchedule(
pool->scheduleOrThrowOnError(
[my_variant = std::move(variant), thread_group = CurrentThread::getGroup()]()
{
SCOPE_EXIT_SAFE(

View File

@ -1,7 +1,15 @@
# foo
# foo (pipe)
foo
# foo (file)
foo
# !foo
# bar
bar
# defaults
bam
# inferred destination table structure
foo
# direct
foo
# infile
foo

View File

@ -4,17 +4,40 @@ CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
tmp_file="$CUR_DIR/$CLICKHOUSE_DATABASE.txt"
echo '# foo'
tmp_file="$CUR_DIR/03031_$CLICKHOUSE_DATABASE.txt"
tmp_input="$CUR_DIR/03031_${CLICKHOUSE_DATABASE}_in.txt"
echo '# foo (pipe)'
$CLICKHOUSE_LOCAL --engine_file_truncate_on_insert=1 -q "insert into function file('$tmp_file', 'LineAsString', 'x String') select * from input('x String') format LineAsString" <<<foo
cat "$tmp_file"
echo '# foo (file)'
echo 'foo' > "$tmp_input"
$CLICKHOUSE_LOCAL --engine_file_truncate_on_insert=1 -q "insert into function file('$tmp_file', 'LineAsString', 'x String') select * from input('x String') format LineAsString" <"$tmp_input"
cat "$tmp_file"
echo '# !foo'
$CLICKHOUSE_LOCAL --engine_file_truncate_on_insert=1 -q "insert into function file('$tmp_file', 'LineAsString', 'x String') select * from input('x String') where x != 'foo' format LineAsString" <<<foo
cat "$tmp_file"
echo '# bar'
$CLICKHOUSE_LOCAL --engine_file_truncate_on_insert=1 -q "insert into function file('$tmp_file', 'LineAsString', 'x String') select y from input('x String, y String') format TSV" <<<$'foo\tbar'
cat "$tmp_file"
echo '# defaults'
$CLICKHOUSE_LOCAL --input_format_tsv_empty_as_default=1 --engine_file_truncate_on_insert=1 -q "insert into function file('$tmp_file', 'LineAsString', 'x String') select y from input('x String, y String DEFAULT \\'bam\\'') format TSV" <<<$'foo\t'
cat "$tmp_file"
rm -f "${tmp_file:?}"
echo '# inferred destination table structure'
$CLICKHOUSE_LOCAL --engine_file_truncate_on_insert=1 -q "insert into function file('$tmp_file', 'TSV') select * from input('x String') format LineAsString" <"$tmp_input"
cat "$tmp_file"
echo '# direct'
$CLICKHOUSE_LOCAL --engine_file_truncate_on_insert=1 -q "insert into function file('$tmp_file', 'LineAsString', 'x String') format LineAsString" <"$tmp_input"
cat "$tmp_file"
echo '# infile'
$CLICKHOUSE_LOCAL --engine_file_truncate_on_insert=1 -q "insert into function file('$tmp_file', 'LineAsString', 'x String') from infile '$tmp_input' format LineAsString"
cat "$tmp_file"
rm -f "${tmp_file:?}" "${tmp_input:?}"

View File

@ -0,0 +1,3 @@
[(1,3),(2,4)]
0 0
0 0 1

View File

@ -0,0 +1,16 @@
SELECT nested(['a', 'b'], [1, 2], materialize([3, 4]));
DROP TABLE IF EXISTS test;
CREATE TABLE test
(
x UInt8,
struct.x DEFAULT [0],
struct.y ALIAS [1],
)
ENGINE = Memory;
insert into test (x) values (0);
select * from test array join struct;
select x, struct.x, struct.y from test array join struct;
DROP TABLE test;