Merge remote-tracking branch 'upstream/master' into improvement/diff-types-in-avg-weighted

This commit is contained in:
myrrc 2020-10-30 17:22:24 +03:00
commit 9ca35c0b44
266 changed files with 22533 additions and 3130 deletions

View File

@ -2362,7 +2362,7 @@ No changes compared to v20.4.3.16-stable.
* `Live View` table engine refactoring. [#8519](https://github.com/ClickHouse/ClickHouse/pull/8519) ([vzakaznikov](https://github.com/vzakaznikov))
* Add additional checks for external dictionaries created from DDL-queries. [#8127](https://github.com/ClickHouse/ClickHouse/pull/8127) ([alesapin](https://github.com/alesapin))
* Fix error `Column ... already exists` while using `FINAL` and `SAMPLE` together, e.g. `select count() from table final sample 1/2`. Fixes [#5186](https://github.com/ClickHouse/ClickHouse/issues/5186). [#7907](https://github.com/ClickHouse/ClickHouse/pull/7907) ([Nikolai Kochetov](https://github.com/KochetovNicolai))
* Now table the first argument of `joinGet` function can be table indentifier. [#7707](https://github.com/ClickHouse/ClickHouse/pull/7707) ([Amos Bird](https://github.com/amosbird))
* Now table the first argument of `joinGet` function can be table identifier. [#7707](https://github.com/ClickHouse/ClickHouse/pull/7707) ([Amos Bird](https://github.com/amosbird))
* Allow using `MaterializedView` with subqueries above `Kafka` tables. [#8197](https://github.com/ClickHouse/ClickHouse/pull/8197) ([filimonov](https://github.com/filimonov))
* Now background moves between disks run it the seprate thread pool. [#7670](https://github.com/ClickHouse/ClickHouse/pull/7670) ([Vladimir Chebotarev](https://github.com/excitoon))
* `SYSTEM RELOAD DICTIONARY` now executes synchronously. [#8240](https://github.com/ClickHouse/ClickHouse/pull/8240) ([Vitaly Baranov](https://github.com/vitlibar))

View File

@ -59,25 +59,6 @@ set(CMAKE_DEBUG_POSTFIX "d" CACHE STRING "Generate debug library name with a pos
# For more info see https://cmake.org/cmake/help/latest/prop_gbl/USE_FOLDERS.html
set_property(GLOBAL PROPERTY USE_FOLDERS ON)
# cmake 3.9+ needed.
# Usually impractical.
# See also ${ENABLE_THINLTO}
option(ENABLE_IPO "Full link time optimization")
if(ENABLE_IPO)
cmake_policy(SET CMP0069 NEW)
include(CheckIPOSupported)
check_ipo_supported(RESULT IPO_SUPPORTED OUTPUT IPO_NOT_SUPPORTED)
if(IPO_SUPPORTED)
message(STATUS "IPO/LTO is supported, enabling")
set(CMAKE_INTERPROCEDURAL_OPTIMIZATION TRUE)
else()
message (${RECONFIGURE_MESSAGE_LEVEL} "IPO/LTO is not supported: <${IPO_NOT_SUPPORTED}>")
endif()
else()
message(STATUS "IPO/LTO not enabled.")
endif()
# Check that submodules are present only if source was downloaded with git
if (EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/.git" AND NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/boost/boost")
message (FATAL_ERROR "Submodules are not initialized. Run\n\tgit submodule update --init --recursive")

View File

@ -17,4 +17,6 @@ ClickHouse is an open-source column-oriented database management system that all
## Upcoming Events
* [ClickHouse virtual office hours](https://www.eventbrite.com/e/clickhouse-october-virtual-meetup-office-hours-tickets-123129500651) on October 22, 2020.
* [The Second ClickHouse Meetup East (online)](https://www.eventbrite.com/e/the-second-clickhouse-meetup-east-tickets-126787955187) on October 31, 2020.
* [ClickHouse for Enterprise Meetup (online in Russian)](https://arenadata-events.timepad.ru/event/1465249/) on November 10, 2020.

View File

@ -51,7 +51,7 @@ struct StringRef
};
/// Here constexpr doesn't implicate inline, see https://www.viva64.com/en/w/v1043/
/// nullptr can't be used because the StringRef values are used in SipHash's pointer arithmetics
/// nullptr can't be used because the StringRef values are used in SipHash's pointer arithmetic
/// and the UBSan thinks that something like nullptr + 8 is UB.
constexpr const inline char empty_string_ref_addr{};
constexpr const inline StringRef EMPTY_STRING_REF{&empty_string_ref_addr, 0};

View File

@ -15,7 +15,7 @@ CFLAGS (GLOBAL -DVERSION_MAJOR=${VERSION_MAJOR})
CFLAGS (GLOBAL -DVERSION_MINOR=${VERSION_MINOR})
CFLAGS (GLOBAL -DVERSION_PATCH=${VERSION_PATCH})
# TODO: not supported yet, not sure if ya.make supports arithmetics.
# TODO: not supported yet, not sure if ya.make supports arithmetic.
CFLAGS (GLOBAL -DVERSION_INTEGER=0)
CFLAGS (GLOBAL -DVERSION_NAME=\"\\\"${VERSION_NAME}\\\"\")

View File

@ -192,7 +192,7 @@ set(SRCS
${HDFS3_SOURCE_DIR}/common/FileWrapper.h
)
# old kernels (< 3.17) doens't have SYS_getrandom. Always use POSIX implementation to have better compatibility
# old kernels (< 3.17) doesn't have SYS_getrandom. Always use POSIX implementation to have better compatibility
set_source_files_properties(${HDFS3_SOURCE_DIR}/rpc/RpcClient.cpp PROPERTIES COMPILE_FLAGS "-DBOOST_UUID_RANDOM_PROVIDER_FORCE_POSIX=1")
# target

@ -1 +1 @@
Subproject commit f5638e954a79f50bac7c7a5deaa5a241e0ce8b5f
Subproject commit 1485b0de3eaa1508dfe49a5ba1e4aa2a71fd8335

View File

@ -31,10 +31,6 @@ RUN curl -O https://clickhouse-builds.s3.yandex.net/utils/1/dpkg-deb \
&& chmod +x dpkg-deb \
&& cp dpkg-deb /usr/bin
RUN export CODENAME="$(lsb_release --codename --short | tr 'A-Z' 'a-z')" \
&& wget -nv -O /tmp/arrow-keyring.deb "https://apache.bintray.com/arrow/ubuntu/apache-arrow-archive-keyring-latest-${CODENAME}.deb" \
&& dpkg -i /tmp/arrow-keyring.deb
# Libraries from OS are only needed to test the "unbundled" build (this is not used in production).
RUN apt-get update \
&& apt-get install \

View File

@ -1,6 +1,10 @@
# docker build -t yandex/clickhouse-unbundled-builder .
FROM yandex/clickhouse-deb-builder
RUN export CODENAME="$(lsb_release --codename --short | tr 'A-Z' 'a-z')" \
&& wget -nv -O /tmp/arrow-keyring.deb "https://apache.bintray.com/arrow/ubuntu/apache-arrow-archive-keyring-latest-${CODENAME}.deb" \
&& dpkg -i /tmp/arrow-keyring.deb
# Libraries from OS are only needed to test the "unbundled" build (that is not used in production).
RUN apt-get update \
&& apt-get install \

View File

@ -0,0 +1,8 @@
# post / preinstall scripts (not needed, we do it in Dockerfile)
alpine-root/install/*
# docs (looks useless)
alpine-root/usr/share/doc/*
# packages, etc. (used by prepare.sh)
alpine-root/tgz-packages/*

1
docker/server/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
alpine-root/*

View File

@ -0,0 +1,26 @@
FROM alpine
ENV LANG=en_US.UTF-8 \
LANGUAGE=en_US:en \
LC_ALL=en_US.UTF-8 \
TZ=UTC \
CLICKHOUSE_CONFIG=/etc/clickhouse-server/config.xml
COPY alpine-root/ /
# from https://github.com/ClickHouse/ClickHouse/blob/master/debian/clickhouse-server.postinst
RUN addgroup clickhouse \
&& adduser -S -H -h /nonexistent -s /bin/false -G clickhouse -g "ClickHouse server" clickhouse \
&& chown clickhouse:clickhouse /var/lib/clickhouse \
&& chmod 700 /var/lib/clickhouse \
&& chown root:clickhouse /var/log/clickhouse-server \
&& chmod 775 /var/log/clickhouse-server \
&& chmod +x /entrypoint.sh \
&& apk add --no-cache su-exec
EXPOSE 9000 8123 9009
VOLUME /var/lib/clickhouse \
/var/log/clickhouse-server
ENTRYPOINT ["/entrypoint.sh"]

59
docker/server/alpine-build.sh Executable file
View File

@ -0,0 +1,59 @@
#!/bin/bash
set -x
REPO_CHANNEL="${REPO_CHANNEL:-stable}" # lts / testing / prestable / etc
REPO_URL="${REPO_URL:-"https://repo.yandex.ru/clickhouse/tgz/${REPO_CHANNEL}"}"
VERSION="${VERSION:-20.9.3.45}"
# where original files live
DOCKER_BUILD_FOLDER="${BASH_SOURCE%/*}"
# we will create root for our image here
CONTAINER_ROOT_FOLDER="${DOCKER_BUILD_FOLDER}/alpine-root"
# where to put downloaded tgz
TGZ_PACKAGES_FOLDER="${CONTAINER_ROOT_FOLDER}/tgz-packages"
# clean up the root from old runs
rm -rf "$CONTAINER_ROOT_FOLDER"
mkdir -p "$TGZ_PACKAGES_FOLDER"
PACKAGES=( "clickhouse-client" "clickhouse-server" "clickhouse-common-static" )
# download tars from the repo
for package in "${PACKAGES[@]}"
do
wget -q --show-progress "${REPO_URL}/${package}-${VERSION}.tgz" -O "${TGZ_PACKAGES_FOLDER}/${package}-${VERSION}.tgz"
done
# unpack tars
for package in "${PACKAGES[@]}"
do
tar xvzf "${TGZ_PACKAGES_FOLDER}/${package}-${VERSION}.tgz" --strip-components=2 -C "$CONTAINER_ROOT_FOLDER"
done
# prepare few more folders
mkdir -p "${CONTAINER_ROOT_FOLDER}/etc/clickhouse-server/users.d" \
"${CONTAINER_ROOT_FOLDER}/etc/clickhouse-server/config.d" \
"${CONTAINER_ROOT_FOLDER}/var/log/clickhouse-server" \
"${CONTAINER_ROOT_FOLDER}/var/lib/clickhouse" \
"${CONTAINER_ROOT_FOLDER}/docker-entrypoint-initdb.d" \
"${CONTAINER_ROOT_FOLDER}/lib64"
cp "${DOCKER_BUILD_FOLDER}/docker_related_config.xml" "${CONTAINER_ROOT_FOLDER}/etc/clickhouse-server/config.d/"
cp "${DOCKER_BUILD_FOLDER}/entrypoint.alpine.sh" "${CONTAINER_ROOT_FOLDER}/entrypoint.sh"
## get glibc components from ubuntu 20.04 and put them to expected place
docker pull ubuntu:20.04
ubuntu20image=$(docker create --rm ubuntu:20.04)
docker cp -L ${ubuntu20image}:/lib/x86_64-linux-gnu/libc.so.6 "${CONTAINER_ROOT_FOLDER}/lib"
docker cp -L ${ubuntu20image}:/lib/x86_64-linux-gnu/libdl.so.2 "${CONTAINER_ROOT_FOLDER}/lib"
docker cp -L ${ubuntu20image}:/lib/x86_64-linux-gnu/libm.so.6 "${CONTAINER_ROOT_FOLDER}/lib"
docker cp -L ${ubuntu20image}:/lib/x86_64-linux-gnu/libpthread.so.0 "${CONTAINER_ROOT_FOLDER}/lib"
docker cp -L ${ubuntu20image}:/lib/x86_64-linux-gnu/librt.so.1 "${CONTAINER_ROOT_FOLDER}/lib"
docker cp -L ${ubuntu20image}:/lib/x86_64-linux-gnu/libnss_dns.so.2 "${CONTAINER_ROOT_FOLDER}/lib"
docker cp -L ${ubuntu20image}:/lib/x86_64-linux-gnu/libresolv.so.2 "${CONTAINER_ROOT_FOLDER}/lib"
docker cp -L ${ubuntu20image}:/lib64/ld-linux-x86-64.so.2 "${CONTAINER_ROOT_FOLDER}/lib64"
docker build "$DOCKER_BUILD_FOLDER" -f Dockerfile.alpine -t "yandex/clickhouse-server:${VERSION}-alpine" --pull

View File

@ -0,0 +1,152 @@
#!/bin/sh
#set -x
DO_CHOWN=1
if [ "$CLICKHOUSE_DO_NOT_CHOWN" = 1 ]; then
DO_CHOWN=0
fi
CLICKHOUSE_UID="${CLICKHOUSE_UID:-"$(id -u clickhouse)"}"
CLICKHOUSE_GID="${CLICKHOUSE_GID:-"$(id -g clickhouse)"}"
# support --user
if [ "$(id -u)" = "0" ]; then
USER=$CLICKHOUSE_UID
GROUP=$CLICKHOUSE_GID
# busybox has setuidgid & chpst buildin
gosu="su-exec $USER:$GROUP"
else
USER="$(id -u)"
GROUP="$(id -g)"
gosu=""
DO_CHOWN=0
fi
# set some vars
CLICKHOUSE_CONFIG="${CLICKHOUSE_CONFIG:-/etc/clickhouse-server/config.xml}"
# port is needed to check if clickhouse-server is ready for connections
HTTP_PORT="$(clickhouse extract-from-config --config-file $CLICKHOUSE_CONFIG --key=http_port)"
# get CH directories locations
DATA_DIR="$(clickhouse extract-from-config --config-file $CLICKHOUSE_CONFIG --key=path || true)"
TMP_DIR="$(clickhouse extract-from-config --config-file $CLICKHOUSE_CONFIG --key=tmp_path || true)"
USER_PATH="$(clickhouse extract-from-config --config-file $CLICKHOUSE_CONFIG --key=user_files_path || true)"
LOG_PATH="$(clickhouse extract-from-config --config-file $CLICKHOUSE_CONFIG --key=logger.log || true)"
LOG_DIR="$(dirname $LOG_PATH || true)"
ERROR_LOG_PATH="$(clickhouse extract-from-config --config-file $CLICKHOUSE_CONFIG --key=logger.errorlog || true)"
ERROR_LOG_DIR="$(dirname $ERROR_LOG_PATH || true)"
FORMAT_SCHEMA_PATH="$(clickhouse extract-from-config --config-file $CLICKHOUSE_CONFIG --key=format_schema_path || true)"
CLICKHOUSE_USER="${CLICKHOUSE_USER:-default}"
CLICKHOUSE_PASSWORD="${CLICKHOUSE_PASSWORD:-}"
CLICKHOUSE_DB="${CLICKHOUSE_DB:-}"
for dir in "$DATA_DIR" \
"$ERROR_LOG_DIR" \
"$LOG_DIR" \
"$TMP_DIR" \
"$USER_PATH" \
"$FORMAT_SCHEMA_PATH"
do
# check if variable not empty
[ -z "$dir" ] && continue
# ensure directories exist
if ! mkdir -p "$dir"; then
echo "Couldn't create necessary directory: $dir"
exit 1
fi
if [ "$DO_CHOWN" = "1" ]; then
# ensure proper directories permissions
chown -R "$USER:$GROUP" "$dir"
elif [ "$(stat -c %u "$dir")" != "$USER" ]; then
echo "Necessary directory '$dir' isn't owned by user with id '$USER'"
exit 1
fi
done
# if clickhouse user is defined - create it (user "default" already exists out of box)
if [ -n "$CLICKHOUSE_USER" ] && [ "$CLICKHOUSE_USER" != "default" ] || [ -n "$CLICKHOUSE_PASSWORD" ]; then
echo "$0: create new user '$CLICKHOUSE_USER' instead 'default'"
cat <<EOT > /etc/clickhouse-server/users.d/default-user.xml
<yandex>
<!-- Docs: <https://clickhouse.tech/docs/en/operations/settings/settings_users/> -->
<users>
<!-- Remove default user -->
<default remove="remove">
</default>
<${CLICKHOUSE_USER}>
<profile>default</profile>
<networks>
<ip>::/0</ip>
</networks>
<password>${CLICKHOUSE_PASSWORD}</password>
<quota>default</quota>
</${CLICKHOUSE_USER}>
</users>
</yandex>
EOT
fi
if [ -n "$(ls /docker-entrypoint-initdb.d/)" ] || [ -n "$CLICKHOUSE_DB" ]; then
# Listen only on localhost until the initialization is done
$gosu /usr/bin/clickhouse-server --config-file=$CLICKHOUSE_CONFIG -- --listen_host=127.0.0.1 &
pid="$!"
# check if clickhouse is ready to accept connections
# will try to send ping clickhouse via http_port (max 6 retries, with 1 sec timeout and 1 sec delay between retries)
tries=6
while ! wget --spider -T 1 -q "http://localhost:$HTTP_PORT/ping" 2>/dev/null; do
if [ "$tries" -le "0" ]; then
echo >&2 'ClickHouse init process failed.'
exit 1
fi
tries=$(( tries-1 ))
sleep 1
done
if [ ! -z "$CLICKHOUSE_PASSWORD" ]; then
printf -v WITH_PASSWORD '%s %q' "--password" "$CLICKHOUSE_PASSWORD"
fi
clickhouseclient="clickhouse-client --multiquery -u $CLICKHOUSE_USER $WITH_PASSWORD "
# create default database, if defined
if [ -n "$CLICKHOUSE_DB" ]; then
echo "$0: create database '$CLICKHOUSE_DB'"
"$clickhouseclient" -q "CREATE DATABASE IF NOT EXISTS $CLICKHOUSE_DB";
fi
for f in /docker-entrypoint-initdb.d/*; do
case "$f" in
*.sh)
if [ -x "$f" ]; then
echo "$0: running $f"
"$f"
else
echo "$0: sourcing $f"
. "$f"
fi
;;
*.sql) echo "$0: running $f"; cat "$f" | "$clickhouseclient" ; echo ;;
*.sql.gz) echo "$0: running $f"; gunzip -c "$f" | "$clickhouseclient"; echo ;;
*) echo "$0: ignoring $f" ;;
esac
echo
done
if ! kill -s TERM "$pid" || ! wait "$pid"; then
echo >&2 'Finishing of ClickHouse init process failed.'
exit 1
fi
fi
# if no args passed to `docker run` or first argument start with `--`, then the user is passing clickhouse-server arguments
if [[ $# -lt 1 ]] || [[ "$1" == "--"* ]]; then
exec $gosu /usr/bin/clickhouse-server --config-file=$CLICKHOUSE_CONFIG "$@"
fi
# Otherwise, we assume the user want to run his own process, for example a `bash` shell to explore this image
exec "$@"

View File

@ -271,7 +271,7 @@ TESTS_TO_SKIP=(
00974_query_profiler
# Look at DistributedFilesToInsert, so cannot run in parallel.
01457_DistributedFilesToInsert
01460_DistributedFilesToInsert
01541_max_memory_usage_for_user

View File

@ -77,12 +77,9 @@ function restart
while killall clickhouse-server; do echo . ; sleep 1 ; done
echo all killed
# Disable percpu arenas because they segfault when the process is bound to
# a particular NUMA node: https://github.com/jemalloc/jemalloc/pull/1939
#
# About the jemalloc settings:
# Change the jemalloc settings here.
# https://github.com/jemalloc/jemalloc/wiki/Getting-Started
export MALLOC_CONF="percpu_arena:disabled,confirm_conf:true"
export MALLOC_CONF="confirm_conf:true"
set -m # Spawn servers in their own process groups
@ -211,7 +208,7 @@ function run_tests
echo test "$test_name"
# Don't profile if we're past the time limit.
# Use awk because bash doesn't support floating point arithmetics.
# Use awk because bash doesn't support floating point arithmetic.
profile_seconds=$(awk "BEGIN { print ($profile_seconds_left > 0 ? 10 : 0) }")
TIMEFORMAT=$(printf "$test_name\t%%3R\t%%3U\t%%3S\n")

View File

@ -17,14 +17,24 @@ service clickhouse-server start && sleep 5
if grep -q -- "--use-skip-list" /usr/bin/clickhouse-test; then
SKIP_LIST_OPT="--use-skip-list"
fi
# We can have several additional options so we path them as array because it's
# more idiologically correct.
read -ra ADDITIONAL_OPTIONS <<< "${ADDITIONAL_OPTIONS:-}"
function run_tests()
{
# We can have several additional options so we path them as array because it's
# more idiologically correct.
read -ra ADDITIONAL_OPTIONS <<< "${ADDITIONAL_OPTIONS:-}"
# Skip these tests, because they fail when we rerun them multiple times
if [ "$NUM_TRIES" -gt "1" ]; then
ADDITIONAL_OPTIONS+=('--skip')
ADDITIONAL_OPTIONS+=('00000_no_tests_to_skip')
fi
for i in $(seq 1 $NUM_TRIES); do
clickhouse-test --testname --shard --zookeeper --hung-check --print-time "$SKIP_LIST_OPT" "${ADDITIONAL_OPTIONS[@]}" "$SKIP_TESTS_OPTION" 2>&1 | ts '%Y-%m-%d %H:%M:%S' | tee -a test_output/test_result.txt
clickhouse-test --testname --shard --zookeeper --hung-check --print-time "$SKIP_LIST_OPT" "${ADDITIONAL_OPTIONS[@]}" 2>&1 | ts '%Y-%m-%d %H:%M:%S' | tee -a test_output/test_result.txt
if [ ${PIPESTATUS[0]} -ne "0" ]; then
break;
fi
done
}

View File

@ -35,7 +35,7 @@ RUN apt-get update \
ENV TZ=Europe/Moscow
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
RUN pip3 install urllib3 testflows==1.6.57 docker-compose docker dicttoxml kazoo tzlocal
RUN pip3 install urllib3 testflows==1.6.59 docker-compose docker dicttoxml kazoo tzlocal
ENV DOCKER_CHANNEL stable
ENV DOCKER_VERSION 17.09.1-ce

View File

@ -51,7 +51,7 @@ Optional parameters:
- `rabbitmq_row_delimiter` Delimiter character, which ends the message.
- `rabbitmq_schema` Parameter that must be used if the format requires a schema definition. For example, [Capn Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
- `rabbitmq_num_consumers` The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient.
- `rabbitmq_num_queues` The number of queues per consumer. Default: `1`. Specify more queues if the capacity of one queue per consumer is insufficient.
- `rabbitmq_num_queues` Total number of queues. Default: `1`. Increasing this number can significantly improve performance.
- `rabbitmq_queue_base` - Specify a hint for queue names. Use cases of this setting are described below.
- `rabbitmq_deadletter_exchange` - Specify name for a [dead letter exchange](https://www.rabbitmq.com/dlx.html). You can create another table with this exchange name and collect messages in cases when they are republished to dead letter exchange. By default dead letter exchange is not specified.
- `rabbitmq_persistent` - If set to 1 (true), in insert query delivery mode will be set to 2 (marks messages as 'persistent'). Default: `0`.
@ -148,4 +148,5 @@ Example:
- `_channel_id` - ChannelID, on which consumer, who received the message, was declared.
- `_delivery_tag` - DeliveryTag of the received message. Scoped per channel.
- `_redelivered` - `redelivered` flag of the message.
- `_message_id` - MessageID of the received message; non-empty if was set, when message was published.
- `_message_id` - messageID of the received message; non-empty if was set, when message was published.
- `_timestamp` - timestamp of the received message; non-empty if was set, when message was published.

View File

@ -0,0 +1,69 @@
---
toc_priority: 62
toc_title: OpenTelemetry Support
---
# [experimental] OpenTelemetry Support
[OpenTelemetry](https://opentelemetry.io/) is an open standard for collecting
traces and metrics from distributed application. ClickHouse has some support
for OpenTelemetry.
!!! warning "Warning"
This is an experimental feature that will change in backwards-incompatible ways in the future releases.
## Supplying Trace Context to ClickHouse
ClickHouse accepts trace context HTTP headers, as described by
the [W3C recommendation](https://www.w3.org/TR/trace-context/).
It also accepts trace context over native protocol that is used for
communication between ClickHouse servers or between the client and server.
For manual testing, trace context headers conforming to the Trace Context
recommendation can be supplied to `clickhouse-client` using
`--opentelemetry-traceparent` and `--opentelemetry-tracestate` flags.
If no parent trace context is supplied, ClickHouse can start a new trace, with
probability controlled by the `opentelemetry_start_trace_probability` setting.
## Propagating the Trace Context
The trace context is propagated to downstream services in the following cases:
* Queries to remote ClickHouse servers, such as when using `Distributed` table
engine.
* `URL` table function. Trace context information is sent in HTTP headers.
## Tracing the ClickHouse Itself
ClickHouse creates _trace spans_ for each query and some of the query execution
stages, such as query planning or distributed queries.
To be useful, the tracing information has to be exported to a monitoring system
that supports OpenTelemetry, such as Jaeger or Prometheus. ClickHouse avoids
a dependency on a particular monitoring system, instead only
providing the tracing data conforming to the standard. A natural way to do so
in an SQL RDBMS is a system table. OpenTelemetry trace span information
[required by the standard](https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/overview.md#span)
is stored in the system table called `system.opentelemetry_span_log`.
The table must be enabled in the server configuration, see the `opentelemetry_span_log`
element in the default config file `config.xml`. It is enabled by default.
The table has the following columns:
- `trace_id`
- `span_id`
- `parent_span_id`
- `operation_name`
- `start_time`
- `finish_time`
- `finish_date`
- `attribute.name`
- `attribute.values`
The tags or attributes are saved as two parallel arrays, containing the keys
and values. Use `ARRAY JOIN` to work with them.

View File

@ -80,4 +80,4 @@ Code: 43. DB::Exception: Received from localhost:9000. DB::Exception: Wrong argu
## See Also {#see-also}
- [INTERVAL](../../../sql-reference/operators/index.md#operator-interval) operator
- [toInterval](../../../sql-reference/functions/type-conversion-functions.md#function-tointerval) type convertion functions
- [toInterval](../../../sql-reference/functions/type-conversion-functions.md#function-tointerval) type conversion functions

View File

@ -626,7 +626,12 @@ neighbor(column, offset[, default_value])
```
The result of the function depends on the affected data blocks and the order of data in the block.
If you make a subquery with ORDER BY and call the function from outside the subquery, you can get the expected result.
!!! warning "Warning"
It can reach the neighbor rows only inside the currently processed data block.
The rows order used during the calculation of `neighbor` can differ from the order of rows returned to the user.
To prevent that you can make a subquery with ORDER BY and call the function from outside the subquery.
**Parameters**
@ -731,8 +736,13 @@ Result:
Calculates the difference between successive row values in the data block.
Returns 0 for the first row and the difference from the previous row for each subsequent row.
!!! warning "Warning"
It can reach the previos row only inside the currently processed data block.
The result of the function depends on the affected data blocks and the order of data in the block.
If you make a subquery with ORDER BY and call the function from outside the subquery, you can get the expected result.
The rows order used during the calculation of `runningDifference` can differ from the order of rows returned to the user.
To prevent that you can make a subquery with ORDER BY and call the function from outside the subquery.
Example:
@ -1584,7 +1594,7 @@ isDecimalOverflow(d, [p])
**Parameters**
- `d` — value. [Decimal](../../sql-reference/data-types/decimal.md).
- `p` — precision. Optional. If omitted, the initial presicion of the first argument is used. Using of this paratemer could be helpful for data extraction to another DBMS or file. [UInt8](../../sql-reference/data-types/int-uint.md#uint-ranges).
- `p` — precision. Optional. If omitted, the initial precision of the first argument is used. Using of this paratemer could be helpful for data extraction to another DBMS or file. [UInt8](../../sql-reference/data-types/int-uint.md#uint-ranges).
**Returned values**

View File

@ -169,7 +169,7 @@ SELECT now() AS current_date_time, current_date_time + INTERVAL 4 DAY + INTERVAL
**See Also**
- [Interval](../../sql-reference/data-types/special-data-types/interval.md) data type
- [toInterval](../../sql-reference/functions/type-conversion-functions.md#function-tointerval) type convertion functions
- [toInterval](../../sql-reference/functions/type-conversion-functions.md#function-tointerval) type conversion functions
## Logical Negation Operator {#logical-negation-operator}

View File

@ -138,7 +138,7 @@ ENGINE = <Engine>
...
```
The `Default` codec can be specified to reference default compression which may dependend on different settings (and properties of data) in runtime.
The `Default` codec can be specified to reference default compression which may depend on different settings (and properties of data) in runtime.
Example: `value UInt64 CODEC(Default)` — the same as lack of codec specification.
Also you can remove current CODEC from the column and use default compression from config.xml:

View File

@ -45,7 +45,7 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
- `rabbitmq_row_delimiter` символ-разделитель, который завершает сообщение.
- `rabbitmq_schema` опциональный параметр, необходимый, если используется формат, требующий определения схемы. Например, [Capn Proto](https://capnproto.org/) требует путь к файлу со схемой и название корневого объекта `schema.capnp:Message`.
- `rabbitmq_num_consumers` количество потребителей на таблицу. По умолчанию: `1`. Укажите больше потребителей, если пропускная способность одного потребителя недостаточна.
- `rabbitmq_num_queues` количество очередей на потребителя. По умолчанию: `1`. Укажите больше потребителей, если пропускная способность одной очереди на потребителя недостаточна.
- `rabbitmq_num_queues` количество очередей. По умолчанию: `1`. Большее число очередей может сильно увеличить пропускную способность.
- `rabbitmq_queue_base` - настройка для имен очередей. Сценарии использования описаны ниже.
- `rabbitmq_persistent` - флаг, от которого зависит настройка 'durable' для сообщений при запросах `INSERT`. По умолчанию: `0`.
- `rabbitmq_skip_broken_messages` максимальное количество некорректных сообщений в блоке. Если `rabbitmq_skip_broken_messages = N`, то движок отбрасывает `N` сообщений, которые не получилось обработать. Одно сообщение в точности соответствует одной записи (строке). Значение по умолчанию 0.
@ -140,4 +140,5 @@ Example:
- `_channel_id` - идентификатор канала `ChannelID`, на котором было получено сообщение.
- `_delivery_tag` - значение `DeliveryTag` полученного сообщения. Уникально в рамках одного канала.
- `_redelivered` - флаг `redelivered`. (Не равно нулю, если есть возможность, что сообщение было получено более, чем одним каналом.)
- `_message_id` - значение `MessageID` полученного сообщения. Данное поле непусто, если указано в параметрах при отправке сообщения.
- `_message_id` - значение поля `messageID` полученного сообщения. Данное поле непусто, если указано в параметрах при отправке сообщения.
- `_timestamp` - значение поля `timestamp` полученного сообщения. Данное поле непусто, если указано в параметрах при отправке сообщения.

View File

@ -218,6 +218,8 @@ private:
QueryFuzzer fuzzer;
int query_fuzzer_runs = 0;
std::optional<Suggest> suggest;
/// We will format query_id in interactive mode in various ways, the default is just to print Query id: ...
std::vector<std::pair<String, String>> query_id_formats;
@ -577,10 +579,11 @@ private:
if (print_time_to_stderr)
throw Exception("time option could be specified only in non-interactive mode", ErrorCodes::BAD_ARGUMENTS);
suggest.emplace();
if (server_revision >= Suggest::MIN_SERVER_REVISION && !config().getBool("disable_suggestion", false))
{
/// Load suggestion data from the server.
Suggest::instance().load(connection_parameters, config().getInt("suggestion_limit"));
suggest->load(connection_parameters, config().getInt("suggestion_limit"));
}
/// Load command history if present.
@ -607,7 +610,7 @@ private:
highlight_callback = highlight;
ReplxxLineReader lr(
Suggest::instance(),
*suggest,
history_file,
config().has("multiline"),
query_extenders,
@ -615,7 +618,7 @@ private:
highlight_callback);
#elif defined(USE_READLINE) && USE_READLINE
ReadlineLineReader lr(Suggest::instance(), history_file, config().has("multiline"), query_extenders, query_delimiters);
ReadlineLineReader lr(*suggest, history_file, config().has("multiline"), query_extenders, query_delimiters);
#else
LineReader lr(history_file, config().has("multiline"), query_extenders, query_delimiters);
#endif
@ -2324,6 +2327,8 @@ public:
("log-level", po::value<std::string>(), "client log level")
("server_logs_file", po::value<std::string>(), "put server logs into specified file")
("query-fuzzer-runs", po::value<int>()->default_value(0), "query fuzzer runs")
("opentelemetry-traceparent", po::value<std::string>(), "OpenTelemetry traceparent header as described by W3C Trace Context recommendation")
("opentelemetry-tracestate", po::value<std::string>(), "OpenTelemetry tracestate header as described by W3C Trace Context recommendation")
;
Settings cmd_settings;
@ -2492,6 +2497,25 @@ public:
ignore_error = true;
}
if (options.count("opentelemetry-traceparent"))
{
std::string traceparent = options["opentelemetry-traceparent"].as<std::string>();
std::string error;
if (!context.getClientInfo().parseTraceparentHeader(
traceparent, error))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Cannot parse OpenTelemetry traceparent '{}': {}",
traceparent, error);
}
}
if (options.count("opentelemetry-tracestate"))
{
context.getClientInfo().opentelemetry_tracestate =
options["opentelemetry-tracestate"].as<std::string>();
}
argsToConfig(common_arguments, config(), 100);
clearPasswordFromCommandLine(argc, argv);

View File

@ -18,10 +18,11 @@ namespace ErrorCodes
class Suggest : public LineReader::Suggest, boost::noncopyable
{
public:
static Suggest & instance()
Suggest();
~Suggest()
{
static Suggest instance;
return instance;
if (loading_thread.joinable())
loading_thread.join();
}
void load(const ConnectionParameters & connection_parameters, size_t suggestion_limit);
@ -30,12 +31,6 @@ public:
static constexpr int MIN_SERVER_REVISION = 54406;
private:
Suggest();
~Suggest()
{
if (loading_thread.joinable())
loading_thread.join();
}
void loadImpl(Connection & connection, const ConnectionTimeouts & timeouts, size_t suggestion_limit);
void fetch(Connection & connection, const ConnectionTimeouts & timeouts, const std::string & query);

View File

@ -37,6 +37,7 @@
#include <boost/program_options.hpp>
#include <common/argsToConfig.h>
#include <Common/TerminalSize.h>
#include <Common/randomSeed.h>
#include <filesystem>
@ -47,9 +48,9 @@ namespace DB
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
extern const int SYNTAX_ERROR;
extern const int CANNOT_LOAD_CONFIG;
extern const int FILE_ALREADY_EXISTS;
}
@ -121,31 +122,43 @@ void LocalServer::tryInitPath()
}
else
{
// Default unique path in the system temporary directory.
const auto tmp = std::filesystem::temp_directory_path();
const auto default_path = tmp
/ fmt::format("clickhouse-local-{}", getpid());
// The path is not provided explicitly - use a unique path in the system temporary directory
// (or in the current dir if temporary don't exist)
Poco::Logger * log = &logger();
std::filesystem::path parent_folder;
std::filesystem::path default_path;
try
{
// try to guess a tmp folder name, and check if it's a directory (throw exception otherwise)
parent_folder = std::filesystem::temp_directory_path();
}
catch (const std::filesystem::filesystem_error& e)
{
// tmp folder don't exists? misconfiguration? chroot?
LOG_DEBUG(log, "Can not get temporary folder: {}", e.what());
parent_folder = std::filesystem::current_path();
std::filesystem::is_directory(parent_folder); // that will throw an exception if it's not a directory
LOG_DEBUG(log, "Will create working directory inside current directory: {}", parent_folder.string());
}
/// we can have another clickhouse-local running simultaneously, even with the same PID (for ex. - several dockers mounting the same folder)
/// or it can be some leftovers from other clickhouse-local runs
/// as we can't accurately distinguish those situations we don't touch any existent folders
/// we just try to pick some free name for our working folder
default_path = parent_folder / fmt::format("clickhouse-local-{}-{}-{}", getpid(), time(nullptr), randomSeed());
if (exists(default_path))
{
// This is a directory that is left by a previous run of
// clickhouse-local that had the same pid and did not complete
// correctly. Remove it, with an additional sanity check.
if (!std::filesystem::equivalent(default_path.parent_path(), tmp))
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"The temporary directory of clickhouse-local '{}' is not"
" inside the system temporary directory '{}'. Will not delete"
" it", default_path.string(), tmp.string());
}
remove_all(default_path);
}
throw Exception(ErrorCodes::FILE_ALREADY_EXISTS, "Unsuccessful attempt to create working directory: {} exist!", default_path.string());
create_directory(default_path);
temporary_directory_to_delete = default_path;
path = default_path.string();
LOG_DEBUG(log, "Working directory created: {}", path);
}
if (path.back() != '/')
@ -438,23 +451,12 @@ void LocalServer::setupUsers()
void LocalServer::cleanup()
{
// Delete the temporary directory if needed. Just in case, check that it is
// in the system temporary directory, not to delete user data if there is a
// bug.
// Delete the temporary directory if needed.
if (temporary_directory_to_delete)
{
const auto tmp = std::filesystem::temp_directory_path();
const auto dir = *temporary_directory_to_delete;
temporary_directory_to_delete.reset();
if (!std::filesystem::equivalent(dir.parent_path(), tmp))
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"The temporary directory of clickhouse-local '{}' is not inside"
" the system temporary directory '{}'. Will not delete it",
dir.string(), tmp.string());
}
LOG_DEBUG(&logger(), "Removing temporary directory: {}", dir.string());
remove_all(dir);
}
}

View File

@ -270,7 +270,7 @@
This parameter is mandatory and cannot be empty.
roles - section with a list of locally defined roles that will be assigned to each user retrieved from the LDAP server.
If no roles are specified, user will not be able to perform any actions after authentication.
If any of the listed roles is not defined locally at the time of authentication, the authenthication attept
If any of the listed roles is not defined locally at the time of authentication, the authenthication attempt
will fail as if the provided password was incorrect.
Example:
<ldap>
@ -628,6 +628,31 @@
<flush_interval_milliseconds>60000</flush_interval_milliseconds>
</asynchronous_metric_log>
<!--
OpenTelemetry log contains OpenTelemetry trace spans.
-->
<opentelemetry_span_log>
<!--
The default table creation code is insufficient, this <engine> spec
is a workaround. There is no 'event_time' for this log, but two times,
start and finish. It is sorted by finish time, to avoid inserting
data too far away in the past (probably we can sometimes insert a span
that is seconds earlier than the last span in the table, due to a race
between several spans inserted in parallel). This gives the spans a
global order that we can use to e.g. retry insertion into some external
system.
-->
<engine>
engine MergeTree
partition by toYYYYMM(finish_date)
order by (finish_date, finish_time_us, trace_id)
</engine>
<database>system</database>
<table>opentelemetry_span_log</table>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</opentelemetry_span_log>
<!-- Crash log. Stores stack traces for fatal errors.
This table is normally empty. -->
<crash_log>

View File

@ -585,7 +585,7 @@ void IAccessStorage::throwInvalidPassword()
void IAccessStorage::throwCannotAuthenticate(const String & user_name)
{
/// We use the same message for all authentification failures because we don't want to give away any unnecessary information for security reasons,
/// We use the same message for all authentication failures because we don't want to give away any unnecessary information for security reasons,
/// only the log will show the exact reason.
throw Exception(user_name + ": Authentication failed: password is incorrect or there is no user with such name", ErrorCodes::AUTHENTICATION_FAILED);
}

View File

@ -31,7 +31,7 @@ AggregateFunctionPtr createAggregateFunctionRate(const std::string & name, const
void registerAggregateFunctionRate(AggregateFunctionFactory & factory)
{
factory.registerFunction("boundingRatio", createAggregateFunctionRate, AggregateFunctionFactory::CaseInsensitive);
factory.registerFunction("boundingRatio", createAggregateFunctionRate);
}
}

View File

@ -296,7 +296,7 @@ public:
{
typename ColumnVector<T>::Container & data_to = assert_cast<ColumnVector<T> &>(arr_to.getData()).getData();
if constexpr (is_big_int_v<T>)
// is data_to empty? we should probaly use std::vector::insert then
// is data_to empty? we should probably use std::vector::insert then
for (auto it = this->data(place).value.begin(); it != this->data(place).value.end(); it++)
data_to.push_back(*it);
else

View File

@ -45,7 +45,7 @@ AggregateFunctionPtr createAggregateFunctionRankCorrelation(const std::string &
void registerAggregateFunctionRankCorrelation(AggregateFunctionFactory & factory)
{
factory.registerFunction("rankCorr", createAggregateFunctionRankCorrelation, AggregateFunctionFactory::CaseInsensitive);
factory.registerFunction("rankCorr", createAggregateFunctionRankCorrelation);
}
}

View File

@ -32,7 +32,7 @@ AggregateFunctionPtr createAggregateFunctionRetention(const std::string & name,
void registerAggregateFunctionRetention(AggregateFunctionFactory & factory)
{
factory.registerFunction("retention", createAggregateFunctionRetention, AggregateFunctionFactory::CaseInsensitive);
factory.registerFunction("retention", createAggregateFunctionRetention);
}
}

View File

@ -47,6 +47,6 @@ AggregateFunctionPtr createAggregateFunctionStudentTTest(const std::string & nam
void registerAggregateFunctionStudentTTest(AggregateFunctionFactory & factory)
{
factory.registerFunction("studentTTest", createAggregateFunctionStudentTTest, AggregateFunctionFactory::CaseInsensitive);
factory.registerFunction("studentTTest", createAggregateFunctionStudentTTest);
}
}

View File

@ -20,9 +20,17 @@
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int BAD_ARGUMENTS;
}
#if defined(OS_DARWIN)
extern "C"
{
double lgammal_r(double x, int * signgamp);
}
#endif
namespace DB
{
@ -150,7 +158,8 @@ struct AggregateFunctionStudentTTestData final
const Float64 t = getTStatisticSquared();
auto f = [&v] (double x) { return std::pow(x, v/2 - 1) / std::sqrt(1 - x); };
Float64 numenator = integrateSimpson(0, v / (t + v), f);
Float64 denominator = std::exp(std::lgammal(v/2) + std::lgammal(0.5) - std::lgammal(v/2 + 0.5));
int unused;
Float64 denominator = std::exp(lgammal_r(v / 2, &unused) + lgammal_r(0.5, &unused) - lgammal_r(v / 2 + 0.5, &unused));
return numenator / denominator;
}

View File

@ -28,8 +28,8 @@ namespace
void registerAggregateFunctionTimeSeriesGroupSum(AggregateFunctionFactory & factory)
{
factory.registerFunction("timeSeriesGroupSum", createAggregateFunctionTimeSeriesGroupSum<false>, AggregateFunctionFactory::CaseInsensitive);
factory.registerFunction("timeSeriesGroupRateSum", createAggregateFunctionTimeSeriesGroupSum<true>, AggregateFunctionFactory::CaseInsensitive);
factory.registerFunction("timeSeriesGroupSum", createAggregateFunctionTimeSeriesGroupSum<false>);
factory.registerFunction("timeSeriesGroupRateSum", createAggregateFunctionTimeSeriesGroupSum<true>);
}
}

View File

@ -44,6 +44,6 @@ AggregateFunctionPtr createAggregateFunctionWelchTTest(const std::string & name,
void registerAggregateFunctionWelchTTest(AggregateFunctionFactory & factory)
{
factory.registerFunction("welchTTest", createAggregateFunctionWelchTTest, AggregateFunctionFactory::CaseInsensitive);
factory.registerFunction("welchTTest", createAggregateFunctionWelchTTest);
}
}

View File

@ -18,11 +18,20 @@
#include <type_traits>
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int BAD_ARGUMENTS;
}
#if defined(OS_DARWIN)
extern "C"
{
double lgammal_r(double x, int * signgamp);
}
#endif
namespace DB
{
@ -159,9 +168,10 @@ struct AggregateFunctionWelchTTestData final
{
const Float64 v = getDegreesOfFreedom();
const Float64 t = getTStatisticSquared();
auto f = [&v] (double x) { return std::pow(x, v/2 - 1) / std::sqrt(1 - x); };
auto f = [&v] (double x) { return std::pow(x, v / 2 - 1) / std::sqrt(1 - x); };
Float64 numenator = integrateSimpson(0, v / (t + v), f);
Float64 denominator = std::exp(std::lgammal(v/2) + std::lgammal(0.5) - std::lgammal(v/2 + 0.5));
int unused;
Float64 denominator = std::exp(lgammal_r(v / 2, &unused) + lgammal_r(0.5, &unused) - lgammal_r(v / 2 + 0.5, &unused));
return numenator / denominator;
}

View File

@ -58,7 +58,7 @@ AggregateFunctionPtr createAggregateFunctionWindowFunnel(const std::string & nam
void registerAggregateFunctionWindowFunnel(AggregateFunctionFactory & factory)
{
factory.registerFunction("windowFunnel", createAggregateFunctionWindowFunnel<AggregateFunctionWindowFunnelData>, AggregateFunctionFactory::CaseInsensitive);
factory.registerFunction("windowFunnel", createAggregateFunctionWindowFunnel<AggregateFunctionWindowFunnelData>);
}
}

View File

@ -39,8 +39,8 @@ namespace ErrorCodes
namespace detail
{
const size_t DEFAULT_SAMPLE_COUNT = 8192;
const auto MAX_SKIP_DEGREE = sizeof(UInt32) * 8;
const size_t DEFAULT_MAX_SAMPLE_SIZE = 8192;
const auto MAX_SKIP_DEGREE = sizeof(UInt32) * 8;
}
/// What if there is not a single value - throw an exception, or return 0 or NaN in the case of double?
@ -50,6 +50,7 @@ enum class ReservoirSamplerDeterministicOnEmpty
RETURN_NAN_OR_ZERO,
};
template <typename T,
ReservoirSamplerDeterministicOnEmpty OnEmpty = ReservoirSamplerDeterministicOnEmpty::THROW>
class ReservoirSamplerDeterministic
@ -60,8 +61,8 @@ class ReservoirSamplerDeterministic
}
public:
ReservoirSamplerDeterministic(const size_t sample_count_ = DEFAULT_SAMPLE_COUNT)
: sample_count{sample_count_}
ReservoirSamplerDeterministic(const size_t max_sample_size_ = detail::DEFAULT_MAX_SAMPLE_SIZE)
: max_sample_size{max_sample_size_}
{
}
@ -131,8 +132,8 @@ public:
void merge(const ReservoirSamplerDeterministic & b)
{
if (sample_count != b.sample_count)
throw Poco::Exception("Cannot merge ReservoirSamplerDeterministic's with different sample_count");
if (max_sample_size != b.max_sample_size)
throw Poco::Exception("Cannot merge ReservoirSamplerDeterministic's with different max sample size");
sorted = false;
if (b.skip_degree > skip_degree)
@ -150,11 +151,16 @@ public:
void read(DB::ReadBuffer & buf)
{
DB::readIntBinary<size_t>(sample_count, buf);
size_t size = 0;
DB::readIntBinary<size_t>(size, buf);
DB::readIntBinary<size_t>(total_values, buf);
samples.resize(std::min(total_values, sample_count));
for (size_t i = 0; i < samples.size(); ++i)
/// Compatibility with old versions.
if (size > total_values)
size = total_values;
samples.resize(size);
for (size_t i = 0; i < size; ++i)
DB::readPODBinary(samples[i], buf);
sorted = false;
@ -162,10 +168,11 @@ public:
void write(DB::WriteBuffer & buf) const
{
DB::writeIntBinary<size_t>(sample_count, buf);
size_t size = samples.size();
DB::writeIntBinary<size_t>(size, buf);
DB::writeIntBinary<size_t>(total_values, buf);
for (size_t i = 0; i < std::min(sample_count, total_values); ++i)
for (size_t i = 0; i < size; ++i)
DB::writePODBinary(samples[i], buf);
}
@ -174,18 +181,19 @@ private:
using Element = std::pair<T, UInt32>;
using Array = DB::PODArray<Element, 64>;
size_t sample_count;
size_t total_values{};
bool sorted{};
const size_t max_sample_size; /// Maximum amount of stored values.
size_t total_values = 0; /// How many values were inserted (regardless if they remain in sample or not).
bool sorted = false;
Array samples;
UInt8 skip_degree{};
UInt8 skip_degree = 0; /// The number N determining that we save only one per 2^N elements in average.
void insertImpl(const T & v, const UInt32 hash)
{
/// @todo why + 1? I don't quite recall
while (samples.size() + 1 >= sample_count)
/// Make a room for plus one element.
while (samples.size() >= max_sample_size)
{
if (++skip_degree > detail::MAX_SKIP_DEGREE)
++skip_degree;
if (skip_degree > detail::MAX_SKIP_DEGREE)
throw DB::Exception{"skip_degree exceeds maximum value", DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED};
thinOut();
}
@ -195,35 +203,17 @@ private:
void thinOut()
{
auto size = samples.size();
for (size_t i = 0; i < size;)
{
if (!good(samples[i].second))
{
/// swap current element with the last one
std::swap(samples[size - 1], samples[i]);
--size;
}
else
++i;
}
if (size != samples.size())
{
samples.resize(size);
samples.resize(std::distance(samples.begin(),
std::remove_if(samples.begin(), samples.end(), [this](const auto & elem){ return !good(elem.second); })));
sorted = false;
}
}
void sortIfNeeded()
{
if (sorted)
return;
std::sort(samples.begin(), samples.end(), [](const auto & lhs, const auto & rhs) { return lhs.first < rhs.first; });
sorted = true;
std::sort(samples.begin(), samples.end(), [] (const std::pair<T, UInt32> & lhs, const std::pair<T, UInt32> & rhs)
{
return lhs.first < rhs.first;
});
}
template <typename ResultType>

View File

@ -6,6 +6,8 @@
#include <Common/WeakHash.h>
#include <Common/HashTable/Hash.h>
#include <common/defines.h>
#if defined(MEMORY_SANITIZER)
#include <sanitizer/msan_interface.h>
#endif

View File

@ -82,7 +82,7 @@ public:
* @see DB::ColumnUnique
*
* The most common example uses https://clickhouse.tech/docs/en/sql-reference/data-types/lowcardinality/ columns.
* Consider data type @e LC(String). The inner type here is @e String which is more or less a contigous memory
* Consider data type @e LC(String). The inner type here is @e String which is more or less a contiguous memory
* region, so it can be easily represented as a @e StringRef. So we pass that ref to this function and get its
* index in the dictionary, which can be used to operate with the indices column.
*/

View File

@ -54,6 +54,7 @@
M(LocalThread, "Number of threads in local thread pools. Should be similar to GlobalThreadActive.") \
M(LocalThreadActive, "Number of threads in local thread pools running a task.") \
M(DistributedFilesToInsert, "Number of pending files to process for asynchronous insertion into Distributed tables. Number of files for every shard is summed.") \
M(TablesToDropQueueSize, "Number of dropped tables, that are waiting for background data removal.") \
namespace CurrentMetrics
{

View File

@ -5,15 +5,15 @@
namespace DB
{
/// Helper class, that recieves file descriptor and does fsync for it in destructor.
/// Helper class, that receives file descriptor and does fsync for it in destructor.
/// It's used to keep descriptor open, while doing some operations with it, and do fsync at the end.
/// Guaranties of sequence 'close-reopen-fsync' may depend on kernel version.
/// Source: linux-fsdevel mailing-list https://marc.info/?l=linux-fsdevel&m=152535409207496
class FileSyncGuard
{
public:
/// NOTE: If you have already opened descriptor, it's preffered to use
/// this constructor instead of construnctor with path.
/// NOTE: If you have already opened descriptor, it's preferred to use
/// this constructor instead of constructor with path.
FileSyncGuard(const DiskPtr & disk_, int fd_) : disk(disk_), fd(fd_) {}
FileSyncGuard(const DiskPtr & disk_, const String & path)

View File

@ -234,13 +234,13 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
std::is_same_v<Thread, std::thread> ? CurrentMetrics::GlobalThreadActive : CurrentMetrics::LocalThreadActive);
job();
/// job should be reseted before decrementing scheduled_jobs to
/// job should be reset before decrementing scheduled_jobs to
/// ensure that the Job destroyed before wait() returns.
job = {};
}
catch (...)
{
/// job should be reseted before decrementing scheduled_jobs to
/// job should be reset before decrementing scheduled_jobs to
/// ensure that the Job destroyed before wait() returns.
job = {};

View File

@ -152,7 +152,7 @@ void TraceCollector::run()
if (trace_log)
{
// time and time_in_microseconds are both being constructed from the same timespec so that the
// times will be equal upto the precision of a second.
// times will be equal up to the precision of a second.
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);

View File

@ -1288,7 +1288,7 @@ void ZooKeeper::receiveEvent()
response->removeRootPath(root_path);
}
/// Instead of setting the watch in sendEvent, set it in receiveEvent becuase need to check the response.
/// Instead of setting the watch in sendEvent, set it in receiveEvent because need to check the response.
/// The watch shouldn't be set if the node does not exist and it will never exist like sequential ephemeral nodes.
/// By using getData() instead of exists(), a watch won't be set if the node doesn't exist.
if (request_info.watch)

View File

@ -185,9 +185,9 @@ void CompressedReadBufferBase::decompress(char * to, size_t size_decompressed, s
}
else
{
throw Exception("Data compressed with different methods, given method byte "
throw Exception("Data compressed with different methods, given method byte 0x"
+ getHexUIntLowercase(method)
+ ", previous method byte "
+ ", previous method byte 0x"
+ getHexUIntLowercase(codec->getMethodByte()),
ErrorCodes::CANNOT_DECOMPRESS);
}

View File

@ -87,7 +87,7 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(const ASTPtr
else
throw Exception("Unexpected AST element for compression codec", ErrorCodes::UNEXPECTED_AST_STRUCTURE);
/// Default codec replaced with current default codec which may dependend on different
/// Default codec replaced with current default codec which may depend on different
/// settings (and properties of data) in runtime.
CompressionCodecPtr result_codec;
if (codec_family_name == DEFAULT_CODEC_NAME)

View File

@ -26,7 +26,7 @@ void ICompressionCodec::setCodecDescription(const String & codec_name, const AST
std::shared_ptr<ASTFunction> result = std::make_shared<ASTFunction>();
result->name = "CODEC";
/// Special case for codec Multiple, which doens't have name. It's just list
/// Special case for codec Multiple, which doesn't have name. It's just list
/// of other codecs.
if (codec_name.empty())
{

View File

@ -67,11 +67,14 @@
/// Minimum revision supporting SettingsBinaryFormat::STRINGS.
#define DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS 54429
/// Minimum revision supporting OpenTelemetry
#define DBMS_MIN_REVISION_WITH_OPENTELEMETRY 54442
/// Mininum revision supporting interserver secret.
#define DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET 54441
/// Version of ClickHouse TCP protocol. Increment it manually when you change the protocol.
#define DBMS_TCP_PROTOCOL_VERSION 54441
#define DBMS_TCP_PROTOCOL_VERSION 54442
/// The boundary on which the blocks for asynchronous file operations should be aligned.
#define DEFAULT_AIO_FILE_BLOCK_SIZE 4096

View File

@ -705,7 +705,7 @@ namespace MySQLReplication
break;
}
default:
throw ReplicationError("Position update with unsupport event", ErrorCodes::LOGICAL_ERROR);
throw ReplicationError("Position update with unsupported event", ErrorCodes::LOGICAL_ERROR);
}
}

View File

@ -222,6 +222,7 @@ class IColumn;
M(UInt64, query_profiler_cpu_time_period_ns, 1000000000, "Period for CPU clock timer of query profiler (in nanoseconds). Set 0 value to turn off the CPU clock query profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
M(Bool, metrics_perf_events_enabled, false, "If enabled, some of the perf events will be measured throughout queries' execution.", 0) \
M(String, metrics_perf_events_list, "", "Comma separated list of perf metrics that will be measured throughout queries' execution. Empty means all events. See PerfEventInfo in sources for the available events.", 0) \
M(Float, opentelemetry_start_trace_probability, 0., "Probability to start an OpenTelemetry trace for an incoming query.", 0) \
\
\
/** Limits during query execution are part of the settings. \
@ -389,7 +390,7 @@ class IColumn;
M(Bool, alter_partition_verbose_result, false, "Output information about affected parts. Currently works only for FREEZE and ATTACH commands.", 0) \
M(Bool, allow_experimental_database_materialize_mysql, false, "Allow to create database with Engine=MaterializeMySQL(...).", 0) \
M(Bool, system_events_show_zero_values, false, "Include all metrics, even with zero values", 0) \
M(MySQLDataTypesSupport, mysql_datatypes_support_level, 0, "Which MySQL types should be converted to corresponding ClickHouse types (rather than being represented as String). Can be empty or any combination of 'decimal' or 'datetime64'. When empty MySQL's DECIMAL and DATETIME/TIMESTAMP with non-zero precison are seen as String on ClickHouse's side.", 0) \
M(MySQLDataTypesSupport, mysql_datatypes_support_level, 0, "Which MySQL types should be converted to corresponding ClickHouse types (rather than being represented as String). Can be empty or any combination of 'decimal' or 'datetime64'. When empty MySQL's DECIMAL and DATETIME/TIMESTAMP with non-zero precision are seen as String on ClickHouse's side.", 0) \
\
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
\

View File

@ -29,7 +29,7 @@ constexpr size_t min(size_t x, size_t y)
}
/// @note There's no auto scale to larger big integer, only for integral ones.
/// It's cause of (U)Int64 backward compatibilty and very big performance penalties.
/// It's cause of (U)Int64 backward compatibility and very big performance penalties.
constexpr size_t nextSize(size_t size)
{
if (size < 8)

View File

@ -116,7 +116,7 @@ void DatabaseAtomic::dropTable(const Context &, const String & table_name, bool
}
tryRemoveSymlink(table_name);
/// Remove the inner table (if any) to avoid deadlock
/// (due to attemp to execute DROP from the worker thread)
/// (due to attempt to execute DROP from the worker thread)
if (auto * mv = dynamic_cast<StorageMaterializedView *>(table.get()))
mv->dropInnerTable(no_delay);
/// Notify DatabaseCatalog that table was dropped. It will remove table data in background.
@ -261,21 +261,29 @@ void DatabaseAtomic::commitCreateTable(const ASTCreateQuery & query, const Stora
{
DetachedTables not_in_use;
auto table_data_path = getTableDataPath(query);
bool locked_uuid = false;
try
{
std::unique_lock lock{mutex};
if (query.database != database_name)
throw Exception(ErrorCodes::UNKNOWN_DATABASE, "Database was renamed to `{}`, cannot create table in `{}`",
database_name, query.database);
/// Do some checks before renaming file from .tmp to .sql
not_in_use = cleanupDetachedTables();
assertDetachedTableNotInUse(query.uuid);
renameNoReplace(table_metadata_tmp_path, table_metadata_path);
/// We will get en exception if some table with the same UUID exists (even if it's detached table or table from another database)
DatabaseCatalog::instance().addUUIDMapping(query.uuid);
locked_uuid = true;
/// It throws if `table_metadata_path` already exists (it's possible if table was detached)
renameNoReplace(table_metadata_tmp_path, table_metadata_path); /// Commit point (a sort of)
attachTableUnlocked(query.table, table, lock); /// Should never throw
table_name_to_path.emplace(query.table, table_data_path);
}
catch (...)
{
Poco::File(table_metadata_tmp_path).remove();
if (locked_uuid)
DatabaseCatalog::instance().removeUUIDMappingFinally(query.uuid);
throw;
}
tryCreateSymlink(query.table, table_data_path);

View File

@ -329,10 +329,4 @@ const StoragePtr & DatabaseLazyIterator::table() const
return current_storage;
}
void DatabaseLazyIterator::reset()
{
if (current_storage)
current_storage.reset();
}
}

View File

@ -22,6 +22,10 @@ public:
String getEngineName() const override { return "Lazy"; }
bool canContainMergeTreeTables() const override { return false; }
bool canContainDistributedTables() const override { return false; }
void loadStoredObjects(
Context & context,
bool has_force_restore_data_flag, bool force_attach) override;
@ -122,7 +126,6 @@ public:
bool isValid() const override;
const String & name() const override;
const StoragePtr & table() const override;
void reset() override;
private:
const DatabaseLazy & database;

View File

@ -53,6 +53,9 @@ void DatabaseMemory::dropTable(
}
table->is_dropped = true;
create_queries.erase(table_name);
UUID table_uuid = table->getStorageID().uuid;
if (table_uuid != UUIDHelpers::Nil)
DatabaseCatalog::instance().removeUUIDMappingFinally(table_uuid);
}
ASTPtr DatabaseMemory::getCreateDatabaseQuery() const

View File

@ -223,6 +223,10 @@ void DatabaseWithDictionaries::removeDictionary(const Context &, const String &
attachDictionary(dictionary_name, attach_info);
throw;
}
UUID dict_uuid = attach_info.create_query->as<ASTCreateQuery>()->uuid;
if (dict_uuid != UUIDHelpers::Nil)
DatabaseCatalog::instance().removeUUIDMappingFinally(dict_uuid);
}
DatabaseDictionariesIteratorPtr DatabaseWithDictionaries::getDictionariesIterator(const FilterByNameFunction & filter_by_dictionary_name)

View File

@ -44,8 +44,6 @@ public:
/// (a database with support for lazy tables loading
/// - it maintains a list of tables but tables are loaded lazily).
virtual const StoragePtr & table() const = 0;
/// Reset reference counter to the StoragePtr.
virtual void reset() = 0;
virtual ~IDatabaseTablesIterator() = default;
@ -95,8 +93,6 @@ public:
const String & name() const override { return it->first; }
const StoragePtr & table() const override { return it->second; }
void reset() override { it->second.reset(); }
};
/// Copies list of dictionaries and iterates through such snapshot.
@ -151,6 +147,10 @@ public:
/// Get name of database engine.
virtual String getEngineName() const = 0;
virtual bool canContainMergeTreeTables() const { return true; }
virtual bool canContainDistributedTables() const { return true; }
/// Load a set of existing tables.
/// You can call only once, right after the object is created.
virtual void loadStoredObjects(Context & /*context*/, bool /*has_force_restore_data_flag*/, bool /*force_attach*/ = false) {}

View File

@ -11,7 +11,7 @@ class Context;
class ASTStorage;
#define LIST_OF_CONNECTION_MYSQL_SETTINGS(M) \
M(MySQLDataTypesSupport, mysql_datatypes_support_level, 0, "Which MySQL types should be converted to corresponding ClickHouse types (rather than being represented as String). Can be empty or any combination of 'decimal' or 'datetime64'. When empty MySQL's DECIMAL and DATETIME/TIMESTAMP with non-zero precison are seen as String on ClickHouse's side.", 0) \
M(MySQLDataTypesSupport, mysql_datatypes_support_level, 0, "Which MySQL types should be converted to corresponding ClickHouse types (rather than being represented as String). Can be empty or any combination of 'decimal' or 'datetime64'. When empty MySQL's DECIMAL and DATETIME/TIMESTAMP with non-zero precision are seen as String on ClickHouse's side.", 0) \
/// Settings that should not change after the creation of a database.
#define APPLY_FOR_IMMUTABLE_CONNECTION_MYSQL_SETTINGS(M) \

View File

@ -42,6 +42,12 @@ public:
String getEngineName() const override { return "MySQL"; }
bool canContainMergeTreeTables() const override { return false; }
bool canContainDistributedTables() const override { return false; }
bool shouldBeEmptyOnDetach() const override { return false; }
bool empty() const override;
DatabaseTablesIteratorPtr getTablesIterator(const Context & context, const FilterByNameFunction & filter_by_table_name) override;

View File

@ -28,11 +28,6 @@ public:
return tables.emplace_back(storage);
}
void reset() override
{
tables.clear();
}
UUID uuid() const override { return nested_iterator->uuid(); }
DatabaseMaterializeTablesIterator(DatabaseTablesIteratorPtr nested_iterator_, DatabaseMaterializeMySQL * database_)

View File

@ -8,7 +8,6 @@ namespace DB
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_SETTING;
}
@ -25,9 +24,8 @@ void MaterializeMySQLSettings::loadFromQuery(ASTStorage & storage_def)
catch (Exception & e)
{
if (e.code() == ErrorCodes::UNKNOWN_SETTING)
throw Exception(e.message() + " for database " + storage_def.engine->name, ErrorCodes::BAD_ARGUMENTS);
else
e.rethrow();
e.addMessage("for database " + storage_def.engine->name);
throw;
}
}
else

View File

@ -326,7 +326,7 @@ struct DecimalBinaryOperation
}
private:
/// there's implicit type convertion here
/// there's implicit type conversion here
static NativeResultType apply(NativeResultType a, NativeResultType b)
{
if constexpr (can_overflow && check_overflow)

View File

@ -577,7 +577,7 @@ private:
auto input_value = input_column->getDataAt(r);
if constexpr (mode == CipherMode::RFC5116_AEAD_AES_GCM)
{
// empty plaintext results in empty ciphertext + tag, means there should be atleast tag_size bytes.
// empty plaintext results in empty ciphertext + tag, means there should be at least tag_size bytes.
if (input_value.size < tag_size)
throw Exception(fmt::format("Encrypted data is too short: only {} bytes, "
"should contain at least {} bytes of a tag.",

View File

@ -131,7 +131,7 @@ public:
for (size_t i = 0; i < input_rows_count; ++i)
{
/// Virtual call is Ok (neglible comparing to the rest of calculations).
/// Virtual call is Ok (negligible comparing to the rest of calculations).
Float64 value = arguments[0].column->getFloat64(i);
bool is_negative = value < 0;

View File

@ -22,7 +22,7 @@ namespace
{
/// Returns 1 if and Decimal value has more digits then it's Precision allow, 0 otherwise.
/// Precision could be set as second argument or omitted. If ommited function uses Decimal presicion of the first argument.
/// Precision could be set as second argument or omitted. If omitted function uses Decimal precision of the first argument.
class FunctionIsDecimalOverflow : public IFunction
{
public:

View File

@ -4,7 +4,6 @@
#if defined(OS_DARWIN)
extern "C"
{
/// Is defined in libglibc-compatibility.a
double lgamma_r(double x, int * signgamp);
}
#endif

View File

@ -744,12 +744,12 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data &
{
function_builder = FunctionFactory::instance().get(node.name, data.context);
}
catch (DB::Exception & e)
catch (Exception & e)
{
auto hints = AggregateFunctionFactory::instance().getHints(node.name);
if (!hints.empty())
e.addMessage("Or unknown aggregate function " + node.name + ". Maybe you meant: " + toString(hints));
e.rethrow();
throw;
}
Names argument_names;

View File

@ -233,8 +233,8 @@ void AsynchronousMetrics::update()
for (const auto & db : databases)
{
/// Lazy database can not contain MergeTree tables
if (db.second->getEngineName() == "Lazy")
/// Check if database can contain MergeTree tables
if (!db.second->canContainMergeTreeTables())
continue;
for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next())
{

View File

@ -59,6 +59,26 @@ void ClientInfo::write(WriteBuffer & out, const UInt64 server_protocol_revision)
if (server_protocol_revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH)
writeVarUInt(client_version_patch, out);
}
if (server_protocol_revision >= DBMS_MIN_REVISION_WITH_OPENTELEMETRY)
{
if (opentelemetry_trace_id)
{
// Have OpenTelemetry header.
writeBinary(uint8_t(1), out);
// No point writing these numbers with variable length, because they
// are random and will probably require the full length anyway.
writeBinary(opentelemetry_trace_id, out);
writeBinary(opentelemetry_span_id, out);
writeBinary(opentelemetry_tracestate, out);
writeBinary(opentelemetry_trace_flags, out);
}
else
{
// Don't have OpenTelemetry header.
writeBinary(uint8_t(0), out);
}
}
}
@ -112,6 +132,19 @@ void ClientInfo::read(ReadBuffer & in, const UInt64 client_protocol_revision)
else
client_version_patch = client_tcp_protocol_version;
}
if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_OPENTELEMETRY)
{
uint8_t have_trace_id = 0;
readBinary(have_trace_id, in);
if (have_trace_id)
{
readBinary(opentelemetry_trace_id, in);
readBinary(opentelemetry_span_id, in);
readBinary(opentelemetry_tracestate, in);
readBinary(opentelemetry_trace_flags, in);
}
}
}
@ -122,6 +155,74 @@ void ClientInfo::setInitialQuery()
client_name = (DBMS_NAME " ") + client_name;
}
bool ClientInfo::parseTraceparentHeader(const std::string & traceparent,
std::string & error)
{
uint8_t version = -1;
uint64_t trace_id_high = 0;
uint64_t trace_id_low = 0;
uint64_t trace_parent = 0;
uint8_t trace_flags = 0;
// Version 00, which is the only one we can parse, is fixed width. Use this
// fact for an additional sanity check.
const int expected_length = 2 + 1 + 32 + 1 + 16 + 1 + 2;
if (traceparent.length() != expected_length)
{
error = fmt::format("unexpected length {}, expected {}",
traceparent.length(), expected_length);
return false;
}
// clang-tidy doesn't like sscanf:
// error: 'sscanf' used to convert a string to an unsigned integer value,
// but function will not report conversion errors; consider using 'strtoul'
// instead [cert-err34-c,-warnings-as-errors]
// There is no other ready solution, and hand-rolling a more complicated
// parser for an HTTP header in C++ sounds like RCE.
// NOLINTNEXTLINE(cert-err34-c)
int result = sscanf(&traceparent[0],
"%2" SCNx8 "-%16" SCNx64 "%16" SCNx64 "-%16" SCNx64 "-%2" SCNx8,
&version, &trace_id_high, &trace_id_low, &trace_parent, &trace_flags);
if (result == EOF)
{
error = "EOF";
return false;
}
// We read uint128 as two uint64, so 5 parts and not 4.
if (result != 5)
{
error = fmt::format("could only read {} parts instead of the expected 5",
result);
return false;
}
if (version != 0)
{
error = fmt::format("unexpected version {}, expected 00", version);
return false;
}
opentelemetry_trace_id = static_cast<__uint128_t>(trace_id_high) << 64
| trace_id_low;
opentelemetry_span_id = trace_parent;
opentelemetry_trace_flags = trace_flags;
return true;
}
std::string ClientInfo::composeTraceparentHeader() const
{
// This span is a parent for its children, so we specify this span_id as a
// parent id.
return fmt::format("00-{:032x}-{:016x}-{:02x}", opentelemetry_trace_id,
opentelemetry_span_id,
// This cast is needed because fmt is being weird and complaining that
// "mixing character types is not allowed".
static_cast<uint8_t>(opentelemetry_trace_flags));
}
void ClientInfo::fillOSUserHostNameAndVersionInfo()
{

View File

@ -1,6 +1,7 @@
#pragma once
#include <Poco/Net/SocketAddress.h>
#include <Common/UInt128.h>
#include <common/types.h>
@ -58,6 +59,17 @@ public:
String initial_query_id;
Poco::Net::SocketAddress initial_address;
// OpenTelemetry trace information.
__uint128_t opentelemetry_trace_id = 0;
// The span id we get the in the incoming client info becomes our parent span
// id, and the span id we send becomes downstream parent span id.
UInt64 opentelemetry_span_id = 0;
UInt64 opentelemetry_parent_span_id = 0;
// The incoming tracestate header and the trace flags, we just pass them downstream.
// They are described at https://www.w3.org/TR/trace-context/
String opentelemetry_tracestate;
UInt8 opentelemetry_trace_flags = 0;
/// All below are parameters related to initial query.
Interface interface = Interface::TCP;
@ -90,6 +102,16 @@ public:
/// Initialize parameters on client initiating query.
void setInitialQuery();
// Parse/compose OpenTelemetry traceparent header.
// Note that these functions use span_id field, not parent_span_id, same as
// in native protocol. The incoming traceparent corresponds to the upstream
// trace span, and the outgoing traceparent corresponds to our current span.
// We use the same ClientInfo structure first for incoming span, and then
// for our span: when we switch, we use old span_id as parent_span_id, and
// generate a new span_id (currently this happens in Context::setQueryId()).
bool parseTraceparentHeader(const std::string & traceparent, std::string & error);
std::string composeTraceparentHeader() const;
private:
void fillOSUserHostNameAndVersionInfo();
};

View File

@ -1100,15 +1100,10 @@ void Context::setCurrentDatabase(const String & name)
void Context::setCurrentQueryId(const String & query_id)
{
String query_id_to_set = query_id;
if (query_id_to_set.empty()) /// If the user did not submit his query_id, then we generate it ourselves.
{
/// Generate random UUID, but using lower quality RNG,
/// because Poco::UUIDGenerator::generateRandom method is using /dev/random, that is very expensive.
/// NOTE: Actually we don't need to use UUIDs for query identifiers.
/// We could use any suitable string instead.
union
{
char bytes[16];
@ -1117,11 +1112,41 @@ void Context::setCurrentQueryId(const String & query_id)
UInt64 a;
UInt64 b;
} words;
__uint128_t uuid;
} random;
random.words.a = thread_local_rng(); //-V656
random.words.b = thread_local_rng(); //-V656
if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY
&& client_info.opentelemetry_trace_id == 0)
{
// If this is an initial query without any parent OpenTelemetry trace, we
// might start the trace ourselves, with some configurable probability.
std::bernoulli_distribution should_start_trace{
settings.opentelemetry_start_trace_probability};
if (should_start_trace(thread_local_rng))
{
// Use the randomly generated default query id as the new trace id.
client_info.opentelemetry_trace_id = random.uuid;
client_info.opentelemetry_parent_span_id = 0;
client_info.opentelemetry_span_id = thread_local_rng();
// Mark this trace as sampled in the flags.
client_info.opentelemetry_trace_flags = 1;
}
}
else
{
// The incoming request has an OpenTelemtry trace context. Its span id
// becomes our parent span id.
client_info.opentelemetry_parent_span_id = client_info.opentelemetry_span_id;
client_info.opentelemetry_span_id = thread_local_rng();
}
String query_id_to_set = query_id;
if (query_id_to_set.empty()) /// If the user did not submit his query_id, then we generate it ourselves.
{
/// Use protected constructor.
struct QueryUUID : Poco::UUID
{
@ -1753,6 +1778,17 @@ std::shared_ptr<AsynchronousMetricLog> Context::getAsynchronousMetricLog()
}
std::shared_ptr<OpenTelemetrySpanLog> Context::getOpenTelemetrySpanLog()
{
auto lock = getLock();
if (!shared->system_logs)
return {};
return shared->system_logs->opentelemetry_span_log;
}
CompressionCodecPtr Context::chooseCompressionCodec(size_t part_size, double part_size_ratio) const
{
auto lock = getLock();
@ -2016,11 +2052,16 @@ void Context::reloadConfig() const
void Context::shutdown()
{
// Disk selector might not be initialized if there was some error during
// its initialization. Don't try to initialize it again on shutdown.
if (shared->merge_tree_disk_selector)
{
for (auto & [disk_name, disk] : getDisksMap())
{
LOG_INFO(shared->log, "Shutdown disk {}", disk_name);
disk->shutdown();
}
}
shared->shutdown();
}

View File

@ -81,6 +81,7 @@ class TextLog;
class TraceLog;
class MetricLog;
class AsynchronousMetricLog;
class OpenTelemetrySpanLog;
struct MergeTreeSettings;
class StorageS3Settings;
class IDatabase;
@ -541,6 +542,7 @@ public:
std::shared_ptr<TextLog> getTextLog();
std::shared_ptr<MetricLog> getMetricLog();
std::shared_ptr<AsynchronousMetricLog> getAsynchronousMetricLog();
std::shared_ptr<OpenTelemetrySpanLog> getOpenTelemetrySpanLog();
/// Returns an object used to log operations with parts if it possible.
/// Provide table name to make required checks.

View File

@ -1246,7 +1246,6 @@ public:
size_t num_unfinished_hosts = waiting_hosts.size() - num_hosts_finished;
size_t num_active_hosts = current_active_hosts.size();
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED,
"Watching task {} is executing longer than distributed_ddl_task_timeout (={}) seconds. "
"There are {} unfinished hosts ({} of them are currently active), they are going to execute the query in background",

View File

@ -13,9 +13,16 @@
#include <IO/ReadHelpers.h>
#include <Poco/DirectoryIterator.h>
#include <Common/renameat2.h>
#include <Common/CurrentMetrics.h>
#include <filesystem>
namespace CurrentMetrics
{
extern const Metric TablesToDropQueueSize;
}
namespace DB
{
@ -155,7 +162,17 @@ void DatabaseCatalog::shutdownImpl()
tables_marked_dropped.clear();
std::lock_guard lock(databases_mutex);
assert(std::find_if_not(uuid_map.begin(), uuid_map.end(), [](const auto & elem) { return elem.map.empty(); }) == uuid_map.end());
assert(std::find_if(uuid_map.begin(), uuid_map.end(), [](const auto & elem)
{
/// Ensure that all UUID mappings are emtpy (i.e. all mappings contain nullptr instead of a pointer to storage)
const auto & not_empty_mapping = [] (const auto & mapping)
{
auto & table = mapping.second.second;
return table;
};
auto it = std::find_if(elem.map.begin(), elem.map.end(), not_empty_mapping);
return it != elem.map.end();
}) == uuid_map.end());
databases.clear();
db_uuid_map.clear();
view_dependencies.clear();
@ -411,36 +428,76 @@ DatabasePtr DatabaseCatalog::getSystemDatabase() const
return getDatabase(SYSTEM_DATABASE);
}
void DatabaseCatalog::addUUIDMapping(const UUID & uuid, DatabasePtr database, StoragePtr table)
void DatabaseCatalog::addUUIDMapping(const UUID & uuid)
{
addUUIDMapping(uuid, nullptr, nullptr);
}
void DatabaseCatalog::addUUIDMapping(const UUID & uuid, const DatabasePtr & database, const StoragePtr & table)
{
assert(uuid != UUIDHelpers::Nil && getFirstLevelIdx(uuid) < uuid_map.size());
assert((database && table) || (!database && !table));
UUIDToStorageMapPart & map_part = uuid_map[getFirstLevelIdx(uuid)];
std::lock_guard lock{map_part.mutex};
auto [_, inserted] = map_part.map.try_emplace(uuid, std::move(database), std::move(table));
auto [it, inserted] = map_part.map.try_emplace(uuid, database, table);
if (inserted)
return;
auto & prev_database = it->second.first;
auto & prev_table = it->second.second;
assert((prev_database && prev_table) || (!prev_database && !prev_table));
if (!prev_table && table)
{
/// It's empty mapping, it was created to "lock" UUID and prevent collision. Just update it.
prev_database = database;
prev_table = table;
return;
}
/// We are trying to replace existing mapping (prev_table != nullptr), it's logical error
if (table)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mapping for table with UUID={} already exists", toString(uuid));
/// Normally this should never happen, but it's possible when the same UUIDs are explicitly specified in different CREATE queries,
/// so it's not LOGICAL_ERROR
if (!inserted)
throw Exception("Mapping for table with UUID=" + toString(uuid) + " already exists", ErrorCodes::TABLE_ALREADY_EXISTS);
throw Exception(ErrorCodes::TABLE_ALREADY_EXISTS, "Mapping for table with UUID={} already exists. It happened due to UUID collision, "
"most likely because some not random UUIDs were manually specified in CREATE queries.", toString(uuid));
}
void DatabaseCatalog::removeUUIDMapping(const UUID & uuid)
{
assert(uuid != UUIDHelpers::Nil && getFirstLevelIdx(uuid) < uuid_map.size());
UUIDToStorageMapPart & map_part = uuid_map[getFirstLevelIdx(uuid)];
std::lock_guard lock{map_part.mutex};
auto it = map_part.map.find(uuid);
if (it == map_part.map.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mapping for table with UUID={} doesn't exist", toString(uuid));
it->second = {};
}
void DatabaseCatalog::removeUUIDMappingFinally(const UUID & uuid)
{
assert(uuid != UUIDHelpers::Nil && getFirstLevelIdx(uuid) < uuid_map.size());
UUIDToStorageMapPart & map_part = uuid_map[getFirstLevelIdx(uuid)];
std::lock_guard lock{map_part.mutex};
if (!map_part.map.erase(uuid))
throw Exception("Mapping for table with UUID=" + toString(uuid) + " doesn't exist", ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mapping for table with UUID={} doesn't exist", toString(uuid));
}
void DatabaseCatalog::updateUUIDMapping(const UUID & uuid, DatabasePtr database, StoragePtr table)
{
assert(uuid != UUIDHelpers::Nil && getFirstLevelIdx(uuid) < uuid_map.size());
assert(database && table);
UUIDToStorageMapPart & map_part = uuid_map[getFirstLevelIdx(uuid)];
std::lock_guard lock{map_part.mutex};
auto it = map_part.map.find(uuid);
if (it == map_part.map.end())
throw Exception("Mapping for table with UUID=" + toString(uuid) + " doesn't exist", ErrorCodes::LOGICAL_ERROR);
it->second = std::make_pair(std::move(database), std::move(table));
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mapping for table with UUID={} doesn't exist", toString(uuid));
auto & prev_database = it->second.first;
auto & prev_table = it->second.second;
assert(prev_database && prev_table);
prev_database = std::move(database);
prev_table = std::move(table);
}
std::unique_ptr<DatabaseCatalog> DatabaseCatalog::database_catalog;
@ -631,6 +688,8 @@ void DatabaseCatalog::loadMarkedAsDroppedTables()
dropped_metadata.emplace(std::move(full_path), std::move(dropped_id));
}
LOG_INFO(log, "Found {} partially dropped tables. Will load them and retry removal.", dropped_metadata.size());
ThreadPool pool;
for (const auto & elem : dropped_metadata)
{
@ -695,6 +754,7 @@ void DatabaseCatalog::enqueueDroppedTableCleanup(StorageID table_id, StoragePtr
LOG_WARNING(log, "Cannot parse metadata of partially dropped table {} from {}. Will remove metadata file and data directory. Garbage may be left in /store directory and ZooKeeper.", table_id.getNameForLogs(), dropped_metadata_path);
}
addUUIDMapping(table_id.uuid);
drop_time = Poco::File(dropped_metadata_path).getLastModified().epochTime();
}
@ -704,6 +764,8 @@ void DatabaseCatalog::enqueueDroppedTableCleanup(StorageID table_id, StoragePtr
else
tables_marked_dropped.push_back({table_id, table, dropped_metadata_path, drop_time});
tables_marked_dropped_ids.insert(table_id.uuid);
CurrentMetrics::add(CurrentMetrics::TablesToDropQueueSize, 1);
/// If list of dropped tables was empty, start a drop task
if (drop_task && tables_marked_dropped.size() == 1)
(*drop_task)->schedule();
@ -732,6 +794,10 @@ void DatabaseCatalog::dropTableDataTask()
LOG_INFO(log, "Will try drop {}", table.table_id.getNameForLogs());
tables_marked_dropped.erase(it);
}
else
{
LOG_TRACE(log, "Not found any suitable tables to drop, still have {} tables in drop queue", tables_marked_dropped.size());
}
need_reschedule = !tables_marked_dropped.empty();
}
catch (...)
@ -770,7 +836,7 @@ void DatabaseCatalog::dropTableDataTask()
(*drop_task)->scheduleAfter(reschedule_time_ms);
}
void DatabaseCatalog::dropTableFinally(const TableMarkedAsDropped & table) const
void DatabaseCatalog::dropTableFinally(const TableMarkedAsDropped & table)
{
if (table.table)
{
@ -789,6 +855,9 @@ void DatabaseCatalog::dropTableFinally(const TableMarkedAsDropped & table) const
LOG_INFO(log, "Removing metadata {} of dropped table {}", table.metadata_path, table.table_id.getNameForLogs());
Poco::File(table.metadata_path).remove();
removeUUIDMappingFinally(table.table_id.uuid);
CurrentMetrics::sub(CurrentMetrics::TablesToDropQueueSize, 1);
}
String DatabaseCatalog::getPathForUUID(const UUID & uuid)
@ -826,6 +895,8 @@ void DatabaseCatalog::waitTableFinallyDropped(const UUID & uuid)
{
if (uuid == UUIDHelpers::Nil)
return;
LOG_DEBUG(log, "Waiting for table {} to be finally dropped", toString(uuid));
std::unique_lock lock{tables_marked_dropped_mutex};
wait_table_finally_dropped.wait(lock, [&]()
{

View File

@ -165,12 +165,21 @@ public:
void updateDependency(const StorageID & old_from, const StorageID & old_where,const StorageID & new_from, const StorageID & new_where);
/// If table has UUID, addUUIDMapping(...) must be called when table attached to some database
/// and removeUUIDMapping(...) must be called when it detached.
/// removeUUIDMapping(...) must be called when it detached,
/// and removeUUIDMappingFinally(...) must be called when table is dropped and its data removed from disk.
/// Such tables can be accessed by persistent UUID instead of database and table name.
void addUUIDMapping(const UUID & uuid, DatabasePtr database, StoragePtr table);
void addUUIDMapping(const UUID & uuid, const DatabasePtr & database, const StoragePtr & table);
void removeUUIDMapping(const UUID & uuid);
void removeUUIDMappingFinally(const UUID & uuid);
/// For moving table between databases
void updateUUIDMapping(const UUID & uuid, DatabasePtr database, StoragePtr table);
/// This method adds empty mapping (with database and storage equal to nullptr).
/// It's required to "lock" some UUIDs and protect us from collision.
/// Collisions of random 122-bit integers are very unlikely to happen,
/// but we allow to explicitly specify UUID in CREATE query (in particular for testing).
/// If some UUID was already added and we are trying to add it again,
/// this method will throw an exception.
void addUUIDMapping(const UUID & uuid);
static String getPathForUUID(const UUID & uuid);
@ -222,7 +231,7 @@ private:
void loadMarkedAsDroppedTables();
void dropTableDataTask();
void dropTableFinally(const TableMarkedAsDropped & table) const;
void dropTableFinally(const TableMarkedAsDropped & table);
static constexpr size_t reschedule_time_ms = 100;

View File

@ -22,6 +22,15 @@
# include "config_core.h"
#endif
#include <common/defines.h>
#if defined(MEMORY_SANITIZER)
#include <sanitizer/msan_interface.h>
#endif
#if defined(ADDRESS_SANITIZER)
#include <sanitizer/asan_interface.h>
#endif
namespace ProfileEvents
{
@ -624,6 +633,22 @@ void ExpressionActions::execute(Block & block, bool dry_run) const
}
catch (Exception & e)
{
#if defined(MEMORY_SANITIZER)
const auto & msg = e.message();
if (__msan_test_shadow(msg.data(), msg.size()) != -1)
{
LOG_FATAL(&Poco::Logger::get("ExpressionActions"), "Poisoned exception message (msan): {}", e.getStackTraceString());
}
#endif
#if defined(ADDRESS_SANITIZER)
const auto & msg = e.message();
if (__asan_region_is_poisoned(const_cast<char *>(msg.data()), msg.size()))
{
LOG_FATAL(&Poco::Logger::get("ExpressionActions"), "Poisoned exception message (asan): {}", e.getStackTraceString());
}
#endif
e.addMessage(fmt::format("while executing '{}'", action.toString()));
throw;
}

View File

@ -28,6 +28,7 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
extern const int DICTIONARIES_WAS_NOT_LOADED;
}
@ -1404,7 +1405,29 @@ void ExternalLoader::checkLoaded(const ExternalLoader::LoadResult & result,
if (result.status == ExternalLoader::Status::LOADING)
throw Exception(type_name + " '" + result.name + "' is still loading", ErrorCodes::BAD_ARGUMENTS);
if (result.exception)
{
// Exception is shared for multiple threads.
// Don't just rethrow it, because sharing the same exception object
// between multiple threads can lead to weird effects if they decide to
// modify it, for example, by adding some error context.
try
{
std::rethrow_exception(result.exception);
}
catch (const Poco::Exception & e)
{
/// This will create a copy for Poco::Exception and DB::Exception
e.rethrow();
}
catch (...)
{
throw DB::Exception(ErrorCodes::DICTIONARIES_WAS_NOT_LOADED,
"Failed to load dictionary '{}': {}",
result.name,
getCurrentExceptionMessage(true /*with stack trace*/,
true /*check embedded stack trace*/));
}
}
if (result.status == ExternalLoader::Status::NOT_EXIST)
throw Exception(type_name + " '" + result.name + "' not found", ErrorCodes::BAD_ARGUMENTS);
if (result.status == ExternalLoader::Status::NOT_LOADED)

View File

@ -52,13 +52,37 @@ BlockIO InterpreterDropQuery::execute()
return executeToDictionary(drop.database, drop.table, drop.kind, drop.if_exists, drop.temporary, drop.no_ddl_lock);
}
else if (!drop.database.empty())
return executeToDatabase(drop.database, drop.kind, drop.if_exists, drop.no_delay);
return executeToDatabase(drop);
else
throw Exception("Nothing to drop, both names are empty", ErrorCodes::LOGICAL_ERROR);
}
void InterpreterDropQuery::waitForTableToBeActuallyDroppedOrDetached(const ASTDropQuery & query, const DatabasePtr & db, const UUID & uuid_to_wait)
{
if (uuid_to_wait == UUIDHelpers::Nil)
return;
if (query.kind == ASTDropQuery::Kind::Drop)
DatabaseCatalog::instance().waitTableFinallyDropped(uuid_to_wait);
else if (query.kind == ASTDropQuery::Kind::Detach)
{
if (auto * atomic = typeid_cast<DatabaseAtomic *>(db.get()))
atomic->waitDetachedTableNotInUse(uuid_to_wait);
}
}
BlockIO InterpreterDropQuery::executeToTable(const ASTDropQuery & query)
{
DatabasePtr database;
UUID table_to_wait_on = UUIDHelpers::Nil;
auto res = executeToTableImpl(query, database, table_to_wait_on);
if (query.no_delay)
waitForTableToBeActuallyDroppedOrDetached(query, database, table_to_wait_on);
return res;
}
BlockIO InterpreterDropQuery::executeToTableImpl(const ASTDropQuery & query, DatabasePtr & db, UUID & uuid_to_wait)
{
/// NOTE: it does not contain UUID, we will resolve it with locked DDLGuard
auto table_id = StorageID(query);
@ -125,19 +149,9 @@ BlockIO InterpreterDropQuery::executeToTable(const ASTDropQuery & query)
database->dropTable(context, table_id.table_name, query.no_delay);
}
}
table.reset();
ddl_guard = {};
if (query.no_delay)
{
if (query.kind == ASTDropQuery::Kind::Drop)
DatabaseCatalog::instance().waitTableFinallyDropped(table_id.uuid);
else if (query.kind == ASTDropQuery::Kind::Detach)
{
if (auto * atomic = typeid_cast<DatabaseAtomic *>(database.get()))
atomic->waitDetachedTableNotInUse(table_id.uuid);
}
db = database;
uuid_to_wait = table_id.uuid;
}
return {};
@ -223,19 +237,48 @@ BlockIO InterpreterDropQuery::executeToTemporaryTable(const String & table_name,
}
BlockIO InterpreterDropQuery::executeToDatabase(const String & database_name, ASTDropQuery::Kind kind, bool if_exists, bool no_delay)
BlockIO InterpreterDropQuery::executeToDatabase(const ASTDropQuery & query)
{
DatabasePtr database;
std::vector<UUID> tables_to_wait;
BlockIO res;
try
{
res = executeToDatabaseImpl(query, database, tables_to_wait);
}
catch (...)
{
if (query.no_delay)
{
for (const auto & table_uuid : tables_to_wait)
waitForTableToBeActuallyDroppedOrDetached(query, database, table_uuid);
}
throw;
}
if (query.no_delay)
{
for (const auto & table_uuid : tables_to_wait)
waitForTableToBeActuallyDroppedOrDetached(query, database, table_uuid);
}
return res;
}
BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query, DatabasePtr & database, std::vector<UUID> & uuids_to_wait)
{
const auto & database_name = query.database;
auto ddl_guard = DatabaseCatalog::instance().getDDLGuard(database_name, "");
if (auto database = tryGetDatabase(database_name, if_exists))
database = tryGetDatabase(database_name, query.if_exists);
if (database)
{
if (kind == ASTDropQuery::Kind::Truncate)
if (query.kind == ASTDropQuery::Kind::Truncate)
{
throw Exception("Unable to truncate database", ErrorCodes::SYNTAX_ERROR);
}
else if (kind == ASTDropQuery::Kind::Detach || kind == ASTDropQuery::Kind::Drop)
else if (query.kind == ASTDropQuery::Kind::Detach || query.kind == ASTDropQuery::Kind::Drop)
{
bool drop = kind == ASTDropQuery::Kind::Drop;
bool drop = query.kind == ASTDropQuery::Kind::Drop;
context.checkAccess(AccessType::DROP_DATABASE, database_name);
if (database->shouldBeEmptyOnDetach())
@ -246,21 +289,22 @@ BlockIO InterpreterDropQuery::executeToDatabase(const String & database_name, AS
for (auto iterator = database->getDictionariesIterator(); iterator->isValid(); iterator->next())
{
String current_dictionary = iterator->name();
executeToDictionary(database_name, current_dictionary, kind, false, false, false);
executeToDictionary(database_name, current_dictionary, query.kind, false, false, false);
}
ASTDropQuery query;
query.kind = kind;
query.if_exists = true;
query.database = database_name;
query.no_delay = no_delay;
ASTDropQuery query_for_table;
query_for_table.kind = query.kind;
query_for_table.if_exists = true;
query_for_table.database = database_name;
query_for_table.no_delay = query.no_delay;
for (auto iterator = database->getTablesIterator(context); iterator->isValid(); iterator->next())
{
/// Reset reference counter of the StoragePtr to allow synchronous drop.
iterator->reset();
query.table = iterator->name();
executeToTable(query);
DatabasePtr db;
UUID table_to_wait = UUIDHelpers::Nil;
query_for_table.table = iterator->name();
executeToTableImpl(query_for_table, db, table_to_wait);
uuids_to_wait.push_back(table_to_wait);
}
}

View File

@ -29,9 +29,13 @@ private:
ASTPtr query_ptr;
Context & context;
BlockIO executeToDatabase(const String & database_name, ASTDropQuery::Kind kind, bool if_exists, bool no_delay);
BlockIO executeToDatabase(const ASTDropQuery & query);
BlockIO executeToDatabaseImpl(const ASTDropQuery & query, DatabasePtr & database, std::vector<UUID> & uuids_to_wait);
BlockIO executeToTable(const ASTDropQuery & query);
BlockIO executeToTableImpl(const ASTDropQuery & query, DatabasePtr & db, UUID & uuid_to_wait);
static void waitForTableToBeActuallyDroppedOrDetached(const ASTDropQuery & query, const DatabasePtr & db, const UUID & uuid_to_wait);
BlockIO executeToDictionary(const String & database_name, const String & dictionary_name, ASTDropQuery::Kind kind, bool if_exists, bool is_temporary, bool no_ddl_lock);

View File

@ -21,6 +21,7 @@
#include <Interpreters/TextLog.h>
#include <Interpreters/MetricLog.h>
#include <Interpreters/AsynchronousMetricLog.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Access/ContextAccess.h>
#include <Access/AllowedClientHosts.h>
#include <Databases/IDatabase.h>
@ -329,7 +330,8 @@ BlockIO InterpreterSystemQuery::execute()
[&] () { if (auto trace_log = context.getTraceLog()) trace_log->flush(true); },
[&] () { if (auto text_log = context.getTextLog()) text_log->flush(true); },
[&] () { if (auto metric_log = context.getMetricLog()) metric_log->flush(true); },
[&] () { if (auto asynchronous_metric_log = context.getAsynchronousMetricLog()) asynchronous_metric_log->flush(true); }
[&] () { if (auto asynchronous_metric_log = context.getAsynchronousMetricLog()) asynchronous_metric_log->flush(true); },
[&] () { if (auto opentelemetry_span_log = context.getOpenTelemetrySpanLog()) opentelemetry_span_log->flush(true); }
);
break;
case Type::STOP_LISTEN_QUERIES:

View File

@ -0,0 +1,47 @@
#include "OpenTelemetrySpanLog.h"
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeUUID.h>
namespace DB
{
Block OpenTelemetrySpanLogElement::createBlock()
{
return {
{std::make_shared<DataTypeUUID>(), "trace_id"},
{std::make_shared<DataTypeUInt64>(), "span_id"},
{std::make_shared<DataTypeUInt64>(), "parent_span_id"},
{std::make_shared<DataTypeString>(), "operation_name"},
{std::make_shared<DataTypeDateTime64>(6), "start_time_us"},
{std::make_shared<DataTypeDateTime64>(6), "finish_time_us"},
{std::make_shared<DataTypeDate>(), "finish_date"},
{std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()),
"attribute.names"},
{std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()),
"attribute.values"}
};
}
void OpenTelemetrySpanLogElement::appendToBlock(MutableColumns & columns) const
{
size_t i = 0;
columns[i++]->insert(UInt128(Int128(trace_id)));
columns[i++]->insert(span_id);
columns[i++]->insert(parent_span_id);
columns[i++]->insert(operation_name);
columns[i++]->insert(start_time_us);
columns[i++]->insert(finish_time_us);
columns[i++]->insert(DateLUT::instance().toDayNum(finish_time_us / 1000000));
columns[i++]->insert(attribute_names);
columns[i++]->insert(attribute_values);
}
}

View File

@ -0,0 +1,39 @@
#pragma once
#include <Interpreters/SystemLog.h>
namespace DB
{
struct OpenTelemetrySpan
{
__uint128_t trace_id;
UInt64 span_id;
UInt64 parent_span_id;
std::string operation_name;
UInt64 start_time_us;
UInt64 finish_time_us;
UInt64 duration_ns;
Array attribute_names;
Array attribute_values;
// I don't understand how Links work, namely, which direction should they
// point to, and how they are related with parent_span_id, so no Links for
// now.
};
struct OpenTelemetrySpanLogElement : public OpenTelemetrySpan
{
static std::string name() { return "OpenTelemetrySpanLog"; }
static Block createBlock();
void appendToBlock(MutableColumns & columns) const;
};
// OpenTelemetry standartizes some Log data as well, so it's not just
// OpenTelemetryLog to avoid confusion.
class OpenTelemetrySpanLog : public SystemLog<OpenTelemetrySpanLogElement>
{
public:
using SystemLog<OpenTelemetrySpanLogElement>::SystemLog;
};
}

View File

@ -6,6 +6,7 @@
#include <unordered_map>
#include <DataTypes/DataTypeLowCardinality.h>
namespace DB
{

View File

@ -7,6 +7,7 @@
#include <Interpreters/CrashLog.h>
#include <Interpreters/MetricLog.h>
#include <Interpreters/AsynchronousMetricLog.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <common/logger_useful.h>
@ -87,6 +88,9 @@ SystemLogs::SystemLogs(Context & global_context, const Poco::Util::AbstractConfi
asynchronous_metric_log = createSystemLog<AsynchronousMetricLog>(
global_context, "system", "asynchronous_metric_log", config,
"asynchronous_metric_log");
opentelemetry_span_log = createSystemLog<OpenTelemetrySpanLog>(
global_context, "system", "opentelemetry_span_log", config,
"opentelemetry_span_log");
if (query_log)
logs.emplace_back(query_log.get());
@ -104,6 +108,8 @@ SystemLogs::SystemLogs(Context & global_context, const Poco::Util::AbstractConfi
logs.emplace_back(metric_log.get());
if (asynchronous_metric_log)
logs.emplace_back(asynchronous_metric_log.get());
if (opentelemetry_span_log)
logs.emplace_back(opentelemetry_span_log.get());
try
{

View File

@ -71,6 +71,7 @@ class TraceLog;
class CrashLog;
class MetricLog;
class AsynchronousMetricLog;
class OpenTelemetrySpanLog;
class ISystemLog
@ -105,6 +106,8 @@ struct SystemLogs
std::shared_ptr<MetricLog> metric_log; /// Used to log all metrics.
/// Metrics from system.asynchronous_metrics.
std::shared_ptr<AsynchronousMetricLog> asynchronous_metric_log;
/// OpenTelemetry trace spans.
std::shared_ptr<OpenTelemetrySpanLog> opentelemetry_span_log;
std::vector<ISystemLog *> logs;
};

View File

@ -166,7 +166,7 @@ void ThreadStatus::initPerformanceCounters()
memory_tracker.setDescription("(for thread)");
// query_start_time_{microseconds, nanoseconds} are all constructed from the same time point
// to ensure that they are all equal upto the precision of a second.
// to ensure that they are all equal up to the precision of a second.
const auto now = std::chrono::system_clock::now();
query_start_time_nanoseconds = time_in_nanoseconds(now);

View File

@ -644,8 +644,13 @@ void TreeOptimizer::apply(ASTPtr & query, Aliases & aliases, const NameSet & sou
optimizeInjectiveFunctionsInsideUniq(query, context);
/// Eliminate min/max/any aggregators of functions of GROUP BY keys
if (settings.optimize_aggregators_of_group_by_keys)
if (settings.optimize_aggregators_of_group_by_keys
&& !select_query->group_by_with_totals
&& !select_query->group_by_with_rollup
&& !select_query->group_by_with_cube)
{
optimizeAggregateFunctionsOfGroupByKeys(select_query, query);
}
/// Remove duplicate items from ORDER BY.
optimizeDuplicatesInOrderBy(select_query);

View File

@ -31,6 +31,7 @@
#include <Access/EnabledQuota.h>
#include <Interpreters/InterpreterFactory.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Interpreters/QueryLog.h>
#include <Interpreters/InterpreterSetQuery.h>
#include <Interpreters/ApplyWithGlobalVisitor.h>
@ -139,15 +140,26 @@ static void logQuery(const String & query, const Context & context, bool interna
}
else
{
const auto & current_query_id = context.getClientInfo().current_query_id;
const auto & initial_query_id = context.getClientInfo().initial_query_id;
const auto & current_user = context.getClientInfo().current_user;
const auto & client_info = context.getClientInfo();
const auto & current_query_id = client_info.current_query_id;
const auto & initial_query_id = client_info.initial_query_id;
const auto & current_user = client_info.current_user;
LOG_DEBUG(&Poco::Logger::get("executeQuery"), "(from {}{}{}) {}",
context.getClientInfo().current_address.toString(),
(current_user != "default" ? ", user: " + context.getClientInfo().current_user : ""),
client_info.current_address.toString(),
(current_user != "default" ? ", user: " + current_user : ""),
(!initial_query_id.empty() && current_query_id != initial_query_id ? ", initial_query_id: " + initial_query_id : std::string()),
joinLines(query));
if (client_info.opentelemetry_trace_id)
{
LOG_TRACE(&Poco::Logger::get("executeQuery"),
"OpenTelemetry trace id {:x}, span id {}, parent span id {}",
client_info.opentelemetry_trace_id,
client_info.opentelemetry_span_id,
client_info.opentelemetry_parent_span_id);
}
}
}
@ -194,7 +206,7 @@ inline UInt64 time_in_seconds(std::chrono::time_point<std::chrono::system_clock>
return std::chrono::duration_cast<std::chrono::seconds>(timepoint.time_since_epoch()).count();
}
static void onExceptionBeforeStart(const String & query_for_logging, Context & context, time_t current_time, UInt64 current_time_microseconds, ASTPtr ast)
static void onExceptionBeforeStart(const String & query_for_logging, Context & context, UInt64 current_time_us, ASTPtr ast)
{
/// Exception before the query execution.
if (auto quota = context.getQuota())
@ -209,11 +221,11 @@ static void onExceptionBeforeStart(const String & query_for_logging, Context & c
// all callers to onExceptionBeforeStart method construct the timespec for event_time and
// event_time_microseconds from the same time point. So, it can be assumed that both of these
// times are equal upto the precision of a second.
elem.event_time = current_time;
elem.event_time_microseconds = current_time_microseconds;
elem.query_start_time = current_time;
elem.query_start_time_microseconds = current_time_microseconds;
// times are equal up to the precision of a second.
elem.event_time = current_time_us / 1000000;
elem.event_time_microseconds = current_time_us;
elem.query_start_time = current_time_us / 1000000;
elem.query_start_time_microseconds = current_time_us;
elem.current_database = context.getCurrentDatabase();
elem.query = query_for_logging;
@ -233,6 +245,39 @@ static void onExceptionBeforeStart(const String & query_for_logging, Context & c
if (auto query_log = context.getQueryLog())
query_log->add(elem);
if (auto opentelemetry_span_log = context.getOpenTelemetrySpanLog();
context.getClientInfo().opentelemetry_trace_id
&& opentelemetry_span_log)
{
OpenTelemetrySpanLogElement span;
span.trace_id = context.getClientInfo().opentelemetry_trace_id;
span.span_id = context.getClientInfo().opentelemetry_span_id;
span.parent_span_id = context.getClientInfo().opentelemetry_parent_span_id;
span.operation_name = "query";
span.start_time_us = current_time_us;
span.finish_time_us = current_time_us;
span.duration_ns = 0;
// keep values synchonized to type enum in QueryLogElement::createBlock
span.attribute_names.push_back("clickhouse.query_status");
span.attribute_values.push_back("ExceptionBeforeStart");
span.attribute_names.push_back("db.statement");
span.attribute_values.push_back(elem.query);
span.attribute_names.push_back("clickhouse.query_id");
span.attribute_values.push_back(elem.client_info.current_query_id);
if (!context.getClientInfo().opentelemetry_tracestate.empty())
{
span.attribute_names.push_back("clickhouse.tracestate");
span.attribute_values.push_back(
context.getClientInfo().opentelemetry_tracestate);
}
opentelemetry_span_log->add(span);
}
ProfileEvents::increment(ProfileEvents::FailedQuery);
if (ast)
@ -266,12 +311,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
bool has_query_tail,
ReadBuffer * istr)
{
// current_time and current_time_microseconds are both constructed from the same time point
// to ensure that both the times are equal upto the precision of a second.
const auto now = std::chrono::system_clock::now();
auto current_time = time_in_seconds(now);
auto current_time_microseconds = time_in_microseconds(now);
const auto current_time = std::chrono::system_clock::now();
/// If we already executing query and it requires to execute internal query, than
/// don't replace thread context with given (it can be temporary). Otherwise, attach context to thread.
@ -322,7 +362,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (!internal)
{
onExceptionBeforeStart(query_for_logging, context, current_time, current_time_microseconds, ast);
onExceptionBeforeStart(query_for_logging, context, time_in_microseconds(current_time), ast);
}
throw;
@ -494,10 +534,10 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
elem.type = QueryLogElementType::QUERY_START;
elem.event_time = current_time;
elem.event_time_microseconds = current_time_microseconds;
elem.query_start_time = current_time;
elem.query_start_time_microseconds = current_time_microseconds;
elem.event_time = time_in_seconds(current_time);
elem.event_time_microseconds = time_in_microseconds(current_time);
elem.query_start_time = time_in_seconds(current_time);
elem.query_start_time_microseconds = time_in_microseconds(current_time);
elem.current_database = context.getCurrentDatabase();
elem.query = query_for_logging;
@ -568,9 +608,9 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
// construct event_time and event_time_microseconds using the same time point
// so that the two times will always be equal up to a precision of a second.
const auto time_now = std::chrono::system_clock::now();
elem.event_time = time_in_seconds(time_now);
elem.event_time_microseconds = time_in_microseconds(time_now);
const auto finish_time = std::chrono::system_clock::now();
elem.event_time = time_in_seconds(finish_time);
elem.event_time_microseconds = time_in_microseconds(finish_time);
status_info_to_query_log(elem, info, ast);
auto progress_callback = context.getProgressCallback();
@ -620,6 +660,38 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (auto query_log = context.getQueryLog())
query_log->add(elem);
}
if (auto opentelemetry_span_log = context.getOpenTelemetrySpanLog();
context.getClientInfo().opentelemetry_trace_id
&& opentelemetry_span_log)
{
OpenTelemetrySpanLogElement span;
span.trace_id = context.getClientInfo().opentelemetry_trace_id;
span.span_id = context.getClientInfo().opentelemetry_span_id;
span.parent_span_id = context.getClientInfo().opentelemetry_parent_span_id;
span.operation_name = "query";
span.start_time_us = elem.query_start_time_microseconds;
span.finish_time_us = time_in_microseconds(finish_time);
span.duration_ns = elapsed_seconds * 1000000000;
// keep values synchonized to type enum in QueryLogElement::createBlock
span.attribute_names.push_back("clickhouse.query_status");
span.attribute_values.push_back("QueryFinish");
span.attribute_names.push_back("db.statement");
span.attribute_values.push_back(elem.query);
span.attribute_names.push_back("clickhouse.query_id");
span.attribute_values.push_back(elem.client_info.current_query_id);
if (!context.getClientInfo().opentelemetry_tracestate.empty())
{
span.attribute_names.push_back("clickhouse.tracestate");
span.attribute_values.push_back(
context.getClientInfo().opentelemetry_tracestate);
}
opentelemetry_span_log->add(span);
}
};
auto exception_callback = [elem, &context, ast, log_queries, log_queries_min_type = settings.log_queries_min_type, quota(quota),
@ -631,7 +703,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
elem.type = QueryLogElementType::EXCEPTION_WHILE_PROCESSING;
// event_time and event_time_microseconds are being constructed from the same time point
// to ensure that both the times will be equal upto the precision of a second.
// to ensure that both the times will be equal up to the precision of a second.
const auto time_now = std::chrono::system_clock::now();
elem.event_time = time_in_seconds(time_now);
@ -694,7 +766,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (query_for_logging.empty())
query_for_logging = prepareQueryForLogging(query, context);
onExceptionBeforeStart(query_for_logging, context, current_time, current_time_microseconds, ast);
onExceptionBeforeStart(query_for_logging, context, time_in_microseconds(current_time), ast);
}
throw;

View File

@ -119,6 +119,7 @@ SRCS(
MutationsInterpreter.cpp
MySQL/InterpretersMySQLDDLQuery.cpp
NullableUtils.cpp
OpenTelemetrySpanLog.cpp
OptimizeIfChains.cpp
OptimizeIfWithConstantConditionVisitor.cpp
PartLog.cpp

View File

@ -927,7 +927,7 @@ void obfuscateQueries(
}
else
{
/// Everyting else is kept as is.
/// Everything else is kept as is.
result.write(token.begin, token.size());
}
}

View File

@ -36,7 +36,7 @@ public:
Status prepare() override;
void work() override;
/// Adds additional port fo totals.
/// Adds additional port for totals.
/// If added, totals will have been ready by the first generate() call (in totals chunk).
InputPort * addTotalsPort();

View File

@ -1,7 +1,9 @@
#pragma once
#include <Processors/ISimpleTransform.h>
#include <Core/ColumnNumbers.h>
namespace DB
{

View File

@ -95,6 +95,7 @@ namespace ErrorCodes
extern const int WRONG_PASSWORD;
extern const int REQUIRED_PASSWORD;
extern const int BAD_REQUEST_PARAMETER;
extern const int INVALID_SESSION_TIMEOUT;
extern const int HTTP_LENGTH_REQUIRED;
}
@ -279,9 +280,7 @@ void HTTPHandler::processQuery(
}
}
std::string query_id = params.get("query_id", "");
context.setUser(user, password, request.clientAddress());
context.setCurrentQueryId(query_id);
if (!quota_key.empty())
context.setQuotaKey(quota_key);
@ -311,6 +310,31 @@ void HTTPHandler::processQuery(
session->release();
});
// Parse the OpenTelemetry traceparent header.
// Disable in Arcadia -- it interferes with the
// test_clickhouse.TestTracing.test_tracing_via_http_proxy[traceparent] test.
#if !defined(ARCADIA_BUILD)
if (request.has("traceparent"))
{
std::string opentelemetry_traceparent = request.get("traceparent");
std::string error;
if (!context.getClientInfo().parseTraceparentHeader(
opentelemetry_traceparent, error))
{
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER,
"Failed to parse OpenTelemetry traceparent header '{}': {}",
opentelemetry_traceparent, error);
}
context.getClientInfo().opentelemetry_tracestate = request.get("tracestate", "");
}
#endif
// Set the query id supplied by the user, if any, and also update the
// OpenTelemetry fields.
context.setCurrentQueryId(params.get("query_id",
request.get("X-ClickHouse-Query-Id", "")));
/// The client can pass a HTTP header indicating supported compression method (gzip or deflate).
String http_response_compression_methods = request.get("Accept-Encoding", "");
CompressionMethod http_response_compression_method = CompressionMethod::None;

View File

@ -43,8 +43,8 @@ void ReplicasStatusHandler::handleRequest(Poco::Net::HTTPServerRequest & request
/// Iterate through all the replicated tables.
for (const auto & db : databases)
{
/// Lazy database can not contain replicated tables
if (db.second->getEngineName() == "Lazy")
/// Check if database can contain replicated tables
if (!db.second->canContainMergeTreeTables())
continue;
for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next())

View File

@ -896,8 +896,6 @@ void TCPHandler::receiveQuery()
state.is_empty = false;
readStringBinary(state.query_id, *in);
query_context->setCurrentQueryId(state.query_id);
/// Client info
ClientInfo & client_info = query_context->getClientInfo();
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_CLIENT_INFO)
@ -917,14 +915,6 @@ void TCPHandler::receiveQuery()
/// Set fields, that are known apriori.
client_info.interface = ClientInfo::Interface::TCP;
if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
{
/// 'Current' fields was set at receiveHello.
client_info.initial_user = client_info.current_user;
client_info.initial_query_id = client_info.current_query_id;
client_info.initial_address = client_info.current_address;
}
/// Per query settings are also passed via TCP.
/// We need to check them before applying due to they can violate the settings constraints.
auto settings_format = (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS) ? SettingsWriteFormat::STRINGS_WITH_FLAGS
@ -1001,11 +991,32 @@ void TCPHandler::receiveQuery()
query_context->clampToSettingsConstraints(settings_changes);
}
query_context->applySettingsChanges(settings_changes);
const Settings & settings = query_context->getSettingsRef();
// Use the received query id, or generate a random default. It is convenient
// to also generate the default OpenTelemetry trace id at the same time, and
// set the trace parent.
// Why is this done here and not earlier:
// 1) ClientInfo might contain upstream trace id, so we decide whether to use
// the default ids after we have received the ClientInfo.
// 2) There is the opentelemetry_start_trace_probability setting that
// controls when we start a new trace. It can be changed via Native protocol,
// so we have to apply the changes first.
query_context->setCurrentQueryId(state.query_id);
// Set parameters of initial query.
if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
{
/// 'Current' fields was set at receiveHello.
client_info.initial_user = client_info.current_user;
client_info.initial_query_id = client_info.current_query_id;
client_info.initial_address = client_info.current_address;
}
/// Sync timeouts on client and server during current query to avoid dangling queries on server
/// NOTE: We use settings.send_timeout for the receive timeout and vice versa (change arguments ordering in TimeoutSetter),
/// because settings.send_timeout is client-side setting which has opposite meaning on the server side.
/// NOTE: these settings are applied only for current connection (not for distributed tables' connections)
const Settings & settings = query_context->getSettingsRef();
state.timeout_setter = std::make_unique<TimeoutSetter>(socket(), settings.receive_timeout, settings.send_timeout);
}

View File

@ -1,3 +1,4 @@
add_subdirectory(MergeTree)
add_subdirectory(System)
if(ENABLE_TESTS)

Some files were not shown because too many files have changed in this diff Show More