Merge remote-tracking branch 'origin/master' into HEAD

This commit is contained in:
Alexander Kuzmenkov 2020-10-30 13:34:20 +03:00
commit 15317fcbd1
82 changed files with 1150 additions and 410 deletions

View File

@ -59,25 +59,6 @@ set(CMAKE_DEBUG_POSTFIX "d" CACHE STRING "Generate debug library name with a pos
# For more info see https://cmake.org/cmake/help/latest/prop_gbl/USE_FOLDERS.html
set_property(GLOBAL PROPERTY USE_FOLDERS ON)
# cmake 3.9+ needed.
# Usually impractical.
# See also ${ENABLE_THINLTO}
option(ENABLE_IPO "Full link time optimization")
if(ENABLE_IPO)
cmake_policy(SET CMP0069 NEW)
include(CheckIPOSupported)
check_ipo_supported(RESULT IPO_SUPPORTED OUTPUT IPO_NOT_SUPPORTED)
if(IPO_SUPPORTED)
message(STATUS "IPO/LTO is supported, enabling")
set(CMAKE_INTERPROCEDURAL_OPTIMIZATION TRUE)
else()
message (${RECONFIGURE_MESSAGE_LEVEL} "IPO/LTO is not supported: <${IPO_NOT_SUPPORTED}>")
endif()
else()
message(STATUS "IPO/LTO not enabled.")
endif()
# Check that submodules are present only if source was downloaded with git
if (EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/.git" AND NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/boost/boost")
message (FATAL_ERROR "Submodules are not initialized. Run\n\tgit submodule update --init --recursive")

View File

@ -0,0 +1,8 @@
# post / preinstall scripts (not needed, we do it in Dockerfile)
alpine-root/install/*
# docs (looks useless)
alpine-root/usr/share/doc/*
# packages, etc. (used by prepare.sh)
alpine-root/tgz-packages/*

1
docker/server/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
alpine-root/*

View File

@ -0,0 +1,26 @@
FROM alpine
ENV LANG=en_US.UTF-8 \
LANGUAGE=en_US:en \
LC_ALL=en_US.UTF-8 \
TZ=UTC \
CLICKHOUSE_CONFIG=/etc/clickhouse-server/config.xml
COPY alpine-root/ /
# from https://github.com/ClickHouse/ClickHouse/blob/master/debian/clickhouse-server.postinst
RUN addgroup clickhouse \
&& adduser -S -H -h /nonexistent -s /bin/false -G clickhouse -g "ClickHouse server" clickhouse \
&& chown clickhouse:clickhouse /var/lib/clickhouse \
&& chmod 700 /var/lib/clickhouse \
&& chown root:clickhouse /var/log/clickhouse-server \
&& chmod 775 /var/log/clickhouse-server \
&& chmod +x /entrypoint.sh \
&& apk add --no-cache su-exec
EXPOSE 9000 8123 9009
VOLUME /var/lib/clickhouse \
/var/log/clickhouse-server
ENTRYPOINT ["/entrypoint.sh"]

59
docker/server/alpine-build.sh Executable file
View File

@ -0,0 +1,59 @@
#!/bin/bash
set -x
REPO_CHANNEL="${REPO_CHANNEL:-stable}" # lts / testing / prestable / etc
REPO_URL="${REPO_URL:-"https://repo.yandex.ru/clickhouse/tgz/${REPO_CHANNEL}"}"
VERSION="${VERSION:-20.9.3.45}"
# where original files live
DOCKER_BUILD_FOLDER="${BASH_SOURCE%/*}"
# we will create root for our image here
CONTAINER_ROOT_FOLDER="${DOCKER_BUILD_FOLDER}/alpine-root"
# where to put downloaded tgz
TGZ_PACKAGES_FOLDER="${CONTAINER_ROOT_FOLDER}/tgz-packages"
# clean up the root from old runs
rm -rf "$CONTAINER_ROOT_FOLDER"
mkdir -p "$TGZ_PACKAGES_FOLDER"
PACKAGES=( "clickhouse-client" "clickhouse-server" "clickhouse-common-static" )
# download tars from the repo
for package in "${PACKAGES[@]}"
do
wget -q --show-progress "${REPO_URL}/${package}-${VERSION}.tgz" -O "${TGZ_PACKAGES_FOLDER}/${package}-${VERSION}.tgz"
done
# unpack tars
for package in "${PACKAGES[@]}"
do
tar xvzf "${TGZ_PACKAGES_FOLDER}/${package}-${VERSION}.tgz" --strip-components=2 -C "$CONTAINER_ROOT_FOLDER"
done
# prepare few more folders
mkdir -p "${CONTAINER_ROOT_FOLDER}/etc/clickhouse-server/users.d" \
"${CONTAINER_ROOT_FOLDER}/etc/clickhouse-server/config.d" \
"${CONTAINER_ROOT_FOLDER}/var/log/clickhouse-server" \
"${CONTAINER_ROOT_FOLDER}/var/lib/clickhouse" \
"${CONTAINER_ROOT_FOLDER}/docker-entrypoint-initdb.d" \
"${CONTAINER_ROOT_FOLDER}/lib64"
cp "${DOCKER_BUILD_FOLDER}/docker_related_config.xml" "${CONTAINER_ROOT_FOLDER}/etc/clickhouse-server/config.d/"
cp "${DOCKER_BUILD_FOLDER}/entrypoint.alpine.sh" "${CONTAINER_ROOT_FOLDER}/entrypoint.sh"
## get glibc components from ubuntu 20.04 and put them to expected place
docker pull ubuntu:20.04
ubuntu20image=$(docker create --rm ubuntu:20.04)
docker cp -L ${ubuntu20image}:/lib/x86_64-linux-gnu/libc.so.6 "${CONTAINER_ROOT_FOLDER}/lib"
docker cp -L ${ubuntu20image}:/lib/x86_64-linux-gnu/libdl.so.2 "${CONTAINER_ROOT_FOLDER}/lib"
docker cp -L ${ubuntu20image}:/lib/x86_64-linux-gnu/libm.so.6 "${CONTAINER_ROOT_FOLDER}/lib"
docker cp -L ${ubuntu20image}:/lib/x86_64-linux-gnu/libpthread.so.0 "${CONTAINER_ROOT_FOLDER}/lib"
docker cp -L ${ubuntu20image}:/lib/x86_64-linux-gnu/librt.so.1 "${CONTAINER_ROOT_FOLDER}/lib"
docker cp -L ${ubuntu20image}:/lib/x86_64-linux-gnu/libnss_dns.so.2 "${CONTAINER_ROOT_FOLDER}/lib"
docker cp -L ${ubuntu20image}:/lib/x86_64-linux-gnu/libresolv.so.2 "${CONTAINER_ROOT_FOLDER}/lib"
docker cp -L ${ubuntu20image}:/lib64/ld-linux-x86-64.so.2 "${CONTAINER_ROOT_FOLDER}/lib64"
docker build "$DOCKER_BUILD_FOLDER" -f Dockerfile.alpine -t "yandex/clickhouse-server:${VERSION}-alpine" --pull

View File

@ -0,0 +1,152 @@
#!/bin/sh
#set -x
DO_CHOWN=1
if [ "$CLICKHOUSE_DO_NOT_CHOWN" = 1 ]; then
DO_CHOWN=0
fi
CLICKHOUSE_UID="${CLICKHOUSE_UID:-"$(id -u clickhouse)"}"
CLICKHOUSE_GID="${CLICKHOUSE_GID:-"$(id -g clickhouse)"}"
# support --user
if [ "$(id -u)" = "0" ]; then
USER=$CLICKHOUSE_UID
GROUP=$CLICKHOUSE_GID
# busybox has setuidgid & chpst buildin
gosu="su-exec $USER:$GROUP"
else
USER="$(id -u)"
GROUP="$(id -g)"
gosu=""
DO_CHOWN=0
fi
# set some vars
CLICKHOUSE_CONFIG="${CLICKHOUSE_CONFIG:-/etc/clickhouse-server/config.xml}"
# port is needed to check if clickhouse-server is ready for connections
HTTP_PORT="$(clickhouse extract-from-config --config-file $CLICKHOUSE_CONFIG --key=http_port)"
# get CH directories locations
DATA_DIR="$(clickhouse extract-from-config --config-file $CLICKHOUSE_CONFIG --key=path || true)"
TMP_DIR="$(clickhouse extract-from-config --config-file $CLICKHOUSE_CONFIG --key=tmp_path || true)"
USER_PATH="$(clickhouse extract-from-config --config-file $CLICKHOUSE_CONFIG --key=user_files_path || true)"
LOG_PATH="$(clickhouse extract-from-config --config-file $CLICKHOUSE_CONFIG --key=logger.log || true)"
LOG_DIR="$(dirname $LOG_PATH || true)"
ERROR_LOG_PATH="$(clickhouse extract-from-config --config-file $CLICKHOUSE_CONFIG --key=logger.errorlog || true)"
ERROR_LOG_DIR="$(dirname $ERROR_LOG_PATH || true)"
FORMAT_SCHEMA_PATH="$(clickhouse extract-from-config --config-file $CLICKHOUSE_CONFIG --key=format_schema_path || true)"
CLICKHOUSE_USER="${CLICKHOUSE_USER:-default}"
CLICKHOUSE_PASSWORD="${CLICKHOUSE_PASSWORD:-}"
CLICKHOUSE_DB="${CLICKHOUSE_DB:-}"
for dir in "$DATA_DIR" \
"$ERROR_LOG_DIR" \
"$LOG_DIR" \
"$TMP_DIR" \
"$USER_PATH" \
"$FORMAT_SCHEMA_PATH"
do
# check if variable not empty
[ -z "$dir" ] && continue
# ensure directories exist
if ! mkdir -p "$dir"; then
echo "Couldn't create necessary directory: $dir"
exit 1
fi
if [ "$DO_CHOWN" = "1" ]; then
# ensure proper directories permissions
chown -R "$USER:$GROUP" "$dir"
elif [ "$(stat -c %u "$dir")" != "$USER" ]; then
echo "Necessary directory '$dir' isn't owned by user with id '$USER'"
exit 1
fi
done
# if clickhouse user is defined - create it (user "default" already exists out of box)
if [ -n "$CLICKHOUSE_USER" ] && [ "$CLICKHOUSE_USER" != "default" ] || [ -n "$CLICKHOUSE_PASSWORD" ]; then
echo "$0: create new user '$CLICKHOUSE_USER' instead 'default'"
cat <<EOT > /etc/clickhouse-server/users.d/default-user.xml
<yandex>
<!-- Docs: <https://clickhouse.tech/docs/en/operations/settings/settings_users/> -->
<users>
<!-- Remove default user -->
<default remove="remove">
</default>
<${CLICKHOUSE_USER}>
<profile>default</profile>
<networks>
<ip>::/0</ip>
</networks>
<password>${CLICKHOUSE_PASSWORD}</password>
<quota>default</quota>
</${CLICKHOUSE_USER}>
</users>
</yandex>
EOT
fi
if [ -n "$(ls /docker-entrypoint-initdb.d/)" ] || [ -n "$CLICKHOUSE_DB" ]; then
# Listen only on localhost until the initialization is done
$gosu /usr/bin/clickhouse-server --config-file=$CLICKHOUSE_CONFIG -- --listen_host=127.0.0.1 &
pid="$!"
# check if clickhouse is ready to accept connections
# will try to send ping clickhouse via http_port (max 6 retries, with 1 sec timeout and 1 sec delay between retries)
tries=6
while ! wget --spider -T 1 -q "http://localhost:$HTTP_PORT/ping" 2>/dev/null; do
if [ "$tries" -le "0" ]; then
echo >&2 'ClickHouse init process failed.'
exit 1
fi
tries=$(( tries-1 ))
sleep 1
done
if [ ! -z "$CLICKHOUSE_PASSWORD" ]; then
printf -v WITH_PASSWORD '%s %q' "--password" "$CLICKHOUSE_PASSWORD"
fi
clickhouseclient="clickhouse-client --multiquery -u $CLICKHOUSE_USER $WITH_PASSWORD "
# create default database, if defined
if [ -n "$CLICKHOUSE_DB" ]; then
echo "$0: create database '$CLICKHOUSE_DB'"
"$clickhouseclient" -q "CREATE DATABASE IF NOT EXISTS $CLICKHOUSE_DB";
fi
for f in /docker-entrypoint-initdb.d/*; do
case "$f" in
*.sh)
if [ -x "$f" ]; then
echo "$0: running $f"
"$f"
else
echo "$0: sourcing $f"
. "$f"
fi
;;
*.sql) echo "$0: running $f"; cat "$f" | "$clickhouseclient" ; echo ;;
*.sql.gz) echo "$0: running $f"; gunzip -c "$f" | "$clickhouseclient"; echo ;;
*) echo "$0: ignoring $f" ;;
esac
echo
done
if ! kill -s TERM "$pid" || ! wait "$pid"; then
echo >&2 'Finishing of ClickHouse init process failed.'
exit 1
fi
fi
# if no args passed to `docker run` or first argument start with `--`, then the user is passing clickhouse-server arguments
if [[ $# -lt 1 ]] || [[ "$1" == "--"* ]]; then
exec $gosu /usr/bin/clickhouse-server --config-file=$CLICKHOUSE_CONFIG "$@"
fi
# Otherwise, we assume the user want to run his own process, for example a `bash` shell to explore this image
exec "$@"

View File

@ -51,7 +51,7 @@ Optional parameters:
- `rabbitmq_row_delimiter` Delimiter character, which ends the message.
- `rabbitmq_schema` Parameter that must be used if the format requires a schema definition. For example, [Capn Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
- `rabbitmq_num_consumers` The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient.
- `rabbitmq_num_queues` The number of queues per consumer. Default: `1`. Specify more queues if the capacity of one queue per consumer is insufficient.
- `rabbitmq_num_queues` Total number of queues. Default: `1`. Increasing this number can significantly improve performance.
- `rabbitmq_queue_base` - Specify a hint for queue names. Use cases of this setting are described below.
- `rabbitmq_deadletter_exchange` - Specify name for a [dead letter exchange](https://www.rabbitmq.com/dlx.html). You can create another table with this exchange name and collect messages in cases when they are republished to dead letter exchange. By default dead letter exchange is not specified.
- `rabbitmq_persistent` - If set to 1 (true), in insert query delivery mode will be set to 2 (marks messages as 'persistent'). Default: `0`.
@ -148,4 +148,5 @@ Example:
- `_channel_id` - ChannelID, on which consumer, who received the message, was declared.
- `_delivery_tag` - DeliveryTag of the received message. Scoped per channel.
- `_redelivered` - `redelivered` flag of the message.
- `_message_id` - MessageID of the received message; non-empty if was set, when message was published.
- `_message_id` - messageID of the received message; non-empty if was set, when message was published.
- `_timestamp` - timestamp of the received message; non-empty if was set, when message was published.

View File

@ -626,7 +626,12 @@ neighbor(column, offset[, default_value])
```
The result of the function depends on the affected data blocks and the order of data in the block.
If you make a subquery with ORDER BY and call the function from outside the subquery, you can get the expected result.
!!! warning "Warning"
It can reach the neighbor rows only inside the currently processed data block.
The rows order used during the calculation of `neighbor` can differ from the order of rows returned to the user.
To prevent that you can make a subquery with ORDER BY and call the function from outside the subquery.
**Parameters**
@ -731,8 +736,13 @@ Result:
Calculates the difference between successive row values in the data block.
Returns 0 for the first row and the difference from the previous row for each subsequent row.
!!! warning "Warning"
It can reach the previos row only inside the currently processed data block.
The result of the function depends on the affected data blocks and the order of data in the block.
If you make a subquery with ORDER BY and call the function from outside the subquery, you can get the expected result.
The rows order used during the calculation of `runningDifference` can differ from the order of rows returned to the user.
To prevent that you can make a subquery with ORDER BY and call the function from outside the subquery.
Example:

View File

@ -13,12 +13,61 @@ Basic query format:
INSERT INTO [db.]table [(c1, c2, c3)] VALUES (v11, v12, v13), (v21, v22, v23), ...
```
The query can specify a list of columns to insert `[(c1, c2, c3)]`. In this case, the rest of the columns are filled with:
You can specify a list of columns to insert using the `(c1, c2, c3)` or `COLUMNS(c1,c2,c3)` syntax.
Instead of listing all the required columns you can use the `(* EXCEPT(column_list))` syntax.
For example, consider the table:
``` sql
SHOW CREATE insert_select_testtable;
```
```
┌─statement────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ CREATE TABLE insert_select_testtable
(
`a` Int8,
`b` String,
`c` Int8
)
ENGINE = MergeTree()
ORDER BY a
SETTINGS index_granularity = 8192 │
└──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
```
``` sql
INSERT INTO insert_select_testtable (*) VALUES (1, 'a', 1) ;
```
If you want to insert data in all the columns, except 'b', you need to pass so many values how many columns you chose in parenthesis then:
``` sql
INSERT INTO insert_select_testtable (* EXCEPT(b)) Values (2, 2);
```
``` sql
SELECT * FROM insert_select_testtable;
```
```
┌─a─┬─b─┬─c─┐
│ 2 │ │ 2 │
└───┴───┴───┘
┌─a─┬─b─┬─c─┐
│ 1 │ a │ 1 │
└───┴───┴───┘
```
In this example, we see that the second inserted row has `a` and `c` columns filled by the passed values, and `b` filled with value by default.
If a list of columns doesn't include all existing columns, the rest of the columns are filled with:
- The values calculated from the `DEFAULT` expressions specified in the table definition.
- Zeros and empty strings, if `DEFAULT` expressions are not defined.
If [strict_insert_defaults=1](../../operations/settings/settings.md), columns that do not have `DEFAULT` defined must be listed in the query.
If [strict\_insert\_defaults=1](../../operations/settings/settings.md), columns that do not have `DEFAULT` defined must be listed in the query.
Data can be passed to the INSERT in any [format](../../interfaces/formats.md#formats) supported by ClickHouse. The format must be specified explicitly in the query:

View File

@ -4,13 +4,17 @@ toc_title: WITH
# WITH Clause {#with-clause}
This section provides support for Common Table Expressions ([CTE](https://en.wikipedia.org/wiki/Hierarchical_and_recursive_queries_in_SQL)), so the results of `WITH` clause can be used in the rest of `SELECT` query.
Clickhouse supports Common Table Expressions ([CTE](https://en.wikipedia.org/wiki/Hierarchical_and_recursive_queries_in_SQL)), that is provides to use results of `WITH` clause in the rest of `SELECT` query. Named subqueries can be included to the current and child query context in places where table objects are allowed. Recursion is prevented by hiding the current level CTEs from the WITH expression.
## Limitations {#limitations}
## Syntax
1. Recursive queries are not supported.
2. When subquery is used inside WITH section, its result should be scalar with exactly one row.
3. Expressions results are not available in subqueries.
``` sql
WITH <expression> AS <identifier>
```
or
``` sql
WITH <identifier> AS <subquery expression>
```
## Examples {#examples}
@ -22,10 +26,10 @@ SELECT *
FROM hits
WHERE
EventDate = toDate(ts_upper_bound) AND
EventTime <= ts_upper_bound
EventTime <= ts_upper_bound;
```
**Example 2:** Evicting sum(bytes) expression result from SELECT clause column list
**Example 2:** Evicting a sum(bytes) expression result from the SELECT clause column list
``` sql
WITH sum(bytes) as s
@ -34,10 +38,10 @@ SELECT
table
FROM system.parts
GROUP BY table
ORDER BY s
ORDER BY s;
```
**Example 3:** Using results of scalar subquery
**Example 3:** Using results of a scalar subquery
``` sql
/* this example would return TOP 10 of most huge tables */
@ -53,27 +57,14 @@ SELECT
FROM system.parts
GROUP BY table
ORDER BY table_disk_usage DESC
LIMIT 10
LIMIT 10;
```
**Example 4:** Re-using expression in subquery
As a workaround for current limitation for expression usage in subqueries, you may duplicate it.
**Example 4:** Reusing expression in a subquery
``` sql
WITH ['hello'] AS hello
SELECT
hello,
*
FROM
(
WITH ['hello'] AS hello
SELECT hello
)
WITH test1 AS (SELECT i + 1, j + 1 FROM test1)
SELECT * FROM test1;
```
``` text
┌─hello─────┬─hello─────┐
│ ['hello'] │ ['hello'] │
└───────────┴───────────┘
```
[Original article](https://clickhouse.tech/docs/en/sql-reference/statements/select/with/) <!--hide-->

View File

@ -45,7 +45,7 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
- `rabbitmq_row_delimiter` символ-разделитель, который завершает сообщение.
- `rabbitmq_schema` опциональный параметр, необходимый, если используется формат, требующий определения схемы. Например, [Capn Proto](https://capnproto.org/) требует путь к файлу со схемой и название корневого объекта `schema.capnp:Message`.
- `rabbitmq_num_consumers` количество потребителей на таблицу. По умолчанию: `1`. Укажите больше потребителей, если пропускная способность одного потребителя недостаточна.
- `rabbitmq_num_queues` количество очередей на потребителя. По умолчанию: `1`. Укажите больше потребителей, если пропускная способность одной очереди на потребителя недостаточна.
- `rabbitmq_num_queues` количество очередей. По умолчанию: `1`. Большее число очередей может сильно увеличить пропускную способность.
- `rabbitmq_queue_base` - настройка для имен очередей. Сценарии использования описаны ниже.
- `rabbitmq_persistent` - флаг, от которого зависит настройка 'durable' для сообщений при запросах `INSERT`. По умолчанию: `0`.
- `rabbitmq_skip_broken_messages` максимальное количество некорректных сообщений в блоке. Если `rabbitmq_skip_broken_messages = N`, то движок отбрасывает `N` сообщений, которые не получилось обработать. Одно сообщение в точности соответствует одной записи (строке). Значение по умолчанию 0.
@ -140,4 +140,5 @@ Example:
- `_channel_id` - идентификатор канала `ChannelID`, на котором было получено сообщение.
- `_delivery_tag` - значение `DeliveryTag` полученного сообщения. Уникально в рамках одного канала.
- `_redelivered` - флаг `redelivered`. (Не равно нулю, если есть возможность, что сообщение было получено более, чем одним каналом.)
- `_message_id` - значение `MessageID` полученного сообщения. Данное поле непусто, если указано в параметрах при отправке сообщения.
- `_message_id` - значение поля `messageID` полученного сообщения. Данное поле непусто, если указано в параметрах при отправке сообщения.
- `_timestamp` - значение поля `timestamp` полученного сообщения. Данное поле непусто, если указано в параметрах при отправке сообщения.

View File

@ -13,7 +13,55 @@ toc_title: INSERT INTO
INSERT INTO [db.]table [(c1, c2, c3)] VALUES (v11, v12, v13), (v21, v22, v23), ...
```
В запросе можно указать список столбцов для вставки `[(c1, c2, c3)]`. В этом случае, в остальные столбцы записываются:
Вы можете указать список столбцов для вставки, используя следующий синтаксис: `(c1, c2, c3)` или `COLUMNS(c1,c2,c3)`.
Можно не перечислять все необходимые столбцы, а использовать синтаксис `(* EXCEPT(column_list))`.
В качестве примера рассмотрим таблицу:
``` sql
SHOW CREATE insert_select_testtable
```
```
┌─statement────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ CREATE TABLE insert_select_testtable
(
`a` Int8,
`b` String,
`c` Int8
)
ENGINE = MergeTree()
ORDER BY a
SETTINGS index_granularity = 8192 │
└──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
```
``` sql
INSERT INTO insert_select_testtable (*) VALUES (1, 'a', 1)
```
Если вы хотите вставить данные во все столбцы, кроме 'b', вам нужно передать столько значений, сколько столбцов вы указали в скобках:
``` sql
INSERT INTO insert_select_testtable (* EXCEPT(b)) Values (2, 2)
```
``` sql
SELECT * FROM insert_select_testtable
```
```
┌─a─┬─b─┬─c─┐
│ 2 │ │ 2 │
└───┴───┴───┘
┌─a─┬─b─┬─c─┐
│ 1 │ a │ 1 │
└───┴───┴───┘
```
В этом примере мы видим, что вторая строка содержит столбцы `a` и `c`, заполненные переданными значениями и `b`, заполненный значением по умолчанию.
Если список столбцов не включает все существующие столбцы, то все остальные столбцы заполняются следующим образом:
- Значения, вычисляемые из `DEFAULT` выражений, указанных в определении таблицы.
- Нули и пустые строки, если `DEFAULT` не определены.

View File

@ -2,18 +2,21 @@
toc_title: WITH
---
# Секция WITH {#sektsiia-with}
# Секция WITH {#with-clause}
Данная секция представляет собой [Common Table Expressions](https://ru.wikipedia.org/wiki/Иерархические_и_рекурсивныеапросы_в_SQL), то есть позволяет использовать результаты выражений из секции `WITH` в остальной части `SELECT` запроса.
Clickhouse поддерживает [Общие табличные выражения](https://ru.wikipedia.org/wiki/Иерархические_и_рекурсивныеапросы_в_SQL), то есть позволяет использовать результаты выражений из секции `WITH` в остальной части `SELECT` запроса. Именованные подзапросы могут быть включены в текущий и дочерний контекст запроса в тех местах, где разрешены табличные объекты. Рекурсия предотвращается путем скрытия общего табличного выражения текущего уровня из выражения `WITH`.
### Ограничения
## Синтаксис
1. Рекурсивные запросы не поддерживаются
2. Если в качестве выражения используется подзапрос, то результат должен содержать ровно одну строку
3. Результаты выражений нельзя переиспользовать во вложенных запросах
В дальнейшем, результаты выражений можно использовать в секции SELECT.
``` sql
WITH <expression> AS <identifier>
```
или
``` sql
WITH <identifier> AS <subquery expression>
```
### Примеры
## Примеры
**Пример 1:** Использование константного выражения как «переменной»
@ -23,7 +26,7 @@ SELECT *
FROM hits
WHERE
EventDate = toDate(ts_upper_bound) AND
EventTime <= ts_upper_bound
EventTime <= ts_upper_bound;
```
**Пример 2:** Выкидывание выражения sum(bytes) из списка колонок в SELECT
@ -35,7 +38,7 @@ SELECT
table
FROM system.parts
GROUP BY table
ORDER BY s
ORDER BY s;
```
**Пример 3:** Использование результатов скалярного подзапроса
@ -54,27 +57,14 @@ SELECT
FROM system.parts
GROUP BY table
ORDER BY table_disk_usage DESC
LIMIT 10
LIMIT 10;
```
**Пример 4:** Переиспользование выражения
В настоящий момент, переиспользование выражения из секции WITH внутри подзапроса возможно только через дублирование.
``` sql
WITH ['hello'] AS hello
SELECT
hello,
*
FROM
(
WITH ['hello'] AS hello
SELECT hello
)
WITH test1 AS (SELECT i + 1, j + 1 FROM test1)
SELECT * FROM test1;
```
``` text
┌─hello─────┬─hello─────┐
│ ['hello'] │ ['hello'] │
└───────────┴───────────┘
```
[Оригинальная статья](https://clickhouse.tech/docs/en/sql-reference/statements/select/with/) <!--hide-->

View File

@ -218,6 +218,8 @@ private:
QueryFuzzer fuzzer;
int query_fuzzer_runs = 0;
std::optional<Suggest> suggest;
/// We will format query_id in interactive mode in various ways, the default is just to print Query id: ...
std::vector<std::pair<String, String>> query_id_formats;
@ -577,10 +579,11 @@ private:
if (print_time_to_stderr)
throw Exception("time option could be specified only in non-interactive mode", ErrorCodes::BAD_ARGUMENTS);
suggest.emplace();
if (server_revision >= Suggest::MIN_SERVER_REVISION && !config().getBool("disable_suggestion", false))
{
/// Load suggestion data from the server.
Suggest::instance().load(connection_parameters, config().getInt("suggestion_limit"));
suggest->load(connection_parameters, config().getInt("suggestion_limit"));
}
/// Load command history if present.
@ -607,7 +610,7 @@ private:
highlight_callback = highlight;
ReplxxLineReader lr(
Suggest::instance(),
*suggest,
history_file,
config().has("multiline"),
query_extenders,
@ -615,7 +618,7 @@ private:
highlight_callback);
#elif defined(USE_READLINE) && USE_READLINE
ReadlineLineReader lr(Suggest::instance(), history_file, config().has("multiline"), query_extenders, query_delimiters);
ReadlineLineReader lr(*suggest, history_file, config().has("multiline"), query_extenders, query_delimiters);
#else
LineReader lr(history_file, config().has("multiline"), query_extenders, query_delimiters);
#endif

View File

@ -18,10 +18,11 @@ namespace ErrorCodes
class Suggest : public LineReader::Suggest, boost::noncopyable
{
public:
static Suggest & instance()
Suggest();
~Suggest()
{
static Suggest instance;
return instance;
if (loading_thread.joinable())
loading_thread.join();
}
void load(const ConnectionParameters & connection_parameters, size_t suggestion_limit);
@ -30,12 +31,6 @@ public:
static constexpr int MIN_SERVER_REVISION = 54406;
private:
Suggest();
~Suggest()
{
if (loading_thread.joinable())
loading_thread.join();
}
void loadImpl(Connection & connection, const ConnectionTimeouts & timeouts, size_t suggestion_limit);
void fetch(Connection & connection, const ConnectionTimeouts & timeouts, const std::string & query);

View File

@ -185,9 +185,9 @@ void CompressedReadBufferBase::decompress(char * to, size_t size_decompressed, s
}
else
{
throw Exception("Data compressed with different methods, given method byte "
throw Exception("Data compressed with different methods, given method byte 0x"
+ getHexUIntLowercase(method)
+ ", previous method byte "
+ ", previous method byte 0x"
+ getHexUIntLowercase(codec->getMethodByte()),
ErrorCodes::CANNOT_DECOMPRESS);
}

View File

@ -169,6 +169,8 @@ class IColumn;
M(Milliseconds, read_backoff_min_interval_between_events_ms, 1000, "Settings to reduce the number of threads in case of slow reads. Do not pay attention to the event, if the previous one has passed less than a certain amount of time.", 0) \
M(UInt64, read_backoff_min_events, 2, "Settings to reduce the number of threads in case of slow reads. The number of events after which the number of threads will be reduced.", 0) \
\
M(UInt64, read_backoff_min_concurrency, 1, "Settings to try keeping the minimal number of threads in case of slow reads.", 0) \
\
M(Float, memory_tracker_fault_probability, 0., "For testing of `exception safety` - throw an exception every time you allocate memory with the specified probability.", 0) \
\
M(Bool, enable_http_compression, 0, "Compress the result if the client over HTTP said that it understands data compressed by gzip or deflate.", 0) \

View File

@ -329,10 +329,4 @@ const StoragePtr & DatabaseLazyIterator::table() const
return current_storage;
}
void DatabaseLazyIterator::reset()
{
if (current_storage)
current_storage.reset();
}
}

View File

@ -22,6 +22,10 @@ public:
String getEngineName() const override { return "Lazy"; }
bool canContainMergeTreeTables() const override { return false; }
bool canContainDistributedTables() const override { return false; }
void loadStoredObjects(
Context & context,
bool has_force_restore_data_flag, bool force_attach) override;
@ -122,7 +126,6 @@ public:
bool isValid() const override;
const String & name() const override;
const StoragePtr & table() const override;
void reset() override;
private:
const DatabaseLazy & database;

View File

@ -44,8 +44,6 @@ public:
/// (a database with support for lazy tables loading
/// - it maintains a list of tables but tables are loaded lazily).
virtual const StoragePtr & table() const = 0;
/// Reset reference counter to the StoragePtr.
virtual void reset() = 0;
virtual ~IDatabaseTablesIterator() = default;
@ -95,8 +93,6 @@ public:
const String & name() const override { return it->first; }
const StoragePtr & table() const override { return it->second; }
void reset() override { it->second.reset(); }
};
/// Copies list of dictionaries and iterates through such snapshot.
@ -151,6 +147,10 @@ public:
/// Get name of database engine.
virtual String getEngineName() const = 0;
virtual bool canContainMergeTreeTables() const { return true; }
virtual bool canContainDistributedTables() const { return true; }
/// Load a set of existing tables.
/// You can call only once, right after the object is created.
virtual void loadStoredObjects(Context & /*context*/, bool /*has_force_restore_data_flag*/, bool /*force_attach*/ = false) {}

View File

@ -42,6 +42,12 @@ public:
String getEngineName() const override { return "MySQL"; }
bool canContainMergeTreeTables() const override { return false; }
bool canContainDistributedTables() const override { return false; }
bool shouldBeEmptyOnDetach() const override { return false; }
bool empty() const override;
DatabaseTablesIteratorPtr getTablesIterator(const Context & context, const FilterByNameFunction & filter_by_table_name) override;

View File

@ -28,11 +28,6 @@ public:
return tables.emplace_back(storage);
}
void reset() override
{
tables.clear();
}
UUID uuid() const override { return nested_iterator->uuid(); }
DatabaseMaterializeTablesIterator(DatabaseTablesIteratorPtr nested_iterator_, DatabaseMaterializeMySQL * database_)

View File

@ -31,6 +31,7 @@ namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int ILLEGAL_COLUMN;
extern const int BAD_ARGUMENTS;
}
@ -84,6 +85,9 @@ enum class TieBreakingMode
Bankers, // use banker's rounding
};
/// For N, no more than the number of digits in the largest type.
using Scale = Int16;
/** Rounding functions for integer values.
*/
@ -416,7 +420,7 @@ private:
using Container = typename ColumnDecimal<T>::Container;
public:
static NO_INLINE void apply(const Container & in, Container & out, Int64 scale_arg)
static NO_INLINE void apply(const Container & in, Container & out, Scale scale_arg)
{
scale_arg = in.getScale() - scale_arg;
if (scale_arg > 0)
@ -458,7 +462,7 @@ class Dispatcher
FloatRoundingImpl<T, rounding_mode, scale_mode>,
IntegerRoundingImpl<T, rounding_mode, scale_mode, tie_breaking_mode>>;
static ColumnPtr apply(const ColumnVector<T> * col, Int64 scale_arg)
static ColumnPtr apply(const ColumnVector<T> * col, Scale scale_arg)
{
auto col_res = ColumnVector<T>::create();
@ -487,7 +491,7 @@ class Dispatcher
return col_res;
}
static ColumnPtr apply(const ColumnDecimal<T> * col, Int64 scale_arg)
static ColumnPtr apply(const ColumnDecimal<T> * col, Scale scale_arg)
{
const typename ColumnDecimal<T>::Container & vec_src = col->getData();
@ -501,7 +505,7 @@ class Dispatcher
}
public:
static ColumnPtr apply(const IColumn * column, Int64 scale_arg)
static ColumnPtr apply(const IColumn * column, Scale scale_arg)
{
if constexpr (IsNumber<T>)
return apply(checkAndGetColumn<ColumnVector<T>>(column), scale_arg);
@ -544,20 +548,25 @@ public:
return arguments[0];
}
static Int64 getScaleArg(ColumnsWithTypeAndName & arguments)
static Scale getScaleArg(ColumnsWithTypeAndName & arguments)
{
if (arguments.size() == 2)
{
const IColumn & scale_column = *arguments[1].column;
if (!isColumnConst(scale_column))
throw Exception("Scale argument for rounding functions must be constant.", ErrorCodes::ILLEGAL_COLUMN);
throw Exception("Scale argument for rounding functions must be constant", ErrorCodes::ILLEGAL_COLUMN);
Field scale_field = assert_cast<const ColumnConst &>(scale_column).getField();
if (scale_field.getType() != Field::Types::UInt64
&& scale_field.getType() != Field::Types::Int64)
throw Exception("Scale argument for rounding functions must have integer type.", ErrorCodes::ILLEGAL_COLUMN);
throw Exception("Scale argument for rounding functions must have integer type", ErrorCodes::ILLEGAL_COLUMN);
return scale_field.get<Int64>();
Int64 scale64 = scale_field.get<Int64>();
if (scale64 > std::numeric_limits<Scale>::max()
|| scale64 < std::numeric_limits<Scale>::min())
throw Exception("Scale argument for rounding function is too large", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
return scale64;
}
return 0;
}
@ -568,7 +577,7 @@ public:
ColumnPtr executeImpl(ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override
{
const ColumnWithTypeAndName & column = arguments[0];
Int64 scale_arg = getScaleArg(arguments);
Scale scale_arg = getScaleArg(arguments);
ColumnPtr res;
auto call = [&](const auto & types) -> bool

View File

@ -480,7 +480,7 @@ void readEscapedString(String & s, ReadBuffer & buf)
}
template void readEscapedStringInto<PaddedPODArray<UInt8>>(PaddedPODArray<UInt8> & s, ReadBuffer & buf);
template void readEscapedStringInto<NullSink>(NullSink & s, ReadBuffer & buf);
template void readEscapedStringInto<NullOutput>(NullOutput & s, ReadBuffer & buf);
/** If enable_sql_style_quoting == true,
@ -566,7 +566,7 @@ void readQuotedStringWithSQLStyle(String & s, ReadBuffer & buf)
template void readQuotedStringInto<true>(PaddedPODArray<UInt8> & s, ReadBuffer & buf);
template void readDoubleQuotedStringInto<false>(NullSink & s, ReadBuffer & buf);
template void readDoubleQuotedStringInto<false>(NullOutput & s, ReadBuffer & buf);
void readDoubleQuotedString(String & s, ReadBuffer & buf)
{
@ -746,7 +746,7 @@ void readJSONString(String & s, ReadBuffer & buf)
template void readJSONStringInto<PaddedPODArray<UInt8>, void>(PaddedPODArray<UInt8> & s, ReadBuffer & buf);
template bool readJSONStringInto<PaddedPODArray<UInt8>, bool>(PaddedPODArray<UInt8> & s, ReadBuffer & buf);
template void readJSONStringInto<NullSink>(NullSink & s, ReadBuffer & buf);
template void readJSONStringInto<NullOutput>(NullOutput & s, ReadBuffer & buf);
template void readJSONStringInto<String>(String & s, ReadBuffer & buf);
@ -895,7 +895,7 @@ void skipJSONField(ReadBuffer & buf, const StringRef & name_of_field)
throw Exception("Unexpected EOF for key '" + name_of_field.toString() + "'", ErrorCodes::INCORRECT_DATA);
else if (*buf.position() == '"') /// skip double-quoted string
{
NullSink sink;
NullOutput sink;
readJSONStringInto(sink, buf);
}
else if (isNumericASCII(*buf.position()) || *buf.position() == '-' || *buf.position() == '+' || *buf.position() == '.') /// skip number
@ -959,7 +959,7 @@ void skipJSONField(ReadBuffer & buf, const StringRef & name_of_field)
// field name
if (*buf.position() == '"')
{
NullSink sink;
NullOutput sink;
readJSONStringInto(sink, buf);
}
else

View File

@ -527,7 +527,7 @@ bool tryReadJSONStringInto(Vector & s, ReadBuffer & buf)
}
/// This could be used as template parameter for functions above, if you want to just skip data.
struct NullSink
struct NullOutput
{
void append(const char *, size_t) {}
void push_back(char) {}

View File

@ -233,8 +233,8 @@ void AsynchronousMetrics::update()
for (const auto & db : databases)
{
/// Lazy database can not contain MergeTree tables
if (db.second->getEngineName() == "Lazy")
/// Check if database can contain MergeTree tables
if (!db.second->canContainMergeTreeTables())
continue;
for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next())
{

View File

@ -52,13 +52,37 @@ BlockIO InterpreterDropQuery::execute()
return executeToDictionary(drop.database, drop.table, drop.kind, drop.if_exists, drop.temporary, drop.no_ddl_lock);
}
else if (!drop.database.empty())
return executeToDatabase(drop.database, drop.kind, drop.if_exists, drop.no_delay);
return executeToDatabase(drop);
else
throw Exception("Nothing to drop, both names are empty", ErrorCodes::LOGICAL_ERROR);
}
void InterpreterDropQuery::waitForTableToBeActuallyDroppedOrDetached(const ASTDropQuery & query, const DatabasePtr & db, const UUID & uuid_to_wait)
{
if (uuid_to_wait == UUIDHelpers::Nil)
return;
if (query.kind == ASTDropQuery::Kind::Drop)
DatabaseCatalog::instance().waitTableFinallyDropped(uuid_to_wait);
else if (query.kind == ASTDropQuery::Kind::Detach)
{
if (auto * atomic = typeid_cast<DatabaseAtomic *>(db.get()))
atomic->waitDetachedTableNotInUse(uuid_to_wait);
}
}
BlockIO InterpreterDropQuery::executeToTable(const ASTDropQuery & query)
{
DatabasePtr database;
UUID table_to_wait_on = UUIDHelpers::Nil;
auto res = executeToTableImpl(query, database, table_to_wait_on);
if (query.no_delay)
waitForTableToBeActuallyDroppedOrDetached(query, database, table_to_wait_on);
return res;
}
BlockIO InterpreterDropQuery::executeToTableImpl(const ASTDropQuery & query, DatabasePtr & db, UUID & uuid_to_wait)
{
/// NOTE: it does not contain UUID, we will resolve it with locked DDLGuard
auto table_id = StorageID(query);
@ -125,19 +149,9 @@ BlockIO InterpreterDropQuery::executeToTable(const ASTDropQuery & query)
database->dropTable(context, table_id.table_name, query.no_delay);
}
}
table.reset();
ddl_guard = {};
if (query.no_delay)
{
if (query.kind == ASTDropQuery::Kind::Drop)
DatabaseCatalog::instance().waitTableFinallyDropped(table_id.uuid);
else if (query.kind == ASTDropQuery::Kind::Detach)
{
if (auto * atomic = typeid_cast<DatabaseAtomic *>(database.get()))
atomic->waitDetachedTableNotInUse(table_id.uuid);
}
db = database;
uuid_to_wait = table_id.uuid;
}
return {};
@ -223,19 +237,48 @@ BlockIO InterpreterDropQuery::executeToTemporaryTable(const String & table_name,
}
BlockIO InterpreterDropQuery::executeToDatabase(const String & database_name, ASTDropQuery::Kind kind, bool if_exists, bool no_delay)
BlockIO InterpreterDropQuery::executeToDatabase(const ASTDropQuery & query)
{
DatabasePtr database;
std::vector<UUID> tables_to_wait;
BlockIO res;
try
{
res = executeToDatabaseImpl(query, database, tables_to_wait);
}
catch (...)
{
if (query.no_delay)
{
for (const auto & table_uuid : tables_to_wait)
waitForTableToBeActuallyDroppedOrDetached(query, database, table_uuid);
}
throw;
}
if (query.no_delay)
{
for (const auto & table_uuid : tables_to_wait)
waitForTableToBeActuallyDroppedOrDetached(query, database, table_uuid);
}
return res;
}
BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query, DatabasePtr & database, std::vector<UUID> & uuids_to_wait)
{
const auto & database_name = query.database;
auto ddl_guard = DatabaseCatalog::instance().getDDLGuard(database_name, "");
if (auto database = tryGetDatabase(database_name, if_exists))
database = tryGetDatabase(database_name, query.if_exists);
if (database)
{
if (kind == ASTDropQuery::Kind::Truncate)
if (query.kind == ASTDropQuery::Kind::Truncate)
{
throw Exception("Unable to truncate database", ErrorCodes::SYNTAX_ERROR);
}
else if (kind == ASTDropQuery::Kind::Detach || kind == ASTDropQuery::Kind::Drop)
else if (query.kind == ASTDropQuery::Kind::Detach || query.kind == ASTDropQuery::Kind::Drop)
{
bool drop = kind == ASTDropQuery::Kind::Drop;
bool drop = query.kind == ASTDropQuery::Kind::Drop;
context.checkAccess(AccessType::DROP_DATABASE, database_name);
if (database->shouldBeEmptyOnDetach())
@ -246,21 +289,22 @@ BlockIO InterpreterDropQuery::executeToDatabase(const String & database_name, AS
for (auto iterator = database->getDictionariesIterator(); iterator->isValid(); iterator->next())
{
String current_dictionary = iterator->name();
executeToDictionary(database_name, current_dictionary, kind, false, false, false);
executeToDictionary(database_name, current_dictionary, query.kind, false, false, false);
}
ASTDropQuery query;
query.kind = kind;
query.if_exists = true;
query.database = database_name;
query.no_delay = no_delay;
ASTDropQuery query_for_table;
query_for_table.kind = query.kind;
query_for_table.if_exists = true;
query_for_table.database = database_name;
query_for_table.no_delay = query.no_delay;
for (auto iterator = database->getTablesIterator(context); iterator->isValid(); iterator->next())
{
/// Reset reference counter of the StoragePtr to allow synchronous drop.
iterator->reset();
query.table = iterator->name();
executeToTable(query);
DatabasePtr db;
UUID table_to_wait = UUIDHelpers::Nil;
query_for_table.table = iterator->name();
executeToTableImpl(query_for_table, db, table_to_wait);
uuids_to_wait.push_back(table_to_wait);
}
}

View File

@ -29,9 +29,13 @@ private:
ASTPtr query_ptr;
Context & context;
BlockIO executeToDatabase(const String & database_name, ASTDropQuery::Kind kind, bool if_exists, bool no_delay);
BlockIO executeToDatabase(const ASTDropQuery & query);
BlockIO executeToDatabaseImpl(const ASTDropQuery & query, DatabasePtr & database, std::vector<UUID> & uuids_to_wait);
BlockIO executeToTable(const ASTDropQuery & query);
BlockIO executeToTableImpl(const ASTDropQuery & query, DatabasePtr & db, UUID & uuid_to_wait);
static void waitForTableToBeActuallyDroppedOrDetached(const ASTDropQuery & query, const DatabasePtr & db, const UUID & uuid_to_wait);
BlockIO executeToDictionary(const String & database_name, const String & dictionary_name, ASTDropQuery::Kind kind, bool if_exists, bool is_temporary, bool no_ddl_lock);

View File

@ -1,4 +1,5 @@
#include "AvroRowInputFormat.h"
#include "DataTypes/DataTypeLowCardinality.h"
#if USE_AVRO
#include <numeric>
@ -174,7 +175,8 @@ static std::string nodeName(avro::NodePtr node)
AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::NodePtr root_node, DataTypePtr target_type)
{
WhichDataType target(target_type);
const WhichDataType target = removeLowCardinality(target_type);
switch (root_node->type())
{
case avro::AVRO_STRING: [[fallthrough]];
@ -384,7 +386,8 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::Node
}
throw Exception(
"Type " + target_type->getName() + " is not compatible with Avro " + avro::toString(root_node->type()) + ":\n" + nodeToJson(root_node),
"Type " + target_type->getName() + " is not compatible with Avro " + avro::toString(root_node->type()) + ":\n"
+ nodeToJson(root_node),
ErrorCodes::ILLEGAL_COLUMN);
}

View File

@ -130,7 +130,7 @@ bool TSKVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ex
throw Exception("Unknown field found while parsing TSKV format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA);
/// If the key is not found, skip the value.
NullSink sink;
NullOutput sink;
readEscapedStringInto(sink, in);
}
else

View File

@ -20,7 +20,7 @@ namespace ErrorCodes
static void skipTSVRow(ReadBuffer & in, const size_t num_columns)
{
NullSink null_sink;
NullOutput null_sink;
for (size_t i = 0; i < num_columns; ++i)
{
@ -196,7 +196,7 @@ bool TabSeparatedRowInputFormat::readRow(MutableColumns & columns, RowReadExtens
}
else
{
NullSink null_sink;
NullOutput null_sink;
readEscapedStringInto(null_sink, in);
}
@ -353,7 +353,7 @@ void TabSeparatedRowInputFormat::tryDeserializeField(const DataTypePtr & type, I
}
else
{
NullSink null_sink;
NullOutput null_sink;
readEscapedStringInto(null_sink, in);
}
}

View File

@ -43,8 +43,8 @@ void ReplicasStatusHandler::handleRequest(Poco::Net::HTTPServerRequest & request
/// Iterate through all the replicated tables.
for (const auto & db : databases)
{
/// Lazy database can not contain replicated tables
if (db.second->getEngineName() == "Lazy")
/// Check if database can contain replicated tables
if (!db.second->canContainMergeTreeTables())
continue;
for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next())

View File

@ -1,3 +1,4 @@
add_subdirectory(MergeTree)
add_subdirectory(System)
if(ENABLE_TESTS)

View File

@ -0,0 +1,3 @@
if(ENABLE_TESTS)
add_subdirectory(tests)
endif()

View File

@ -68,7 +68,7 @@ MergeInfo MergeListElement::getInfo() const
res.memory_usage = memory_tracker.get();
res.thread_id = thread_id;
res.merge_type = toString(merge_type);
res.merge_algorithm = toString(merge_algorithm);
res.merge_algorithm = toString(merge_algorithm.load(std::memory_order_relaxed));
for (const auto & source_part_name : source_part_names)
res.source_part_names.emplace_back(source_part_name);

View File

@ -92,7 +92,8 @@ struct MergeListElement : boost::noncopyable
UInt64 thread_id;
MergeType merge_type;
MergeAlgorithm merge_algorithm;
/// Detected after merge already started
std::atomic<MergeAlgorithm> merge_algorithm;
MergeListElement(const std::string & database, const std::string & table, const FutureMergedMutatedPart & future_part);

View File

@ -710,10 +710,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
size_t sum_input_rows_upper_bound = merge_entry->total_rows_count;
size_t sum_compressed_bytes_upper_bound = merge_entry->total_size_bytes_compressed;
MergeAlgorithm merge_alg = chooseMergeAlgorithm(parts, sum_input_rows_upper_bound, gathering_columns, deduplicate, need_remove_expired_values);
merge_entry->merge_algorithm = merge_alg;
MergeAlgorithm chosen_merge_algorithm = chooseMergeAlgorithm(parts, sum_input_rows_upper_bound, gathering_columns, deduplicate, need_remove_expired_values);
merge_entry->merge_algorithm.store(chosen_merge_algorithm, std::memory_order_relaxed);
LOG_DEBUG(log, "Selected MergeAlgorithm: {}", toString(merge_alg));
LOG_DEBUG(log, "Selected MergeAlgorithm: {}", toString(chosen_merge_algorithm));
/// Note: this is done before creating input streams, because otherwise data.data_parts_mutex
/// (which is locked in data.getTotalActiveSizeInBytes())
@ -728,7 +728,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
std::unique_ptr<WriteBuffer> rows_sources_write_buf;
std::optional<ColumnSizeEstimator> column_sizes;
if (merge_alg == MergeAlgorithm::Vertical)
if (chosen_merge_algorithm == MergeAlgorithm::Vertical)
{
tmp_disk->createDirectories(new_part_tmp_path);
rows_sources_file_path = new_part_tmp_path + "rows_sources";
@ -818,7 +818,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
ProcessorPtr merged_transform;
/// If merge is vertical we cannot calculate it
bool blocks_are_granules_size = (merge_alg == MergeAlgorithm::Vertical);
bool blocks_are_granules_size = (chosen_merge_algorithm == MergeAlgorithm::Vertical);
UInt64 merge_block_size = data_settings->merge_max_block_size;
switch (data.merging_params.mode)
@ -917,7 +917,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
{
/// The same progress from merge_entry could be used for both algorithms (it should be more accurate)
/// But now we are using inaccurate row-based estimation in Horizontal case for backward compatibility
Float64 progress = (merge_alg == MergeAlgorithm::Horizontal)
Float64 progress = (chosen_merge_algorithm == MergeAlgorithm::Horizontal)
? std::min(1., 1. * rows_written / sum_input_rows_upper_bound)
: std::min(1., merge_entry->progress.load(std::memory_order_relaxed));
@ -938,7 +938,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
MergeTreeData::DataPart::Checksums checksums_gathered_columns;
/// Gather ordinary columns
if (merge_alg == MergeAlgorithm::Vertical)
if (chosen_merge_algorithm == MergeAlgorithm::Vertical)
{
size_t sum_input_rows_exact = merge_entry->rows_read;
merge_entry->columns_written = merging_column_names.size();
@ -1054,7 +1054,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
ReadableSize(merge_entry->bytes_read_uncompressed / elapsed_seconds));
}
if (merge_alg != MergeAlgorithm::Vertical)
if (chosen_merge_algorithm != MergeAlgorithm::Vertical)
to.writeSuffixAndFinalizePart(new_data_part, need_sync);
else
to.writeSuffixAndFinalizePart(new_data_part, need_sync, &storage_columns, &checksums_gathered_columns);

View File

@ -898,7 +898,7 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
num_streams,
sum_marks,
min_marks_for_concurrent_read,
parts,
std::move(parts),
data,
metadata_snapshot,
query_info.prewhere_info,

View File

@ -21,7 +21,7 @@ MergeTreeReadPool::MergeTreeReadPool(
const size_t threads_,
const size_t sum_marks_,
const size_t min_marks_for_concurrent_read_,
RangesInDataParts parts_,
RangesInDataParts && parts_,
const MergeTreeData & data_,
const StorageMetadataPtr & metadata_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
@ -38,11 +38,11 @@ MergeTreeReadPool::MergeTreeReadPool(
, do_not_steal_tasks{do_not_steal_tasks_}
, predict_block_size_bytes{preferred_block_size_bytes_ > 0}
, prewhere_info{prewhere_info_}
, parts_ranges{parts_}
, parts_ranges{std::move(parts_)}
{
/// parts don't contain duplicate MergeTreeDataPart's.
const auto per_part_sum_marks = fillPerPartInfo(parts_, check_columns_);
fillPerThreadInfo(threads_, sum_marks_, per_part_sum_marks, parts_, min_marks_for_concurrent_read_);
const auto per_part_sum_marks = fillPerPartInfo(parts_ranges, check_columns_);
fillPerThreadInfo(threads_, sum_marks_, per_part_sum_marks, parts_ranges, min_marks_for_concurrent_read_);
}
@ -62,7 +62,24 @@ MergeTreeReadTaskPtr MergeTreeReadPool::getTask(const size_t min_marks_to_read,
return nullptr;
/// Steal task if nothing to do and it's not prohibited
const auto thread_idx = tasks_remaining_for_this_thread ? thread : *std::begin(remaining_thread_tasks);
auto thread_idx = thread;
if (!tasks_remaining_for_this_thread)
{
auto it = remaining_thread_tasks.lower_bound(backoff_state.current_threads);
// Grab the entire tasks of a thread which is killed by backoff
if (it != remaining_thread_tasks.end())
{
threads_tasks[thread] = std::move(threads_tasks[*it]);
remaining_thread_tasks.erase(it);
}
else // Try steal tasks from the next thread
{
it = remaining_thread_tasks.upper_bound(thread);
if (it == remaining_thread_tasks.end())
it = remaining_thread_tasks.begin();
thread_idx = *it;
}
}
auto & thread_tasks = threads_tasks[thread_idx];
auto & thread_task = thread_tasks.parts_and_ranges.back();
@ -163,7 +180,7 @@ void MergeTreeReadPool::profileFeedback(const ReadBufferFromFileBase::ProfileInf
std::lock_guard lock(mutex);
if (backoff_state.current_threads <= 1)
if (backoff_state.current_threads <= backoff_settings.min_concurrency)
return;
size_t throughput = info.bytes_read * 1000000000 / info.nanoseconds;
@ -194,14 +211,14 @@ void MergeTreeReadPool::profileFeedback(const ReadBufferFromFileBase::ProfileInf
std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(
RangesInDataParts & parts, const bool check_columns)
const RangesInDataParts & parts, const bool check_columns)
{
std::vector<size_t> per_part_sum_marks;
Block sample_block = metadata_snapshot->getSampleBlock();
for (const auto i : ext::range(0, parts.size()))
{
auto & part = parts[i];
const auto & part = parts[i];
/// Read marks for every data part.
size_t sum_marks = 0;
@ -238,21 +255,53 @@ std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(
void MergeTreeReadPool::fillPerThreadInfo(
const size_t threads, const size_t sum_marks, std::vector<size_t> per_part_sum_marks,
RangesInDataParts & parts, const size_t min_marks_for_concurrent_read)
const RangesInDataParts & parts, const size_t min_marks_for_concurrent_read)
{
threads_tasks.resize(threads);
if (parts.empty())
return;
struct PartInfo
{
RangesInDataPart part;
size_t sum_marks;
size_t part_idx;
};
using PartsInfo = std::vector<PartInfo>;
std::queue<PartsInfo> parts_queue;
{
/// Group parts by disk name.
/// We try minimize the number of threads concurrently read from the same disk.
/// It improves the performance for JBOD architecture.
std::map<String, std::vector<PartInfo>> parts_per_disk;
for (size_t i = 0; i < parts.size(); ++i)
{
PartInfo part_info{parts[i], per_part_sum_marks[i], i};
if (parts[i].data_part->isStoredOnDisk())
parts_per_disk[parts[i].data_part->volume->getDisk()->getName()].push_back(std::move(part_info));
else
parts_per_disk[""].push_back(std::move(part_info));
}
for (auto & info : parts_per_disk)
parts_queue.push(std::move(info.second));
}
const size_t min_marks_per_thread = (sum_marks - 1) / threads + 1;
for (size_t i = 0; i < threads && !parts.empty(); ++i)
for (size_t i = 0; i < threads && !parts_queue.empty(); ++i)
{
auto need_marks = min_marks_per_thread;
while (need_marks > 0 && !parts.empty())
while (need_marks > 0 && !parts_queue.empty())
{
const auto part_idx = parts.size() - 1;
RangesInDataPart & part = parts.back();
size_t & marks_in_part = per_part_sum_marks.back();
auto & current_parts = parts_queue.front();
RangesInDataPart & part = current_parts.back().part;
size_t & marks_in_part = current_parts.back().sum_marks;
const auto part_idx = current_parts.back().part_idx;
/// Do not get too few rows from part.
if (marks_in_part >= min_marks_for_concurrent_read &&
@ -274,8 +323,9 @@ void MergeTreeReadPool::fillPerThreadInfo(
marks_in_ranges = marks_in_part;
need_marks -= marks_in_part;
parts.pop_back();
per_part_sum_marks.pop_back();
current_parts.pop_back();
if (current_parts.empty())
parts_queue.pop();
}
else
{
@ -304,6 +354,17 @@ void MergeTreeReadPool::fillPerThreadInfo(
if (marks_in_ranges != 0)
remaining_thread_tasks.insert(i);
}
/// Before processing next thread, change disk if possible.
/// Different threads will likely start reading from different disk,
/// which may improve read parallelism for JBOD.
/// It also may be helpful in case we have backoff threads.
/// Backoff threads will likely to reduce load for different disks, not the same one.
if (parts_queue.size() > 1)
{
parts_queue.push(std::move(parts_queue.front()));
parts_queue.pop();
}
}
}

View File

@ -36,13 +36,16 @@ public:
size_t min_interval_between_events_ms = 1000;
/// Number of events to do backoff - to lower number of threads in pool.
size_t min_events = 2;
/// Try keeping the minimal number of threads in pool.
size_t min_concurrency = 1;
/// Constants above is just an example.
BackoffSettings(const Settings & settings)
: min_read_latency_ms(settings.read_backoff_min_latency_ms.totalMilliseconds()),
max_throughput(settings.read_backoff_max_throughput),
min_interval_between_events_ms(settings.read_backoff_min_interval_between_events_ms.totalMilliseconds()),
min_events(settings.read_backoff_min_events)
min_events(settings.read_backoff_min_events),
min_concurrency(settings.read_backoff_min_concurrency)
{
}
@ -68,7 +71,7 @@ private:
public:
MergeTreeReadPool(
const size_t threads_, const size_t sum_marks_, const size_t min_marks_for_concurrent_read_,
RangesInDataParts parts_, const MergeTreeData & data_, const StorageMetadataPtr & metadata_snapshot_, const PrewhereInfoPtr & prewhere_info_,
RangesInDataParts && parts_, const MergeTreeData & data_, const StorageMetadataPtr & metadata_snapshot_, const PrewhereInfoPtr & prewhere_info_,
const bool check_columns_, const Names & column_names_,
const BackoffSettings & backoff_settings_, size_t preferred_block_size_bytes_,
const bool do_not_steal_tasks_ = false);
@ -88,11 +91,11 @@ public:
private:
std::vector<size_t> fillPerPartInfo(
RangesInDataParts & parts, const bool check_columns);
const RangesInDataParts & parts, const bool check_columns);
void fillPerThreadInfo(
const size_t threads, const size_t sum_marks, std::vector<size_t> per_part_sum_marks,
RangesInDataParts & parts, const size_t min_marks_for_concurrent_read);
const RangesInDataParts & parts, const size_t min_marks_for_concurrent_read);
const MergeTreeData & data;
StorageMetadataPtr metadata_snapshot;

View File

@ -64,7 +64,7 @@ void MergeTreeWriteAheadLog::addPart(const Block & block, const String & part_na
min_block_number = std::min(min_block_number, part_info.min_block);
max_block_number = std::max(max_block_number, part_info.max_block);
writeIntBinary(static_cast<UInt8>(0), *out); /// version
writeIntBinary(WAL_VERSION, *out);
writeIntBinary(static_cast<UInt8>(ActionType::ADD_PART), *out);
writeStringBinary(part_name, *out);
block_out->write(block);
@ -80,7 +80,7 @@ void MergeTreeWriteAheadLog::dropPart(const String & part_name)
{
std::unique_lock lock(write_mutex);
writeIntBinary(static_cast<UInt8>(0), *out);
writeIntBinary(WAL_VERSION, *out);
writeIntBinary(static_cast<UInt8>(ActionType::DROP_PART), *out);
writeStringBinary(part_name, *out);
out->next();
@ -116,9 +116,13 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor
try
{
ActionMetadata metadata;
readIntBinary(version, *in);
if (version != 0)
throw Exception("Unknown WAL format version: " + toString(version), ErrorCodes::UNKNOWN_FORMAT_VERSION);
if (version > 0)
{
metadata.read(*in);
}
readIntBinary(action_type, *in);
readStringBinary(part_name, *in);
@ -233,4 +237,29 @@ MergeTreeWriteAheadLog::tryParseMinMaxBlockNumber(const String & filename)
return std::make_pair(min_block, max_block);
}
void MergeTreeWriteAheadLog::ActionMetadata::read(ReadBuffer & meta_in)
{
readIntBinary(min_compatible_version, meta_in);
if (min_compatible_version > WAL_VERSION)
throw Exception("WAL metadata version " + toString(min_compatible_version)
+ " is not compatible with this ClickHouse version", ErrorCodes::UNKNOWN_FORMAT_VERSION);
size_t metadata_size;
readVarUInt(metadata_size, meta_in);
UInt32 metadata_start = meta_in.offset();
/// For the future: read metadata here.
/// Skip extra fields if any. If min_compatible_version is lower than WAL_VERSION it means
/// that the fields are not critical for the correctness.
meta_in.ignore(metadata_size - (meta_in.offset() - metadata_start));
}
void MergeTreeWriteAheadLog::ActionMetadata::write(WriteBuffer & meta_out) const
{
writeIntBinary(min_compatible_version, meta_out);
writeVarUInt(static_cast<UInt32>(0), meta_out);
}
}

View File

@ -28,6 +28,19 @@ public:
DROP_PART = 1,
};
struct ActionMetadata
{
/// The minimum version of WAL reader that can understand metadata written by current ClickHouse version.
/// This field must be increased when making backwards incompatible changes.
///
/// The same approach can be used recursively inside metadata.
UInt8 min_compatible_version = 0;
void write(WriteBuffer & meta_out) const;
void read(ReadBuffer & meta_in);
};
constexpr static UInt8 WAL_VERSION = 0;
constexpr static auto WAL_FILE_NAME = "wal";
constexpr static auto WAL_FILE_EXTENSION = ".bin";
constexpr static auto DEFAULT_WAL_FILE_NAME = "wal.bin";

View File

@ -57,6 +57,7 @@ bool ReplicatedMergeTreeQueue::isVirtualPart(const MergeTreeData::DataPartPtr &
return virtual_parts.getContainingPart(data_part->info) != data_part->name;
}
bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
{
auto queue_path = replica_path + "/queue";
@ -68,6 +69,9 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
{
std::lock_guard pull_logs_lock(pull_logs_to_queue_mutex);
/// Reset batch size on initialization to recover from possible errors of too large batch size.
current_multi_batch_size = 1;
String log_pointer_str = zookeeper->get(replica_path + "/log_pointer");
log_pointer = log_pointer_str.empty() ? 0 : parse<UInt64>(log_pointer_str);
@ -486,20 +490,21 @@ int32_t ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper
{
std::sort(log_entries.begin(), log_entries.end());
/// ZK contains a limit on the number or total size of operations in a multi-request.
/// If the limit is exceeded, the connection is simply closed.
/// The constant is selected with a margin. The default limit in ZK is 1 MB of data in total.
/// The average size of the node value in this case is less than 10 kilobytes.
static constexpr auto MAX_MULTI_OPS = 100;
for (size_t entry_idx = 0, num_entries = log_entries.size(); entry_idx < num_entries; entry_idx += MAX_MULTI_OPS)
for (size_t entry_idx = 0, num_entries = log_entries.size(); entry_idx < num_entries;)
{
auto begin = log_entries.begin() + entry_idx;
auto end = entry_idx + MAX_MULTI_OPS >= log_entries.size()
auto end = entry_idx + current_multi_batch_size >= log_entries.size()
? log_entries.end()
: (begin + MAX_MULTI_OPS);
: (begin + current_multi_batch_size);
auto last = end - 1;
/// Increment entry_idx before batch size increase (we copied at most current_multi_batch_size entries)
entry_idx += current_multi_batch_size;
/// Increase the batch size exponentially, so it will saturate to MAX_MULTI_OPS.
if (current_multi_batch_size < MAX_MULTI_OPS)
current_multi_batch_size = std::min<size_t>(MAX_MULTI_OPS, current_multi_batch_size * 2);
String last_entry = *last;
if (!startsWith(last_entry, "log-"))
throw Exception("Error in zookeeper data: unexpected node " + last_entry + " in " + zookeeper_path + "/log",

View File

@ -259,6 +259,19 @@ private:
~CurrentlyExecuting();
};
/// ZK contains a limit on the number or total size of operations in a multi-request.
/// If the limit is exceeded, the connection is simply closed.
/// The constant is selected with a margin. The default limit in ZK is 1 MB of data in total.
/// The average size of the node value in this case is less than 10 kilobytes.
static constexpr size_t MAX_MULTI_OPS = 100;
/// Very large queue entries may appear occasionally.
/// We cannot process MAX_MULTI_OPS at once because it will fail.
/// But we have to process more than one entry at once because otherwise lagged replicas keep up slowly.
/// Let's start with one entry per transaction and icrease it exponentially towards MAX_MULTI_OPS.
/// It will allow to make some progress before failing and remain operational even in extreme cases.
size_t current_multi_batch_size = 1;
public:
ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & storage_);
~ReplicatedMergeTreeQueue();

View File

@ -0,0 +1,2 @@
add_executable (wal_action_metadata wal_action_metadata.cpp)
target_link_libraries (wal_action_metadata PRIVATE dbms)

View File

@ -0,0 +1,61 @@
#include <iostream>
#include <IO/MemoryReadWriteBuffer.h>
#include <Storages/MergeTree/MergeTreeWriteAheadLog.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_FORMAT_VERSION;
}
}
int main(int, char **)
{
try
{
{
std::cout << "test: dummy test" << std::endl;
DB::MergeTreeWriteAheadLog::ActionMetadata metadata_out;
DB::MemoryWriteBuffer buf{};
metadata_out.write(buf);
buf.finalize();
metadata_out.read(*buf.tryGetReadBuffer());
}
{
std::cout << "test: min compatibility" << std::endl;
DB::MergeTreeWriteAheadLog::ActionMetadata metadata_out;
metadata_out.min_compatible_version = DB::MergeTreeWriteAheadLog::WAL_VERSION + 1;
DB::MemoryWriteBuffer buf{};
metadata_out.write(buf);
buf.finalize();
try
{
metadata_out.read(*buf.tryGetReadBuffer());
}
catch (const DB::Exception & e)
{
if (e.code() != DB::ErrorCodes::UNKNOWN_FORMAT_VERSION)
{
std::cerr << "Expected UNKNOWN_FORMAT_VERSION exception but got: "
<< e.what() << ", " << e.displayText() << std::endl;
}
}
}
}
catch (const DB::Exception & e)
{
std::cerr << e.what() << ", " << e.displayText() << std::endl;
return 1;
}
return 0;
}

View File

@ -27,7 +27,7 @@ RabbitMQBlockInputStream::RabbitMQBlockInputStream(
, non_virtual_header(metadata_snapshot->getSampleBlockNonMaterialized())
, sample_block(non_virtual_header)
, virtual_header(metadata_snapshot->getSampleBlockForColumns(
{"_exchange_name", "_channel_id", "_delivery_tag", "_redelivered", "_message_id"},
{"_exchange_name", "_channel_id", "_delivery_tag", "_redelivered", "_message_id", "_timestamp"},
storage.getVirtuals(), storage.getStorageID()))
{
for (const auto & column : virtual_header)
@ -158,6 +158,7 @@ Block RabbitMQBlockInputStream::readImpl()
auto delivery_tag = buffer->getDeliveryTag();
auto redelivered = buffer->getRedelivered();
auto message_id = buffer->getMessageID();
auto timestamp = buffer->getTimestamp();
buffer->updateAckTracker({delivery_tag, channel_id});
@ -168,6 +169,7 @@ Block RabbitMQBlockInputStream::readImpl()
virtual_columns[2]->insert(delivery_tag);
virtual_columns[3]->insert(redelivered);
virtual_columns[4]->insert(message_id);
virtual_columns[5]->insert(timestamp);
}
total_rows = total_rows + new_rows;

View File

@ -30,6 +30,7 @@ public:
Block readImpl() override;
void readSuffixImpl() override;
bool queueEmpty() const { return !buffer || buffer->queueEmpty(); }
bool needChannelUpdate();
void updateChannel();
bool sendAck();

View File

@ -14,47 +14,27 @@
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int CANNOT_CREATE_RABBITMQ_QUEUE_BINDING;
}
ReadBufferFromRabbitMQConsumer::ReadBufferFromRabbitMQConsumer(
ChannelPtr consumer_channel_,
ChannelPtr setup_channel_,
HandlerPtr event_handler_,
const String & exchange_name_,
std::vector<String> & queues_,
size_t channel_id_base_,
const String & channel_base_,
const String & queue_base_,
Poco::Logger * log_,
char row_delimiter_,
bool hash_exchange_,
size_t num_queues_,
const String & deadletter_exchange_,
uint32_t queue_size_,
const std::atomic<bool> & stopped_)
: ReadBuffer(nullptr, 0)
, consumer_channel(std::move(consumer_channel_))
, setup_channel(setup_channel_)
, event_handler(event_handler_)
, exchange_name(exchange_name_)
, queues(queues_)
, channel_base(channel_base_)
, channel_id_base(channel_id_base_)
, queue_base(queue_base_)
, hash_exchange(hash_exchange_)
, num_queues(num_queues_)
, deadletter_exchange(deadletter_exchange_)
, log(log_)
, row_delimiter(row_delimiter_)
, queue_size(queue_size_)
, stopped(stopped_)
, received(queue_size * num_queues)
, received(queue_size_)
{
for (size_t queue_id = 0; queue_id < num_queues; ++queue_id)
bindQueue(queue_id);
setupChannel();
}
@ -65,67 +45,6 @@ ReadBufferFromRabbitMQConsumer::~ReadBufferFromRabbitMQConsumer()
}
void ReadBufferFromRabbitMQConsumer::bindQueue(size_t queue_id)
{
std::atomic<bool> binding_created = false;
auto success_callback = [&](const std::string & queue_name, int msgcount, int /* consumercount */)
{
queues.emplace_back(queue_name);
LOG_DEBUG(log, "Queue {} is declared", queue_name);
if (msgcount)
LOG_INFO(log, "Queue {} is non-empty. Non-consumed messaged will also be delivered", queue_name);
/* Here we bind either to sharding exchange (consistent-hash) or to bridge exchange (fanout). All bindings to routing keys are
* done between client's exchange and local bridge exchange. Binding key must be a string integer in case of hash exchange, for
* fanout exchange it can be arbitrary
*/
setup_channel->bindQueue(exchange_name, queue_name, std::to_string(channel_id_base))
.onSuccess([&] { binding_created = true; })
.onError([&](const char * message)
{
throw Exception(
ErrorCodes::CANNOT_CREATE_RABBITMQ_QUEUE_BINDING,
"Failed to create queue binding with queue {} for exchange {}. Reason: {}", std::string(message),
queue_name, exchange_name);
});
};
auto error_callback([&](const char * message)
{
/* This error is most likely a result of an attempt to declare queue with different settings if it was declared before. So for a
* given queue name either deadletter_exchange parameter changed or queue_size changed, i.e. table was declared with different
* max_block_size parameter. Solution: client should specify a different queue_base parameter or manually delete previously
* declared queues via any of the various cli tools.
*/
throw Exception("Failed to declare queue. Probably queue settings are conflicting: max_block_size, deadletter_exchange. Attempt \
specifying differently those settings or use a different queue_base or manually delete previously declared queues, \
which were declared with the same names. ERROR reason: "
+ std::string(message), ErrorCodes::BAD_ARGUMENTS);
});
AMQP::Table queue_settings;
queue_settings["x-max-length"] = queue_size;
queue_settings["x-overflow"] = "reject-publish";
if (!deadletter_exchange.empty())
queue_settings["x-dead-letter-exchange"] = deadletter_exchange;
/* The first option not just simplifies queue_name, but also implements the possibility to be able to resume reading from one
* specific queue when its name is specified in queue_base setting
*/
const String queue_name = !hash_exchange ? queue_base : std::to_string(channel_id_base) + "_" + std::to_string(queue_id) + "_" + queue_base;
setup_channel->declareQueue(queue_name, AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback);
while (!binding_created)
{
iterateEventLoop();
}
}
void ReadBufferFromRabbitMQConsumer::subscribe()
{
for (const auto & queue_name : queues)
@ -146,10 +65,9 @@ void ReadBufferFromRabbitMQConsumer::subscribe()
if (row_delimiter != '\0')
message_received += row_delimiter;
if (message.hasMessageID())
received.push({message_received, message.messageID(), redelivered, AckTracker(delivery_tag, channel_id)});
else
received.push({message_received, "", redelivered, AckTracker(delivery_tag, channel_id)});
received.push({message_received, message.hasMessageID() ? message.messageID() : "",
message.hasTimestamp() ? message.timestamp() : 0,
redelivered, AckTracker(delivery_tag, channel_id)});
}
})
.onError([&](const char * message)

View File

@ -24,17 +24,12 @@ class ReadBufferFromRabbitMQConsumer : public ReadBuffer
public:
ReadBufferFromRabbitMQConsumer(
ChannelPtr consumer_channel_,
ChannelPtr setup_channel_,
HandlerPtr event_handler_,
const String & exchange_name_,
std::vector<String> & queues_,
size_t channel_id_base_,
const String & channel_base_,
const String & queue_base_,
Poco::Logger * log_,
char row_delimiter_,
bool hash_exchange_,
size_t num_queues_,
const String & deadletter_exchange_,
uint32_t queue_size_,
const std::atomic<bool> & stopped_);
@ -53,6 +48,7 @@ public:
{
String message;
String message_id;
uint64_t timestamp;
bool redelivered;
AckTracker track;
};
@ -75,34 +71,26 @@ public:
auto getDeliveryTag() const { return current.track.delivery_tag; }
auto getRedelivered() const { return current.redelivered; }
auto getMessageID() const { return current.message_id; }
auto getTimestamp() const { return current.timestamp; }
private:
bool nextImpl() override;
void bindQueue(size_t queue_id);
void subscribe();
void iterateEventLoop();
ChannelPtr consumer_channel;
ChannelPtr setup_channel;
HandlerPtr event_handler;
const String exchange_name;
std::vector<String> queues;
const String channel_base;
const size_t channel_id_base;
const String queue_base;
const bool hash_exchange;
const size_t num_queues;
const String deadletter_exchange;
Poco::Logger * log;
char row_delimiter;
bool allowed = true;
uint32_t queue_size;
const std::atomic<bool> & stopped;
String channel_id;
std::atomic<bool> channel_error = true, wait_subscription = false;
std::vector<String> queues;
ConcurrentBoundedQueue<MessageData> received;
MessageData current;
size_t subscribed = 0;

View File

@ -38,8 +38,10 @@ namespace DB
static const auto CONNECT_SLEEP = 200;
static const auto RETRIES_MAX = 20;
static const auto HEARTBEAT_RESCHEDULE_MS = 3000;
static const uint32_t QUEUE_SIZE = 100000;
static const auto MAX_FAILED_READ_ATTEMPTS = 10;
static const auto RESCHEDULE_MS = 500;
static const auto MAX_THREAD_WORK_DURATION_MS = 60000;
namespace ErrorCodes
{
@ -50,6 +52,7 @@ namespace ErrorCodes
extern const int CANNOT_BIND_RABBITMQ_EXCHANGE;
extern const int CANNOT_DECLARE_RABBITMQ_EXCHANGE;
extern const int CANNOT_REMOVE_RABBITMQ_EXCHANGE;
extern const int CANNOT_CREATE_RABBITMQ_QUEUE_BINDING;
}
namespace ExchangeType
@ -122,9 +125,6 @@ StorageRabbitMQ::StorageRabbitMQ(
streaming_task = global_context.getSchedulePool().createTask("RabbitMQStreamingTask", [this]{ streamingToViewsFunc(); });
streaming_task->deactivate();
heartbeat_task = global_context.getSchedulePool().createTask("RabbitMQHeartbeatTask", [this]{ heartbeatFunc(); });
heartbeat_task->deactivate();
if (queue_base.empty())
{
/* Make sure that local exchange name is unique for each table and is not the same as client's exchange name. It also needs to
@ -210,16 +210,6 @@ Context StorageRabbitMQ::addSettings(Context context) const
}
void StorageRabbitMQ::heartbeatFunc()
{
if (!stream_cancelled && event_handler->connectionRunning())
{
connection->heartbeat();
heartbeat_task->scheduleAfter(HEARTBEAT_RESCHEDULE_MS);
}
}
void StorageRabbitMQ::loopingFunc()
{
if (event_handler->connectionRunning())
@ -396,13 +386,73 @@ void StorageRabbitMQ::bindExchange()
}
void StorageRabbitMQ::bindQueue(size_t queue_id)
{
std::atomic<bool> binding_created = false;
auto success_callback = [&](const std::string & queue_name, int msgcount, int /* consumercount */)
{
queues.emplace_back(queue_name);
LOG_DEBUG(log, "Queue {} is declared", queue_name);
if (msgcount)
LOG_INFO(log, "Queue {} is non-empty. Non-consumed messaged will also be delivered", queue_name);
/* Here we bind either to sharding exchange (consistent-hash) or to bridge exchange (fanout). All bindings to routing keys are
* done between client's exchange and local bridge exchange. Binding key must be a string integer in case of hash exchange, for
* fanout exchange it can be arbitrary
*/
setup_channel->bindQueue(consumer_exchange, queue_name, std::to_string(queue_id))
.onSuccess([&] { binding_created = true; })
.onError([&](const char * message)
{
throw Exception(
ErrorCodes::CANNOT_CREATE_RABBITMQ_QUEUE_BINDING,
"Failed to create queue binding for exchange {}. Reason: {}", exchange_name, std::string(message));
});
};
auto error_callback([&](const char * message)
{
/* This error is most likely a result of an attempt to declare queue with different settings if it was declared before. So for a
* given queue name either deadletter_exchange parameter changed or queue_size changed, i.e. table was declared with different
* max_block_size parameter. Solution: client should specify a different queue_base parameter or manually delete previously
* declared queues via any of the various cli tools.
*/
throw Exception("Failed to declare queue. Probably queue settings are conflicting: max_block_size, deadletter_exchange. Attempt \
specifying differently those settings or use a different queue_base or manually delete previously declared queues, \
which were declared with the same names. ERROR reason: "
+ std::string(message), ErrorCodes::BAD_ARGUMENTS);
});
AMQP::Table queue_settings;
queue_settings["x-max-length"] = queue_size;
if (!deadletter_exchange.empty())
queue_settings["x-dead-letter-exchange"] = deadletter_exchange;
else
queue_settings["x-overflow"] = "reject-publish";
/* The first option not just simplifies queue_name, but also implements the possibility to be able to resume reading from one
* specific queue when its name is specified in queue_base setting
*/
const String queue_name = !hash_exchange ? queue_base : std::to_string(queue_id) + "_" + queue_base;
setup_channel->declareQueue(queue_name, AMQP::durable, queue_settings).onSuccess(success_callback).onError(error_callback);
while (!binding_created)
{
event_handler->iterateLoop();
}
}
bool StorageRabbitMQ::restoreConnection(bool reconnecting)
{
size_t cnt_retries = 0;
if (reconnecting)
{
deactivateTask(heartbeat_task, false, false);
connection->close(); /// Connection might be unusable, but not closed
/* Connection is not closed immediately (firstly, all pending operations are completed, and then
@ -452,11 +502,11 @@ void StorageRabbitMQ::unbindExchange()
*/
std::call_once(flag, [&]()
{
heartbeat_task->deactivate();
streaming_task->deactivate();
event_handler->updateLoopState(Loop::STOP);
looping_task->deactivate();
setup_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
setup_channel->removeExchange(bridge_exchange)
.onSuccess([&]()
{
@ -471,6 +521,8 @@ void StorageRabbitMQ::unbindExchange()
{
event_handler->iterateLoop();
}
setup_channel->close();
});
}
@ -499,8 +551,6 @@ Pipe StorageRabbitMQ::read(
deactivateTask(looping_task, false, true);
update_channels = restoreConnection(true);
if (update_channels)
heartbeat_task->scheduleAfter(HEARTBEAT_RESCHEDULE_MS);
}
Pipes pipes;
@ -521,7 +571,6 @@ Pipe StorageRabbitMQ::read(
if (event_handler->loopRunning())
{
deactivateTask(looping_task, false, true);
deactivateTask(heartbeat_task, false, false);
}
rabbit_stream->updateChannel();
@ -552,6 +601,13 @@ void StorageRabbitMQ::startup()
initExchange();
bindExchange();
for (size_t i = 1; i <= num_queues; ++i)
{
bindQueue(i);
}
setup_channel->close();
for (size_t i = 0; i < num_consumers; ++i)
{
try
@ -568,7 +624,6 @@ void StorageRabbitMQ::startup()
event_handler->updateLoopState(Loop::RUN);
streaming_task->activateAndSchedule();
heartbeat_task->activateAndSchedule();
}
@ -579,7 +634,6 @@ void StorageRabbitMQ::shutdown()
deactivateTask(streaming_task, true, false);
deactivateTask(looping_task, true, true);
deactivateTask(heartbeat_task, true, false);
connection->close();
@ -635,9 +689,8 @@ ConsumerBufferPtr StorageRabbitMQ::createReadBuffer()
ChannelPtr consumer_channel = std::make_shared<AMQP::TcpChannel>(connection.get());
return std::make_shared<ReadBufferFromRabbitMQConsumer>(
consumer_channel, setup_channel, event_handler, consumer_exchange, ++consumer_id,
unique_strbase, queue_base, log, row_delimiter, hash_exchange, num_queues,
deadletter_exchange, queue_size, stream_cancelled);
consumer_channel, event_handler, queues, ++consumer_id,
unique_strbase, log, row_delimiter, queue_size, stream_cancelled);
}
@ -683,11 +736,14 @@ void StorageRabbitMQ::streamingToViewsFunc()
try
{
auto table_id = getStorageID();
// Check if at least one direct dependency is attached
size_t dependencies_count = DatabaseCatalog::instance().getDependencies(table_id).size();
if (dependencies_count)
{
auto start_time = std::chrono::steady_clock::now();
// Keep streaming as long as there are attached views and streaming is not cancelled
while (!stream_cancelled && num_created_consumers > 0)
{
@ -696,8 +752,17 @@ void StorageRabbitMQ::streamingToViewsFunc()
LOG_DEBUG(log, "Started streaming to {} attached views", dependencies_count);
if (!streamToViews())
if (streamToViews())
break;
auto end_time = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
if (duration.count() > MAX_THREAD_WORK_DURATION_MS)
{
event_handler->updateLoopState(Loop::STOP);
LOG_TRACE(log, "Reschedule streaming. Thread work duration limit exceeded.");
break;
}
}
}
}
@ -708,7 +773,7 @@ void StorageRabbitMQ::streamingToViewsFunc()
/// Wait for attached views
if (!stream_cancelled)
streaming_task->schedule();
streaming_task->scheduleAfter(RESCHEDULE_MS);
}
@ -731,13 +796,6 @@ bool StorageRabbitMQ::streamToViews()
auto column_names = block_io.out->getHeader().getNames();
auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
/* event_handler->connectionRunning() does not guarantee that connection is not closed in case loop was not running before, but
* need to anyway start the loop to activate error callbacks and update connection state, because even checking with
* connection->usable() will not give correct answer before callbacks are activated.
*/
if (!event_handler->loopRunning() && event_handler->connectionRunning())
looping_task->activateAndSchedule();
auto block_size = getMaxBlockSize();
// Create a stream for each consumer and join them in a union stream
@ -770,34 +828,45 @@ bool StorageRabbitMQ::streamToViews()
in = streams[0];
std::atomic<bool> stub = {false};
if (!event_handler->loopRunning())
{
event_handler->updateLoopState(Loop::RUN);
looping_task->activateAndSchedule();
}
copyData(*in, *block_io.out, &stub);
/* Need to stop loop even if connection is ok, because sending ack() with loop running in another thread will lead to a lot of data
* races inside the library, but only in case any error occurs or connection is lost while ack is being sent
/* Note: sending ack() with loop running in another thread will lead to a lot of data races inside the library, but only in case
* error occurs or connection is lost while ack is being sent
*/
if (event_handler->loopRunning())
deactivateTask(looping_task, false, true);
size_t queue_empty = 0;
if (!event_handler->connectionRunning())
{
if (!stream_cancelled && restoreConnection(true))
if (stream_cancelled)
return true;
if (restoreConnection(true))
{
for (auto & stream : streams)
stream->as<RabbitMQBlockInputStream>()->updateChannel();
}
else
{
/// Reschedule if unable to connect to rabbitmq or quit if cancelled
return false;
LOG_TRACE(log, "Reschedule streaming. Unable to restore connection.");
return true;
}
}
else
{
deactivateTask(heartbeat_task, false, false);
/// Commit
for (auto & stream : streams)
{
if (stream->as<RabbitMQBlockInputStream>()->queueEmpty())
++queue_empty;
/* false is returned by the sendAck function in only two cases:
* 1) if connection failed. In this case all channels will be closed and will be unable to send ack. Also ack is made based on
* delivery tags, which are unique to channels, so if channels fail, those delivery tags will become invalid and there is
@ -828,19 +897,25 @@ bool StorageRabbitMQ::streamToViews()
break;
}
}
event_handler->iterateLoop();
}
}
if ((queue_empty == num_created_consumers) && (++read_attempts == MAX_FAILED_READ_ATTEMPTS))
{
connection->heartbeat();
read_attempts = 0;
LOG_TRACE(log, "Reschedule streaming. Queues are empty.");
return true;
}
else
{
event_handler->updateLoopState(Loop::RUN);
looping_task->activateAndSchedule();
heartbeat_task->scheduleAfter(HEARTBEAT_RESCHEDULE_MS); /// It is also deactivated in restoreConnection(), so reschedule anyway
}
// Check whether the limits were applied during query execution
bool limits_applied = false;
const BlockStreamProfileInfo & info = in->getProfileInfo();
limits_applied = info.hasAppliedLimit();
return limits_applied;
return false;
}
@ -907,7 +982,8 @@ NamesAndTypesList StorageRabbitMQ::getVirtuals() const
{"_channel_id", std::make_shared<DataTypeString>()},
{"_delivery_tag", std::make_shared<DataTypeUInt64>()},
{"_redelivered", std::make_shared<DataTypeUInt8>()},
{"_message_id", std::make_shared<DataTypeString>()}
{"_message_id", std::make_shared<DataTypeString>()},
{"_timestamp", std::make_shared<DataTypeUInt64>()}
};
}

View File

@ -114,14 +114,15 @@ private:
std::atomic<bool> wait_confirm = true; /// needed to break waiting for confirmations for producer
std::atomic<bool> exchange_removed = false;
ChannelPtr setup_channel;
std::vector<String> queues;
std::once_flag flag; /// remove exchange only once
std::mutex task_mutex;
BackgroundSchedulePool::TaskHolder streaming_task;
BackgroundSchedulePool::TaskHolder heartbeat_task;
BackgroundSchedulePool::TaskHolder looping_task;
std::atomic<bool> stream_cancelled{false};
size_t read_attempts = 0;
ConsumerBufferPtr createReadBuffer();
@ -140,6 +141,7 @@ private:
void initExchange();
void bindExchange();
void bindQueue(size_t queue_id);
bool restoreConnection(bool reconnecting);
bool streamToViews();

View File

@ -80,7 +80,6 @@ namespace ErrorCodes
extern const int TYPE_MISMATCH;
extern const int TOO_MANY_ROWS;
extern const int UNABLE_TO_SKIP_UNUSED_SHARDS;
extern const int LOGICAL_ERROR;
}
namespace ActionLocks
@ -600,15 +599,22 @@ void StorageDistributed::shutdown()
monitors_blocker.cancelForever();
std::lock_guard lock(cluster_nodes_mutex);
LOG_DEBUG(log, "Joining background threads for async INSERT");
cluster_nodes_data.clear();
LOG_DEBUG(log, "Background threads for async INSERT joined");
}
void StorageDistributed::drop()
{
// shutdown() should be already called
// and by the same reason we cannot use truncate() here, since
// cluster_nodes_data already cleaned
if (!cluster_nodes_data.empty())
throw Exception("drop called before shutdown", ErrorCodes::LOGICAL_ERROR);
// Some INSERT in-between shutdown() and drop() can call
// requireDirectoryMonitor() again, so call shutdown() to clear them, but
// when the drop() (this function) executed none of INSERT is allowed in
// parallel.
//
// And second time shutdown() should be fast, since none of
// DirectoryMonitor should do anything, because ActionBlocker is canceled
// (in shutdown()).
shutdown();
// Distributed table w/o sharding_key does not allows INSERTs
if (relative_data_path.empty())

View File

@ -3481,8 +3481,10 @@ void StorageReplicatedMergeTree::startup()
{
queue.initialize(getDataParts());
data_parts_exchange_endpoint = std::make_shared<DataPartsExchange::Service>(*this);
global_context.getInterserverIOHandler().addEndpoint(data_parts_exchange_endpoint->getId(replica_path), data_parts_exchange_endpoint);
InterserverIOEndpointPtr data_parts_exchange_ptr = std::make_shared<DataPartsExchange::Service>(*this);
[[maybe_unused]] auto prev_ptr = std::atomic_exchange(&data_parts_exchange_endpoint, data_parts_exchange_ptr);
assert(prev_ptr == nullptr);
global_context.getInterserverIOHandler().addEndpoint(data_parts_exchange_ptr->getId(replica_path), data_parts_exchange_ptr);
/// In this thread replica will be activated.
restarting_thread.start();
@ -3549,15 +3551,15 @@ void StorageReplicatedMergeTree::shutdown()
global_context.getBackgroundMovePool().removeTask(move_parts_task_handle);
move_parts_task_handle.reset();
if (data_parts_exchange_endpoint)
auto data_parts_exchange_ptr = std::atomic_exchange(&data_parts_exchange_endpoint, InterserverIOEndpointPtr{});
if (data_parts_exchange_ptr)
{
global_context.getInterserverIOHandler().removeEndpointIfExists(data_parts_exchange_endpoint->getId(replica_path));
global_context.getInterserverIOHandler().removeEndpointIfExists(data_parts_exchange_ptr->getId(replica_path));
/// Ask all parts exchange handlers to finish asap. New ones will fail to start
data_parts_exchange_endpoint->blocker.cancelForever();
data_parts_exchange_ptr->blocker.cancelForever();
/// Wait for all of them
std::unique_lock lock(data_parts_exchange_endpoint->rwlock);
std::unique_lock lock(data_parts_exchange_ptr->rwlock);
}
data_parts_exchange_endpoint.reset();
/// We clear all old parts after stopping all background operations. It's
/// important, because background operations can produce temporary parts
@ -5900,7 +5902,10 @@ ActionLock StorageReplicatedMergeTree::getActionLock(StorageActionBlockType acti
return fetcher.blocker.cancel();
if (action_type == ActionLocks::PartsSend)
return data_parts_exchange_endpoint ? data_parts_exchange_endpoint->blocker.cancel() : ActionLock();
{
auto data_parts_exchange_ptr = std::atomic_load(&data_parts_exchange_endpoint);
return data_parts_exchange_ptr ? data_parts_exchange_ptr->blocker.cancel() : ActionLock();
}
if (action_type == ActionLocks::ReplicationQueue)
return queue.actions_blocker.cancel();

View File

@ -38,8 +38,8 @@ void StorageSystemDistributionQueue::fillData(MutableColumns & res_columns, cons
std::map<String, std::map<String, StoragePtr>> tables;
for (const auto & db : DatabaseCatalog::instance().getDatabases())
{
/// Lazy database can not contain distributed tables
if (db.second->getEngineName() == "Lazy")
/// Check if database can contain distributed tables
if (!db.second->canContainDistributedTables())
continue;
const bool check_access_for_tables = check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, db.first);

View File

@ -32,8 +32,8 @@ static StorageSystemGraphite::Configs getConfigs(const Context & context)
for (const auto & db : databases)
{
/// Lazy database can not contain MergeTree tables
if (db.second->getEngineName() == "Lazy")
/// Check if database can contain MergeTree tables
if (!db.second->canContainMergeTreeTables())
continue;
for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next())

View File

@ -44,8 +44,8 @@ void StorageSystemMutations::fillData(MutableColumns & res_columns, const Contex
std::map<String, std::map<String, StoragePtr>> merge_tree_tables;
for (const auto & db : DatabaseCatalog::instance().getDatabases())
{
/// Lazy database can not contain MergeTree tables
if (db.second->getEngineName() == "Lazy")
/// Check if database can contain MergeTree tables
if (!db.second->canContainMergeTreeTables())
continue;
const bool check_access_for_tables = check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, db.first);

View File

@ -83,9 +83,9 @@ StoragesInfoStream::StoragesInfoStream(const SelectQueryInfo & query_info, const
MutableColumnPtr database_column_mut = ColumnString::create();
for (const auto & database : databases)
{
/// Lazy database can not contain MergeTree tables
/// and it's unnecessary to load all tables of Lazy database just to filter all of them.
if (database.second->getEngineName() != "Lazy")
/// Checck if database can contain MergeTree tables,
/// if not it's unnecessary to load all tables of database just to filter all of them.
if (database.second->canContainMergeTreeTables())
database_column_mut->insert(database.first);
}
block_to_filter.insert(ColumnWithTypeAndName(

View File

@ -74,8 +74,8 @@ Pipe StorageSystemReplicas::read(
std::map<String, std::map<String, StoragePtr>> replicated_tables;
for (const auto & db : DatabaseCatalog::instance().getDatabases())
{
/// Lazy database can not contain replicated tables
if (db.second->getEngineName() == "Lazy")
/// Check if database can contain replicated tables
if (!db.second->canContainMergeTreeTables())
continue;
const bool check_access_for_tables = check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, db.first);
for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next())

View File

@ -55,8 +55,8 @@ void StorageSystemReplicationQueue::fillData(MutableColumns & res_columns, const
std::map<String, std::map<String, StoragePtr>> replicated_tables;
for (const auto & db : DatabaseCatalog::instance().getDatabases())
{
/// Lazy database can not contain replicated tables
if (db.second->getEngineName() == "Lazy")
/// Check if database can contain replicated tables
if (!db.second->canContainMergeTreeTables())
continue;
const bool check_access_for_tables = check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, db.first);

View File

@ -0,0 +1,12 @@
<yandex>
<shutdown_wait_unfinished>3</shutdown_wait_unfinished>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
</yandex>

View File

@ -5,8 +5,8 @@ from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/fast_background_pool.xml'], with_zookeeper=True)
node2 = cluster.add_instance('node2', main_configs=['configs/fast_background_pool.xml'], with_zookeeper=True)
node1 = cluster.add_instance('node1', main_configs=['configs/fast_background_pool.xml', 'configs/log_conf.xml'], with_zookeeper=True)
node2 = cluster.add_instance('node2', main_configs=['configs/fast_background_pool.xml', 'configs/log_conf.xml'], with_zookeeper=True)
@pytest.fixture(scope="module")

View File

@ -0,0 +1,12 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>

View File

@ -0,0 +1,60 @@
import time
import contextlib
import pymysql.cursors
import pytest
import os
import subprocess
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster, get_docker_compose_path
from helpers.network import PartitionManager
cluster = ClickHouseCluster(__file__)
clickhouse_node = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_mysql=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
class MySQLNodeInstance:
def __init__(self, user='root', password='clickhouse', hostname='127.0.0.1', port=3308):
self.user = user
self.port = port
self.hostname = hostname
self.password = password
self.mysql_connection = None # lazy init
def alloc_connection(self):
if self.mysql_connection is None:
self.mysql_connection = pymysql.connect(user=self.user, password=self.password, host=self.hostname,
port=self.port, autocommit=True)
return self.mysql_connection
def query(self, execution_query):
with self.alloc_connection().cursor() as cursor:
cursor.execute(execution_query)
def close(self):
if self.mysql_connection is not None:
self.mysql_connection.close()
def test_disabled_mysql_server(started_cluster):
with contextlib.closing(MySQLNodeInstance()) as mysql_node:
mysql_node.query("CREATE DATABASE test_db;")
mysql_node.query("CREATE TABLE test_db.test_table ( `id` int(11) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB;")
with PartitionManager() as pm:
clickhouse_node.query("CREATE DATABASE test_db ENGINE = MySQL('mysql1:3306', 'test_db', 'root', 'clickhouse')")
pm._add_rule({'source': clickhouse_node.ip_address, 'destination_port': 3306, 'action': 'DROP'})
clickhouse_node.query("SELECT * FROM system.parts")
clickhouse_node.query("SELECT * FROM system.mutations")
clickhouse_node.query("SELECT * FROM system.graphite_retentions")
clickhouse_node.query("DROP DATABASE test_db")

View File

@ -537,14 +537,14 @@ def test_rabbitmq_big_message(rabbitmq_cluster):
@pytest.mark.timeout(420)
def test_rabbitmq_sharding_between_queues_publish(rabbitmq_cluster):
NUM_CONSUMERS = 10
NUM_QUEUES = 2
NUM_QUEUES = 10
instance.query('''
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'test_sharding',
rabbitmq_num_queues = 2,
rabbitmq_num_queues = 10,
rabbitmq_num_consumers = 10,
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
@ -617,7 +617,7 @@ def test_rabbitmq_mv_combo(rabbitmq_cluster):
rabbitmq_exchange_name = 'combo',
rabbitmq_queue_base = 'combo',
rabbitmq_num_consumers = 2,
rabbitmq_num_queues = 2,
rabbitmq_num_queues = 5,
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
''')
@ -879,7 +879,7 @@ def test_rabbitmq_overloaded_insert(rabbitmq_cluster):
rabbitmq_queue_base = 'over',
rabbitmq_exchange_type = 'direct',
rabbitmq_num_consumers = 5,
rabbitmq_num_queues = 2,
rabbitmq_num_queues = 10,
rabbitmq_max_block_size = 10000,
rabbitmq_routing_key_list = 'over',
rabbitmq_format = 'TSV',
@ -1722,7 +1722,7 @@ def test_rabbitmq_restore_failed_connection_without_losses_2(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'consumer_reconnect',
rabbitmq_num_consumers = 10,
rabbitmq_num_queues = 2,
rabbitmq_num_queues = 10,
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
''')

View File

@ -107,7 +107,7 @@ def test_RELOAD_CONFIG_AND_MACROS(started_cluster):
assert TSV(instance.query("select * from system.macros")) == TSV("instance\tch1\nmac\tro\n")
def test_SYSTEM_FLUSH_LOGS(started_cluster):
def test_system_flush_logs(started_cluster):
instance = cluster.instances['ch1']
instance.query('''
SET log_queries = 0;

View File

@ -44,4 +44,4 @@ SELECT 12345.6789 AS x, floor(x, -1), floor(x, -2), floor(x, -3), floor(x, -4),
SELECT roundToExp2(100), roundToExp2(64), roundToExp2(3), roundToExp2(0), roundToExp2(-1);
SELECT roundToExp2(0.9), roundToExp2(0), roundToExp2(-0.5), roundToExp2(-0.6), roundToExp2(-0.2);
SELECT ceil(29375422, -54212) --{serverError 36}
SELECT ceil(29375422, -54212) --{serverError 69}

View File

@ -5,7 +5,6 @@ insert into ttl_00933_2 values (toDateTime('2000-10-10 00:00:00'), 1);
insert into ttl_00933_2 values (toDateTime('2000-10-10 00:00:00'), 2);
insert into ttl_00933_2 values (toDateTime('2100-10-10 00:00:00'), 3);
insert into ttl_00933_2 values (toDateTime('2100-10-10 00:00:00'), 4);
select sleep(0.7) format Null; -- wait if very fast merge happen
optimize table ttl_00933_2 final;
select a from ttl_00933_2 order by a;
@ -16,7 +15,6 @@ insert into ttl_00933_2 values (toDateTime('2000-10-10 00:00:00'), 1, 100);
insert into ttl_00933_2 values (toDateTime('2000-10-10 00:00:00'), 2, 200);
insert into ttl_00933_2 values (toDateTime('2100-10-10 00:00:00'), 3, 300);
insert into ttl_00933_2 values (toDateTime('2100-10-10 00:00:00'), 4, 400);
select sleep(0.7) format Null; -- wait if very fast merge happen
optimize table ttl_00933_2 final;
select a, b from ttl_00933_2 order by a;
@ -27,7 +25,6 @@ insert into ttl_00933_2 values (toDateTime('2000-10-10 00:00:00'), 1, 5);
insert into ttl_00933_2 values (toDateTime('2000-10-10 00:00:00'), 2, 10);
insert into ttl_00933_2 values (toDateTime('2100-10-10 00:00:00'), 3, 15);
insert into ttl_00933_2 values (toDateTime('2100-10-10 00:00:00'), 4, 20);
select sleep(0.7) format Null; -- wait if very fast merge happen
optimize table ttl_00933_2 final;
select a, b from ttl_00933_2 order by a;

View File

@ -49,4 +49,4 @@ $CLICKHOUSE_CLIENT -q "SELECT if(quantile(0.5)(query_duration_ms) < $max_time_ms
$CLICKHOUSE_CLIENT -q "SELECT count() * $count_multiplier, i, d, s, n.i, n.f FROM $db.table_merge GROUP BY i, d, s, n.i, n.f ORDER BY i"
$CLICKHOUSE_CLIENT -q "DROP DATABASE $db"
$CLICKHOUSE_CLIENT -q "DROP DATABASE $db" --database_atomic_wait_for_drop_and_detach_synchronously=0

View File

@ -0,0 +1,12 @@
#!/usr/bin/expect -f
log_user 1
set timeout 5
match_max 100000
if ![info exists env(CLICKHOUSE_PORT_TCP)] {set env(CLICKHOUSE_PORT_TCP) 9000}
spawn bash -c "clickhouse-client --port $env(CLICKHOUSE_PORT_TCP) && echo $?"
expect ":) "
send -- "\4"
expect eof

View File

@ -0,0 +1 @@
Loaded 10000 queries.

View File

@ -0,0 +1,23 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh
# Create a huge amount of tables, so Suggest will take a time to load
${CLICKHOUSE_CLIENT} -q "SELECT 'CREATE TABLE test_' || hex(randomPrintableASCII(40)) || '(x UInt8) Engine=Memory;' FROM numbers(10000)" --format=TSVRaw | ${CLICKHOUSE_BENCHMARK} -c32 -i 10000 -d 0 2>&1 | grep -F 'Loaded 10000 queries'
function stress()
{
while true; do
"${CURDIR}"/01526_client_start_and_exit.expect | grep -v -P 'ClickHouse client|Connecting|Connected|:\) Bye\.|^\s*$|spawn bash|^0\s*$'
done
}
export CURDIR
export -f stress
for _ in {1..10}; do
timeout 3 bash -c stress &
done
wait

View File

@ -0,0 +1,4 @@
442d3ff4-842a-45bb-8b02-b616122c0dc6
05fe40cb-1d0c-45b0-8e60-8e311c2463f1
2fc89389-4728-4b30-9e51-b5bc3ad215f6
10000

View File

@ -0,0 +1,38 @@
DROP TABLE IF EXISTS bug_14144;
CREATE TABLE bug_14144
( meta_source_req_uuid Nullable(UUID),
a Int64,
meta_source_type String
)
ENGINE = MergeTree
ORDER BY a;
INSERT INTO bug_14144 SELECT cast(toUUID('442d3ff4-842a-45bb-8b02-b616122c0dc6'), 'Nullable(UUID)'), number, 'missing' FROM numbers(1000);
INSERT INTO bug_14144 SELECT cast(toUUIDOrZero('2fc89389-4728-4b30-9e51-b5bc3ad215f6'), 'Nullable(UUID)'), number, 'missing' FROM numbers(1000);
INSERT INTO bug_14144 SELECT cast(toUUIDOrNull('05fe40cb-1d0c-45b0-8e60-8e311c2463f1'), 'Nullable(UUID)'), number, 'missing' FROM numbers(1000);
SELECT DISTINCT meta_source_req_uuid
FROM bug_14144
WHERE meta_source_type = 'missing'
ORDER BY meta_source_req_uuid ASC;
TRUNCATE TABLE bug_14144;
INSERT INTO bug_14144 SELECT generateUUIDv4(), number, 'missing' FROM numbers(10000);
SELECT COUNT() FROM (
SELECT DISTINCT meta_source_req_uuid
FROM bug_14144
WHERE meta_source_type = 'missing'
ORDER BY meta_source_req_uuid ASC
LIMIT 100000
);
DROP TABLE bug_14144;

View File

@ -0,0 +1 @@
SELECT round(toDecimal32(1, 0), -9223372036854775806); -- { serverError 69 }

View File

@ -0,0 +1,12 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT --query "CREATE TABLE IF NOT EXISTS test_01543 (value LowCardinality(String)) ENGINE=Memory()"
$CLICKHOUSE_CLIENT --query "INSERT INTO test_01543 SELECT toString(number) FROM numbers(1000)"
$CLICKHOUSE_CLIENT -q "SELECT * FROM test_01543 FORMAT Avro" |
$CLICKHOUSE_CLIENT -q "INSERT INTO test_01543 FORMAT Avro";
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS test_01543"

View File

@ -1,6 +1,9 @@
v20.10.3.30-stable 2020-10-29
v20.10.2.20-stable 2020-10-23
v20.9.4.76-stable 2020-10-29
v20.9.3.45-stable 2020-10-09
v20.9.2.20-stable 2020-09-22
v20.8.5.45-lts 2020-10-29
v20.8.4.11-lts 2020-10-09
v20.8.3.18-stable 2020-09-18
v20.8.2.3-stable 2020-09-08

1 v20.10.2.20-stable v20.10.3.30-stable 2020-10-23 2020-10-29
1 v20.10.3.30-stable 2020-10-29
2 v20.10.2.20-stable v20.10.2.20-stable 2020-10-23 2020-10-23
3 v20.9.4.76-stable 2020-10-29
4 v20.9.3.45-stable v20.9.3.45-stable 2020-10-09 2020-10-09
5 v20.9.2.20-stable v20.9.2.20-stable 2020-09-22 2020-09-22
6 v20.8.5.45-lts 2020-10-29
7 v20.8.4.11-lts v20.8.4.11-lts 2020-10-09 2020-10-09
8 v20.8.3.18-stable v20.8.3.18-stable 2020-09-18 2020-09-18
9 v20.8.2.3-stable v20.8.2.3-stable 2020-09-08 2020-09-08