diff --git a/CMakeLists.txt b/CMakeLists.txt index 21cc74bbd2b..783a9f80b66 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -59,25 +59,6 @@ set(CMAKE_DEBUG_POSTFIX "d" CACHE STRING "Generate debug library name with a pos # For more info see https://cmake.org/cmake/help/latest/prop_gbl/USE_FOLDERS.html set_property(GLOBAL PROPERTY USE_FOLDERS ON) -# cmake 3.9+ needed. -# Usually impractical. -# See also ${ENABLE_THINLTO} -option(ENABLE_IPO "Full link time optimization") - -if(ENABLE_IPO) - cmake_policy(SET CMP0069 NEW) - include(CheckIPOSupported) - check_ipo_supported(RESULT IPO_SUPPORTED OUTPUT IPO_NOT_SUPPORTED) - if(IPO_SUPPORTED) - message(STATUS "IPO/LTO is supported, enabling") - set(CMAKE_INTERPROCEDURAL_OPTIMIZATION TRUE) - else() - message (${RECONFIGURE_MESSAGE_LEVEL} "IPO/LTO is not supported: <${IPO_NOT_SUPPORTED}>") - endif() -else() - message(STATUS "IPO/LTO not enabled.") -endif() - # Check that submodules are present only if source was downloaded with git if (EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/.git" AND NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/boost/boost") message (FATAL_ERROR "Submodules are not initialized. Run\n\tgit submodule update --init --recursive") diff --git a/docker/server/.dockerignore b/docker/server/.dockerignore new file mode 100644 index 00000000000..468a8cafb00 --- /dev/null +++ b/docker/server/.dockerignore @@ -0,0 +1,8 @@ +# post / preinstall scripts (not needed, we do it in Dockerfile) +alpine-root/install/* + +# docs (looks useless) +alpine-root/usr/share/doc/* + +# packages, etc. (used by prepare.sh) +alpine-root/tgz-packages/* \ No newline at end of file diff --git a/docker/server/.gitignore b/docker/server/.gitignore new file mode 100644 index 00000000000..4081b5f124c --- /dev/null +++ b/docker/server/.gitignore @@ -0,0 +1 @@ +alpine-root/* \ No newline at end of file diff --git a/docker/server/Dockerfile.alpine b/docker/server/Dockerfile.alpine new file mode 100644 index 00000000000..fc2756eac8c --- /dev/null +++ b/docker/server/Dockerfile.alpine @@ -0,0 +1,26 @@ +FROM alpine + +ENV LANG=en_US.UTF-8 \ + LANGUAGE=en_US:en \ + LC_ALL=en_US.UTF-8 \ + TZ=UTC \ + CLICKHOUSE_CONFIG=/etc/clickhouse-server/config.xml + +COPY alpine-root/ / + +# from https://github.com/ClickHouse/ClickHouse/blob/master/debian/clickhouse-server.postinst +RUN addgroup clickhouse \ + && adduser -S -H -h /nonexistent -s /bin/false -G clickhouse -g "ClickHouse server" clickhouse \ + && chown clickhouse:clickhouse /var/lib/clickhouse \ + && chmod 700 /var/lib/clickhouse \ + && chown root:clickhouse /var/log/clickhouse-server \ + && chmod 775 /var/log/clickhouse-server \ + && chmod +x /entrypoint.sh \ + && apk add --no-cache su-exec + +EXPOSE 9000 8123 9009 + +VOLUME /var/lib/clickhouse \ + /var/log/clickhouse-server + +ENTRYPOINT ["/entrypoint.sh"] diff --git a/docker/server/alpine-build.sh b/docker/server/alpine-build.sh new file mode 100755 index 00000000000..30101225b3e --- /dev/null +++ b/docker/server/alpine-build.sh @@ -0,0 +1,59 @@ +#!/bin/bash +set -x + +REPO_CHANNEL="${REPO_CHANNEL:-stable}" # lts / testing / prestable / etc +REPO_URL="${REPO_URL:-"https://repo.yandex.ru/clickhouse/tgz/${REPO_CHANNEL}"}" +VERSION="${VERSION:-20.9.3.45}" + +# where original files live +DOCKER_BUILD_FOLDER="${BASH_SOURCE%/*}" + +# we will create root for our image here +CONTAINER_ROOT_FOLDER="${DOCKER_BUILD_FOLDER}/alpine-root" + +# where to put downloaded tgz +TGZ_PACKAGES_FOLDER="${CONTAINER_ROOT_FOLDER}/tgz-packages" + +# clean up the root from old runs +rm -rf "$CONTAINER_ROOT_FOLDER" + +mkdir -p "$TGZ_PACKAGES_FOLDER" + +PACKAGES=( "clickhouse-client" "clickhouse-server" "clickhouse-common-static" ) + +# download tars from the repo +for package in "${PACKAGES[@]}" +do + wget -q --show-progress "${REPO_URL}/${package}-${VERSION}.tgz" -O "${TGZ_PACKAGES_FOLDER}/${package}-${VERSION}.tgz" +done + +# unpack tars +for package in "${PACKAGES[@]}" +do + tar xvzf "${TGZ_PACKAGES_FOLDER}/${package}-${VERSION}.tgz" --strip-components=2 -C "$CONTAINER_ROOT_FOLDER" +done + +# prepare few more folders +mkdir -p "${CONTAINER_ROOT_FOLDER}/etc/clickhouse-server/users.d" \ + "${CONTAINER_ROOT_FOLDER}/etc/clickhouse-server/config.d" \ + "${CONTAINER_ROOT_FOLDER}/var/log/clickhouse-server" \ + "${CONTAINER_ROOT_FOLDER}/var/lib/clickhouse" \ + "${CONTAINER_ROOT_FOLDER}/docker-entrypoint-initdb.d" \ + "${CONTAINER_ROOT_FOLDER}/lib64" + +cp "${DOCKER_BUILD_FOLDER}/docker_related_config.xml" "${CONTAINER_ROOT_FOLDER}/etc/clickhouse-server/config.d/" +cp "${DOCKER_BUILD_FOLDER}/entrypoint.alpine.sh" "${CONTAINER_ROOT_FOLDER}/entrypoint.sh" + +## get glibc components from ubuntu 20.04 and put them to expected place +docker pull ubuntu:20.04 +ubuntu20image=$(docker create --rm ubuntu:20.04) +docker cp -L ${ubuntu20image}:/lib/x86_64-linux-gnu/libc.so.6 "${CONTAINER_ROOT_FOLDER}/lib" +docker cp -L ${ubuntu20image}:/lib/x86_64-linux-gnu/libdl.so.2 "${CONTAINER_ROOT_FOLDER}/lib" +docker cp -L ${ubuntu20image}:/lib/x86_64-linux-gnu/libm.so.6 "${CONTAINER_ROOT_FOLDER}/lib" +docker cp -L ${ubuntu20image}:/lib/x86_64-linux-gnu/libpthread.so.0 "${CONTAINER_ROOT_FOLDER}/lib" +docker cp -L ${ubuntu20image}:/lib/x86_64-linux-gnu/librt.so.1 "${CONTAINER_ROOT_FOLDER}/lib" +docker cp -L ${ubuntu20image}:/lib/x86_64-linux-gnu/libnss_dns.so.2 "${CONTAINER_ROOT_FOLDER}/lib" +docker cp -L ${ubuntu20image}:/lib/x86_64-linux-gnu/libresolv.so.2 "${CONTAINER_ROOT_FOLDER}/lib" +docker cp -L ${ubuntu20image}:/lib64/ld-linux-x86-64.so.2 "${CONTAINER_ROOT_FOLDER}/lib64" + +docker build "$DOCKER_BUILD_FOLDER" -f Dockerfile.alpine -t "yandex/clickhouse-server:${VERSION}-alpine" --pull \ No newline at end of file diff --git a/docker/server/entrypoint.alpine.sh b/docker/server/entrypoint.alpine.sh new file mode 100755 index 00000000000..e2edda9ca26 --- /dev/null +++ b/docker/server/entrypoint.alpine.sh @@ -0,0 +1,152 @@ +#!/bin/sh +#set -x + +DO_CHOWN=1 +if [ "$CLICKHOUSE_DO_NOT_CHOWN" = 1 ]; then + DO_CHOWN=0 +fi + +CLICKHOUSE_UID="${CLICKHOUSE_UID:-"$(id -u clickhouse)"}" +CLICKHOUSE_GID="${CLICKHOUSE_GID:-"$(id -g clickhouse)"}" + +# support --user +if [ "$(id -u)" = "0" ]; then + USER=$CLICKHOUSE_UID + GROUP=$CLICKHOUSE_GID + # busybox has setuidgid & chpst buildin + gosu="su-exec $USER:$GROUP" +else + USER="$(id -u)" + GROUP="$(id -g)" + gosu="" + DO_CHOWN=0 +fi + +# set some vars +CLICKHOUSE_CONFIG="${CLICKHOUSE_CONFIG:-/etc/clickhouse-server/config.xml}" + +# port is needed to check if clickhouse-server is ready for connections +HTTP_PORT="$(clickhouse extract-from-config --config-file $CLICKHOUSE_CONFIG --key=http_port)" + +# get CH directories locations +DATA_DIR="$(clickhouse extract-from-config --config-file $CLICKHOUSE_CONFIG --key=path || true)" +TMP_DIR="$(clickhouse extract-from-config --config-file $CLICKHOUSE_CONFIG --key=tmp_path || true)" +USER_PATH="$(clickhouse extract-from-config --config-file $CLICKHOUSE_CONFIG --key=user_files_path || true)" +LOG_PATH="$(clickhouse extract-from-config --config-file $CLICKHOUSE_CONFIG --key=logger.log || true)" +LOG_DIR="$(dirname $LOG_PATH || true)" +ERROR_LOG_PATH="$(clickhouse extract-from-config --config-file $CLICKHOUSE_CONFIG --key=logger.errorlog || true)" +ERROR_LOG_DIR="$(dirname $ERROR_LOG_PATH || true)" +FORMAT_SCHEMA_PATH="$(clickhouse extract-from-config --config-file $CLICKHOUSE_CONFIG --key=format_schema_path || true)" + +CLICKHOUSE_USER="${CLICKHOUSE_USER:-default}" +CLICKHOUSE_PASSWORD="${CLICKHOUSE_PASSWORD:-}" +CLICKHOUSE_DB="${CLICKHOUSE_DB:-}" + +for dir in "$DATA_DIR" \ + "$ERROR_LOG_DIR" \ + "$LOG_DIR" \ + "$TMP_DIR" \ + "$USER_PATH" \ + "$FORMAT_SCHEMA_PATH" +do + # check if variable not empty + [ -z "$dir" ] && continue + # ensure directories exist + if ! mkdir -p "$dir"; then + echo "Couldn't create necessary directory: $dir" + exit 1 + fi + + if [ "$DO_CHOWN" = "1" ]; then + # ensure proper directories permissions + chown -R "$USER:$GROUP" "$dir" + elif [ "$(stat -c %u "$dir")" != "$USER" ]; then + echo "Necessary directory '$dir' isn't owned by user with id '$USER'" + exit 1 + fi +done + +# if clickhouse user is defined - create it (user "default" already exists out of box) +if [ -n "$CLICKHOUSE_USER" ] && [ "$CLICKHOUSE_USER" != "default" ] || [ -n "$CLICKHOUSE_PASSWORD" ]; then + echo "$0: create new user '$CLICKHOUSE_USER' instead 'default'" + cat < /etc/clickhouse-server/users.d/default-user.xml + + + + + + + + <${CLICKHOUSE_USER}> + default + + ::/0 + + ${CLICKHOUSE_PASSWORD} + default + + + +EOT +fi + +if [ -n "$(ls /docker-entrypoint-initdb.d/)" ] || [ -n "$CLICKHOUSE_DB" ]; then + # Listen only on localhost until the initialization is done + $gosu /usr/bin/clickhouse-server --config-file=$CLICKHOUSE_CONFIG -- --listen_host=127.0.0.1 & + pid="$!" + + # check if clickhouse is ready to accept connections + # will try to send ping clickhouse via http_port (max 6 retries, with 1 sec timeout and 1 sec delay between retries) + tries=6 + while ! wget --spider -T 1 -q "http://localhost:$HTTP_PORT/ping" 2>/dev/null; do + if [ "$tries" -le "0" ]; then + echo >&2 'ClickHouse init process failed.' + exit 1 + fi + tries=$(( tries-1 )) + sleep 1 + done + + if [ ! -z "$CLICKHOUSE_PASSWORD" ]; then + printf -v WITH_PASSWORD '%s %q' "--password" "$CLICKHOUSE_PASSWORD" + fi + + clickhouseclient="clickhouse-client --multiquery -u $CLICKHOUSE_USER $WITH_PASSWORD " + + # create default database, if defined + if [ -n "$CLICKHOUSE_DB" ]; then + echo "$0: create database '$CLICKHOUSE_DB'" + "$clickhouseclient" -q "CREATE DATABASE IF NOT EXISTS $CLICKHOUSE_DB"; + fi + + for f in /docker-entrypoint-initdb.d/*; do + case "$f" in + *.sh) + if [ -x "$f" ]; then + echo "$0: running $f" + "$f" + else + echo "$0: sourcing $f" + . "$f" + fi + ;; + *.sql) echo "$0: running $f"; cat "$f" | "$clickhouseclient" ; echo ;; + *.sql.gz) echo "$0: running $f"; gunzip -c "$f" | "$clickhouseclient"; echo ;; + *) echo "$0: ignoring $f" ;; + esac + echo + done + + if ! kill -s TERM "$pid" || ! wait "$pid"; then + echo >&2 'Finishing of ClickHouse init process failed.' + exit 1 + fi +fi + +# if no args passed to `docker run` or first argument start with `--`, then the user is passing clickhouse-server arguments +if [[ $# -lt 1 ]] || [[ "$1" == "--"* ]]; then + exec $gosu /usr/bin/clickhouse-server --config-file=$CLICKHOUSE_CONFIG "$@" +fi + +# Otherwise, we assume the user want to run his own process, for example a `bash` shell to explore this image +exec "$@" diff --git a/docs/en/engines/table-engines/integrations/rabbitmq.md b/docs/en/engines/table-engines/integrations/rabbitmq.md index dd14ee3b4b1..b0901ee6f6e 100644 --- a/docs/en/engines/table-engines/integrations/rabbitmq.md +++ b/docs/en/engines/table-engines/integrations/rabbitmq.md @@ -51,7 +51,7 @@ Optional parameters: - `rabbitmq_row_delimiter` – Delimiter character, which ends the message. - `rabbitmq_schema` – Parameter that must be used if the format requires a schema definition. For example, [Cap’n Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object. - `rabbitmq_num_consumers` – The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient. -- `rabbitmq_num_queues` – The number of queues per consumer. Default: `1`. Specify more queues if the capacity of one queue per consumer is insufficient. +- `rabbitmq_num_queues` – Total number of queues. Default: `1`. Increasing this number can significantly improve performance. - `rabbitmq_queue_base` - Specify a hint for queue names. Use cases of this setting are described below. - `rabbitmq_deadletter_exchange` - Specify name for a [dead letter exchange](https://www.rabbitmq.com/dlx.html). You can create another table with this exchange name and collect messages in cases when they are republished to dead letter exchange. By default dead letter exchange is not specified. - `rabbitmq_persistent` - If set to 1 (true), in insert query delivery mode will be set to 2 (marks messages as 'persistent'). Default: `0`. @@ -148,4 +148,5 @@ Example: - `_channel_id` - ChannelID, on which consumer, who received the message, was declared. - `_delivery_tag` - DeliveryTag of the received message. Scoped per channel. - `_redelivered` - `redelivered` flag of the message. -- `_message_id` - MessageID of the received message; non-empty if was set, when message was published. +- `_message_id` - messageID of the received message; non-empty if was set, when message was published. +- `_timestamp` - timestamp of the received message; non-empty if was set, when message was published. diff --git a/docs/en/sql-reference/functions/other-functions.md b/docs/en/sql-reference/functions/other-functions.md index 518479fb728..2cc80dcffc1 100644 --- a/docs/en/sql-reference/functions/other-functions.md +++ b/docs/en/sql-reference/functions/other-functions.md @@ -626,7 +626,12 @@ neighbor(column, offset[, default_value]) ``` The result of the function depends on the affected data blocks and the order of data in the block. -If you make a subquery with ORDER BY and call the function from outside the subquery, you can get the expected result. + +!!! warning "Warning" + It can reach the neighbor rows only inside the currently processed data block. + +The rows order used during the calculation of `neighbor` can differ from the order of rows returned to the user. +To prevent that you can make a subquery with ORDER BY and call the function from outside the subquery. **Parameters** @@ -731,8 +736,13 @@ Result: Calculates the difference between successive row values ​​in the data block. Returns 0 for the first row and the difference from the previous row for each subsequent row. +!!! warning "Warning" + It can reach the previos row only inside the currently processed data block. + The result of the function depends on the affected data blocks and the order of data in the block. -If you make a subquery with ORDER BY and call the function from outside the subquery, you can get the expected result. + +The rows order used during the calculation of `runningDifference` can differ from the order of rows returned to the user. +To prevent that you can make a subquery with ORDER BY and call the function from outside the subquery. Example: diff --git a/docs/en/sql-reference/statements/insert-into.md b/docs/en/sql-reference/statements/insert-into.md index b49314a1785..ae5e074fd15 100644 --- a/docs/en/sql-reference/statements/insert-into.md +++ b/docs/en/sql-reference/statements/insert-into.md @@ -13,12 +13,61 @@ Basic query format: INSERT INTO [db.]table [(c1, c2, c3)] VALUES (v11, v12, v13), (v21, v22, v23), ... ``` -The query can specify a list of columns to insert `[(c1, c2, c3)]`. In this case, the rest of the columns are filled with: +You can specify a list of columns to insert using the `(c1, c2, c3)` or `COLUMNS(c1,c2,c3)` syntax. + +Instead of listing all the required columns you can use the `(* EXCEPT(column_list))` syntax. + +For example, consider the table: + +``` sql +SHOW CREATE insert_select_testtable; +``` + +``` +┌─statement────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ +│ CREATE TABLE insert_select_testtable +( + `a` Int8, + `b` String, + `c` Int8 +) +ENGINE = MergeTree() +ORDER BY a +SETTINGS index_granularity = 8192 │ +└──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘ +``` + +``` sql +INSERT INTO insert_select_testtable (*) VALUES (1, 'a', 1) ; +``` + +If you want to insert data in all the columns, except 'b', you need to pass so many values how many columns you chose in parenthesis then: + +``` sql +INSERT INTO insert_select_testtable (* EXCEPT(b)) Values (2, 2); +``` + +``` sql +SELECT * FROM insert_select_testtable; +``` + +``` +┌─a─┬─b─┬─c─┐ +│ 2 │ │ 2 │ +└───┴───┴───┘ +┌─a─┬─b─┬─c─┐ +│ 1 │ a │ 1 │ +└───┴───┴───┘ +``` + +In this example, we see that the second inserted row has `a` and `c` columns filled by the passed values, and `b` filled with value by default. + +If a list of columns doesn't include all existing columns, the rest of the columns are filled with: - The values calculated from the `DEFAULT` expressions specified in the table definition. - Zeros and empty strings, if `DEFAULT` expressions are not defined. -If [strict_insert_defaults=1](../../operations/settings/settings.md), columns that do not have `DEFAULT` defined must be listed in the query. +If [strict\_insert\_defaults=1](../../operations/settings/settings.md), columns that do not have `DEFAULT` defined must be listed in the query. Data can be passed to the INSERT in any [format](../../interfaces/formats.md#formats) supported by ClickHouse. The format must be specified explicitly in the query: diff --git a/docs/en/sql-reference/statements/select/with.md b/docs/en/sql-reference/statements/select/with.md index a507d5224aa..6a0564a8ede 100644 --- a/docs/en/sql-reference/statements/select/with.md +++ b/docs/en/sql-reference/statements/select/with.md @@ -4,13 +4,17 @@ toc_title: WITH # WITH Clause {#with-clause} -This section provides support for Common Table Expressions ([CTE](https://en.wikipedia.org/wiki/Hierarchical_and_recursive_queries_in_SQL)), so the results of `WITH` clause can be used in the rest of `SELECT` query. +Clickhouse supports Common Table Expressions ([CTE](https://en.wikipedia.org/wiki/Hierarchical_and_recursive_queries_in_SQL)), that is provides to use results of `WITH` clause in the rest of `SELECT` query. Named subqueries can be included to the current and child query context in places where table objects are allowed. Recursion is prevented by hiding the current level CTEs from the WITH expression. -## Limitations {#limitations} +## Syntax -1. Recursive queries are not supported. -2. When subquery is used inside WITH section, it’s result should be scalar with exactly one row. -3. Expression’s results are not available in subqueries. +``` sql +WITH AS +``` +or +``` sql +WITH AS +``` ## Examples {#examples} @@ -22,10 +26,10 @@ SELECT * FROM hits WHERE EventDate = toDate(ts_upper_bound) AND - EventTime <= ts_upper_bound + EventTime <= ts_upper_bound; ``` -**Example 2:** Evicting sum(bytes) expression result from SELECT clause column list +**Example 2:** Evicting a sum(bytes) expression result from the SELECT clause column list ``` sql WITH sum(bytes) as s @@ -34,10 +38,10 @@ SELECT table FROM system.parts GROUP BY table -ORDER BY s +ORDER BY s; ``` -**Example 3:** Using results of scalar subquery +**Example 3:** Using results of a scalar subquery ``` sql /* this example would return TOP 10 of most huge tables */ @@ -53,27 +57,14 @@ SELECT FROM system.parts GROUP BY table ORDER BY table_disk_usage DESC -LIMIT 10 +LIMIT 10; ``` -**Example 4:** Re-using expression in subquery - -As a workaround for current limitation for expression usage in subqueries, you may duplicate it. +**Example 4:** Reusing expression in a subquery ``` sql -WITH ['hello'] AS hello -SELECT - hello, - * -FROM -( - WITH ['hello'] AS hello - SELECT hello -) +WITH test1 AS (SELECT i + 1, j + 1 FROM test1) +SELECT * FROM test1; ``` -``` text -┌─hello─────┬─hello─────┐ -│ ['hello'] │ ['hello'] │ -└───────────┴───────────┘ -``` +[Original article](https://clickhouse.tech/docs/en/sql-reference/statements/select/with/) diff --git a/docs/ru/engines/table-engines/integrations/rabbitmq.md b/docs/ru/engines/table-engines/integrations/rabbitmq.md index ef7b811e295..dedb5842d68 100644 --- a/docs/ru/engines/table-engines/integrations/rabbitmq.md +++ b/docs/ru/engines/table-engines/integrations/rabbitmq.md @@ -45,7 +45,7 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] - `rabbitmq_row_delimiter` – символ-разделитель, который завершает сообщение. - `rabbitmq_schema` – опциональный параметр, необходимый, если используется формат, требующий определения схемы. Например, [Cap’n Proto](https://capnproto.org/) требует путь к файлу со схемой и название корневого объекта `schema.capnp:Message`. - `rabbitmq_num_consumers` – количество потребителей на таблицу. По умолчанию: `1`. Укажите больше потребителей, если пропускная способность одного потребителя недостаточна. -- `rabbitmq_num_queues` – количество очередей на потребителя. По умолчанию: `1`. Укажите больше потребителей, если пропускная способность одной очереди на потребителя недостаточна. +- `rabbitmq_num_queues` – количество очередей. По умолчанию: `1`. Большее число очередей может сильно увеличить пропускную способность. - `rabbitmq_queue_base` - настройка для имен очередей. Сценарии использования описаны ниже. - `rabbitmq_persistent` - флаг, от которого зависит настройка 'durable' для сообщений при запросах `INSERT`. По умолчанию: `0`. - `rabbitmq_skip_broken_messages` – максимальное количество некорректных сообщений в блоке. Если `rabbitmq_skip_broken_messages = N`, то движок отбрасывает `N` сообщений, которые не получилось обработать. Одно сообщение в точности соответствует одной записи (строке). Значение по умолчанию – 0. @@ -140,4 +140,5 @@ Example: - `_channel_id` - идентификатор канала `ChannelID`, на котором было получено сообщение. - `_delivery_tag` - значение `DeliveryTag` полученного сообщения. Уникально в рамках одного канала. - `_redelivered` - флаг `redelivered`. (Не равно нулю, если есть возможность, что сообщение было получено более, чем одним каналом.) -- `_message_id` - значение `MessageID` полученного сообщения. Данное поле непусто, если указано в параметрах при отправке сообщения. +- `_message_id` - значение поля `messageID` полученного сообщения. Данное поле непусто, если указано в параметрах при отправке сообщения. +- `_timestamp` - значение поля `timestamp` полученного сообщения. Данное поле непусто, если указано в параметрах при отправке сообщения. diff --git a/docs/ru/sql-reference/statements/insert-into.md b/docs/ru/sql-reference/statements/insert-into.md index 8ea7c83bec8..0d38be81ac6 100644 --- a/docs/ru/sql-reference/statements/insert-into.md +++ b/docs/ru/sql-reference/statements/insert-into.md @@ -13,7 +13,55 @@ toc_title: INSERT INTO INSERT INTO [db.]table [(c1, c2, c3)] VALUES (v11, v12, v13), (v21, v22, v23), ... ``` -В запросе можно указать список столбцов для вставки `[(c1, c2, c3)]`. В этом случае, в остальные столбцы записываются: +Вы можете указать список столбцов для вставки, используя следующий синтаксис: `(c1, c2, c3)` или `COLUMNS(c1,c2,c3)`. + +Можно не перечислять все необходимые столбцы, а использовать синтаксис `(* EXCEPT(column_list))`. + +В качестве примера рассмотрим таблицу: + +``` sql +SHOW CREATE insert_select_testtable +``` + +``` +┌─statement────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ +│ CREATE TABLE insert_select_testtable +( + `a` Int8, + `b` String, + `c` Int8 +) +ENGINE = MergeTree() +ORDER BY a +SETTINGS index_granularity = 8192 │ +└──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘ +``` + +``` sql +INSERT INTO insert_select_testtable (*) VALUES (1, 'a', 1) +``` + +Если вы хотите вставить данные во все столбцы, кроме 'b', вам нужно передать столько значений, сколько столбцов вы указали в скобках: + +``` sql +INSERT INTO insert_select_testtable (* EXCEPT(b)) Values (2, 2) +``` + +``` sql +SELECT * FROM insert_select_testtable +``` + +``` +┌─a─┬─b─┬─c─┐ +│ 2 │ │ 2 │ +└───┴───┴───┘ +┌─a─┬─b─┬─c─┐ +│ 1 │ a │ 1 │ +└───┴───┴───┘ +``` + +В этом примере мы видим, что вторая строка содержит столбцы `a` и `c`, заполненные переданными значениями и `b`, заполненный значением по умолчанию. +Если список столбцов не включает все существующие столбцы, то все остальные столбцы заполняются следующим образом: - Значения, вычисляемые из `DEFAULT` выражений, указанных в определении таблицы. - Нули и пустые строки, если `DEFAULT` не определены. diff --git a/docs/ru/sql-reference/statements/select/with.md b/docs/ru/sql-reference/statements/select/with.md index 4feae232bd7..328b28c27ef 100644 --- a/docs/ru/sql-reference/statements/select/with.md +++ b/docs/ru/sql-reference/statements/select/with.md @@ -2,18 +2,21 @@ toc_title: WITH --- -# Секция WITH {#sektsiia-with} +# Секция WITH {#with-clause} -Данная секция представляет собой [Common Table Expressions](https://ru.wikipedia.org/wiki/Иерархические_и_рекурсивные_запросы_в_SQL), то есть позволяет использовать результаты выражений из секции `WITH` в остальной части `SELECT` запроса. +Clickhouse поддерживает [Общие табличные выражения](https://ru.wikipedia.org/wiki/Иерархические_и_рекурсивные_запросы_в_SQL), то есть позволяет использовать результаты выражений из секции `WITH` в остальной части `SELECT` запроса. Именованные подзапросы могут быть включены в текущий и дочерний контекст запроса в тех местах, где разрешены табличные объекты. Рекурсия предотвращается путем скрытия общего табличного выражения текущего уровня из выражения `WITH`. + +## Синтаксис + +``` sql +WITH AS +``` +или +``` sql +WITH AS +``` -### Ограничения - -1. Рекурсивные запросы не поддерживаются -2. Если в качестве выражения используется подзапрос, то результат должен содержать ровно одну строку -3. Результаты выражений нельзя переиспользовать во вложенных запросах -В дальнейшем, результаты выражений можно использовать в секции SELECT. - -### Примеры +## Примеры **Пример 1:** Использование константного выражения как «переменной» @@ -23,7 +26,7 @@ SELECT * FROM hits WHERE EventDate = toDate(ts_upper_bound) AND - EventTime <= ts_upper_bound + EventTime <= ts_upper_bound; ``` **Пример 2:** Выкидывание выражения sum(bytes) из списка колонок в SELECT @@ -35,7 +38,7 @@ SELECT table FROM system.parts GROUP BY table -ORDER BY s +ORDER BY s; ``` **Пример 3:** Использование результатов скалярного подзапроса @@ -54,27 +57,14 @@ SELECT FROM system.parts GROUP BY table ORDER BY table_disk_usage DESC -LIMIT 10 +LIMIT 10; ``` **Пример 4:** Переиспользование выражения -В настоящий момент, переиспользование выражения из секции WITH внутри подзапроса возможно только через дублирование. - ``` sql -WITH ['hello'] AS hello -SELECT - hello, - * -FROM -( - WITH ['hello'] AS hello - SELECT hello -) +WITH test1 AS (SELECT i + 1, j + 1 FROM test1) +SELECT * FROM test1; ``` -``` text -┌─hello─────┬─hello─────┐ -│ ['hello'] │ ['hello'] │ -└───────────┴───────────┘ -``` +[Оригинальная статья](https://clickhouse.tech/docs/en/sql-reference/statements/select/with/) diff --git a/programs/client/Client.cpp b/programs/client/Client.cpp index 808a505b5e4..ace509d6691 100644 --- a/programs/client/Client.cpp +++ b/programs/client/Client.cpp @@ -218,6 +218,8 @@ private: QueryFuzzer fuzzer; int query_fuzzer_runs = 0; + std::optional suggest; + /// We will format query_id in interactive mode in various ways, the default is just to print Query id: ... std::vector> query_id_formats; @@ -577,10 +579,11 @@ private: if (print_time_to_stderr) throw Exception("time option could be specified only in non-interactive mode", ErrorCodes::BAD_ARGUMENTS); + suggest.emplace(); if (server_revision >= Suggest::MIN_SERVER_REVISION && !config().getBool("disable_suggestion", false)) { /// Load suggestion data from the server. - Suggest::instance().load(connection_parameters, config().getInt("suggestion_limit")); + suggest->load(connection_parameters, config().getInt("suggestion_limit")); } /// Load command history if present. @@ -607,7 +610,7 @@ private: highlight_callback = highlight; ReplxxLineReader lr( - Suggest::instance(), + *suggest, history_file, config().has("multiline"), query_extenders, @@ -615,7 +618,7 @@ private: highlight_callback); #elif defined(USE_READLINE) && USE_READLINE - ReadlineLineReader lr(Suggest::instance(), history_file, config().has("multiline"), query_extenders, query_delimiters); + ReadlineLineReader lr(*suggest, history_file, config().has("multiline"), query_extenders, query_delimiters); #else LineReader lr(history_file, config().has("multiline"), query_extenders, query_delimiters); #endif diff --git a/programs/client/Suggest.h b/programs/client/Suggest.h index b13289ac322..03332088cbe 100644 --- a/programs/client/Suggest.h +++ b/programs/client/Suggest.h @@ -18,10 +18,11 @@ namespace ErrorCodes class Suggest : public LineReader::Suggest, boost::noncopyable { public: - static Suggest & instance() + Suggest(); + ~Suggest() { - static Suggest instance; - return instance; + if (loading_thread.joinable()) + loading_thread.join(); } void load(const ConnectionParameters & connection_parameters, size_t suggestion_limit); @@ -30,12 +31,6 @@ public: static constexpr int MIN_SERVER_REVISION = 54406; private: - Suggest(); - ~Suggest() - { - if (loading_thread.joinable()) - loading_thread.join(); - } void loadImpl(Connection & connection, const ConnectionTimeouts & timeouts, size_t suggestion_limit); void fetch(Connection & connection, const ConnectionTimeouts & timeouts, const std::string & query); diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index ef0b82666dd..3d6a2d6f99c 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -54,6 +54,7 @@ M(LocalThread, "Number of threads in local thread pools. Should be similar to GlobalThreadActive.") \ M(LocalThreadActive, "Number of threads in local thread pools running a task.") \ M(DistributedFilesToInsert, "Number of pending files to process for asynchronous insertion into Distributed tables. Number of files for every shard is summed.") \ + M(TablesToDropQueueSize, "Number of dropped tables, that are waiting for background data removal.") \ namespace CurrentMetrics { diff --git a/src/Compression/CompressedReadBufferBase.cpp b/src/Compression/CompressedReadBufferBase.cpp index be2f697e1b3..7a6b605d015 100644 --- a/src/Compression/CompressedReadBufferBase.cpp +++ b/src/Compression/CompressedReadBufferBase.cpp @@ -185,9 +185,9 @@ void CompressedReadBufferBase::decompress(char * to, size_t size_decompressed, s } else { - throw Exception("Data compressed with different methods, given method byte " + throw Exception("Data compressed with different methods, given method byte 0x" + getHexUIntLowercase(method) - + ", previous method byte " + + ", previous method byte 0x" + getHexUIntLowercase(codec->getMethodByte()), ErrorCodes::CANNOT_DECOMPRESS); } diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 4e95d1ab395..df6cf5fc85d 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -169,6 +169,8 @@ class IColumn; M(Milliseconds, read_backoff_min_interval_between_events_ms, 1000, "Settings to reduce the number of threads in case of slow reads. Do not pay attention to the event, if the previous one has passed less than a certain amount of time.", 0) \ M(UInt64, read_backoff_min_events, 2, "Settings to reduce the number of threads in case of slow reads. The number of events after which the number of threads will be reduced.", 0) \ \ + M(UInt64, read_backoff_min_concurrency, 1, "Settings to try keeping the minimal number of threads in case of slow reads.", 0) \ + \ M(Float, memory_tracker_fault_probability, 0., "For testing of `exception safety` - throw an exception every time you allocate memory with the specified probability.", 0) \ \ M(Bool, enable_http_compression, 0, "Compress the result if the client over HTTP said that it understands data compressed by gzip or deflate.", 0) \ diff --git a/src/Databases/DatabaseAtomic.cpp b/src/Databases/DatabaseAtomic.cpp index 3784ae0961b..4fcd9f12276 100644 --- a/src/Databases/DatabaseAtomic.cpp +++ b/src/Databases/DatabaseAtomic.cpp @@ -261,21 +261,29 @@ void DatabaseAtomic::commitCreateTable(const ASTCreateQuery & query, const Stora { DetachedTables not_in_use; auto table_data_path = getTableDataPath(query); + bool locked_uuid = false; try { std::unique_lock lock{mutex}; if (query.database != database_name) throw Exception(ErrorCodes::UNKNOWN_DATABASE, "Database was renamed to `{}`, cannot create table in `{}`", database_name, query.database); + /// Do some checks before renaming file from .tmp to .sql not_in_use = cleanupDetachedTables(); assertDetachedTableNotInUse(query.uuid); - renameNoReplace(table_metadata_tmp_path, table_metadata_path); + /// We will get en exception if some table with the same UUID exists (even if it's detached table or table from another database) + DatabaseCatalog::instance().addUUIDMapping(query.uuid); + locked_uuid = true; + /// It throws if `table_metadata_path` already exists (it's possible if table was detached) + renameNoReplace(table_metadata_tmp_path, table_metadata_path); /// Commit point (a sort of) attachTableUnlocked(query.table, table, lock); /// Should never throw table_name_to_path.emplace(query.table, table_data_path); } catch (...) { Poco::File(table_metadata_tmp_path).remove(); + if (locked_uuid) + DatabaseCatalog::instance().removeUUIDMappingFinally(query.uuid); throw; } tryCreateSymlink(query.table, table_data_path); diff --git a/src/Databases/DatabaseLazy.cpp b/src/Databases/DatabaseLazy.cpp index 81414902a33..0119f17f843 100644 --- a/src/Databases/DatabaseLazy.cpp +++ b/src/Databases/DatabaseLazy.cpp @@ -329,10 +329,4 @@ const StoragePtr & DatabaseLazyIterator::table() const return current_storage; } -void DatabaseLazyIterator::reset() -{ - if (current_storage) - current_storage.reset(); -} - } diff --git a/src/Databases/DatabaseLazy.h b/src/Databases/DatabaseLazy.h index 58e5e465eef..2d091297c91 100644 --- a/src/Databases/DatabaseLazy.h +++ b/src/Databases/DatabaseLazy.h @@ -22,6 +22,10 @@ public: String getEngineName() const override { return "Lazy"; } + bool canContainMergeTreeTables() const override { return false; } + + bool canContainDistributedTables() const override { return false; } + void loadStoredObjects( Context & context, bool has_force_restore_data_flag, bool force_attach) override; @@ -122,7 +126,6 @@ public: bool isValid() const override; const String & name() const override; const StoragePtr & table() const override; - void reset() override; private: const DatabaseLazy & database; diff --git a/src/Databases/DatabaseMemory.cpp b/src/Databases/DatabaseMemory.cpp index 5eacb846d52..357acb32371 100644 --- a/src/Databases/DatabaseMemory.cpp +++ b/src/Databases/DatabaseMemory.cpp @@ -53,6 +53,9 @@ void DatabaseMemory::dropTable( } table->is_dropped = true; create_queries.erase(table_name); + UUID table_uuid = table->getStorageID().uuid; + if (table_uuid != UUIDHelpers::Nil) + DatabaseCatalog::instance().removeUUIDMappingFinally(table_uuid); } ASTPtr DatabaseMemory::getCreateDatabaseQuery() const diff --git a/src/Databases/DatabaseWithDictionaries.cpp b/src/Databases/DatabaseWithDictionaries.cpp index ed85028d04d..6c5173c986f 100644 --- a/src/Databases/DatabaseWithDictionaries.cpp +++ b/src/Databases/DatabaseWithDictionaries.cpp @@ -223,6 +223,10 @@ void DatabaseWithDictionaries::removeDictionary(const Context &, const String & attachDictionary(dictionary_name, attach_info); throw; } + + UUID dict_uuid = attach_info.create_query->as()->uuid; + if (dict_uuid != UUIDHelpers::Nil) + DatabaseCatalog::instance().removeUUIDMappingFinally(dict_uuid); } DatabaseDictionariesIteratorPtr DatabaseWithDictionaries::getDictionariesIterator(const FilterByNameFunction & filter_by_dictionary_name) diff --git a/src/Databases/IDatabase.h b/src/Databases/IDatabase.h index 9b744259406..fadec5fe7a9 100644 --- a/src/Databases/IDatabase.h +++ b/src/Databases/IDatabase.h @@ -44,8 +44,6 @@ public: /// (a database with support for lazy tables loading /// - it maintains a list of tables but tables are loaded lazily). virtual const StoragePtr & table() const = 0; - /// Reset reference counter to the StoragePtr. - virtual void reset() = 0; virtual ~IDatabaseTablesIterator() = default; @@ -95,8 +93,6 @@ public: const String & name() const override { return it->first; } const StoragePtr & table() const override { return it->second; } - - void reset() override { it->second.reset(); } }; /// Copies list of dictionaries and iterates through such snapshot. @@ -151,6 +147,10 @@ public: /// Get name of database engine. virtual String getEngineName() const = 0; + virtual bool canContainMergeTreeTables() const { return true; } + + virtual bool canContainDistributedTables() const { return true; } + /// Load a set of existing tables. /// You can call only once, right after the object is created. virtual void loadStoredObjects(Context & /*context*/, bool /*has_force_restore_data_flag*/, bool /*force_attach*/ = false) {} diff --git a/src/Databases/MySQL/DatabaseConnectionMySQL.h b/src/Databases/MySQL/DatabaseConnectionMySQL.h index 7bf5e8c1d88..d8694e71db2 100644 --- a/src/Databases/MySQL/DatabaseConnectionMySQL.h +++ b/src/Databases/MySQL/DatabaseConnectionMySQL.h @@ -42,6 +42,12 @@ public: String getEngineName() const override { return "MySQL"; } + bool canContainMergeTreeTables() const override { return false; } + + bool canContainDistributedTables() const override { return false; } + + bool shouldBeEmptyOnDetach() const override { return false; } + bool empty() const override; DatabaseTablesIteratorPtr getTablesIterator(const Context & context, const FilterByNameFunction & filter_by_table_name) override; diff --git a/src/Databases/MySQL/DatabaseMaterializeTablesIterator.h b/src/Databases/MySQL/DatabaseMaterializeTablesIterator.h index 5a0ec242c2f..86a5cbf8206 100644 --- a/src/Databases/MySQL/DatabaseMaterializeTablesIterator.h +++ b/src/Databases/MySQL/DatabaseMaterializeTablesIterator.h @@ -28,11 +28,6 @@ public: return tables.emplace_back(storage); } - void reset() override - { - tables.clear(); - } - UUID uuid() const override { return nested_iterator->uuid(); } DatabaseMaterializeTablesIterator(DatabaseTablesIteratorPtr nested_iterator_, DatabaseMaterializeMySQL * database_) diff --git a/src/Functions/FunctionsRound.h b/src/Functions/FunctionsRound.h index 7a8304dbfa9..542463255d3 100644 --- a/src/Functions/FunctionsRound.h +++ b/src/Functions/FunctionsRound.h @@ -31,6 +31,7 @@ namespace ErrorCodes { extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int ILLEGAL_TYPE_OF_ARGUMENT; + extern const int ARGUMENT_OUT_OF_BOUND; extern const int ILLEGAL_COLUMN; extern const int BAD_ARGUMENTS; } @@ -84,6 +85,9 @@ enum class TieBreakingMode Bankers, // use banker's rounding }; +/// For N, no more than the number of digits in the largest type. +using Scale = Int16; + /** Rounding functions for integer values. */ @@ -416,7 +420,7 @@ private: using Container = typename ColumnDecimal::Container; public: - static NO_INLINE void apply(const Container & in, Container & out, Int64 scale_arg) + static NO_INLINE void apply(const Container & in, Container & out, Scale scale_arg) { scale_arg = in.getScale() - scale_arg; if (scale_arg > 0) @@ -458,7 +462,7 @@ class Dispatcher FloatRoundingImpl, IntegerRoundingImpl>; - static ColumnPtr apply(const ColumnVector * col, Int64 scale_arg) + static ColumnPtr apply(const ColumnVector * col, Scale scale_arg) { auto col_res = ColumnVector::create(); @@ -487,7 +491,7 @@ class Dispatcher return col_res; } - static ColumnPtr apply(const ColumnDecimal * col, Int64 scale_arg) + static ColumnPtr apply(const ColumnDecimal * col, Scale scale_arg) { const typename ColumnDecimal::Container & vec_src = col->getData(); @@ -501,7 +505,7 @@ class Dispatcher } public: - static ColumnPtr apply(const IColumn * column, Int64 scale_arg) + static ColumnPtr apply(const IColumn * column, Scale scale_arg) { if constexpr (IsNumber) return apply(checkAndGetColumn>(column), scale_arg); @@ -544,20 +548,25 @@ public: return arguments[0]; } - static Int64 getScaleArg(ColumnsWithTypeAndName & arguments) + static Scale getScaleArg(ColumnsWithTypeAndName & arguments) { if (arguments.size() == 2) { const IColumn & scale_column = *arguments[1].column; if (!isColumnConst(scale_column)) - throw Exception("Scale argument for rounding functions must be constant.", ErrorCodes::ILLEGAL_COLUMN); + throw Exception("Scale argument for rounding functions must be constant", ErrorCodes::ILLEGAL_COLUMN); Field scale_field = assert_cast(scale_column).getField(); if (scale_field.getType() != Field::Types::UInt64 && scale_field.getType() != Field::Types::Int64) - throw Exception("Scale argument for rounding functions must have integer type.", ErrorCodes::ILLEGAL_COLUMN); + throw Exception("Scale argument for rounding functions must have integer type", ErrorCodes::ILLEGAL_COLUMN); - return scale_field.get(); + Int64 scale64 = scale_field.get(); + if (scale64 > std::numeric_limits::max() + || scale64 < std::numeric_limits::min()) + throw Exception("Scale argument for rounding function is too large", ErrorCodes::ARGUMENT_OUT_OF_BOUND); + + return scale64; } return 0; } @@ -568,7 +577,7 @@ public: ColumnPtr executeImpl(ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override { const ColumnWithTypeAndName & column = arguments[0]; - Int64 scale_arg = getScaleArg(arguments); + Scale scale_arg = getScaleArg(arguments); ColumnPtr res; auto call = [&](const auto & types) -> bool diff --git a/src/IO/ReadHelpers.cpp b/src/IO/ReadHelpers.cpp index 900e9c7b535..bf41de3959a 100644 --- a/src/IO/ReadHelpers.cpp +++ b/src/IO/ReadHelpers.cpp @@ -480,7 +480,7 @@ void readEscapedString(String & s, ReadBuffer & buf) } template void readEscapedStringInto>(PaddedPODArray & s, ReadBuffer & buf); -template void readEscapedStringInto(NullSink & s, ReadBuffer & buf); +template void readEscapedStringInto(NullOutput & s, ReadBuffer & buf); /** If enable_sql_style_quoting == true, @@ -562,7 +562,7 @@ void readQuotedStringWithSQLStyle(String & s, ReadBuffer & buf) template void readQuotedStringInto(PaddedPODArray & s, ReadBuffer & buf); -template void readDoubleQuotedStringInto(NullSink & s, ReadBuffer & buf); +template void readDoubleQuotedStringInto(NullOutput & s, ReadBuffer & buf); void readDoubleQuotedString(String & s, ReadBuffer & buf) { @@ -742,7 +742,7 @@ void readJSONString(String & s, ReadBuffer & buf) template void readJSONStringInto, void>(PaddedPODArray & s, ReadBuffer & buf); template bool readJSONStringInto, bool>(PaddedPODArray & s, ReadBuffer & buf); -template void readJSONStringInto(NullSink & s, ReadBuffer & buf); +template void readJSONStringInto(NullOutput & s, ReadBuffer & buf); template void readJSONStringInto(String & s, ReadBuffer & buf); @@ -891,7 +891,7 @@ void skipJSONField(ReadBuffer & buf, const StringRef & name_of_field) throw Exception("Unexpected EOF for key '" + name_of_field.toString() + "'", ErrorCodes::INCORRECT_DATA); else if (*buf.position() == '"') /// skip double-quoted string { - NullSink sink; + NullOutput sink; readJSONStringInto(sink, buf); } else if (isNumericASCII(*buf.position()) || *buf.position() == '-' || *buf.position() == '+' || *buf.position() == '.') /// skip number @@ -955,7 +955,7 @@ void skipJSONField(ReadBuffer & buf, const StringRef & name_of_field) // field name if (*buf.position() == '"') { - NullSink sink; + NullOutput sink; readJSONStringInto(sink, buf); } else diff --git a/src/IO/ReadHelpers.h b/src/IO/ReadHelpers.h index d79328889f1..9ff1858c723 100644 --- a/src/IO/ReadHelpers.h +++ b/src/IO/ReadHelpers.h @@ -527,7 +527,7 @@ bool tryReadJSONStringInto(Vector & s, ReadBuffer & buf) } /// This could be used as template parameter for functions above, if you want to just skip data. -struct NullSink +struct NullOutput { void append(const char *, size_t) {} void push_back(char) {} diff --git a/src/Interpreters/AsynchronousMetrics.cpp b/src/Interpreters/AsynchronousMetrics.cpp index feb2036a0d6..e1a9a820ebb 100644 --- a/src/Interpreters/AsynchronousMetrics.cpp +++ b/src/Interpreters/AsynchronousMetrics.cpp @@ -233,8 +233,8 @@ void AsynchronousMetrics::update() for (const auto & db : databases) { - /// Lazy database can not contain MergeTree tables - if (db.second->getEngineName() == "Lazy") + /// Check if database can contain MergeTree tables + if (!db.second->canContainMergeTreeTables()) continue; for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next()) { diff --git a/src/Interpreters/DatabaseCatalog.cpp b/src/Interpreters/DatabaseCatalog.cpp index cfe1046aceb..906863f3f44 100644 --- a/src/Interpreters/DatabaseCatalog.cpp +++ b/src/Interpreters/DatabaseCatalog.cpp @@ -13,9 +13,16 @@ #include #include #include +#include #include + +namespace CurrentMetrics +{ + extern const Metric TablesToDropQueueSize; +} + namespace DB { @@ -155,7 +162,17 @@ void DatabaseCatalog::shutdownImpl() tables_marked_dropped.clear(); std::lock_guard lock(databases_mutex); - assert(std::find_if_not(uuid_map.begin(), uuid_map.end(), [](const auto & elem) { return elem.map.empty(); }) == uuid_map.end()); + assert(std::find_if(uuid_map.begin(), uuid_map.end(), [](const auto & elem) + { + /// Ensure that all UUID mappings are emtpy (i.e. all mappings contain nullptr instead of a pointer to storage) + const auto & not_empty_mapping = [] (const auto & mapping) + { + auto & table = mapping.second.second; + return table; + }; + auto it = std::find_if(elem.map.begin(), elem.map.end(), not_empty_mapping); + return it != elem.map.end(); + }) == uuid_map.end()); databases.clear(); db_uuid_map.clear(); view_dependencies.clear(); @@ -411,36 +428,76 @@ DatabasePtr DatabaseCatalog::getSystemDatabase() const return getDatabase(SYSTEM_DATABASE); } -void DatabaseCatalog::addUUIDMapping(const UUID & uuid, DatabasePtr database, StoragePtr table) +void DatabaseCatalog::addUUIDMapping(const UUID & uuid) +{ + addUUIDMapping(uuid, nullptr, nullptr); +} + +void DatabaseCatalog::addUUIDMapping(const UUID & uuid, const DatabasePtr & database, const StoragePtr & table) { assert(uuid != UUIDHelpers::Nil && getFirstLevelIdx(uuid) < uuid_map.size()); + assert((database && table) || (!database && !table)); UUIDToStorageMapPart & map_part = uuid_map[getFirstLevelIdx(uuid)]; std::lock_guard lock{map_part.mutex}; - auto [_, inserted] = map_part.map.try_emplace(uuid, std::move(database), std::move(table)); + auto [it, inserted] = map_part.map.try_emplace(uuid, database, table); + if (inserted) + return; + + auto & prev_database = it->second.first; + auto & prev_table = it->second.second; + assert((prev_database && prev_table) || (!prev_database && !prev_table)); + + if (!prev_table && table) + { + /// It's empty mapping, it was created to "lock" UUID and prevent collision. Just update it. + prev_database = database; + prev_table = table; + return; + } + + /// We are trying to replace existing mapping (prev_table != nullptr), it's logical error + if (table) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Mapping for table with UUID={} already exists", toString(uuid)); /// Normally this should never happen, but it's possible when the same UUIDs are explicitly specified in different CREATE queries, /// so it's not LOGICAL_ERROR - if (!inserted) - throw Exception("Mapping for table with UUID=" + toString(uuid) + " already exists", ErrorCodes::TABLE_ALREADY_EXISTS); + throw Exception(ErrorCodes::TABLE_ALREADY_EXISTS, "Mapping for table with UUID={} already exists. It happened due to UUID collision, " + "most likely because some not random UUIDs were manually specified in CREATE queries.", toString(uuid)); } void DatabaseCatalog::removeUUIDMapping(const UUID & uuid) +{ + assert(uuid != UUIDHelpers::Nil && getFirstLevelIdx(uuid) < uuid_map.size()); + UUIDToStorageMapPart & map_part = uuid_map[getFirstLevelIdx(uuid)]; + std::lock_guard lock{map_part.mutex}; + auto it = map_part.map.find(uuid); + if (it == map_part.map.end()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Mapping for table with UUID={} doesn't exist", toString(uuid)); + it->second = {}; +} + +void DatabaseCatalog::removeUUIDMappingFinally(const UUID & uuid) { assert(uuid != UUIDHelpers::Nil && getFirstLevelIdx(uuid) < uuid_map.size()); UUIDToStorageMapPart & map_part = uuid_map[getFirstLevelIdx(uuid)]; std::lock_guard lock{map_part.mutex}; if (!map_part.map.erase(uuid)) - throw Exception("Mapping for table with UUID=" + toString(uuid) + " doesn't exist", ErrorCodes::LOGICAL_ERROR); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Mapping for table with UUID={} doesn't exist", toString(uuid)); } void DatabaseCatalog::updateUUIDMapping(const UUID & uuid, DatabasePtr database, StoragePtr table) { assert(uuid != UUIDHelpers::Nil && getFirstLevelIdx(uuid) < uuid_map.size()); + assert(database && table); UUIDToStorageMapPart & map_part = uuid_map[getFirstLevelIdx(uuid)]; std::lock_guard lock{map_part.mutex}; auto it = map_part.map.find(uuid); if (it == map_part.map.end()) - throw Exception("Mapping for table with UUID=" + toString(uuid) + " doesn't exist", ErrorCodes::LOGICAL_ERROR); - it->second = std::make_pair(std::move(database), std::move(table)); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Mapping for table with UUID={} doesn't exist", toString(uuid)); + auto & prev_database = it->second.first; + auto & prev_table = it->second.second; + assert(prev_database && prev_table); + prev_database = std::move(database); + prev_table = std::move(table); } std::unique_ptr DatabaseCatalog::database_catalog; @@ -631,6 +688,8 @@ void DatabaseCatalog::loadMarkedAsDroppedTables() dropped_metadata.emplace(std::move(full_path), std::move(dropped_id)); } + LOG_INFO(log, "Found {} partially dropped tables. Will load them and retry removal.", dropped_metadata.size()); + ThreadPool pool; for (const auto & elem : dropped_metadata) { @@ -695,6 +754,7 @@ void DatabaseCatalog::enqueueDroppedTableCleanup(StorageID table_id, StoragePtr LOG_WARNING(log, "Cannot parse metadata of partially dropped table {} from {}. Will remove metadata file and data directory. Garbage may be left in /store directory and ZooKeeper.", table_id.getNameForLogs(), dropped_metadata_path); } + addUUIDMapping(table_id.uuid); drop_time = Poco::File(dropped_metadata_path).getLastModified().epochTime(); } @@ -704,6 +764,8 @@ void DatabaseCatalog::enqueueDroppedTableCleanup(StorageID table_id, StoragePtr else tables_marked_dropped.push_back({table_id, table, dropped_metadata_path, drop_time}); tables_marked_dropped_ids.insert(table_id.uuid); + CurrentMetrics::add(CurrentMetrics::TablesToDropQueueSize, 1); + /// If list of dropped tables was empty, start a drop task if (drop_task && tables_marked_dropped.size() == 1) (*drop_task)->schedule(); @@ -732,6 +794,10 @@ void DatabaseCatalog::dropTableDataTask() LOG_INFO(log, "Will try drop {}", table.table_id.getNameForLogs()); tables_marked_dropped.erase(it); } + else + { + LOG_TRACE(log, "Not found any suitable tables to drop, still have {} tables in drop queue", tables_marked_dropped.size()); + } need_reschedule = !tables_marked_dropped.empty(); } catch (...) @@ -770,7 +836,7 @@ void DatabaseCatalog::dropTableDataTask() (*drop_task)->scheduleAfter(reschedule_time_ms); } -void DatabaseCatalog::dropTableFinally(const TableMarkedAsDropped & table) const +void DatabaseCatalog::dropTableFinally(const TableMarkedAsDropped & table) { if (table.table) { @@ -789,6 +855,9 @@ void DatabaseCatalog::dropTableFinally(const TableMarkedAsDropped & table) const LOG_INFO(log, "Removing metadata {} of dropped table {}", table.metadata_path, table.table_id.getNameForLogs()); Poco::File(table.metadata_path).remove(); + + removeUUIDMappingFinally(table.table_id.uuid); + CurrentMetrics::sub(CurrentMetrics::TablesToDropQueueSize, 1); } String DatabaseCatalog::getPathForUUID(const UUID & uuid) @@ -826,6 +895,8 @@ void DatabaseCatalog::waitTableFinallyDropped(const UUID & uuid) { if (uuid == UUIDHelpers::Nil) return; + + LOG_DEBUG(log, "Waiting for table {} to be finally dropped", toString(uuid)); std::unique_lock lock{tables_marked_dropped_mutex}; wait_table_finally_dropped.wait(lock, [&]() { diff --git a/src/Interpreters/DatabaseCatalog.h b/src/Interpreters/DatabaseCatalog.h index c6f50117564..d26307a3bc3 100644 --- a/src/Interpreters/DatabaseCatalog.h +++ b/src/Interpreters/DatabaseCatalog.h @@ -165,12 +165,21 @@ public: void updateDependency(const StorageID & old_from, const StorageID & old_where,const StorageID & new_from, const StorageID & new_where); /// If table has UUID, addUUIDMapping(...) must be called when table attached to some database - /// and removeUUIDMapping(...) must be called when it detached. + /// removeUUIDMapping(...) must be called when it detached, + /// and removeUUIDMappingFinally(...) must be called when table is dropped and its data removed from disk. /// Such tables can be accessed by persistent UUID instead of database and table name. - void addUUIDMapping(const UUID & uuid, DatabasePtr database, StoragePtr table); + void addUUIDMapping(const UUID & uuid, const DatabasePtr & database, const StoragePtr & table); void removeUUIDMapping(const UUID & uuid); + void removeUUIDMappingFinally(const UUID & uuid); /// For moving table between databases void updateUUIDMapping(const UUID & uuid, DatabasePtr database, StoragePtr table); + /// This method adds empty mapping (with database and storage equal to nullptr). + /// It's required to "lock" some UUIDs and protect us from collision. + /// Collisions of random 122-bit integers are very unlikely to happen, + /// but we allow to explicitly specify UUID in CREATE query (in particular for testing). + /// If some UUID was already added and we are trying to add it again, + /// this method will throw an exception. + void addUUIDMapping(const UUID & uuid); static String getPathForUUID(const UUID & uuid); @@ -222,7 +231,7 @@ private: void loadMarkedAsDroppedTables(); void dropTableDataTask(); - void dropTableFinally(const TableMarkedAsDropped & table) const; + void dropTableFinally(const TableMarkedAsDropped & table); static constexpr size_t reschedule_time_ms = 100; diff --git a/src/Interpreters/InterpreterDropQuery.cpp b/src/Interpreters/InterpreterDropQuery.cpp index c4ebe596649..144e045ecee 100644 --- a/src/Interpreters/InterpreterDropQuery.cpp +++ b/src/Interpreters/InterpreterDropQuery.cpp @@ -52,13 +52,37 @@ BlockIO InterpreterDropQuery::execute() return executeToDictionary(drop.database, drop.table, drop.kind, drop.if_exists, drop.temporary, drop.no_ddl_lock); } else if (!drop.database.empty()) - return executeToDatabase(drop.database, drop.kind, drop.if_exists, drop.no_delay); + return executeToDatabase(drop); else throw Exception("Nothing to drop, both names are empty", ErrorCodes::LOGICAL_ERROR); } +void InterpreterDropQuery::waitForTableToBeActuallyDroppedOrDetached(const ASTDropQuery & query, const DatabasePtr & db, const UUID & uuid_to_wait) +{ + if (uuid_to_wait == UUIDHelpers::Nil) + return; + + if (query.kind == ASTDropQuery::Kind::Drop) + DatabaseCatalog::instance().waitTableFinallyDropped(uuid_to_wait); + else if (query.kind == ASTDropQuery::Kind::Detach) + { + if (auto * atomic = typeid_cast(db.get())) + atomic->waitDetachedTableNotInUse(uuid_to_wait); + } +} + BlockIO InterpreterDropQuery::executeToTable(const ASTDropQuery & query) +{ + DatabasePtr database; + UUID table_to_wait_on = UUIDHelpers::Nil; + auto res = executeToTableImpl(query, database, table_to_wait_on); + if (query.no_delay) + waitForTableToBeActuallyDroppedOrDetached(query, database, table_to_wait_on); + return res; +} + +BlockIO InterpreterDropQuery::executeToTableImpl(const ASTDropQuery & query, DatabasePtr & db, UUID & uuid_to_wait) { /// NOTE: it does not contain UUID, we will resolve it with locked DDLGuard auto table_id = StorageID(query); @@ -125,19 +149,9 @@ BlockIO InterpreterDropQuery::executeToTable(const ASTDropQuery & query) database->dropTable(context, table_id.table_name, query.no_delay); } - } - table.reset(); - ddl_guard = {}; - if (query.no_delay) - { - if (query.kind == ASTDropQuery::Kind::Drop) - DatabaseCatalog::instance().waitTableFinallyDropped(table_id.uuid); - else if (query.kind == ASTDropQuery::Kind::Detach) - { - if (auto * atomic = typeid_cast(database.get())) - atomic->waitDetachedTableNotInUse(table_id.uuid); - } + db = database; + uuid_to_wait = table_id.uuid; } return {}; @@ -223,19 +237,48 @@ BlockIO InterpreterDropQuery::executeToTemporaryTable(const String & table_name, } -BlockIO InterpreterDropQuery::executeToDatabase(const String & database_name, ASTDropQuery::Kind kind, bool if_exists, bool no_delay) +BlockIO InterpreterDropQuery::executeToDatabase(const ASTDropQuery & query) { + DatabasePtr database; + std::vector tables_to_wait; + BlockIO res; + try + { + res = executeToDatabaseImpl(query, database, tables_to_wait); + } + catch (...) + { + if (query.no_delay) + { + for (const auto & table_uuid : tables_to_wait) + waitForTableToBeActuallyDroppedOrDetached(query, database, table_uuid); + } + throw; + } + + if (query.no_delay) + { + for (const auto & table_uuid : tables_to_wait) + waitForTableToBeActuallyDroppedOrDetached(query, database, table_uuid); + } + return res; +} + +BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query, DatabasePtr & database, std::vector & uuids_to_wait) +{ + const auto & database_name = query.database; auto ddl_guard = DatabaseCatalog::instance().getDDLGuard(database_name, ""); - if (auto database = tryGetDatabase(database_name, if_exists)) + database = tryGetDatabase(database_name, query.if_exists); + if (database) { - if (kind == ASTDropQuery::Kind::Truncate) + if (query.kind == ASTDropQuery::Kind::Truncate) { throw Exception("Unable to truncate database", ErrorCodes::SYNTAX_ERROR); } - else if (kind == ASTDropQuery::Kind::Detach || kind == ASTDropQuery::Kind::Drop) + else if (query.kind == ASTDropQuery::Kind::Detach || query.kind == ASTDropQuery::Kind::Drop) { - bool drop = kind == ASTDropQuery::Kind::Drop; + bool drop = query.kind == ASTDropQuery::Kind::Drop; context.checkAccess(AccessType::DROP_DATABASE, database_name); if (database->shouldBeEmptyOnDetach()) @@ -246,21 +289,22 @@ BlockIO InterpreterDropQuery::executeToDatabase(const String & database_name, AS for (auto iterator = database->getDictionariesIterator(); iterator->isValid(); iterator->next()) { String current_dictionary = iterator->name(); - executeToDictionary(database_name, current_dictionary, kind, false, false, false); + executeToDictionary(database_name, current_dictionary, query.kind, false, false, false); } - ASTDropQuery query; - query.kind = kind; - query.if_exists = true; - query.database = database_name; - query.no_delay = no_delay; + ASTDropQuery query_for_table; + query_for_table.kind = query.kind; + query_for_table.if_exists = true; + query_for_table.database = database_name; + query_for_table.no_delay = query.no_delay; for (auto iterator = database->getTablesIterator(context); iterator->isValid(); iterator->next()) { - /// Reset reference counter of the StoragePtr to allow synchronous drop. - iterator->reset(); - query.table = iterator->name(); - executeToTable(query); + DatabasePtr db; + UUID table_to_wait = UUIDHelpers::Nil; + query_for_table.table = iterator->name(); + executeToTableImpl(query_for_table, db, table_to_wait); + uuids_to_wait.push_back(table_to_wait); } } diff --git a/src/Interpreters/InterpreterDropQuery.h b/src/Interpreters/InterpreterDropQuery.h index 0e1fd47b079..fe5362985de 100644 --- a/src/Interpreters/InterpreterDropQuery.h +++ b/src/Interpreters/InterpreterDropQuery.h @@ -29,9 +29,13 @@ private: ASTPtr query_ptr; Context & context; - BlockIO executeToDatabase(const String & database_name, ASTDropQuery::Kind kind, bool if_exists, bool no_delay); + BlockIO executeToDatabase(const ASTDropQuery & query); + BlockIO executeToDatabaseImpl(const ASTDropQuery & query, DatabasePtr & database, std::vector & uuids_to_wait); BlockIO executeToTable(const ASTDropQuery & query); + BlockIO executeToTableImpl(const ASTDropQuery & query, DatabasePtr & db, UUID & uuid_to_wait); + + static void waitForTableToBeActuallyDroppedOrDetached(const ASTDropQuery & query, const DatabasePtr & db, const UUID & uuid_to_wait); BlockIO executeToDictionary(const String & database_name, const String & dictionary_name, ASTDropQuery::Kind kind, bool if_exists, bool is_temporary, bool no_ddl_lock); diff --git a/src/Processors/Formats/Impl/TSKVRowInputFormat.cpp b/src/Processors/Formats/Impl/TSKVRowInputFormat.cpp index 93cd0a623c7..abb468741c5 100644 --- a/src/Processors/Formats/Impl/TSKVRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/TSKVRowInputFormat.cpp @@ -130,7 +130,7 @@ bool TSKVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ex throw Exception("Unknown field found while parsing TSKV format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA); /// If the key is not found, skip the value. - NullSink sink; + NullOutput sink; readEscapedStringInto(sink, in); } else diff --git a/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp b/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp index c7da0e7383e..529b70e4e09 100644 --- a/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp @@ -20,7 +20,7 @@ namespace ErrorCodes static void skipTSVRow(ReadBuffer & in, const size_t num_columns) { - NullSink null_sink; + NullOutput null_sink; for (size_t i = 0; i < num_columns; ++i) { @@ -196,7 +196,7 @@ bool TabSeparatedRowInputFormat::readRow(MutableColumns & columns, RowReadExtens } else { - NullSink null_sink; + NullOutput null_sink; readEscapedStringInto(null_sink, in); } @@ -353,7 +353,7 @@ void TabSeparatedRowInputFormat::tryDeserializeField(const DataTypePtr & type, I } else { - NullSink null_sink; + NullOutput null_sink; readEscapedStringInto(null_sink, in); } } diff --git a/src/Server/ReplicasStatusHandler.cpp b/src/Server/ReplicasStatusHandler.cpp index bc5436f00ee..1aa5c10afd7 100644 --- a/src/Server/ReplicasStatusHandler.cpp +++ b/src/Server/ReplicasStatusHandler.cpp @@ -43,8 +43,8 @@ void ReplicasStatusHandler::handleRequest(Poco::Net::HTTPServerRequest & request /// Iterate through all the replicated tables. for (const auto & db : databases) { - /// Lazy database can not contain replicated tables - if (db.second->getEngineName() == "Lazy") + /// Check if database can contain replicated tables + if (!db.second->canContainMergeTreeTables()) continue; for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next()) diff --git a/src/Storages/CMakeLists.txt b/src/Storages/CMakeLists.txt index ae47fba063a..deb1c9f6716 100644 --- a/src/Storages/CMakeLists.txt +++ b/src/Storages/CMakeLists.txt @@ -1,3 +1,4 @@ +add_subdirectory(MergeTree) add_subdirectory(System) if(ENABLE_TESTS) diff --git a/src/Storages/MergeTree/CMakeLists.txt b/src/Storages/MergeTree/CMakeLists.txt new file mode 100644 index 00000000000..36cab0b3590 --- /dev/null +++ b/src/Storages/MergeTree/CMakeLists.txt @@ -0,0 +1,3 @@ +if(ENABLE_TESTS) + add_subdirectory(tests) +endif() diff --git a/src/Storages/MergeTree/MergeList.cpp b/src/Storages/MergeTree/MergeList.cpp index 5b044622b36..ba6c2a3d462 100644 --- a/src/Storages/MergeTree/MergeList.cpp +++ b/src/Storages/MergeTree/MergeList.cpp @@ -68,7 +68,7 @@ MergeInfo MergeListElement::getInfo() const res.memory_usage = memory_tracker.get(); res.thread_id = thread_id; res.merge_type = toString(merge_type); - res.merge_algorithm = toString(merge_algorithm); + res.merge_algorithm = toString(merge_algorithm.load(std::memory_order_relaxed)); for (const auto & source_part_name : source_part_names) res.source_part_names.emplace_back(source_part_name); diff --git a/src/Storages/MergeTree/MergeList.h b/src/Storages/MergeTree/MergeList.h index 8d97b899aac..65e873ed102 100644 --- a/src/Storages/MergeTree/MergeList.h +++ b/src/Storages/MergeTree/MergeList.h @@ -91,7 +91,8 @@ struct MergeListElement : boost::noncopyable UInt64 thread_id; MergeType merge_type; - MergeAlgorithm merge_algorithm; + /// Detected after merge already started + std::atomic merge_algorithm; MergeListElement(const std::string & database, const std::string & table, const FutureMergedMutatedPart & future_part); diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 141dd8002b9..bc44df5c293 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -710,10 +710,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor size_t sum_input_rows_upper_bound = merge_entry->total_rows_count; size_t sum_compressed_bytes_upper_bound = merge_entry->total_size_bytes_compressed; - MergeAlgorithm merge_alg = chooseMergeAlgorithm(parts, sum_input_rows_upper_bound, gathering_columns, deduplicate, need_remove_expired_values); - merge_entry->merge_algorithm = merge_alg; + MergeAlgorithm chosen_merge_algorithm = chooseMergeAlgorithm(parts, sum_input_rows_upper_bound, gathering_columns, deduplicate, need_remove_expired_values); + merge_entry->merge_algorithm.store(chosen_merge_algorithm, std::memory_order_relaxed); - LOG_DEBUG(log, "Selected MergeAlgorithm: {}", toString(merge_alg)); + LOG_DEBUG(log, "Selected MergeAlgorithm: {}", toString(chosen_merge_algorithm)); /// Note: this is done before creating input streams, because otherwise data.data_parts_mutex /// (which is locked in data.getTotalActiveSizeInBytes()) @@ -728,7 +728,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor std::unique_ptr rows_sources_write_buf; std::optional column_sizes; - if (merge_alg == MergeAlgorithm::Vertical) + if (chosen_merge_algorithm == MergeAlgorithm::Vertical) { tmp_disk->createDirectories(new_part_tmp_path); rows_sources_file_path = new_part_tmp_path + "rows_sources"; @@ -818,7 +818,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor ProcessorPtr merged_transform; /// If merge is vertical we cannot calculate it - bool blocks_are_granules_size = (merge_alg == MergeAlgorithm::Vertical); + bool blocks_are_granules_size = (chosen_merge_algorithm == MergeAlgorithm::Vertical); UInt64 merge_block_size = data_settings->merge_max_block_size; switch (data.merging_params.mode) @@ -917,7 +917,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor { /// The same progress from merge_entry could be used for both algorithms (it should be more accurate) /// But now we are using inaccurate row-based estimation in Horizontal case for backward compatibility - Float64 progress = (merge_alg == MergeAlgorithm::Horizontal) + Float64 progress = (chosen_merge_algorithm == MergeAlgorithm::Horizontal) ? std::min(1., 1. * rows_written / sum_input_rows_upper_bound) : std::min(1., merge_entry->progress.load(std::memory_order_relaxed)); @@ -938,7 +938,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor MergeTreeData::DataPart::Checksums checksums_gathered_columns; /// Gather ordinary columns - if (merge_alg == MergeAlgorithm::Vertical) + if (chosen_merge_algorithm == MergeAlgorithm::Vertical) { size_t sum_input_rows_exact = merge_entry->rows_read; merge_entry->columns_written = merging_column_names.size(); @@ -1054,7 +1054,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor ReadableSize(merge_entry->bytes_read_uncompressed / elapsed_seconds)); } - if (merge_alg != MergeAlgorithm::Vertical) + if (chosen_merge_algorithm != MergeAlgorithm::Vertical) to.writeSuffixAndFinalizePart(new_data_part, need_sync); else to.writeSuffixAndFinalizePart(new_data_part, need_sync, &storage_columns, &checksums_gathered_columns); diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 44d0788901f..4239a2bedc0 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -898,7 +898,7 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams( num_streams, sum_marks, min_marks_for_concurrent_read, - parts, + std::move(parts), data, metadata_snapshot, query_info.prewhere_info, diff --git a/src/Storages/MergeTree/MergeTreeReadPool.cpp b/src/Storages/MergeTree/MergeTreeReadPool.cpp index d78f72d1dd0..e44ff500c88 100644 --- a/src/Storages/MergeTree/MergeTreeReadPool.cpp +++ b/src/Storages/MergeTree/MergeTreeReadPool.cpp @@ -21,7 +21,7 @@ MergeTreeReadPool::MergeTreeReadPool( const size_t threads_, const size_t sum_marks_, const size_t min_marks_for_concurrent_read_, - RangesInDataParts parts_, + RangesInDataParts && parts_, const MergeTreeData & data_, const StorageMetadataPtr & metadata_snapshot_, const PrewhereInfoPtr & prewhere_info_, @@ -38,11 +38,11 @@ MergeTreeReadPool::MergeTreeReadPool( , do_not_steal_tasks{do_not_steal_tasks_} , predict_block_size_bytes{preferred_block_size_bytes_ > 0} , prewhere_info{prewhere_info_} - , parts_ranges{parts_} + , parts_ranges{std::move(parts_)} { /// parts don't contain duplicate MergeTreeDataPart's. - const auto per_part_sum_marks = fillPerPartInfo(parts_, check_columns_); - fillPerThreadInfo(threads_, sum_marks_, per_part_sum_marks, parts_, min_marks_for_concurrent_read_); + const auto per_part_sum_marks = fillPerPartInfo(parts_ranges, check_columns_); + fillPerThreadInfo(threads_, sum_marks_, per_part_sum_marks, parts_ranges, min_marks_for_concurrent_read_); } @@ -62,7 +62,24 @@ MergeTreeReadTaskPtr MergeTreeReadPool::getTask(const size_t min_marks_to_read, return nullptr; /// Steal task if nothing to do and it's not prohibited - const auto thread_idx = tasks_remaining_for_this_thread ? thread : *std::begin(remaining_thread_tasks); + auto thread_idx = thread; + if (!tasks_remaining_for_this_thread) + { + auto it = remaining_thread_tasks.lower_bound(backoff_state.current_threads); + // Grab the entire tasks of a thread which is killed by backoff + if (it != remaining_thread_tasks.end()) + { + threads_tasks[thread] = std::move(threads_tasks[*it]); + remaining_thread_tasks.erase(it); + } + else // Try steal tasks from the next thread + { + it = remaining_thread_tasks.upper_bound(thread); + if (it == remaining_thread_tasks.end()) + it = remaining_thread_tasks.begin(); + thread_idx = *it; + } + } auto & thread_tasks = threads_tasks[thread_idx]; auto & thread_task = thread_tasks.parts_and_ranges.back(); @@ -163,7 +180,7 @@ void MergeTreeReadPool::profileFeedback(const ReadBufferFromFileBase::ProfileInf std::lock_guard lock(mutex); - if (backoff_state.current_threads <= 1) + if (backoff_state.current_threads <= backoff_settings.min_concurrency) return; size_t throughput = info.bytes_read * 1000000000 / info.nanoseconds; @@ -194,14 +211,14 @@ void MergeTreeReadPool::profileFeedback(const ReadBufferFromFileBase::ProfileInf std::vector MergeTreeReadPool::fillPerPartInfo( - RangesInDataParts & parts, const bool check_columns) + const RangesInDataParts & parts, const bool check_columns) { std::vector per_part_sum_marks; Block sample_block = metadata_snapshot->getSampleBlock(); for (const auto i : ext::range(0, parts.size())) { - auto & part = parts[i]; + const auto & part = parts[i]; /// Read marks for every data part. size_t sum_marks = 0; @@ -238,21 +255,53 @@ std::vector MergeTreeReadPool::fillPerPartInfo( void MergeTreeReadPool::fillPerThreadInfo( const size_t threads, const size_t sum_marks, std::vector per_part_sum_marks, - RangesInDataParts & parts, const size_t min_marks_for_concurrent_read) + const RangesInDataParts & parts, const size_t min_marks_for_concurrent_read) { threads_tasks.resize(threads); + if (parts.empty()) + return; + + struct PartInfo + { + RangesInDataPart part; + size_t sum_marks; + size_t part_idx; + }; + + using PartsInfo = std::vector; + std::queue parts_queue; + + { + /// Group parts by disk name. + /// We try minimize the number of threads concurrently read from the same disk. + /// It improves the performance for JBOD architecture. + std::map> parts_per_disk; + + for (size_t i = 0; i < parts.size(); ++i) + { + PartInfo part_info{parts[i], per_part_sum_marks[i], i}; + if (parts[i].data_part->isStoredOnDisk()) + parts_per_disk[parts[i].data_part->volume->getDisk()->getName()].push_back(std::move(part_info)); + else + parts_per_disk[""].push_back(std::move(part_info)); + } + + for (auto & info : parts_per_disk) + parts_queue.push(std::move(info.second)); + } const size_t min_marks_per_thread = (sum_marks - 1) / threads + 1; - for (size_t i = 0; i < threads && !parts.empty(); ++i) + for (size_t i = 0; i < threads && !parts_queue.empty(); ++i) { auto need_marks = min_marks_per_thread; - while (need_marks > 0 && !parts.empty()) + while (need_marks > 0 && !parts_queue.empty()) { - const auto part_idx = parts.size() - 1; - RangesInDataPart & part = parts.back(); - size_t & marks_in_part = per_part_sum_marks.back(); + auto & current_parts = parts_queue.front(); + RangesInDataPart & part = current_parts.back().part; + size_t & marks_in_part = current_parts.back().sum_marks; + const auto part_idx = current_parts.back().part_idx; /// Do not get too few rows from part. if (marks_in_part >= min_marks_for_concurrent_read && @@ -274,8 +323,9 @@ void MergeTreeReadPool::fillPerThreadInfo( marks_in_ranges = marks_in_part; need_marks -= marks_in_part; - parts.pop_back(); - per_part_sum_marks.pop_back(); + current_parts.pop_back(); + if (current_parts.empty()) + parts_queue.pop(); } else { @@ -304,6 +354,17 @@ void MergeTreeReadPool::fillPerThreadInfo( if (marks_in_ranges != 0) remaining_thread_tasks.insert(i); } + + /// Before processing next thread, change disk if possible. + /// Different threads will likely start reading from different disk, + /// which may improve read parallelism for JBOD. + /// It also may be helpful in case we have backoff threads. + /// Backoff threads will likely to reduce load for different disks, not the same one. + if (parts_queue.size() > 1) + { + parts_queue.push(std::move(parts_queue.front())); + parts_queue.pop(); + } } } diff --git a/src/Storages/MergeTree/MergeTreeReadPool.h b/src/Storages/MergeTree/MergeTreeReadPool.h index c0b04c6a228..aa6811661e6 100644 --- a/src/Storages/MergeTree/MergeTreeReadPool.h +++ b/src/Storages/MergeTree/MergeTreeReadPool.h @@ -36,13 +36,16 @@ public: size_t min_interval_between_events_ms = 1000; /// Number of events to do backoff - to lower number of threads in pool. size_t min_events = 2; + /// Try keeping the minimal number of threads in pool. + size_t min_concurrency = 1; /// Constants above is just an example. BackoffSettings(const Settings & settings) : min_read_latency_ms(settings.read_backoff_min_latency_ms.totalMilliseconds()), max_throughput(settings.read_backoff_max_throughput), min_interval_between_events_ms(settings.read_backoff_min_interval_between_events_ms.totalMilliseconds()), - min_events(settings.read_backoff_min_events) + min_events(settings.read_backoff_min_events), + min_concurrency(settings.read_backoff_min_concurrency) { } @@ -68,7 +71,7 @@ private: public: MergeTreeReadPool( const size_t threads_, const size_t sum_marks_, const size_t min_marks_for_concurrent_read_, - RangesInDataParts parts_, const MergeTreeData & data_, const StorageMetadataPtr & metadata_snapshot_, const PrewhereInfoPtr & prewhere_info_, + RangesInDataParts && parts_, const MergeTreeData & data_, const StorageMetadataPtr & metadata_snapshot_, const PrewhereInfoPtr & prewhere_info_, const bool check_columns_, const Names & column_names_, const BackoffSettings & backoff_settings_, size_t preferred_block_size_bytes_, const bool do_not_steal_tasks_ = false); @@ -88,11 +91,11 @@ public: private: std::vector fillPerPartInfo( - RangesInDataParts & parts, const bool check_columns); + const RangesInDataParts & parts, const bool check_columns); void fillPerThreadInfo( const size_t threads, const size_t sum_marks, std::vector per_part_sum_marks, - RangesInDataParts & parts, const size_t min_marks_for_concurrent_read); + const RangesInDataParts & parts, const size_t min_marks_for_concurrent_read); const MergeTreeData & data; StorageMetadataPtr metadata_snapshot; diff --git a/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp b/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp index 1d133f73a7b..4b5ae580257 100644 --- a/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp +++ b/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp @@ -64,7 +64,7 @@ void MergeTreeWriteAheadLog::addPart(const Block & block, const String & part_na min_block_number = std::min(min_block_number, part_info.min_block); max_block_number = std::max(max_block_number, part_info.max_block); - writeIntBinary(static_cast(0), *out); /// version + writeIntBinary(WAL_VERSION, *out); writeIntBinary(static_cast(ActionType::ADD_PART), *out); writeStringBinary(part_name, *out); block_out->write(block); @@ -80,7 +80,7 @@ void MergeTreeWriteAheadLog::dropPart(const String & part_name) { std::unique_lock lock(write_mutex); - writeIntBinary(static_cast(0), *out); + writeIntBinary(WAL_VERSION, *out); writeIntBinary(static_cast(ActionType::DROP_PART), *out); writeStringBinary(part_name, *out); out->next(); @@ -116,9 +116,13 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor try { + ActionMetadata metadata; + readIntBinary(version, *in); - if (version != 0) - throw Exception("Unknown WAL format version: " + toString(version), ErrorCodes::UNKNOWN_FORMAT_VERSION); + if (version > 0) + { + metadata.read(*in); + } readIntBinary(action_type, *in); readStringBinary(part_name, *in); @@ -233,4 +237,29 @@ MergeTreeWriteAheadLog::tryParseMinMaxBlockNumber(const String & filename) return std::make_pair(min_block, max_block); } +void MergeTreeWriteAheadLog::ActionMetadata::read(ReadBuffer & meta_in) +{ + readIntBinary(min_compatible_version, meta_in); + if (min_compatible_version > WAL_VERSION) + throw Exception("WAL metadata version " + toString(min_compatible_version) + + " is not compatible with this ClickHouse version", ErrorCodes::UNKNOWN_FORMAT_VERSION); + + size_t metadata_size; + readVarUInt(metadata_size, meta_in); + + UInt32 metadata_start = meta_in.offset(); + + /// For the future: read metadata here. + + + /// Skip extra fields if any. If min_compatible_version is lower than WAL_VERSION it means + /// that the fields are not critical for the correctness. + meta_in.ignore(metadata_size - (meta_in.offset() - metadata_start)); +} + +void MergeTreeWriteAheadLog::ActionMetadata::write(WriteBuffer & meta_out) const +{ + writeIntBinary(min_compatible_version, meta_out); + writeVarUInt(static_cast(0), meta_out); +} } diff --git a/src/Storages/MergeTree/MergeTreeWriteAheadLog.h b/src/Storages/MergeTree/MergeTreeWriteAheadLog.h index 77c7c7e11e7..f4cf8ddc315 100644 --- a/src/Storages/MergeTree/MergeTreeWriteAheadLog.h +++ b/src/Storages/MergeTree/MergeTreeWriteAheadLog.h @@ -28,6 +28,19 @@ public: DROP_PART = 1, }; + struct ActionMetadata + { + /// The minimum version of WAL reader that can understand metadata written by current ClickHouse version. + /// This field must be increased when making backwards incompatible changes. + /// + /// The same approach can be used recursively inside metadata. + UInt8 min_compatible_version = 0; + + void write(WriteBuffer & meta_out) const; + void read(ReadBuffer & meta_in); + }; + + constexpr static UInt8 WAL_VERSION = 0; constexpr static auto WAL_FILE_NAME = "wal"; constexpr static auto WAL_FILE_EXTENSION = ".bin"; constexpr static auto DEFAULT_WAL_FILE_NAME = "wal.bin"; diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 45e16e81208..880ad4dd0d3 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -57,6 +57,7 @@ bool ReplicatedMergeTreeQueue::isVirtualPart(const MergeTreeData::DataPartPtr & return virtual_parts.getContainingPart(data_part->info) != data_part->name; } + bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper) { auto queue_path = replica_path + "/queue"; @@ -68,6 +69,9 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper) { std::lock_guard pull_logs_lock(pull_logs_to_queue_mutex); + /// Reset batch size on initialization to recover from possible errors of too large batch size. + current_multi_batch_size = 1; + String log_pointer_str = zookeeper->get(replica_path + "/log_pointer"); log_pointer = log_pointer_str.empty() ? 0 : parse(log_pointer_str); @@ -486,20 +490,21 @@ int32_t ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper { std::sort(log_entries.begin(), log_entries.end()); - /// ZK contains a limit on the number or total size of operations in a multi-request. - /// If the limit is exceeded, the connection is simply closed. - /// The constant is selected with a margin. The default limit in ZK is 1 MB of data in total. - /// The average size of the node value in this case is less than 10 kilobytes. - static constexpr auto MAX_MULTI_OPS = 100; - - for (size_t entry_idx = 0, num_entries = log_entries.size(); entry_idx < num_entries; entry_idx += MAX_MULTI_OPS) + for (size_t entry_idx = 0, num_entries = log_entries.size(); entry_idx < num_entries;) { auto begin = log_entries.begin() + entry_idx; - auto end = entry_idx + MAX_MULTI_OPS >= log_entries.size() + auto end = entry_idx + current_multi_batch_size >= log_entries.size() ? log_entries.end() - : (begin + MAX_MULTI_OPS); + : (begin + current_multi_batch_size); auto last = end - 1; + /// Increment entry_idx before batch size increase (we copied at most current_multi_batch_size entries) + entry_idx += current_multi_batch_size; + + /// Increase the batch size exponentially, so it will saturate to MAX_MULTI_OPS. + if (current_multi_batch_size < MAX_MULTI_OPS) + current_multi_batch_size = std::min(MAX_MULTI_OPS, current_multi_batch_size * 2); + String last_entry = *last; if (!startsWith(last_entry, "log-")) throw Exception("Error in zookeeper data: unexpected node " + last_entry + " in " + zookeeper_path + "/log", diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 88a61f50225..93b79c8336c 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -259,6 +259,19 @@ private: ~CurrentlyExecuting(); }; + /// ZK contains a limit on the number or total size of operations in a multi-request. + /// If the limit is exceeded, the connection is simply closed. + /// The constant is selected with a margin. The default limit in ZK is 1 MB of data in total. + /// The average size of the node value in this case is less than 10 kilobytes. + static constexpr size_t MAX_MULTI_OPS = 100; + + /// Very large queue entries may appear occasionally. + /// We cannot process MAX_MULTI_OPS at once because it will fail. + /// But we have to process more than one entry at once because otherwise lagged replicas keep up slowly. + /// Let's start with one entry per transaction and icrease it exponentially towards MAX_MULTI_OPS. + /// It will allow to make some progress before failing and remain operational even in extreme cases. + size_t current_multi_batch_size = 1; + public: ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & storage_); ~ReplicatedMergeTreeQueue(); diff --git a/src/Storages/MergeTree/tests/CMakeLists.txt b/src/Storages/MergeTree/tests/CMakeLists.txt new file mode 100644 index 00000000000..777c75f191a --- /dev/null +++ b/src/Storages/MergeTree/tests/CMakeLists.txt @@ -0,0 +1,2 @@ +add_executable (wal_action_metadata wal_action_metadata.cpp) +target_link_libraries (wal_action_metadata PRIVATE dbms) diff --git a/src/Storages/MergeTree/tests/wal_action_metadata.cpp b/src/Storages/MergeTree/tests/wal_action_metadata.cpp new file mode 100644 index 00000000000..03c38c7a186 --- /dev/null +++ b/src/Storages/MergeTree/tests/wal_action_metadata.cpp @@ -0,0 +1,61 @@ +#include + +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int UNKNOWN_FORMAT_VERSION; +} +} + +int main(int, char **) +{ + try + { + { + std::cout << "test: dummy test" << std::endl; + + DB::MergeTreeWriteAheadLog::ActionMetadata metadata_out; + DB::MemoryWriteBuffer buf{}; + + metadata_out.write(buf); + buf.finalize(); + + metadata_out.read(*buf.tryGetReadBuffer()); + } + + { + std::cout << "test: min compatibility" << std::endl; + + DB::MergeTreeWriteAheadLog::ActionMetadata metadata_out; + metadata_out.min_compatible_version = DB::MergeTreeWriteAheadLog::WAL_VERSION + 1; + DB::MemoryWriteBuffer buf{}; + + metadata_out.write(buf); + buf.finalize(); + + try + { + metadata_out.read(*buf.tryGetReadBuffer()); + } + catch (const DB::Exception & e) + { + if (e.code() != DB::ErrorCodes::UNKNOWN_FORMAT_VERSION) + { + std::cerr << "Expected UNKNOWN_FORMAT_VERSION exception but got: " + << e.what() << ", " << e.displayText() << std::endl; + } + } + } + } + catch (const DB::Exception & e) + { + std::cerr << e.what() << ", " << e.displayText() << std::endl; + return 1; + } + + return 0; +} diff --git a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp index c74081d8802..830c6224b9e 100644 --- a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp +++ b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp @@ -27,7 +27,7 @@ RabbitMQBlockInputStream::RabbitMQBlockInputStream( , non_virtual_header(metadata_snapshot->getSampleBlockNonMaterialized()) , sample_block(non_virtual_header) , virtual_header(metadata_snapshot->getSampleBlockForColumns( - {"_exchange_name", "_channel_id", "_delivery_tag", "_redelivered", "_message_id"}, + {"_exchange_name", "_channel_id", "_delivery_tag", "_redelivered", "_message_id", "_timestamp"}, storage.getVirtuals(), storage.getStorageID())) { for (const auto & column : virtual_header) @@ -158,6 +158,7 @@ Block RabbitMQBlockInputStream::readImpl() auto delivery_tag = buffer->getDeliveryTag(); auto redelivered = buffer->getRedelivered(); auto message_id = buffer->getMessageID(); + auto timestamp = buffer->getTimestamp(); buffer->updateAckTracker({delivery_tag, channel_id}); @@ -168,6 +169,7 @@ Block RabbitMQBlockInputStream::readImpl() virtual_columns[2]->insert(delivery_tag); virtual_columns[3]->insert(redelivered); virtual_columns[4]->insert(message_id); + virtual_columns[5]->insert(timestamp); } total_rows = total_rows + new_rows; diff --git a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.h b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.h index f68b79275f6..5f2c2a62018 100644 --- a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.h +++ b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.h @@ -30,6 +30,7 @@ public: Block readImpl() override; void readSuffixImpl() override; + bool queueEmpty() const { return !buffer || buffer->queueEmpty(); } bool needChannelUpdate(); void updateChannel(); bool sendAck(); diff --git a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp index 685a55027ce..404ba27ccde 100644 --- a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp +++ b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.cpp @@ -14,47 +14,27 @@ namespace DB { -namespace ErrorCodes -{ - extern const int BAD_ARGUMENTS; - extern const int CANNOT_CREATE_RABBITMQ_QUEUE_BINDING; -} - ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer( ChannelPtr consumer_channel_, - ChannelPtr setup_channel_, HandlerPtr event_handler_, - const String & exchange_name_, + std::vector & queues_, size_t channel_id_base_, const String & channel_base_, - const String & queue_base_, Poco::Logger * log_, char row_delimiter_, - bool hash_exchange_, - size_t num_queues_, - const String & deadletter_exchange_, uint32_t queue_size_, const std::atomic & stopped_) : ReadBuffer(nullptr, 0) , consumer_channel(std::move(consumer_channel_)) - , setup_channel(setup_channel_) , event_handler(event_handler_) - , exchange_name(exchange_name_) + , queues(queues_) , channel_base(channel_base_) , channel_id_base(channel_id_base_) - , queue_base(queue_base_) - , hash_exchange(hash_exchange_) - , num_queues(num_queues_) - , deadletter_exchange(deadletter_exchange_) , log(log_) , row_delimiter(row_delimiter_) - , queue_size(queue_size_) , stopped(stopped_) - , received(queue_size * num_queues) + , received(queue_size_) { - for (size_t queue_id = 0; queue_id < num_queues; ++queue_id) - bindQueue(queue_id); - setupChannel(); } @@ -65,67 +45,6 @@ ReadBufferFromRabbitMQConsumer::~ReadBufferFromRabbitMQConsumer() } -void ReadBufferFromRabbitMQConsumer::bindQueue(size_t queue_id) -{ - std::atomic binding_created = false; - - auto success_callback = [&](const std::string & queue_name, int msgcount, int /* consumercount */) - { - queues.emplace_back(queue_name); - LOG_DEBUG(log, "Queue {} is declared", queue_name); - - if (msgcount) - LOG_INFO(log, "Queue {} is non-empty. Non-consumed messaged will also be delivered", queue_name); - - /* Here we bind either to sharding exchange (consistent-hash) or to bridge exchange (fanout). All bindings to routing keys are - * done between client's exchange and local bridge exchange. Binding key must be a string integer in case of hash exchange, for - * fanout exchange it can be arbitrary - */ - setup_channel->bindQueue(exchange_name, queue_name, std::to_string(channel_id_base)) - .onSuccess([&] { binding_created = true; }) - .onError([&](const char * message) - { - throw Exception( - ErrorCodes::CANNOT_CREATE_RABBITMQ_QUEUE_BINDING, - "Failed to create queue binding with queue {} for exchange {}. Reason: {}", std::string(message), - queue_name, exchange_name); - }); - }; - - auto error_callback([&](const char * message) - { - /* This error is most likely a result of an attempt to declare queue with different settings if it was declared before. So for a - * given queue name either deadletter_exchange parameter changed or queue_size changed, i.e. table was declared with different - * max_block_size parameter. Solution: client should specify a different queue_base parameter or manually delete previously - * declared queues via any of the various cli tools. - */ - throw Exception("Failed to declare queue. Probably queue settings are conflicting: max_block_size, deadletter_exchange. Attempt \ - specifying differently those settings or use a different queue_base or manually delete previously declared queues, \ - which were declared with the same names. ERROR reason: " - + std::string(message), ErrorCodes::BAD_ARGUMENTS); - }); - - AMQP::Table queue_settings; - - queue_settings["x-max-length"] = queue_size; - queue_settings["x-overflow"] = "reject-publish"; - - if (!deadletter_exchange.empty()) - queue_settings["x-dead-letter-exchange"] = deadletter_exchange; - - /* The first option not just simplifies queue_name, but also implements the possibility to be able to resume reading from one - * specific queue when its name is specified in queue_base setting - */ - const String queue_name = !hash_exchange ? queue_base : std::to_string(channel_id_base) + "_" + std::to_string(queue_id) + "_" + queue_base; - setup_channel->declareQueue(queue_name, AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback); - - while (!binding_created) - { - iterateEventLoop(); - } -} - - void ReadBufferFromRabbitMQConsumer::subscribe() { for (const auto & queue_name : queues) @@ -146,10 +65,9 @@ void ReadBufferFromRabbitMQConsumer::subscribe() if (row_delimiter != '\0') message_received += row_delimiter; - if (message.hasMessageID()) - received.push({message_received, message.messageID(), redelivered, AckTracker(delivery_tag, channel_id)}); - else - received.push({message_received, "", redelivered, AckTracker(delivery_tag, channel_id)}); + received.push({message_received, message.hasMessageID() ? message.messageID() : "", + message.hasTimestamp() ? message.timestamp() : 0, + redelivered, AckTracker(delivery_tag, channel_id)}); } }) .onError([&](const char * message) diff --git a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h index 109770c77e9..476db3f5e94 100644 --- a/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h +++ b/src/Storages/RabbitMQ/ReadBufferFromRabbitMQConsumer.h @@ -24,17 +24,12 @@ class ReadBufferFromRabbitMQConsumer : public ReadBuffer public: ReadBufferFromRabbitMQConsumer( ChannelPtr consumer_channel_, - ChannelPtr setup_channel_, HandlerPtr event_handler_, - const String & exchange_name_, + std::vector & queues_, size_t channel_id_base_, const String & channel_base_, - const String & queue_base_, Poco::Logger * log_, char row_delimiter_, - bool hash_exchange_, - size_t num_queues_, - const String & deadletter_exchange_, uint32_t queue_size_, const std::atomic & stopped_); @@ -53,6 +48,7 @@ public: { String message; String message_id; + uint64_t timestamp; bool redelivered; AckTracker track; }; @@ -75,34 +71,26 @@ public: auto getDeliveryTag() const { return current.track.delivery_tag; } auto getRedelivered() const { return current.redelivered; } auto getMessageID() const { return current.message_id; } + auto getTimestamp() const { return current.timestamp; } private: bool nextImpl() override; - void bindQueue(size_t queue_id); void subscribe(); void iterateEventLoop(); ChannelPtr consumer_channel; - ChannelPtr setup_channel; HandlerPtr event_handler; - - const String exchange_name; + std::vector queues; const String channel_base; const size_t channel_id_base; - const String queue_base; - const bool hash_exchange; - const size_t num_queues; - const String deadletter_exchange; Poco::Logger * log; char row_delimiter; bool allowed = true; - uint32_t queue_size; const std::atomic & stopped; String channel_id; std::atomic channel_error = true, wait_subscription = false; - std::vector queues; ConcurrentBoundedQueue received; MessageData current; size_t subscribed = 0; diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index d213251e366..9735c4d7fd3 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -38,8 +38,10 @@ namespace DB static const auto CONNECT_SLEEP = 200; static const auto RETRIES_MAX = 20; -static const auto HEARTBEAT_RESCHEDULE_MS = 3000; static const uint32_t QUEUE_SIZE = 100000; +static const auto MAX_FAILED_READ_ATTEMPTS = 10; +static const auto RESCHEDULE_MS = 500; +static const auto MAX_THREAD_WORK_DURATION_MS = 60000; namespace ErrorCodes { @@ -50,6 +52,7 @@ namespace ErrorCodes extern const int CANNOT_BIND_RABBITMQ_EXCHANGE; extern const int CANNOT_DECLARE_RABBITMQ_EXCHANGE; extern const int CANNOT_REMOVE_RABBITMQ_EXCHANGE; + extern const int CANNOT_CREATE_RABBITMQ_QUEUE_BINDING; } namespace ExchangeType @@ -122,9 +125,6 @@ StorageRabbitMQ::StorageRabbitMQ( streaming_task = global_context.getSchedulePool().createTask("RabbitMQStreamingTask", [this]{ streamingToViewsFunc(); }); streaming_task->deactivate(); - heartbeat_task = global_context.getSchedulePool().createTask("RabbitMQHeartbeatTask", [this]{ heartbeatFunc(); }); - heartbeat_task->deactivate(); - if (queue_base.empty()) { /* Make sure that local exchange name is unique for each table and is not the same as client's exchange name. It also needs to @@ -210,16 +210,6 @@ Context StorageRabbitMQ::addSettings(Context context) const } -void StorageRabbitMQ::heartbeatFunc() -{ - if (!stream_cancelled && event_handler->connectionRunning()) - { - connection->heartbeat(); - heartbeat_task->scheduleAfter(HEARTBEAT_RESCHEDULE_MS); - } -} - - void StorageRabbitMQ::loopingFunc() { if (event_handler->connectionRunning()) @@ -396,13 +386,73 @@ void StorageRabbitMQ::bindExchange() } +void StorageRabbitMQ::bindQueue(size_t queue_id) +{ + std::atomic binding_created = false; + + auto success_callback = [&](const std::string & queue_name, int msgcount, int /* consumercount */) + { + queues.emplace_back(queue_name); + LOG_DEBUG(log, "Queue {} is declared", queue_name); + + if (msgcount) + LOG_INFO(log, "Queue {} is non-empty. Non-consumed messaged will also be delivered", queue_name); + + /* Here we bind either to sharding exchange (consistent-hash) or to bridge exchange (fanout). All bindings to routing keys are + * done between client's exchange and local bridge exchange. Binding key must be a string integer in case of hash exchange, for + * fanout exchange it can be arbitrary + */ + setup_channel->bindQueue(consumer_exchange, queue_name, std::to_string(queue_id)) + .onSuccess([&] { binding_created = true; }) + .onError([&](const char * message) + { + throw Exception( + ErrorCodes::CANNOT_CREATE_RABBITMQ_QUEUE_BINDING, + "Failed to create queue binding for exchange {}. Reason: {}", exchange_name, std::string(message)); + }); + }; + + auto error_callback([&](const char * message) + { + /* This error is most likely a result of an attempt to declare queue with different settings if it was declared before. So for a + * given queue name either deadletter_exchange parameter changed or queue_size changed, i.e. table was declared with different + * max_block_size parameter. Solution: client should specify a different queue_base parameter or manually delete previously + * declared queues via any of the various cli tools. + */ + throw Exception("Failed to declare queue. Probably queue settings are conflicting: max_block_size, deadletter_exchange. Attempt \ + specifying differently those settings or use a different queue_base or manually delete previously declared queues, \ + which were declared with the same names. ERROR reason: " + + std::string(message), ErrorCodes::BAD_ARGUMENTS); + }); + + AMQP::Table queue_settings; + + queue_settings["x-max-length"] = queue_size; + + if (!deadletter_exchange.empty()) + queue_settings["x-dead-letter-exchange"] = deadletter_exchange; + else + queue_settings["x-overflow"] = "reject-publish"; + + /* The first option not just simplifies queue_name, but also implements the possibility to be able to resume reading from one + * specific queue when its name is specified in queue_base setting + */ + const String queue_name = !hash_exchange ? queue_base : std::to_string(queue_id) + "_" + queue_base; + setup_channel->declareQueue(queue_name, AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback); + + while (!binding_created) + { + event_handler->iterateLoop(); + } +} + + bool StorageRabbitMQ::restoreConnection(bool reconnecting) { size_t cnt_retries = 0; if (reconnecting) { - deactivateTask(heartbeat_task, false, false); connection->close(); /// Connection might be unusable, but not closed /* Connection is not closed immediately (firstly, all pending operations are completed, and then @@ -452,11 +502,11 @@ void StorageRabbitMQ::unbindExchange() */ std::call_once(flag, [&]() { - heartbeat_task->deactivate(); streaming_task->deactivate(); event_handler->updateLoopState(Loop::STOP); looping_task->deactivate(); + setup_channel = std::make_shared(connection.get()); setup_channel->removeExchange(bridge_exchange) .onSuccess([&]() { @@ -471,6 +521,8 @@ void StorageRabbitMQ::unbindExchange() { event_handler->iterateLoop(); } + + setup_channel->close(); }); } @@ -499,8 +551,6 @@ Pipe StorageRabbitMQ::read( deactivateTask(looping_task, false, true); update_channels = restoreConnection(true); - if (update_channels) - heartbeat_task->scheduleAfter(HEARTBEAT_RESCHEDULE_MS); } Pipes pipes; @@ -521,7 +571,6 @@ Pipe StorageRabbitMQ::read( if (event_handler->loopRunning()) { deactivateTask(looping_task, false, true); - deactivateTask(heartbeat_task, false, false); } rabbit_stream->updateChannel(); @@ -552,6 +601,13 @@ void StorageRabbitMQ::startup() initExchange(); bindExchange(); + for (size_t i = 1; i <= num_queues; ++i) + { + bindQueue(i); + } + + setup_channel->close(); + for (size_t i = 0; i < num_consumers; ++i) { try @@ -568,7 +624,6 @@ void StorageRabbitMQ::startup() event_handler->updateLoopState(Loop::RUN); streaming_task->activateAndSchedule(); - heartbeat_task->activateAndSchedule(); } @@ -579,7 +634,6 @@ void StorageRabbitMQ::shutdown() deactivateTask(streaming_task, true, false); deactivateTask(looping_task, true, true); - deactivateTask(heartbeat_task, true, false); connection->close(); @@ -635,9 +689,8 @@ ConsumerBufferPtr StorageRabbitMQ::createReadBuffer() ChannelPtr consumer_channel = std::make_shared(connection.get()); return std::make_shared( - consumer_channel, setup_channel, event_handler, consumer_exchange, ++consumer_id, - unique_strbase, queue_base, log, row_delimiter, hash_exchange, num_queues, - deadletter_exchange, queue_size, stream_cancelled); + consumer_channel, event_handler, queues, ++consumer_id, + unique_strbase, log, row_delimiter, queue_size, stream_cancelled); } @@ -683,11 +736,14 @@ void StorageRabbitMQ::streamingToViewsFunc() try { auto table_id = getStorageID(); + // Check if at least one direct dependency is attached size_t dependencies_count = DatabaseCatalog::instance().getDependencies(table_id).size(); if (dependencies_count) { + auto start_time = std::chrono::steady_clock::now(); + // Keep streaming as long as there are attached views and streaming is not cancelled while (!stream_cancelled && num_created_consumers > 0) { @@ -696,8 +752,17 @@ void StorageRabbitMQ::streamingToViewsFunc() LOG_DEBUG(log, "Started streaming to {} attached views", dependencies_count); - if (!streamToViews()) + if (streamToViews()) break; + + auto end_time = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast(end_time - start_time); + if (duration.count() > MAX_THREAD_WORK_DURATION_MS) + { + event_handler->updateLoopState(Loop::STOP); + LOG_TRACE(log, "Reschedule streaming. Thread work duration limit exceeded."); + break; + } } } } @@ -708,7 +773,7 @@ void StorageRabbitMQ::streamingToViewsFunc() /// Wait for attached views if (!stream_cancelled) - streaming_task->schedule(); + streaming_task->scheduleAfter(RESCHEDULE_MS); } @@ -731,13 +796,6 @@ bool StorageRabbitMQ::streamToViews() auto column_names = block_io.out->getHeader().getNames(); auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID()); - /* event_handler->connectionRunning() does not guarantee that connection is not closed in case loop was not running before, but - * need to anyway start the loop to activate error callbacks and update connection state, because even checking with - * connection->usable() will not give correct answer before callbacks are activated. - */ - if (!event_handler->loopRunning() && event_handler->connectionRunning()) - looping_task->activateAndSchedule(); - auto block_size = getMaxBlockSize(); // Create a stream for each consumer and join them in a union stream @@ -770,34 +828,45 @@ bool StorageRabbitMQ::streamToViews() in = streams[0]; std::atomic stub = {false}; + + if (!event_handler->loopRunning()) + { + event_handler->updateLoopState(Loop::RUN); + looping_task->activateAndSchedule(); + } + copyData(*in, *block_io.out, &stub); - /* Need to stop loop even if connection is ok, because sending ack() with loop running in another thread will lead to a lot of data - * races inside the library, but only in case any error occurs or connection is lost while ack is being sent + /* Note: sending ack() with loop running in another thread will lead to a lot of data races inside the library, but only in case + * error occurs or connection is lost while ack is being sent */ - if (event_handler->loopRunning()) - deactivateTask(looping_task, false, true); + deactivateTask(looping_task, false, true); + size_t queue_empty = 0; if (!event_handler->connectionRunning()) { - if (!stream_cancelled && restoreConnection(true)) + if (stream_cancelled) + return true; + + if (restoreConnection(true)) { for (auto & stream : streams) stream->as()->updateChannel(); } else { - /// Reschedule if unable to connect to rabbitmq or quit if cancelled - return false; + LOG_TRACE(log, "Reschedule streaming. Unable to restore connection."); + return true; } } else { - deactivateTask(heartbeat_task, false, false); - /// Commit for (auto & stream : streams) { + if (stream->as()->queueEmpty()) + ++queue_empty; + /* false is returned by the sendAck function in only two cases: * 1) if connection failed. In this case all channels will be closed and will be unable to send ack. Also ack is made based on * delivery tags, which are unique to channels, so if channels fail, those delivery tags will become invalid and there is @@ -828,19 +897,25 @@ bool StorageRabbitMQ::streamToViews() break; } } + + event_handler->iterateLoop(); } } - event_handler->updateLoopState(Loop::RUN); - looping_task->activateAndSchedule(); - heartbeat_task->scheduleAfter(HEARTBEAT_RESCHEDULE_MS); /// It is also deactivated in restoreConnection(), so reschedule anyway + if ((queue_empty == num_created_consumers) && (++read_attempts == MAX_FAILED_READ_ATTEMPTS)) + { + connection->heartbeat(); + read_attempts = 0; + LOG_TRACE(log, "Reschedule streaming. Queues are empty."); + return true; + } + else + { + event_handler->updateLoopState(Loop::RUN); + looping_task->activateAndSchedule(); + } - // Check whether the limits were applied during query execution - bool limits_applied = false; - const BlockStreamProfileInfo & info = in->getProfileInfo(); - limits_applied = info.hasAppliedLimit(); - - return limits_applied; + return false; } @@ -907,7 +982,8 @@ NamesAndTypesList StorageRabbitMQ::getVirtuals() const {"_channel_id", std::make_shared()}, {"_delivery_tag", std::make_shared()}, {"_redelivered", std::make_shared()}, - {"_message_id", std::make_shared()} + {"_message_id", std::make_shared()}, + {"_timestamp", std::make_shared()} }; } diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.h b/src/Storages/RabbitMQ/StorageRabbitMQ.h index 8d9a20f9e34..d7891aed0a7 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.h +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.h @@ -114,14 +114,15 @@ private: std::atomic wait_confirm = true; /// needed to break waiting for confirmations for producer std::atomic exchange_removed = false; ChannelPtr setup_channel; + std::vector queues; std::once_flag flag; /// remove exchange only once std::mutex task_mutex; BackgroundSchedulePool::TaskHolder streaming_task; - BackgroundSchedulePool::TaskHolder heartbeat_task; BackgroundSchedulePool::TaskHolder looping_task; std::atomic stream_cancelled{false}; + size_t read_attempts = 0; ConsumerBufferPtr createReadBuffer(); @@ -140,6 +141,7 @@ private: void initExchange(); void bindExchange(); + void bindQueue(size_t queue_id); bool restoreConnection(bool reconnecting); bool streamToViews(); diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 9046940b3f7..0c1561fca9b 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -80,7 +80,6 @@ namespace ErrorCodes extern const int TYPE_MISMATCH; extern const int TOO_MANY_ROWS; extern const int UNABLE_TO_SKIP_UNUSED_SHARDS; - extern const int LOGICAL_ERROR; } namespace ActionLocks @@ -600,15 +599,22 @@ void StorageDistributed::shutdown() monitors_blocker.cancelForever(); std::lock_guard lock(cluster_nodes_mutex); + + LOG_DEBUG(log, "Joining background threads for async INSERT"); cluster_nodes_data.clear(); + LOG_DEBUG(log, "Background threads for async INSERT joined"); } void StorageDistributed::drop() { - // shutdown() should be already called - // and by the same reason we cannot use truncate() here, since - // cluster_nodes_data already cleaned - if (!cluster_nodes_data.empty()) - throw Exception("drop called before shutdown", ErrorCodes::LOGICAL_ERROR); + // Some INSERT in-between shutdown() and drop() can call + // requireDirectoryMonitor() again, so call shutdown() to clear them, but + // when the drop() (this function) executed none of INSERT is allowed in + // parallel. + // + // And second time shutdown() should be fast, since none of + // DirectoryMonitor should do anything, because ActionBlocker is canceled + // (in shutdown()). + shutdown(); // Distributed table w/o sharding_key does not allows INSERTs if (relative_data_path.empty()) diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index edc8a9df911..4d65fe61dc1 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -3481,8 +3481,10 @@ void StorageReplicatedMergeTree::startup() { queue.initialize(getDataParts()); - data_parts_exchange_endpoint = std::make_shared(*this); - global_context.getInterserverIOHandler().addEndpoint(data_parts_exchange_endpoint->getId(replica_path), data_parts_exchange_endpoint); + InterserverIOEndpointPtr data_parts_exchange_ptr = std::make_shared(*this); + [[maybe_unused]] auto prev_ptr = std::atomic_exchange(&data_parts_exchange_endpoint, data_parts_exchange_ptr); + assert(prev_ptr == nullptr); + global_context.getInterserverIOHandler().addEndpoint(data_parts_exchange_ptr->getId(replica_path), data_parts_exchange_ptr); /// In this thread replica will be activated. restarting_thread.start(); @@ -3549,15 +3551,15 @@ void StorageReplicatedMergeTree::shutdown() global_context.getBackgroundMovePool().removeTask(move_parts_task_handle); move_parts_task_handle.reset(); - if (data_parts_exchange_endpoint) + auto data_parts_exchange_ptr = std::atomic_exchange(&data_parts_exchange_endpoint, InterserverIOEndpointPtr{}); + if (data_parts_exchange_ptr) { - global_context.getInterserverIOHandler().removeEndpointIfExists(data_parts_exchange_endpoint->getId(replica_path)); + global_context.getInterserverIOHandler().removeEndpointIfExists(data_parts_exchange_ptr->getId(replica_path)); /// Ask all parts exchange handlers to finish asap. New ones will fail to start - data_parts_exchange_endpoint->blocker.cancelForever(); + data_parts_exchange_ptr->blocker.cancelForever(); /// Wait for all of them - std::unique_lock lock(data_parts_exchange_endpoint->rwlock); + std::unique_lock lock(data_parts_exchange_ptr->rwlock); } - data_parts_exchange_endpoint.reset(); /// We clear all old parts after stopping all background operations. It's /// important, because background operations can produce temporary parts @@ -5900,7 +5902,10 @@ ActionLock StorageReplicatedMergeTree::getActionLock(StorageActionBlockType acti return fetcher.blocker.cancel(); if (action_type == ActionLocks::PartsSend) - return data_parts_exchange_endpoint ? data_parts_exchange_endpoint->blocker.cancel() : ActionLock(); + { + auto data_parts_exchange_ptr = std::atomic_load(&data_parts_exchange_endpoint); + return data_parts_exchange_ptr ? data_parts_exchange_ptr->blocker.cancel() : ActionLock(); + } if (action_type == ActionLocks::ReplicationQueue) return queue.actions_blocker.cancel(); diff --git a/src/Storages/System/StorageSystemDistributionQueue.cpp b/src/Storages/System/StorageSystemDistributionQueue.cpp index 2459be0ba71..39ccea64e26 100644 --- a/src/Storages/System/StorageSystemDistributionQueue.cpp +++ b/src/Storages/System/StorageSystemDistributionQueue.cpp @@ -38,8 +38,8 @@ void StorageSystemDistributionQueue::fillData(MutableColumns & res_columns, cons std::map> tables; for (const auto & db : DatabaseCatalog::instance().getDatabases()) { - /// Lazy database can not contain distributed tables - if (db.second->getEngineName() == "Lazy") + /// Check if database can contain distributed tables + if (!db.second->canContainDistributedTables()) continue; const bool check_access_for_tables = check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, db.first); diff --git a/src/Storages/System/StorageSystemGraphite.cpp b/src/Storages/System/StorageSystemGraphite.cpp index ffa789a4751..93bc16785b2 100644 --- a/src/Storages/System/StorageSystemGraphite.cpp +++ b/src/Storages/System/StorageSystemGraphite.cpp @@ -32,8 +32,8 @@ static StorageSystemGraphite::Configs getConfigs(const Context & context) for (const auto & db : databases) { - /// Lazy database can not contain MergeTree tables - if (db.second->getEngineName() == "Lazy") + /// Check if database can contain MergeTree tables + if (!db.second->canContainMergeTreeTables()) continue; for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next()) diff --git a/src/Storages/System/StorageSystemMutations.cpp b/src/Storages/System/StorageSystemMutations.cpp index 32f672b8401..f66f57ef5d1 100644 --- a/src/Storages/System/StorageSystemMutations.cpp +++ b/src/Storages/System/StorageSystemMutations.cpp @@ -44,8 +44,8 @@ void StorageSystemMutations::fillData(MutableColumns & res_columns, const Contex std::map> merge_tree_tables; for (const auto & db : DatabaseCatalog::instance().getDatabases()) { - /// Lazy database can not contain MergeTree tables - if (db.second->getEngineName() == "Lazy") + /// Check if database can contain MergeTree tables + if (!db.second->canContainMergeTreeTables()) continue; const bool check_access_for_tables = check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, db.first); diff --git a/src/Storages/System/StorageSystemPartsBase.cpp b/src/Storages/System/StorageSystemPartsBase.cpp index faa2ec0e1c3..d10346af89f 100644 --- a/src/Storages/System/StorageSystemPartsBase.cpp +++ b/src/Storages/System/StorageSystemPartsBase.cpp @@ -83,9 +83,9 @@ StoragesInfoStream::StoragesInfoStream(const SelectQueryInfo & query_info, const MutableColumnPtr database_column_mut = ColumnString::create(); for (const auto & database : databases) { - /// Lazy database can not contain MergeTree tables - /// and it's unnecessary to load all tables of Lazy database just to filter all of them. - if (database.second->getEngineName() != "Lazy") + /// Checck if database can contain MergeTree tables, + /// if not it's unnecessary to load all tables of database just to filter all of them. + if (database.second->canContainMergeTreeTables()) database_column_mut->insert(database.first); } block_to_filter.insert(ColumnWithTypeAndName( diff --git a/src/Storages/System/StorageSystemReplicas.cpp b/src/Storages/System/StorageSystemReplicas.cpp index 7ab6e939815..ab54d760873 100644 --- a/src/Storages/System/StorageSystemReplicas.cpp +++ b/src/Storages/System/StorageSystemReplicas.cpp @@ -74,8 +74,8 @@ Pipe StorageSystemReplicas::read( std::map> replicated_tables; for (const auto & db : DatabaseCatalog::instance().getDatabases()) { - /// Lazy database can not contain replicated tables - if (db.second->getEngineName() == "Lazy") + /// Check if database can contain replicated tables + if (!db.second->canContainMergeTreeTables()) continue; const bool check_access_for_tables = check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, db.first); for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next()) diff --git a/src/Storages/System/StorageSystemReplicationQueue.cpp b/src/Storages/System/StorageSystemReplicationQueue.cpp index f04d8759507..9cd5e8b8ff3 100644 --- a/src/Storages/System/StorageSystemReplicationQueue.cpp +++ b/src/Storages/System/StorageSystemReplicationQueue.cpp @@ -55,8 +55,8 @@ void StorageSystemReplicationQueue::fillData(MutableColumns & res_columns, const std::map> replicated_tables; for (const auto & db : DatabaseCatalog::instance().getDatabases()) { - /// Lazy database can not contain replicated tables - if (db.second->getEngineName() == "Lazy") + /// Check if database can contain replicated tables + if (!db.second->canContainMergeTreeTables()) continue; const bool check_access_for_tables = check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, db.first); diff --git a/tests/integration/test_concurrent_ttl_merges/configs/log_conf.xml b/tests/integration/test_concurrent_ttl_merges/configs/log_conf.xml new file mode 100644 index 00000000000..318a6bca95d --- /dev/null +++ b/tests/integration/test_concurrent_ttl_merges/configs/log_conf.xml @@ -0,0 +1,12 @@ + + 3 + + trace + /var/log/clickhouse-server/log.log + /var/log/clickhouse-server/log.err.log + 1000M + 10 + /var/log/clickhouse-server/stderr.log + /var/log/clickhouse-server/stdout.log + + diff --git a/tests/integration/test_concurrent_ttl_merges/test.py b/tests/integration/test_concurrent_ttl_merges/test.py index f067e65f58a..65bc3828b38 100644 --- a/tests/integration/test_concurrent_ttl_merges/test.py +++ b/tests/integration/test_concurrent_ttl_merges/test.py @@ -5,8 +5,8 @@ from helpers.cluster import ClickHouseCluster from helpers.test_tools import assert_eq_with_retry cluster = ClickHouseCluster(__file__) -node1 = cluster.add_instance('node1', main_configs=['configs/fast_background_pool.xml'], with_zookeeper=True) -node2 = cluster.add_instance('node2', main_configs=['configs/fast_background_pool.xml'], with_zookeeper=True) +node1 = cluster.add_instance('node1', main_configs=['configs/fast_background_pool.xml', 'configs/log_conf.xml'], with_zookeeper=True) +node2 = cluster.add_instance('node2', main_configs=['configs/fast_background_pool.xml', 'configs/log_conf.xml'], with_zookeeper=True) @pytest.fixture(scope="module") diff --git a/tests/integration/test_disabled_mysql_server/__init__.py b/tests/integration/test_disabled_mysql_server/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_disabled_mysql_server/configs/remote_servers.xml b/tests/integration/test_disabled_mysql_server/configs/remote_servers.xml new file mode 100644 index 00000000000..de8e5865f12 --- /dev/null +++ b/tests/integration/test_disabled_mysql_server/configs/remote_servers.xml @@ -0,0 +1,12 @@ + + + + + + node1 + 9000 + + + + + diff --git a/tests/integration/test_disabled_mysql_server/test.py b/tests/integration/test_disabled_mysql_server/test.py new file mode 100644 index 00000000000..a2cbcb17534 --- /dev/null +++ b/tests/integration/test_disabled_mysql_server/test.py @@ -0,0 +1,60 @@ +import time +import contextlib +import pymysql.cursors +import pytest +import os +import subprocess + +from helpers.client import QueryRuntimeException +from helpers.cluster import ClickHouseCluster, get_docker_compose_path +from helpers.network import PartitionManager + +cluster = ClickHouseCluster(__file__) +clickhouse_node = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_mysql=True) + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +class MySQLNodeInstance: + def __init__(self, user='root', password='clickhouse', hostname='127.0.0.1', port=3308): + self.user = user + self.port = port + self.hostname = hostname + self.password = password + self.mysql_connection = None # lazy init + + def alloc_connection(self): + if self.mysql_connection is None: + self.mysql_connection = pymysql.connect(user=self.user, password=self.password, host=self.hostname, + port=self.port, autocommit=True) + return self.mysql_connection + + def query(self, execution_query): + with self.alloc_connection().cursor() as cursor: + cursor.execute(execution_query) + + def close(self): + if self.mysql_connection is not None: + self.mysql_connection.close() + + +def test_disabled_mysql_server(started_cluster): + with contextlib.closing(MySQLNodeInstance()) as mysql_node: + mysql_node.query("CREATE DATABASE test_db;") + mysql_node.query("CREATE TABLE test_db.test_table ( `id` int(11) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB;") + + with PartitionManager() as pm: + clickhouse_node.query("CREATE DATABASE test_db ENGINE = MySQL('mysql1:3306', 'test_db', 'root', 'clickhouse')") + + pm._add_rule({'source': clickhouse_node.ip_address, 'destination_port': 3306, 'action': 'DROP'}) + clickhouse_node.query("SELECT * FROM system.parts") + clickhouse_node.query("SELECT * FROM system.mutations") + clickhouse_node.query("SELECT * FROM system.graphite_retentions") + + clickhouse_node.query("DROP DATABASE test_db") diff --git a/tests/integration/test_storage_rabbitmq/test.py b/tests/integration/test_storage_rabbitmq/test.py index ab44d0ebea0..d7f98d5cb77 100644 --- a/tests/integration/test_storage_rabbitmq/test.py +++ b/tests/integration/test_storage_rabbitmq/test.py @@ -537,14 +537,14 @@ def test_rabbitmq_big_message(rabbitmq_cluster): @pytest.mark.timeout(420) def test_rabbitmq_sharding_between_queues_publish(rabbitmq_cluster): NUM_CONSUMERS = 10 - NUM_QUEUES = 2 + NUM_QUEUES = 10 instance.query(''' CREATE TABLE test.rabbitmq (key UInt64, value UInt64) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_exchange_name = 'test_sharding', - rabbitmq_num_queues = 2, + rabbitmq_num_queues = 10, rabbitmq_num_consumers = 10, rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; @@ -617,7 +617,7 @@ def test_rabbitmq_mv_combo(rabbitmq_cluster): rabbitmq_exchange_name = 'combo', rabbitmq_queue_base = 'combo', rabbitmq_num_consumers = 2, - rabbitmq_num_queues = 2, + rabbitmq_num_queues = 5, rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; ''') @@ -879,7 +879,7 @@ def test_rabbitmq_overloaded_insert(rabbitmq_cluster): rabbitmq_queue_base = 'over', rabbitmq_exchange_type = 'direct', rabbitmq_num_consumers = 5, - rabbitmq_num_queues = 2, + rabbitmq_num_queues = 10, rabbitmq_max_block_size = 10000, rabbitmq_routing_key_list = 'over', rabbitmq_format = 'TSV', @@ -1722,7 +1722,7 @@ def test_rabbitmq_restore_failed_connection_without_losses_2(rabbitmq_cluster): SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_exchange_name = 'consumer_reconnect', rabbitmq_num_consumers = 10, - rabbitmq_num_queues = 2, + rabbitmq_num_queues = 10, rabbitmq_format = 'JSONEachRow', rabbitmq_row_delimiter = '\\n'; ''') diff --git a/tests/integration/test_system_flush_logs/__init__.py b/tests/integration/test_system_flush_logs/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_SYSTEM_FLUSH_LOGS/test.py b/tests/integration/test_system_flush_logs/test.py similarity index 100% rename from tests/integration/test_SYSTEM_FLUSH_LOGS/test.py rename to tests/integration/test_system_flush_logs/test.py diff --git a/tests/integration/test_system_queries/test.py b/tests/integration/test_system_queries/test.py index 7f5bce97805..b159e8b4cf3 100644 --- a/tests/integration/test_system_queries/test.py +++ b/tests/integration/test_system_queries/test.py @@ -107,7 +107,7 @@ def test_RELOAD_CONFIG_AND_MACROS(started_cluster): assert TSV(instance.query("select * from system.macros")) == TSV("instance\tch1\nmac\tro\n") -def test_SYSTEM_FLUSH_LOGS(started_cluster): +def test_system_flush_logs(started_cluster): instance = cluster.instances['ch1'] instance.query(''' SET log_queries = 0; diff --git a/tests/queries/0_stateless/00161_rounding_functions.sql b/tests/queries/0_stateless/00161_rounding_functions.sql index 460129d2e9d..cc3542338bb 100644 --- a/tests/queries/0_stateless/00161_rounding_functions.sql +++ b/tests/queries/0_stateless/00161_rounding_functions.sql @@ -44,4 +44,4 @@ SELECT 12345.6789 AS x, floor(x, -1), floor(x, -2), floor(x, -3), floor(x, -4), SELECT roundToExp2(100), roundToExp2(64), roundToExp2(3), roundToExp2(0), roundToExp2(-1); SELECT roundToExp2(0.9), roundToExp2(0), roundToExp2(-0.5), roundToExp2(-0.6), roundToExp2(-0.2); -SELECT ceil(29375422, -54212) --{serverError 36} +SELECT ceil(29375422, -54212) --{serverError 69} diff --git a/tests/queries/0_stateless/00933_ttl_with_default.sql b/tests/queries/0_stateless/00933_ttl_with_default.sql index d3f3b62126c..e6c0a6e700c 100644 --- a/tests/queries/0_stateless/00933_ttl_with_default.sql +++ b/tests/queries/0_stateless/00933_ttl_with_default.sql @@ -5,7 +5,6 @@ insert into ttl_00933_2 values (toDateTime('2000-10-10 00:00:00'), 1); insert into ttl_00933_2 values (toDateTime('2000-10-10 00:00:00'), 2); insert into ttl_00933_2 values (toDateTime('2100-10-10 00:00:00'), 3); insert into ttl_00933_2 values (toDateTime('2100-10-10 00:00:00'), 4); -select sleep(0.7) format Null; -- wait if very fast merge happen optimize table ttl_00933_2 final; select a from ttl_00933_2 order by a; @@ -16,7 +15,6 @@ insert into ttl_00933_2 values (toDateTime('2000-10-10 00:00:00'), 1, 100); insert into ttl_00933_2 values (toDateTime('2000-10-10 00:00:00'), 2, 200); insert into ttl_00933_2 values (toDateTime('2100-10-10 00:00:00'), 3, 300); insert into ttl_00933_2 values (toDateTime('2100-10-10 00:00:00'), 4, 400); -select sleep(0.7) format Null; -- wait if very fast merge happen optimize table ttl_00933_2 final; select a, b from ttl_00933_2 order by a; @@ -27,7 +25,6 @@ insert into ttl_00933_2 values (toDateTime('2000-10-10 00:00:00'), 1, 5); insert into ttl_00933_2 values (toDateTime('2000-10-10 00:00:00'), 2, 10); insert into ttl_00933_2 values (toDateTime('2100-10-10 00:00:00'), 3, 15); insert into ttl_00933_2 values (toDateTime('2100-10-10 00:00:00'), 4, 20); -select sleep(0.7) format Null; -- wait if very fast merge happen optimize table ttl_00933_2 final; select a, b from ttl_00933_2 order by a; diff --git a/tests/queries/0_stateless/01193_metadata_loading.sh b/tests/queries/0_stateless/01193_metadata_loading.sh index 0ee583a7265..319b537e84b 100755 --- a/tests/queries/0_stateless/01193_metadata_loading.sh +++ b/tests/queries/0_stateless/01193_metadata_loading.sh @@ -49,4 +49,4 @@ $CLICKHOUSE_CLIENT -q "SELECT if(quantile(0.5)(query_duration_ms) < $max_time_ms $CLICKHOUSE_CLIENT -q "SELECT count() * $count_multiplier, i, d, s, n.i, n.f FROM $db.table_merge GROUP BY i, d, s, n.i, n.f ORDER BY i" -$CLICKHOUSE_CLIENT -q "DROP DATABASE $db" +$CLICKHOUSE_CLIENT -q "DROP DATABASE $db" --database_atomic_wait_for_drop_and_detach_synchronously=0 diff --git a/tests/queries/0_stateless/01526_client_start_and_exit.expect b/tests/queries/0_stateless/01526_client_start_and_exit.expect new file mode 100755 index 00000000000..003439ffa54 --- /dev/null +++ b/tests/queries/0_stateless/01526_client_start_and_exit.expect @@ -0,0 +1,12 @@ +#!/usr/bin/expect -f + +log_user 1 +set timeout 5 +match_max 100000 + +if ![info exists env(CLICKHOUSE_PORT_TCP)] {set env(CLICKHOUSE_PORT_TCP) 9000} + +spawn bash -c "clickhouse-client --port $env(CLICKHOUSE_PORT_TCP) && echo $?" +expect ":) " +send -- "\4" +expect eof diff --git a/tests/queries/0_stateless/01526_client_start_and_exit.reference b/tests/queries/0_stateless/01526_client_start_and_exit.reference new file mode 100644 index 00000000000..e3e2e7b22af --- /dev/null +++ b/tests/queries/0_stateless/01526_client_start_and_exit.reference @@ -0,0 +1 @@ +Loaded 10000 queries. diff --git a/tests/queries/0_stateless/01526_client_start_and_exit.sh b/tests/queries/0_stateless/01526_client_start_and_exit.sh new file mode 100755 index 00000000000..c179be79d03 --- /dev/null +++ b/tests/queries/0_stateless/01526_client_start_and_exit.sh @@ -0,0 +1,23 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. "$CURDIR"/../shell_config.sh + +# Create a huge amount of tables, so Suggest will take a time to load +${CLICKHOUSE_CLIENT} -q "SELECT 'CREATE TABLE test_' || hex(randomPrintableASCII(40)) || '(x UInt8) Engine=Memory;' FROM numbers(10000)" --format=TSVRaw | ${CLICKHOUSE_BENCHMARK} -c32 -i 10000 -d 0 2>&1 | grep -F 'Loaded 10000 queries' + +function stress() +{ + while true; do + "${CURDIR}"/01526_client_start_and_exit.expect | grep -v -P 'ClickHouse client|Connecting|Connected|:\) Bye\.|^\s*$|spawn bash|^0\s*$' + done +} + +export CURDIR +export -f stress + +for _ in {1..10}; do + timeout 3 bash -c stress & +done + +wait diff --git a/tests/queries/0_stateless/01533_distinct_nullable_uuid.reference b/tests/queries/0_stateless/01533_distinct_nullable_uuid.reference new file mode 100644 index 00000000000..e02acad09d6 --- /dev/null +++ b/tests/queries/0_stateless/01533_distinct_nullable_uuid.reference @@ -0,0 +1,4 @@ +442d3ff4-842a-45bb-8b02-b616122c0dc6 +05fe40cb-1d0c-45b0-8e60-8e311c2463f1 +2fc89389-4728-4b30-9e51-b5bc3ad215f6 +10000 diff --git a/tests/queries/0_stateless/01533_distinct_nullable_uuid.sql b/tests/queries/0_stateless/01533_distinct_nullable_uuid.sql new file mode 100644 index 00000000000..926739d3f58 --- /dev/null +++ b/tests/queries/0_stateless/01533_distinct_nullable_uuid.sql @@ -0,0 +1,38 @@ +DROP TABLE IF EXISTS bug_14144; + +CREATE TABLE bug_14144 +( meta_source_req_uuid Nullable(UUID), + a Int64, + meta_source_type String +) +ENGINE = MergeTree +ORDER BY a; + +INSERT INTO bug_14144 SELECT cast(toUUID('442d3ff4-842a-45bb-8b02-b616122c0dc6'), 'Nullable(UUID)'), number, 'missing' FROM numbers(1000); + +INSERT INTO bug_14144 SELECT cast(toUUIDOrZero('2fc89389-4728-4b30-9e51-b5bc3ad215f6'), 'Nullable(UUID)'), number, 'missing' FROM numbers(1000); + +INSERT INTO bug_14144 SELECT cast(toUUIDOrNull('05fe40cb-1d0c-45b0-8e60-8e311c2463f1'), 'Nullable(UUID)'), number, 'missing' FROM numbers(1000); + +SELECT DISTINCT meta_source_req_uuid +FROM bug_14144 +WHERE meta_source_type = 'missing' +ORDER BY meta_source_req_uuid ASC; + +TRUNCATE TABLE bug_14144; + +INSERT INTO bug_14144 SELECT generateUUIDv4(), number, 'missing' FROM numbers(10000); + +SELECT COUNT() FROM ( + SELECT DISTINCT meta_source_req_uuid + FROM bug_14144 + WHERE meta_source_type = 'missing' + ORDER BY meta_source_req_uuid ASC + LIMIT 100000 +); + +DROP TABLE bug_14144; + + + + diff --git a/tests/queries/0_stateless/01535_decimal_round_scale_overflow_check.reference b/tests/queries/0_stateless/01535_decimal_round_scale_overflow_check.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/01535_decimal_round_scale_overflow_check.sql b/tests/queries/0_stateless/01535_decimal_round_scale_overflow_check.sql new file mode 100644 index 00000000000..18509221203 --- /dev/null +++ b/tests/queries/0_stateless/01535_decimal_round_scale_overflow_check.sql @@ -0,0 +1 @@ +SELECT round(toDecimal32(1, 0), -9223372036854775806); -- { serverError 69 } diff --git a/utils/list-versions/version_date.tsv b/utils/list-versions/version_date.tsv index f7d6536a890..37e3e412a63 100644 --- a/utils/list-versions/version_date.tsv +++ b/utils/list-versions/version_date.tsv @@ -1,6 +1,9 @@ +v20.10.3.30-stable 2020-10-29 v20.10.2.20-stable 2020-10-23 +v20.9.4.76-stable 2020-10-29 v20.9.3.45-stable 2020-10-09 v20.9.2.20-stable 2020-09-22 +v20.8.5.45-lts 2020-10-29 v20.8.4.11-lts 2020-10-09 v20.8.3.18-stable 2020-09-18 v20.8.2.3-stable 2020-09-08