Rename directory monitor concept into background INSERT (#55978)

* Limit log frequence for "Skipping send data over distributed table" message

After SYSTEM STOP DISTRIBUTED SENDS it will constantly print this
message.

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>

* Rename directory monitor concept into async INSERT

Rename the following query settings (with preserving backward
compatiblity, by keeping old name as an alias):
- distributed_directory_monitor_sleep_time_ms -> distributed_async_insert_sleep_time_ms
- distributed_directory_monitor_max_sleep_time_ms -> distributed_async_insert_max_sleep_time_ms
- distributed_directory_monitor_batch -> distributed_async_insert_batch_inserts
- distributed_directory_monitor_split_batch_on_failure -> distributed_async_insert_split_batch_on_failure

Rename the following table settings (with preserving backward
compatiblity, by keeping old name as an alias):
- monitor_batch_inserts -> async_insert_batch
- monitor_split_batch_on_failure -> async_insert_split_batch_on_failure
- directory_monitor_sleep_time_ms -> async_insert_sleep_time_ms
- directory_monitor_max_sleep_time_ms -> async_insert_max_sleep_time_ms

And also update all the references:

    $ gg -e directory_monitor_ -e monitor_ tests docs | cut -d: -f1 | sort -u | xargs sed -e 's/distributed_directory_monitor_sleep_time_ms/distributed_async_insert_sleep_time_ms/g' -e 's/distributed_directory_monitor_max_sleep_time_ms/distributed_async_insert_max_sleep_time_ms/g' -e 's/distributed_directory_monitor_batch_inserts/distributed_async_insert_batch/g' -e 's/distributed_directory_monitor_split_batch_on_failure/distributed_async_insert_split_batch_on_failure/g' -e 's/monitor_batch_inserts/async_insert_batch/g' -e 's/monitor_split_batch_on_failure/async_insert_split_batch_on_failure/g' -e 's/monitor_sleep_time_ms/async_insert_sleep_time_ms/g' -e 's/monitor_max_sleep_time_ms/async_insert_max_sleep_time_ms/g' -i

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>

* Rename async_insert for Distributed into background_insert

This will avoid amigibuity between general async INSERT's and INSERT
into Distributed, which are indeed background, so new term express it
even better.

Mostly done with:

    $ git di HEAD^ --name-only | xargs sed -i -e 's/distributed_async_insert/distributed_background_insert/g' -e 's/async_insert_batch/background_insert_batch/g' -e 's/async_insert_split_batch_on_failure/background_insert_split_batch_on_failure/g' -e 's/async_insert_sleep_time_ms/background_insert_sleep_time_ms/g' -e 's/async_insert_max_sleep_time_ms/background_insert_max_sleep_time_ms/g'

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>

* Mark 02417_opentelemetry_insert_on_distributed_table as long

CI: https://s3.amazonaws.com/clickhouse-test-reports/55978/7a6abb03a0b507e29e999cb7e04f246a119c6f28/stateless_tests_flaky_check__asan_.html
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>

---------

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2023-11-01 15:09:39 +01:00 committed by GitHub
parent 65f3cf81dc
commit c25d6cd624
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
76 changed files with 281 additions and 243 deletions

View File

@ -189,6 +189,7 @@ rg -Fav -e "Code: 236. DB::Exception: Cancelled merging parts" \
-e "ZooKeeperClient" \
-e "KEEPER_EXCEPTION" \
-e "DirectoryMonitor" \
-e "DistributedInsertQueue" \
-e "TABLE_IS_READ_ONLY" \
-e "Code: 1000, e.code() = 111, Connection refused" \
-e "UNFINISHED" \

View File

@ -48,61 +48,61 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] AS [db2.]name2
#### policy_name
`policy_name` - (optionally) policy name, it will be used to store temporary files for async send
`policy_name` - (optionally) policy name, it will be used to store temporary files for background send
**See Also**
- [insert_distributed_sync](../../../operations/settings/settings.md#insert_distributed_sync) setting
- [distributed_foreground_insert](../../../operations/settings/settings.md#distributed_foreground_insert) setting
- [MergeTree](../../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-multiple-volumes) for the examples
### Distributed Settings
#### fsync_after_insert
`fsync_after_insert` - do the `fsync` for the file data after asynchronous insert to Distributed. Guarantees that the OS flushed the whole inserted data to a file **on the initiator node** disk.
`fsync_after_insert` - do the `fsync` for the file data after background insert to Distributed. Guarantees that the OS flushed the whole inserted data to a file **on the initiator node** disk.
#### fsync_directories
`fsync_directories` - do the `fsync` for directories. Guarantees that the OS refreshed directory metadata after operations related to asynchronous inserts on Distributed table (after insert, after sending the data to shard, etc).
`fsync_directories` - do the `fsync` for directories. Guarantees that the OS refreshed directory metadata after operations related to background inserts on Distributed table (after insert, after sending the data to shard, etc).
#### bytes_to_throw_insert
`bytes_to_throw_insert` - if more than this number of compressed bytes will be pending for async INSERT, an exception will be thrown. 0 - do not throw. Default 0.
`bytes_to_throw_insert` - if more than this number of compressed bytes will be pending for background INSERT, an exception will be thrown. 0 - do not throw. Default 0.
#### bytes_to_delay_insert
`bytes_to_delay_insert` - if more than this number of compressed bytes will be pending for async INSERT, the query will be delayed. 0 - do not delay. Default 0.
`bytes_to_delay_insert` - if more than this number of compressed bytes will be pending for background INSERT, the query will be delayed. 0 - do not delay. Default 0.
#### max_delay_to_insert
`max_delay_to_insert` - max delay of inserting data into Distributed table in seconds, if there are a lot of pending bytes for async send. Default 60.
`max_delay_to_insert` - max delay of inserting data into Distributed table in seconds, if there are a lot of pending bytes for background send. Default 60.
#### monitor_batch_inserts
#### background_insert_batch
`monitor_batch_inserts` - same as [distributed_directory_monitor_batch_inserts](../../../operations/settings/settings.md#distributed_directory_monitor_batch_inserts)
`background_insert_batch` - same as [distributed_background_insert_batch](../../../operations/settings/settings.md#distributed_background_insert_batch)
#### monitor_split_batch_on_failure
#### background_insert_split_batch_on_failure
`monitor_split_batch_on_failure` - same as [distributed_directory_monitor_split_batch_on_failure](../../../operations/settings/settings.md#distributed_directory_monitor_split_batch_on_failure)
`background_insert_split_batch_on_failure` - same as [distributed_background_insert_split_batch_on_failure](../../../operations/settings/settings.md#distributed_background_insert_split_batch_on_failure)
#### monitor_sleep_time_ms
#### background_insert_sleep_time_ms
`monitor_sleep_time_ms` - same as [distributed_directory_monitor_sleep_time_ms](../../../operations/settings/settings.md#distributed_directory_monitor_sleep_time_ms)
`background_insert_sleep_time_ms` - same as [distributed_background_insert_sleep_time_ms](../../../operations/settings/settings.md#distributed_background_insert_sleep_time_ms)
#### monitor_max_sleep_time_ms
#### background_insert_max_sleep_time_ms
`monitor_max_sleep_time_ms` - same as [distributed_directory_monitor_max_sleep_time_ms](../../../operations/settings/settings.md#distributed_directory_monitor_max_sleep_time_ms)
`background_insert_max_sleep_time_ms` - same as [distributed_background_insert_max_sleep_time_ms](../../../operations/settings/settings.md#distributed_background_insert_max_sleep_time_ms)
:::note
**Durability settings** (`fsync_...`):
- Affect only asynchronous INSERTs (i.e. `insert_distributed_sync=false`) when data first stored on the initiator node disk and later asynchronously send to shards.
- Affect only background INSERTs (i.e. `distributed_foreground_insert=false`) when data first stored on the initiator node disk and later, in background, send to shards.
- May significantly decrease the inserts' performance
- Affect writing the data stored inside Distributed table folder into the **node which accepted your insert**. If you need to have guarantees of writing data to underlying MergeTree tables - see durability settings (`...fsync...`) in `system.merge_tree_settings`
For **Insert limit settings** (`..._insert`) see also:
- [insert_distributed_sync](../../../operations/settings/settings.md#insert_distributed_sync) setting
- [distributed_foreground_insert](../../../operations/settings/settings.md#distributed_foreground_insert) setting
- [prefer_localhost_replica](../../../operations/settings/settings.md#settings-prefer-localhost-replica) setting
- `bytes_to_throw_insert` handled before `bytes_to_delay_insert`, so you should not set it to the value less then `bytes_to_delay_insert`
:::
@ -232,7 +232,7 @@ You should be concerned about the sharding scheme in the following cases:
- Queries are used that require joining data (`IN` or `JOIN`) by a specific key. If data is sharded by this key, you can use local `IN` or `JOIN` instead of `GLOBAL IN` or `GLOBAL JOIN`, which is much more efficient.
- A large number of servers is used (hundreds or more) with a large number of small queries, for example, queries for data of individual clients (e.g. websites, advertisers, or partners). In order for the small queries to not affect the entire cluster, it makes sense to locate data for a single client on a single shard. Alternatively, you can set up bi-level sharding: divide the entire cluster into “layers”, where a layer may consist of multiple shards. Data for a single client is located on a single layer, but shards can be added to a layer as necessary, and data is randomly distributed within them. `Distributed` tables are created for each layer, and a single shared distributed table is created for global queries.
Data is written asynchronously. When inserted in the table, the data block is just written to the local file system. The data is sent to the remote servers in the background as soon as possible. The periodicity for sending data is managed by the [distributed_directory_monitor_sleep_time_ms](../../../operations/settings/settings.md#distributed_directory_monitor_sleep_time_ms) and [distributed_directory_monitor_max_sleep_time_ms](../../../operations/settings/settings.md#distributed_directory_monitor_max_sleep_time_ms) settings. The `Distributed` engine sends each file with inserted data separately, but you can enable batch sending of files with the [distributed_directory_monitor_batch_inserts](../../../operations/settings/settings.md#distributed_directory_monitor_batch_inserts) setting. This setting improves cluster performance by better utilizing local server and network resources. You should check whether data is sent successfully by checking the list of files (data waiting to be sent) in the table directory: `/var/lib/clickhouse/data/database/table/`. The number of threads performing background tasks can be set by [background_distributed_schedule_pool_size](../../../operations/settings/settings.md#background_distributed_schedule_pool_size) setting.
Data is written in background. When inserted in the table, the data block is just written to the local file system. The data is sent to the remote servers in the background as soon as possible. The periodicity for sending data is managed by the [distributed_background_insert_sleep_time_ms](../../../operations/settings/settings.md#distributed_background_insert_sleep_time_ms) and [distributed_background_insert_max_sleep_time_ms](../../../operations/settings/settings.md#distributed_background_insert_max_sleep_time_ms) settings. The `Distributed` engine sends each file with inserted data separately, but you can enable batch sending of files with the [distributed_background_insert_batch](../../../operations/settings/settings.md#distributed_background_insert_batch) setting. This setting improves cluster performance by better utilizing local server and network resources. You should check whether data is sent successfully by checking the list of files (data waiting to be sent) in the table directory: `/var/lib/clickhouse/data/database/table/`. The number of threads performing background tasks can be set by [background_distributed_schedule_pool_size](../../../operations/settings/settings.md#background_distributed_schedule_pool_size) setting.
If the server ceased to exist or had a rough restart (for example, due to a hardware failure) after an `INSERT` to a `Distributed` table, the inserted data might be lost. If a damaged data part is detected in the table directory, it is transferred to the `broken` subdirectory and no longer used.

View File

@ -2473,7 +2473,7 @@ See also:
- [distributed_replica_error_cap](#distributed_replica_error_cap)
- [distributed_replica_error_half_life](#settings-distributed_replica_error_half_life)
## distributed_directory_monitor_sleep_time_ms {#distributed_directory_monitor_sleep_time_ms}
## distributed_background_insert_sleep_time_ms {#distributed_background_insert_sleep_time_ms}
Base interval for the [Distributed](../../engines/table-engines/special/distributed.md) table engine to send data. The actual interval grows exponentially in the event of errors.
@ -2483,9 +2483,9 @@ Possible values:
Default value: 100 milliseconds.
## distributed_directory_monitor_max_sleep_time_ms {#distributed_directory_monitor_max_sleep_time_ms}
## distributed_background_insert_max_sleep_time_ms {#distributed_background_insert_max_sleep_time_ms}
Maximum interval for the [Distributed](../../engines/table-engines/special/distributed.md) table engine to send data. Limits exponential growth of the interval set in the [distributed_directory_monitor_sleep_time_ms](#distributed_directory_monitor_sleep_time_ms) setting.
Maximum interval for the [Distributed](../../engines/table-engines/special/distributed.md) table engine to send data. Limits exponential growth of the interval set in the [distributed_background_insert_sleep_time_ms](#distributed_background_insert_sleep_time_ms) setting.
Possible values:
@ -2493,7 +2493,7 @@ Possible values:
Default value: 30000 milliseconds (30 seconds).
## distributed_directory_monitor_batch_inserts {#distributed_directory_monitor_batch_inserts}
## distributed_background_insert_batch {#distributed_background_insert_batch}
Enables/disables inserted data sending in batches.
@ -2506,13 +2506,13 @@ Possible values:
Default value: 0.
## distributed_directory_monitor_split_batch_on_failure {#distributed_directory_monitor_split_batch_on_failure}
## distributed_background_insert_split_batch_on_failure {#distributed_background_insert_split_batch_on_failure}
Enables/disables splitting batches on failures.
Sometimes sending particular batch to the remote shard may fail, because of some complex pipeline after (i.e. `MATERIALIZED VIEW` with `GROUP BY`) due to `Memory limit exceeded` or similar errors. In this case, retrying will not help (and this will stuck distributed sends for the table) but sending files from that batch one by one may succeed INSERT.
So installing this setting to `1` will disable batching for such batches (i.e. temporary disables `distributed_directory_monitor_batch_inserts` for failed batches).
So installing this setting to `1` will disable batching for such batches (i.e. temporary disables `distributed_background_insert_batch` for failed batches).
Possible values:
@ -2695,15 +2695,15 @@ Possible values:
Default value: 0.
## insert_distributed_sync {#insert_distributed_sync}
## distributed_foreground_insert {#distributed_foreground_insert}
Enables or disables synchronous data insertion into a [Distributed](../../engines/table-engines/special/distributed.md/#distributed) table.
By default, when inserting data into a `Distributed` table, the ClickHouse server sends data to cluster nodes in asynchronous mode. When `insert_distributed_sync=1`, the data is processed synchronously, and the `INSERT` operation succeeds only after all the data is saved on all shards (at least one replica for each shard if `internal_replication` is true).
By default, when inserting data into a `Distributed` table, the ClickHouse server sends data to cluster nodes in background mode. When `distributed_foreground_insert=1`, the data is processed synchronously, and the `INSERT` operation succeeds only after all the data is saved on all shards (at least one replica for each shard if `internal_replication` is true).
Possible values:
- 0 — Data is inserted in asynchronous mode.
- 0 — Data is inserted in background mode.
- 1 — Data is inserted in synchronous mode.
Default value: `0`.
@ -2762,7 +2762,7 @@ Result:
## use_compact_format_in_distributed_parts_names {#use_compact_format_in_distributed_parts_names}
Uses compact format for storing blocks for async (`insert_distributed_sync`) INSERT into tables with `Distributed` engine.
Uses compact format for storing blocks for background (`distributed_foreground_insert`) INSERT into tables with `Distributed` engine.
Possible values:
@ -2772,7 +2772,7 @@ Possible values:
Default value: `1`.
:::note
- with `use_compact_format_in_distributed_parts_names=0` changes from cluster definition will not be applied for async INSERT.
- with `use_compact_format_in_distributed_parts_names=0` changes from cluster definition will not be applied for background INSERT.
- with `use_compact_format_in_distributed_parts_names=1` changing the order of the nodes in the cluster definition, will change the `shard_index`/`replica_index` so be aware.
:::

View File

@ -115,7 +115,7 @@ Parameters:
<settings>
<connect_timeout>3</connect_timeout>
<!-- Sync insert is set forcibly, leave it here just in case. -->
<insert_distributed_sync>1</insert_distributed_sync>
<distributed_foreground_insert>1</distributed_foreground_insert>
</settings>
<!-- Copying tasks description.

View File

@ -166,7 +166,7 @@ Aborts ClickHouse process (like `kill -9 {$ pid_clickhouse-server}`)
## Managing Distributed Tables
ClickHouse can manage [distributed](../../engines/table-engines/special/distributed.md) tables. When a user inserts data into these tables, ClickHouse first creates a queue of the data that should be sent to cluster nodes, then asynchronously sends it. You can manage queue processing with the [STOP DISTRIBUTED SENDS](#query_language-system-stop-distributed-sends), [FLUSH DISTRIBUTED](#query_language-system-flush-distributed), and [START DISTRIBUTED SENDS](#query_language-system-start-distributed-sends) queries. You can also synchronously insert distributed data with the [insert_distributed_sync](../../operations/settings/settings.md#insert_distributed_sync) setting.
ClickHouse can manage [distributed](../../engines/table-engines/special/distributed.md) tables. When a user inserts data into these tables, ClickHouse first creates a queue of the data that should be sent to cluster nodes, then asynchronously sends it. You can manage queue processing with the [STOP DISTRIBUTED SENDS](#query_language-system-stop-distributed-sends), [FLUSH DISTRIBUTED](#query_language-system-flush-distributed), and [START DISTRIBUTED SENDS](#query_language-system-start-distributed-sends) queries. You can also synchronously insert distributed data with the [distributed_foreground_insert](../../operations/settings/settings.md#distributed_foreground_insert) setting.
### STOP DISTRIBUTED SENDS

View File

@ -22,7 +22,7 @@ sidebar_label: Distributed
Смотрите также:
- настройка `insert_distributed_sync`
- настройка `distributed_foreground_insert`
- [MergeTree](../mergetree-family/mergetree.md#table_engine-mergetree-multiple-volumes) для примера
Пример:
@ -131,7 +131,7 @@ logs - имя кластера в конфигурационном файле с
- используются запросы, требующие соединение данных (IN, JOIN) по определённому ключу - тогда если данные шардированы по этому ключу, то можно использовать локальные IN, JOIN вместо GLOBAL IN, GLOBAL JOIN, что кардинально более эффективно.
- используется большое количество серверов (сотни и больше) и большое количество маленьких запросов (запросы отдельных клиентов - сайтов, рекламодателей, партнёров) - тогда, для того, чтобы маленькие запросы не затрагивали весь кластер, имеет смысл располагать данные одного клиента на одном шарде, или сделать двухуровневое шардирование: разбить весь кластер на «слои», где слой может состоять из нескольких шардов; данные для одного клиента располагаются на одном слое, но в один слой можно по мере необходимости добавлять шарды, в рамках которых данные распределены произвольным образом; создаются распределённые таблицы на каждый слой и одна общая распределённая таблица для глобальных запросов.
Запись данных осуществляется полностью асинхронно. При вставке в таблицу, блок данных сначала записывается в файловую систему. Затем, в фоновом режиме отправляются на удалённые серверы при первой возможности. Период отправки регулируется настройками [distributed_directory_monitor_sleep_time_ms](../../../operations/settings/settings.md#distributed_directory_monitor_sleep_time_ms) и [distributed_directory_monitor_max_sleep_time_ms](../../../operations/settings/settings.md#distributed_directory_monitor_max_sleep_time_ms). Движок таблиц `Distributed` отправляет каждый файл со вставленными данными отдельно, но можно включить пакетную отправку данных настройкой [distributed_directory_monitor_batch_inserts](../../../operations/settings/settings.md#distributed_directory_monitor_batch_inserts). Эта настройка улучшает производительность кластера за счет более оптимального использования ресурсов сервера-отправителя и сети. Необходимо проверять, что данные отправлены успешно, для этого проверьте список файлов (данных, ожидающих отправки) в каталоге таблицы `/var/lib/clickhouse/data/database/table/`. Количество потоков для выполнения фоновых задач можно задать с помощью настройки [background_distributed_schedule_pool_size](../../../operations/settings/settings.md#background_distributed_schedule_pool_size).
Запись данных осуществляется полностью асинхронно. При вставке в таблицу, блок данных сначала записывается в файловую систему. Затем, в фоновом режиме отправляются на удалённые серверы при первой возможности. Период отправки регулируется настройками [distributed_background_insert_sleep_time_ms](../../../operations/settings/settings.md#distributed_background_insert_sleep_time_ms) и [distributed_background_insert_max_sleep_time_ms](../../../operations/settings/settings.md#distributed_background_insert_max_sleep_time_ms). Движок таблиц `Distributed` отправляет каждый файл со вставленными данными отдельно, но можно включить пакетную отправку данных настройкой [distributed_background_insert_batch](../../../operations/settings/settings.md#distributed_background_insert_batch). Эта настройка улучшает производительность кластера за счет более оптимального использования ресурсов сервера-отправителя и сети. Необходимо проверять, что данные отправлены успешно, для этого проверьте список файлов (данных, ожидающих отправки) в каталоге таблицы `/var/lib/clickhouse/data/database/table/`. Количество потоков для выполнения фоновых задач можно задать с помощью настройки [background_distributed_schedule_pool_size](../../../operations/settings/settings.md#background_distributed_schedule_pool_size).
Если после INSERT-а в Distributed таблицу, сервер перестал существовать или был грубо перезапущен (например, в следствие аппаратного сбоя), то записанные данные могут быть потеряны. Если в директории таблицы обнаружен повреждённый кусок данных, то он переносится в поддиректорию broken и больше не используется.

View File

@ -2136,7 +2136,7 @@ SELECT * FROM test_table
- [distributed_replica_error_cap](#settings-distributed_replica_error_cap)
- [distributed_replica_error_half_life](#settings-distributed_replica_error_half_life)
## distributed_directory_monitor_sleep_time_ms {#distributed_directory_monitor_sleep_time_ms}
## distributed_background_insert_sleep_time_ms {#distributed_background_insert_sleep_time_ms}
Основной интервал отправки данных движком таблиц [Distributed](../../engines/table-engines/special/distributed.md). Фактический интервал растёт экспоненциально при возникновении ошибок.
@ -2146,9 +2146,9 @@ SELECT * FROM test_table
Значение по умолчанию: 100 миллисекунд.
## distributed_directory_monitor_max_sleep_time_ms {#distributed_directory_monitor_max_sleep_time_ms}
## distributed_background_insert_max_sleep_time_ms {#distributed_background_insert_max_sleep_time_ms}
Максимальный интервал отправки данных движком таблиц [Distributed](../../engines/table-engines/special/distributed.md). Ограничивает экпоненциальный рост интервала, установленого настройкой [distributed_directory_monitor_sleep_time_ms](#distributed_directory_monitor_sleep_time_ms).
Максимальный интервал отправки данных движком таблиц [Distributed](../../engines/table-engines/special/distributed.md). Ограничивает экпоненциальный рост интервала, установленого настройкой [distributed_background_insert_sleep_time_ms](#distributed_background_insert_sleep_time_ms).
Возможные значения:
@ -2156,7 +2156,7 @@ SELECT * FROM test_table
Значение по умолчанию: 30000 миллисекунд (30 секунд).
## distributed_directory_monitor_batch_inserts {#distributed_directory_monitor_batch_inserts}
## distributed_background_insert_batch {#distributed_background_insert_batch}
Включает/выключает пакетную отправку вставленных данных.
@ -2323,11 +2323,11 @@ SELECT * FROM test_table
Значение по умолчанию: 0.
## insert_distributed_sync {#insert_distributed_sync}
## distributed_foreground_insert {#distributed_foreground_insert}
Включает или отключает режим синхронного добавления данных в распределенные таблицы (таблицы с движком [Distributed](../../engines/table-engines/special/distributed.md#distributed)).
По умолчанию ClickHouse вставляет данные в распределённую таблицу в асинхронном режиме. Если `insert_distributed_sync=1`, то данные вставляются сихронно, а запрос `INSERT` считается выполненным успешно, когда данные записаны на все шарды (по крайней мере на одну реплику для каждого шарда, если `internal_replication = true`).
По умолчанию ClickHouse вставляет данные в распределённую таблицу в асинхронном режиме. Если `distributed_foreground_insert=1`, то данные вставляются сихронно, а запрос `INSERT` считается выполненным успешно, когда данные записаны на все шарды (по крайней мере на одну реплику для каждого шарда, если `internal_replication = true`).
Возможные значения:

View File

@ -111,7 +111,7 @@ $ clickhouse-copier --daemon --config zookeeper.xml --task-path /task/path --bas
<settings>
<connect_timeout>3</connect_timeout>
<!-- Sync insert is set forcibly, leave it here just in case. -->
<insert_distributed_sync>1</insert_distributed_sync>
<distributed_foreground_insert>1</distributed_foreground_insert>
</settings>
<!-- Copying tasks description.

View File

@ -128,7 +128,7 @@ SYSTEM RELOAD CONFIG [ON CLUSTER cluster_name]
## Управление распределёнными таблицами {#query-language-system-distributed}
ClickHouse может оперировать [распределёнными](../../sql-reference/statements/system.md) таблицами. Когда пользователь вставляет данные в эти таблицы, ClickHouse сначала формирует очередь из данных, которые должны быть отправлены на узлы кластера, а затем асинхронно отправляет подготовленные данные. Вы можете управлять очередью с помощью запросов [STOP DISTRIBUTED SENDS](#query_language-system-stop-distributed-sends), [START DISTRIBUTED SENDS](#query_language-system-start-distributed-sends) и [FLUSH DISTRIBUTED](#query_language-system-flush-distributed). Также есть возможность синхронно вставлять распределенные данные с помощью настройки [insert_distributed_sync](../../operations/settings/settings.md#insert_distributed_sync).
ClickHouse может оперировать [распределёнными](../../sql-reference/statements/system.md) таблицами. Когда пользователь вставляет данные в эти таблицы, ClickHouse сначала формирует очередь из данных, которые должны быть отправлены на узлы кластера, а затем асинхронно отправляет подготовленные данные. Вы можете управлять очередью с помощью запросов [STOP DISTRIBUTED SENDS](#query_language-system-stop-distributed-sends), [START DISTRIBUTED SENDS](#query_language-system-start-distributed-sends) и [FLUSH DISTRIBUTED](#query_language-system-flush-distributed). Также есть возможность синхронно вставлять распределенные данные с помощью настройки [distributed_foreground_insert](../../operations/settings/settings.md#distributed_foreground_insert).
### STOP DISTRIBUTED SENDS {#query_language-system-stop-distributed-sends}

View File

@ -43,7 +43,7 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] AS [db2.]name2
**详见**
- [insert_distributed_sync](../../../operations/settings/settings.md#insert_distributed_sync) 设置
- [distributed_foreground_insert](../../../operations/settings/settings.md#distributed_foreground_insert) 设置
- [MergeTree](../../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-multiple-volumes) 查看示例
**分布式设置**
@ -58,24 +58,24 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] AS [db2.]name2
- `max_delay_to_insert` - 最大延迟多少秒插入数据到分布式表如果有很多挂起字节异步发送。默认值60。
- `monitor_batch_inserts` - 等同于 [distributed_directory_monitor_batch_inserts](../../../operations/settings/settings.md#distributed_directory_monitor_batch_inserts)
- `background_insert_batch` - 等同于 [distributed_background_insert_batch](../../../operations/settings/settings.md#distributed_background_insert_batch)
- `monitor_split_batch_on_failure` - 等同于[distributed_directory_monitor_split_batch_on_failure](../../../operations/settings/settings.md#distributed_directory_monitor_split_batch_on_failure)
- `background_insert_split_batch_on_failure` - 等同于[distributed_background_insert_split_batch_on_failure](../../../operations/settings/settings.md#distributed_background_insert_split_batch_on_failure)
- `monitor_sleep_time_ms` - 等同于 [distributed_directory_monitor_sleep_time_ms](../../../operations/settings/settings.md#distributed_directory_monitor_sleep_time_ms)
- `background_insert_sleep_time_ms` - 等同于 [distributed_background_insert_sleep_time_ms](../../../operations/settings/settings.md#distributed_background_insert_sleep_time_ms)
- `monitor_max_sleep_time_ms` - 等同于 [distributed_directory_monitor_max_sleep_time_ms](../../../operations/settings/settings.md#distributed_directory_monitor_max_sleep_time_ms)
- `background_insert_max_sleep_time_ms` - 等同于 [distributed_background_insert_max_sleep_time_ms](../../../operations/settings/settings.md#distributed_background_insert_max_sleep_time_ms)
::note
**稳定性设置** (`fsync_...`):
- 只影响异步插入(例如:`insert_distributed_sync=false`), 当数据首先存储在启动节点磁盘上然后再异步发送到shard。
- 只影响异步插入(例如:`distributed_foreground_insert=false`), 当数据首先存储在启动节点磁盘上然后再异步发送到shard。
— 可能会显著降低`insert`的性能
- 影响将存储在分布式表文件夹中的数据写入 **接受您插入的节点** 。如果你需要保证写入数据到底层的MergeTree表中请参阅 `system.merge_tree_settings` 中的持久性设置(`...fsync...`)
**插入限制设置** (`..._insert`) 请见:
- [insert_distributed_sync](../../../operations/settings/settings.md#insert_distributed_sync) 设置
- [distributed_foreground_insert](../../../operations/settings/settings.md#distributed_foreground_insert) 设置
- [prefer_localhost_replica](../../../operations/settings/settings.md#settings-prefer-localhost-replica) 设置
- `bytes_to_throw_insert``bytes_to_delay_insert` 之前处理,所以你不应该设置它的值小于 `bytes_to_delay_insert`
:::
@ -209,7 +209,7 @@ SELECT 查询会被发送到所有分片,并且无论数据在分片中如何
- 使用需要特定键连接数据( IN 或 JOIN )的查询。如果数据是用该键进行分片,则应使用本地 IN 或 JOIN 而不是 GLOBAL IN 或 GLOBAL JOIN这样效率更高。
- 使用大量服务器(上百或更多),但有大量小查询(个别客户的查询 - 网站,广告商或合作伙伴)。为了使小查询不影响整个集群,让单个客户的数据处于单个分片上是有意义的。或者 你可以配置两级分片:将整个集群划分为«层»,一个层可以包含多个分片。单个客户的数据位于单个层上,根据需要将分片添加到层中,层中的数据随机分布。然后给每层创建分布式表,再创建一个全局的分布式表用于全局的查询。
数据是异步写入的。对于分布式表的 INSERT数据块只写本地文件系统。之后会尽快地在后台发送到远程服务器。发送数据的周期性是由[distributed_directory_monitor_sleep_time_ms](../../../operations/settings/settings.md#distributed_directory_monitor_sleep_time_ms)和[distributed_directory_monitor_max_sleep_time_ms](../../../operations/settings/settings.md#distributed_directory_monitor_max_sleep_time_ms)设置。分布式引擎会分别发送每个插入数据的文件,但是你可以使用[distributed_directory_monitor_batch_inserts](../../../operations/settings/settings.md#distributed_directory_monitor_batch_inserts)设置启用批量发送文件。该设置通过更好地利用本地服务器和网络资源来提高集群性能。你应该检查表目录`/var/lib/clickhouse/data/database/table/`中的文件列表(等待发送的数据)来检查数据是否发送成功。执行后台任务的线程数可以通过[background_distributed_schedule_pool_size](../../../operations/settings/settings.md#background_distributed_schedule_pool_size)设置。
数据是异步写入的。对于分布式表的 INSERT数据块只写本地文件系统。之后会尽快地在后台发送到远程服务器。发送数据的周期性是由[distributed_background_insert_sleep_time_ms](../../../operations/settings/settings.md#distributed_background_insert_sleep_time_ms)和[distributed_background_insert_max_sleep_time_ms](../../../operations/settings/settings.md#distributed_background_insert_max_sleep_time_ms)设置。分布式引擎会分别发送每个插入数据的文件,但是你可以使用[distributed_background_insert_batch](../../../operations/settings/settings.md#distributed_background_insert_batch)设置启用批量发送文件。该设置通过更好地利用本地服务器和网络资源来提高集群性能。你应该检查表目录`/var/lib/clickhouse/data/database/table/`中的文件列表(等待发送的数据)来检查数据是否发送成功。执行后台任务的线程数可以通过[background_distributed_schedule_pool_size](../../../operations/settings/settings.md#background_distributed_schedule_pool_size)设置。
如果在 INSERT 到分布式表时服务器节点丢失或重启设备故障则插入的数据可能会丢失。如果在表目录中检测到损坏的数据分片则会将其转移到«broken»子目录并不再使用。

View File

@ -1088,7 +1088,7 @@ ClickHouse生成异常
- [表引擎分布式](../../engines/table-engines/special/distributed.md)
- [distributed_replica_error_half_life](#settings-distributed_replica_error_half_life)
## distributed_directory_monitor_sleep_time_ms {#distributed_directory_monitor_sleep_time_ms}
## distributed_background_insert_sleep_time_ms {#distributed_background_insert_sleep_time_ms}
对于基本间隔 [分布](../../engines/table-engines/special/distributed.md) 表引擎发送数据。 在发生错误时,实际间隔呈指数级增长。
@ -1098,9 +1098,9 @@ ClickHouse生成异常
默认值100毫秒。
## distributed_directory_monitor_max_sleep_time_ms {#distributed_directory_monitor_max_sleep_time_ms}
## distributed_background_insert_max_sleep_time_ms {#distributed_background_insert_max_sleep_time_ms}
的最大间隔 [分布](../../engines/table-engines/special/distributed.md) 表引擎发送数据。 限制在设置的区间的指数增长 [distributed_directory_monitor_sleep_time_ms](#distributed_directory_monitor_sleep_time_ms) 设置。
的最大间隔 [分布](../../engines/table-engines/special/distributed.md) 表引擎发送数据。 限制在设置的区间的指数增长 [distributed_background_insert_sleep_time_ms](#distributed_background_insert_sleep_time_ms) 设置。
可能的值:
@ -1108,7 +1108,7 @@ ClickHouse生成异常
默认值30000毫秒30秒
## distributed_directory_monitor_batch_inserts {#distributed_directory_monitor_batch_inserts}
## distributed_background_insert_batch {#distributed_background_insert_batch}
启用/禁用批量发送插入的数据。

View File

@ -100,7 +100,7 @@ clickhouse-copier --daemon --config zookeeper.xml --task-path /task/path --base-
<settings>
<connect_timeout>3</connect_timeout>
<!-- Sync insert is set forcibly, leave it here just in case. -->
<insert_distributed_sync>1</insert_distributed_sync>
<distributed_foreground_insert>1</distributed_foreground_insert>
</settings>
<!-- Copying tasks description.

View File

@ -93,7 +93,7 @@ SYSTEM RELOAD CONFIG [ON CLUSTER cluster_name]
## Managing Distributed Tables {#query-language-system-distributed}
ClickHouse可以管理 [distribute](../../engines/table-engines/special/distributed.md)表。当用户向这类表插入数据时ClickHouse首先为需要发送到集群节点的数据创建一个队列然后异步的发送它们。你可以维护队列的处理过程通过[STOP DISTRIBUTED SENDS](#query_language-system-stop-distributed-sends), [FLUSH DISTRIBUTED](#query_language-system-flush-distributed), 以及 [START DISTRIBUTED SENDS](#query_language-system-start-distributed-sends)。你也可以设置 `insert_distributed_sync`参数来以同步的方式插入分布式数据。
ClickHouse可以管理 [distribute](../../engines/table-engines/special/distributed.md)表。当用户向这类表插入数据时ClickHouse首先为需要发送到集群节点的数据创建一个队列然后异步的发送它们。你可以维护队列的处理过程通过[STOP DISTRIBUTED SENDS](#query_language-system-stop-distributed-sends), [FLUSH DISTRIBUTED](#query_language-system-flush-distributed), 以及 [START DISTRIBUTED SENDS](#query_language-system-start-distributed-sends)。你也可以设置 `distributed_foreground_insert`参数来以同步的方式插入分布式数据。
### STOP DISTRIBUTED SENDS {#query_language-system-stop-distributed-sends}

View File

@ -58,7 +58,7 @@ void DB::TaskCluster::reloadSettings(const Poco::Util::AbstractConfiguration & c
/// Override important settings
settings_pull.readonly = 1;
settings_pull.prefer_localhost_replica = false;
settings_push.insert_distributed_sync = true;
settings_push.distributed_foreground_insert = true;
settings_push.prefer_localhost_replica = false;
set_default_value(settings_pull.load_balancing, LoadBalancing::NEAREST_HOSTNAME);
@ -66,7 +66,7 @@ void DB::TaskCluster::reloadSettings(const Poco::Util::AbstractConfiguration & c
set_default_value(settings_pull.max_block_size, 8192UL);
set_default_value(settings_pull.preferred_block_size_bytes, 0);
set_default_value(settings_push.insert_distributed_timeout, 0);
set_default_value(settings_push.distributed_background_insert_timeout, 0);
set_default_value(settings_push.alter_sync, 2);
}

View File

@ -236,7 +236,7 @@
M(DictCacheLockWriteNs, "Number of nanoseconds spend in waiting for write lock to update the data for the dictionaries of 'cache' types.") \
M(DictCacheLockReadNs, "Number of nanoseconds spend in waiting for read lock to lookup the data for the dictionaries of 'cache' types.") \
\
M(DistributedSyncInsertionTimeoutExceeded, "A timeout has exceeded while waiting for shards during synchronous insertion into a Distributed table (with 'insert_distributed_sync' = 1)") \
M(DistributedSyncInsertionTimeoutExceeded, "A timeout has exceeded while waiting for shards during synchronous insertion into a Distributed table (with 'distributed_foreground_insert' = 1)") \
M(DataAfterMergeDiffersFromReplica, R"(
Number of times data after merge is not byte-identical to the data on another replicas. There could be several reasons:
1. Using newer version of compression library after server update.

View File

@ -124,11 +124,13 @@ class IColumn;
M(Bool, stream_like_engine_allow_direct_select, false, "Allow direct SELECT query for Kafka, RabbitMQ, FileLog, Redis Streams and NATS engines. In case there are attached materialized views, SELECT query is not allowed even if this setting is enabled.", 0) \
M(String, stream_like_engine_insert_queue, "", "When stream like engine reads from multiple queues, user will need to select one queue to insert into when writing. Used by Redis Streams and NATS.", 0) \
\
M(Milliseconds, distributed_directory_monitor_sleep_time_ms, 100, "Sleep time for StorageDistributed DirectoryMonitors, in case of any errors delay grows exponentially.", 0) \
M(Milliseconds, distributed_directory_monitor_max_sleep_time_ms, 30000, "Maximum sleep time for StorageDistributed DirectoryMonitors, it limits exponential growth too.", 0) \
M(Bool, distributed_foreground_insert, false, "If setting is enabled, insert query into distributed waits until data will be sent to all nodes in cluster. \n\nEnables or disables synchronous data insertion into a `Distributed` table.\n\nBy default, when inserting data into a Distributed table, the ClickHouse server sends data to cluster nodes in background. When `distributed_foreground_insert` = 1, the data is processed synchronously, and the `INSERT` operation succeeds only after all the data is saved on all shards (at least one replica for each shard if `internal_replication` is true).", 0) ALIAS(insert_distributed_sync) \
M(UInt64, distributed_background_insert_timeout, 0, "Timeout for insert query into distributed. Setting is used only with insert_distributed_sync enabled. Zero value means no timeout.", 0) ALIAS(insert_distributed_timeout) \
M(Milliseconds, distributed_background_insert_sleep_time_ms, 100, "Sleep time for background INSERTs into Distributed, in case of any errors delay grows exponentially.", 0) ALIAS(distributed_directory_monitor_sleep_time_ms) \
M(Milliseconds, distributed_background_insert_max_sleep_time_ms, 30000, "Maximum sleep time for background INSERTs into Distributed, it limits exponential growth too.", 0) ALIAS(distributed_directory_monitor_max_sleep_time_ms) \
\
M(Bool, distributed_directory_monitor_batch_inserts, false, "Should StorageDistributed DirectoryMonitors try to batch individual inserts into bigger ones.", 0) \
M(Bool, distributed_directory_monitor_split_batch_on_failure, false, "Should StorageDistributed DirectoryMonitors try to split batch into smaller in case of failures.", 0) \
M(Bool, distributed_background_insert_batch, false, "Should background INSERTs into Distributed be batched into bigger blocks.", 0) ALIAS(distributed_directory_monitor_batch_inserts) \
M(Bool, distributed_background_insert_split_batch_on_failure, false, "Should batches of the background INSERT into Distributed be split into smaller batches in case of failures.", 0) ALIAS(distributed_directory_monitor_split_batch_on_failure) \
\
M(Bool, optimize_move_to_prewhere, true, "Allows disabling WHERE to PREWHERE optimization in SELECT queries from MergeTree.", 0) \
M(Bool, optimize_move_to_prewhere_if_final, false, "If query has `FINAL`, the optimization `move_to_prewhere` is not always correct and it is enabled only if both settings `optimize_move_to_prewhere` and `optimize_move_to_prewhere_if_final` are turned on", 0) \
@ -304,8 +306,6 @@ class IColumn;
M(UInt64, parts_to_throw_insert, 0, "If more than this number active parts in a single partition of the destination table, throw 'Too many parts ...' exception.", 0) \
M(UInt64, number_of_mutations_to_delay, 0, "If the mutated table contains at least that many unfinished mutations, artificially slow down mutations of table. 0 - disabled", 0) \
M(UInt64, number_of_mutations_to_throw, 0, "If the mutated table contains at least that many unfinished mutations, throw 'Too many mutations ...' exception. 0 - disabled", 0) \
M(Bool, insert_distributed_sync, false, "If setting is enabled, insert query into distributed waits until data will be sent to all nodes in cluster. \n\nEnables or disables synchronous data insertion into a `Distributed` table.\n\nBy default, when inserting data into a Distributed table, the ClickHouse server sends data to cluster nodes in asynchronous mode. When `insert_distributed_sync` = 1, the data is processed synchronously, and the `INSERT` operation succeeds only after all the data is saved on all shards (at least one replica for each shard if `internal_replication` is true).", 0) \
M(UInt64, insert_distributed_timeout, 0, "Timeout for insert query into distributed. Setting is used only with insert_distributed_sync enabled. Zero value means no timeout.", 0) \
M(Int64, distributed_ddl_task_timeout, 180, "Timeout for DDL query responses from all hosts in cluster. If a ddl request has not been performed on all hosts, a response will contain a timeout error and a request will be executed in an async mode. Negative value means infinite. Zero means async mode.", 0) \
M(Milliseconds, stream_flush_interval_ms, 7500, "Timeout for flushing data from streaming storages.", 0) \
M(Milliseconds, stream_poll_timeout_ms, 500, "Timeout for polling data from/to streaming storages.", 0) \

View File

@ -370,7 +370,7 @@ Chain InterpreterInsertQuery::buildPreSinkChain(
/// Do not squash blocks if it is a sync INSERT into Distributed, since it lead to double bufferization on client and server side.
/// Client-side bufferization might cause excessive timeouts (especially in case of big blocks).
if (!(settings.insert_distributed_sync && table->isRemote()) && !async_insert && !no_squash && !(query && query->watch))
if (!(settings.distributed_foreground_insert && table->isRemote()) && !async_insert && !no_squash && !(query && query->watch))
{
bool table_prefers_large_blocks = table->prefersLargeBlocks();

View File

@ -128,16 +128,16 @@ DistributedAsyncInsertDirectoryQueue::DistributedAsyncInsertDirectoryQueue(
, path(fs::path(disk->getPath()) / relative_path / "")
, broken_relative_path(fs::path(relative_path) / "broken")
, broken_path(fs::path(path) / "broken" / "")
, should_batch_inserts(storage.getDistributedSettingsRef().monitor_batch_inserts)
, split_batch_on_failure(storage.getDistributedSettingsRef().monitor_split_batch_on_failure)
, should_batch_inserts(storage.getDistributedSettingsRef().background_insert_batch)
, split_batch_on_failure(storage.getDistributedSettingsRef().background_insert_split_batch_on_failure)
, dir_fsync(storage.getDistributedSettingsRef().fsync_directories)
, min_batched_block_size_rows(storage.getContext()->getSettingsRef().min_insert_block_size_rows)
, min_batched_block_size_bytes(storage.getContext()->getSettingsRef().min_insert_block_size_bytes)
, current_batch_file_path(path + "current_batch.txt")
, pending_files(std::numeric_limits<size_t>::max())
, default_sleep_time(storage.getDistributedSettingsRef().monitor_sleep_time_ms.totalMilliseconds())
, default_sleep_time(storage.getDistributedSettingsRef().background_insert_sleep_time_ms.totalMilliseconds())
, sleep_time(default_sleep_time)
, max_sleep_time(storage.getDistributedSettingsRef().monitor_max_sleep_time_ms.totalMilliseconds())
, max_sleep_time(storage.getDistributedSettingsRef().background_insert_max_sleep_time_ms.totalMilliseconds())
, log(&Poco::Logger::get(getLoggerName()))
, monitor_blocker(monitor_blocker_)
, metric_pending_bytes(CurrentMetrics::DistributedBytesToInsert, 0)
@ -234,7 +234,7 @@ void DistributedAsyncInsertDirectoryQueue::run()
}
}
else
LOG_TEST(log, "Skipping send data over distributed table.");
LOG_TEST(LogFrequencyLimiter(log, 30), "Skipping send data over distributed table.");
const auto now = std::chrono::system_clock::now();
if (now - last_decrease_time > decrease_error_count_period)
@ -726,7 +726,7 @@ SyncGuardPtr DistributedAsyncInsertDirectoryQueue::getDirectorySyncGuard(const s
std::string DistributedAsyncInsertDirectoryQueue::getLoggerName() const
{
return storage.getStorageID().getFullTableName() + ".DirectoryMonitor." + disk->getName();
return storage.getStorageID().getFullTableName() + ".DistributedInsertQueue." + disk->getName();
}
void DistributedAsyncInsertDirectoryQueue::updatePath(const std::string & new_relative_path)

View File

@ -27,7 +27,7 @@ using ProcessorPtr = std::shared_ptr<IProcessor>;
class ISource;
/** Queue for async INSERT Into Distributed engine (insert_distributed_sync=0).
/** Queue for async INSERT Into Distributed engine (distributed_foreground_insert=0).
*
* Files are added from two places:
* - from filesystem at startup (StorageDistributed::startup())
@ -36,12 +36,10 @@ class ISource;
* Later, in background, those files will be send to the remote nodes.
*
* The behaviour of this queue can be configured via the following settings:
* - distributed_directory_monitor_batch_inserts
* - distributed_directory_monitor_split_batch_on_failure
* - distributed_directory_monitor_sleep_time_ms
* - distributed_directory_monitor_max_sleep_time_ms
* NOTE: It worth to rename the settings too
* ("directory_monitor" in settings looks too internal).
* - distributed_background_insert_batch
* - distributed_background_insert_split_batch_on_failure
* - distributed_background_insert_sleep_time_ms
* - distributed_background_insert_max_sleep_time_ms
*/
class DistributedAsyncInsertDirectoryQueue
{

View File

@ -15,17 +15,17 @@ namespace DB
class ASTStorage;
#define LIST_OF_DISTRIBUTED_SETTINGS(M, ALIAS) \
M(Bool, fsync_after_insert, false, "Do fsync for every inserted. Will decreases performance of inserts (only for async INSERT, i.e. insert_distributed_sync=false)", 0) \
M(Bool, fsync_directories, false, "Do fsync for temporary directory (that is used for async INSERT only) after all part operations (writes, renames, etc.).", 0) \
M(Bool, fsync_after_insert, false, "Do fsync for every inserted. Will decreases performance of inserts (only for background INSERT, i.e. distributed_foreground_insert=false)", 0) \
M(Bool, fsync_directories, false, "Do fsync for temporary directory (that is used for background INSERT only) after all part operations (writes, renames, etc.).", 0) \
/** Inserts settings. */ \
M(UInt64, bytes_to_throw_insert, 0, "If more than this number of compressed bytes will be pending for async INSERT, an exception will be thrown. 0 - do not throw.", 0) \
M(UInt64, bytes_to_delay_insert, 0, "If more than this number of compressed bytes will be pending for async INSERT, the query will be delayed. 0 - do not delay.", 0) \
M(UInt64, max_delay_to_insert, 60, "Max delay of inserting data into Distributed table in seconds, if there are a lot of pending bytes for async send.", 0) \
/** Directory monitor settings */ \
M(UInt64, monitor_batch_inserts, 0, "Default - distributed_directory_monitor_batch_inserts", 0) \
M(UInt64, monitor_split_batch_on_failure, 0, "Default - distributed_directory_monitor_split_batch_on_failure", 0) \
M(Milliseconds, monitor_sleep_time_ms, 0, "Default - distributed_directory_monitor_sleep_time_ms", 0) \
M(Milliseconds, monitor_max_sleep_time_ms, 0, "Default - distributed_directory_monitor_max_sleep_time_ms", 0) \
M(UInt64, bytes_to_throw_insert, 0, "If more than this number of compressed bytes will be pending for background INSERT, an exception will be thrown. 0 - do not throw.", 0) \
M(UInt64, bytes_to_delay_insert, 0, "If more than this number of compressed bytes will be pending for background INSERT, the query will be delayed. 0 - do not delay.", 0) \
M(UInt64, max_delay_to_insert, 60, "Max delay of inserting data into Distributed table in seconds, if there are a lot of pending bytes for background send.", 0) \
/** Async INSERT settings */ \
M(UInt64, background_insert_batch, 0, "Default - distributed_background_insert_batch", 0) ALIAS(monitor_batch_inserts) \
M(UInt64, background_insert_split_batch_on_failure, 0, "Default - distributed_background_insert_split_batch_on_failure", 0) ALIAS(monitor_split_batch_on_failure) \
M(Milliseconds, background_insert_sleep_time_ms, 0, "Default - distributed_background_insert_sleep_time_ms", 0) ALIAS(monitor_sleep_time_ms) \
M(Milliseconds, background_insert_max_sleep_time_ms, 0, "Default - distributed_background_insert_max_sleep_time_ms", 0) ALIAS(monitor_max_sleep_time_ms) \
M(Bool, flush_on_detach, true, "Flush data to remote nodes on DETACH/DROP/server shutdown", 0) \
DECLARE_SETTINGS_TRAITS(DistributedSettingsTraits, LIST_OF_DISTRIBUTED_SETTINGS)

View File

@ -763,7 +763,7 @@ void DistributedSink::writeToShard(const Cluster::ShardInfo & shard_info, const
return guard;
};
auto sleep_ms = context->getSettingsRef().distributed_directory_monitor_sleep_time_ms.totalMilliseconds();
auto sleep_ms = context->getSettingsRef().distributed_background_insert_sleep_time_ms.totalMilliseconds();
size_t file_size;
auto it = dir_names.begin();
@ -789,7 +789,7 @@ void DistributedSink::writeToShard(const Cluster::ShardInfo & shard_info, const
NativeWriter stream{compress, DBMS_TCP_PROTOCOL_VERSION, block.cloneEmpty()};
/// Prepare the header.
/// See also readDistributedHeader() in DirectoryMonitor (for reading side)
/// See also DistributedAsyncInsertHeader::read() in DistributedInsertQueue (for reading side)
///
/// We wrap the header into a string for compatibility with older versions:
/// a shard will able to read the header partly and ignore other parts based on its version.

View File

@ -335,8 +335,8 @@ StorageDistributed::StorageDistributed(
, distributed_settings(distributed_settings_)
, rng(randomSeed())
{
if (!distributed_settings.flush_on_detach && distributed_settings.monitor_batch_inserts)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Settings flush_on_detach=0 and monitor_batch_inserts=1 are incompatible");
if (!distributed_settings.flush_on_detach && distributed_settings.background_insert_batch)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Settings flush_on_detach=0 and background_insert_batch=1 are incompatible");
StorageInMemoryMetadata storage_metadata;
if (columns_.empty())
@ -938,8 +938,8 @@ SinkToStoragePtr StorageDistributed::write(const ASTPtr &, const StorageMetadata
}
/// Force sync insertion if it is remote() table function
bool insert_sync = settings.insert_distributed_sync || settings.insert_shard_id || owned_cluster;
auto timeout = settings.insert_distributed_timeout;
bool insert_sync = settings.distributed_foreground_insert || settings.insert_shard_id || owned_cluster;
auto timeout = settings.distributed_background_insert_timeout;
Names columns_to_send;
if (settings.insert_allow_materialized_columns)
@ -1250,7 +1250,7 @@ void StorageDistributed::initializeFromDisk()
void StorageDistributed::shutdown()
{
monitors_blocker.cancelForever();
async_insert_blocker.cancelForever();
std::lock_guard lock(cluster_nodes_mutex);
@ -1267,8 +1267,8 @@ void StorageDistributed::drop()
// parallel.
//
// And second time shutdown() should be fast, since none of
// DirectoryMonitor should do anything, because ActionBlocker is canceled
// (in shutdown()).
// DirectoryMonitor should not do anything, because ActionBlocker is
// canceled (in shutdown()).
shutdown();
// Distributed table without sharding_key does not allows INSERTs
@ -1313,7 +1313,7 @@ void StorageDistributed::truncate(const ASTPtr &, const StorageMetadataPtr &, Co
for (auto it = cluster_nodes_data.begin(); it != cluster_nodes_data.end();)
{
it->second.directory_monitor->shutdownAndDropAllData();
it->second.directory_queue->shutdownAndDropAllData();
it = cluster_nodes_data.erase(it);
}
@ -1368,16 +1368,16 @@ DistributedAsyncInsertDirectoryQueue & StorageDistributed::getDirectoryQueue(con
std::lock_guard lock(cluster_nodes_mutex);
auto & node_data = cluster_nodes_data[key];
if (!node_data.directory_monitor)
if (!node_data.directory_queue)
{
node_data.connection_pool = DistributedAsyncInsertDirectoryQueue::createPool(name, *this);
node_data.directory_monitor = std::make_unique<DistributedAsyncInsertDirectoryQueue>(
node_data.directory_queue = std::make_unique<DistributedAsyncInsertDirectoryQueue>(
*this, disk, relative_data_path + name,
node_data.connection_pool,
monitors_blocker,
async_insert_blocker,
getContext()->getDistributedSchedulePool());
}
return *node_data.directory_monitor;
return *node_data.directory_queue;
}
std::vector<DistributedAsyncInsertDirectoryQueue::Status> StorageDistributed::getDirectoryQueueStatuses() const
@ -1386,7 +1386,7 @@ std::vector<DistributedAsyncInsertDirectoryQueue::Status> StorageDistributed::ge
std::lock_guard lock(cluster_nodes_mutex);
statuses.reserve(cluster_nodes_data.size());
for (const auto & node : cluster_nodes_data)
statuses.push_back(node.second.directory_monitor->getStatus());
statuses.push_back(node.second.directory_queue->getStatus());
return statuses;
}
@ -1614,7 +1614,7 @@ ClusterPtr StorageDistributed::skipUnusedShards(
ActionLock StorageDistributed::getActionLock(StorageActionBlockType type)
{
if (type == ActionLocks::DistributedSend)
return monitors_blocker.cancel();
return async_insert_blocker.cancel();
return {};
}
@ -1635,14 +1635,14 @@ void StorageDistributed::flushClusterNodesAllData(ContextPtr local_context)
/// Sync SYSTEM FLUSH DISTRIBUTED with TRUNCATE
auto table_lock = lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
std::vector<std::shared_ptr<DistributedAsyncInsertDirectoryQueue>> directory_monitors;
std::vector<std::shared_ptr<DistributedAsyncInsertDirectoryQueue>> directory_queues;
{
std::lock_guard lock(cluster_nodes_mutex);
directory_monitors.reserve(cluster_nodes_data.size());
directory_queues.reserve(cluster_nodes_data.size());
for (auto & node : cluster_nodes_data)
directory_monitors.push_back(node.second.directory_monitor);
directory_queues.push_back(node.second.directory_queue);
}
bool need_flush = getDistributedSettingsRef().flush_on_detach;
@ -1650,7 +1650,7 @@ void StorageDistributed::flushClusterNodesAllData(ContextPtr local_context)
LOG_INFO(log, "Skip flushing data (due to flush_on_detach=0)");
/// TODO: Maybe it should be executed in parallel
for (auto & node : directory_monitors)
for (auto & node : directory_queues)
{
if (need_flush)
node->flushAllData();
@ -1706,7 +1706,7 @@ void StorageDistributed::renameOnDisk(const String & new_path_to_table_data)
std::lock_guard lock(cluster_nodes_mutex);
for (auto & node : cluster_nodes_data)
node.second.directory_monitor->updatePath(new_path_to_table_data);
node.second.directory_queue->updatePath(new_path_to_table_data);
}
relative_data_path = new_path_to_table_data;
@ -1842,15 +1842,15 @@ void registerStorageDistributed(StorageFactory & factory)
"bytes_to_throw_insert cannot be less or equal to bytes_to_delay_insert (since it is handled first)");
}
/// Set default values from the distributed_directory_monitor_* global context settings.
if (!distributed_settings.monitor_batch_inserts.changed)
distributed_settings.monitor_batch_inserts = context->getSettingsRef().distributed_directory_monitor_batch_inserts;
if (!distributed_settings.monitor_split_batch_on_failure.changed)
distributed_settings.monitor_split_batch_on_failure = context->getSettingsRef().distributed_directory_monitor_split_batch_on_failure;
if (!distributed_settings.monitor_sleep_time_ms.changed)
distributed_settings.monitor_sleep_time_ms = context->getSettingsRef().distributed_directory_monitor_sleep_time_ms;
if (!distributed_settings.monitor_max_sleep_time_ms.changed)
distributed_settings.monitor_max_sleep_time_ms = context->getSettingsRef().distributed_directory_monitor_max_sleep_time_ms;
/// Set default values from the distributed_background_insert_* global context settings.
if (!distributed_settings.background_insert_batch.changed)
distributed_settings.background_insert_batch = context->getSettingsRef().distributed_background_insert_batch;
if (!distributed_settings.background_insert_split_batch_on_failure.changed)
distributed_settings.background_insert_split_batch_on_failure = context->getSettingsRef().distributed_background_insert_split_batch_on_failure;
if (!distributed_settings.background_insert_sleep_time_ms.changed)
distributed_settings.background_insert_sleep_time_ms = context->getSettingsRef().distributed_background_insert_sleep_time_ms;
if (!distributed_settings.background_insert_max_sleep_time_ms.changed)
distributed_settings.background_insert_max_sleep_time_ms = context->getSettingsRef().distributed_background_insert_max_sleep_time_ms;
return std::make_shared<StorageDistributed>(
args.table_id,

View File

@ -171,7 +171,7 @@ private:
/// Get directory queue thread and connection pool created by disk and subdirectory name
///
/// Used for the INSERT into Distributed in case of insert_distributed_sync==1, from DistributedSink.
/// Used for the INSERT into Distributed in case of distributed_foreground_insert==1, from DistributedSink.
DistributedAsyncInsertDirectoryQueue & getDirectoryQueue(const DiskPtr & disk, const std::string & name);
@ -250,7 +250,7 @@ private:
/// Used for global monotonic ordering of files to send.
SimpleIncrement file_names_increment;
ActionBlocker monitors_blocker;
ActionBlocker async_insert_blocker;
String relative_data_path;
@ -266,7 +266,7 @@ private:
struct ClusterNodeData
{
std::shared_ptr<DistributedAsyncInsertDirectoryQueue> directory_monitor;
std::shared_ptr<DistributedAsyncInsertDirectoryQueue> directory_queue;
ConnectionPoolPtr connection_pool;
};
std::unordered_map<std::string, ClusterNodeData> cluster_nodes_data;

View File

@ -39,7 +39,7 @@
<settings>
<connect_timeout>3</connect_timeout>
<!-- Sync insert is set forcibly, leave it here just in case. -->
<insert_distributed_sync>1</insert_distributed_sync>
<distributed_foreground_insert>1</distributed_foreground_insert>
</settings>
<!-- Copying tasks description.

View File

@ -40,7 +40,7 @@
<settings>
<connect_timeout>3</connect_timeout>
<insert_distributed_sync>1</insert_distributed_sync>
<distributed_foreground_insert>1</distributed_foreground_insert>
</settings>
<tables>

View File

@ -128,7 +128,7 @@ class Task1:
)
instance.query(
"INSERT INTO hits_all SELECT * FROM system.numbers LIMIT 1002",
settings={"insert_distributed_sync": 1},
settings={"distributed_foreground_insert": 1},
)
def check(self):
@ -218,7 +218,7 @@ class Task2:
instance.query(
"INSERT INTO a_all SELECT toDate(17581 + number) AS date, number AS d FROM system.numbers LIMIT 85",
settings={"insert_distributed_sync": 1},
settings={"distributed_foreground_insert": 1},
)
def check(self):

View File

@ -78,7 +78,7 @@ class TaskTrivial:
source.query(
"INSERT INTO trivial SELECT * FROM system.numbers LIMIT 1002",
settings={"insert_distributed_sync": 1},
settings={"distributed_foreground_insert": 1},
)
def check(self):
@ -123,7 +123,7 @@ class TaskReplicatedWithoutArguments:
source.query(
"INSERT INTO trivial_without_arguments SELECT * FROM system.numbers LIMIT 1002",
settings={"insert_distributed_sync": 1},
settings={"distributed_foreground_insert": 1},
)
def check(self):

View File

@ -69,7 +69,7 @@ def started_cluster():
node1.query(
"INSERT INTO replica_1.replicated FORMAT TSV",
stdin=to_insert,
settings={"insert_distributed_sync": 1},
settings={"distributed_foreground_insert": 1},
)
yield cluster

View File

@ -4,10 +4,10 @@
<!-- always send via network -->
<prefer_localhost_replica>0</prefer_localhost_replica>
<!-- override defaults just in case they will be changed -->
<distributed_directory_monitor_split_batch_on_failure>1</distributed_directory_monitor_split_batch_on_failure>
<distributed_background_insert_split_batch_on_failure>1</distributed_background_insert_split_batch_on_failure>
<!-- wait for explicit flush -->
<distributed_directory_monitor_sleep_time_ms>86400</distributed_directory_monitor_sleep_time_ms>
<distributed_directory_monitor_max_sleep_time_ms>86400</distributed_directory_monitor_max_sleep_time_ms>
<distributed_background_insert_sleep_time_ms>86400</distributed_background_insert_sleep_time_ms>
<distributed_background_insert_max_sleep_time_ms>86400</distributed_background_insert_max_sleep_time_ms>
</default>
</profiles>
</clickhouse>

View File

@ -4,10 +4,10 @@
<!-- always send via network -->
<prefer_localhost_replica>0</prefer_localhost_replica>
<!-- disable -->
<distributed_directory_monitor_split_batch_on_failure>0</distributed_directory_monitor_split_batch_on_failure>
<distributed_background_insert_split_batch_on_failure>0</distributed_background_insert_split_batch_on_failure>
<!-- wait for explicit flush -->
<distributed_directory_monitor_sleep_time_ms>86400</distributed_directory_monitor_sleep_time_ms>
<distributed_directory_monitor_max_sleep_time_ms>86400</distributed_directory_monitor_max_sleep_time_ms>
<distributed_background_insert_sleep_time_ms>86400</distributed_background_insert_sleep_time_ms>
<distributed_background_insert_max_sleep_time_ms>86400</distributed_background_insert_max_sleep_time_ms>
</default>
</profiles>
</clickhouse>

View File

@ -4,13 +4,13 @@ from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
# node1 -- distributed_directory_monitor_split_batch_on_failure=on
# node1 -- distributed_background_insert_split_batch_on_failure=on
node1 = cluster.add_instance(
"node1",
main_configs=["configs/remote_servers.xml"],
user_configs=["configs/overrides_1.xml"],
)
# node2 -- distributed_directory_monitor_split_batch_on_failure=off
# node2 -- distributed_background_insert_split_batch_on_failure=off
node2 = cluster.add_instance(
"node2",
main_configs=["configs/remote_servers.xml"],
@ -19,7 +19,7 @@ node2 = cluster.add_instance(
def get_test_settings():
settings = {"monitor_batch_inserts": [0, 1]}
settings = {"background_insert_batch": [0, 1]}
return [(k, v) for k, values in settings.items() for v in values]
@ -58,7 +58,7 @@ def started_cluster():
cluster.shutdown()
def test_distributed_directory_monitor_split_batch_on_failure_OFF(started_cluster):
def test_distributed_background_insert_split_batch_on_failure_OFF(started_cluster):
for setting, setting_value in get_test_settings():
create_tables(**{setting: setting_value})
for i in range(0, 100):
@ -73,7 +73,7 @@ def test_distributed_directory_monitor_split_batch_on_failure_OFF(started_cluste
},
)
# "Received from" is mandatory, since the exception should be thrown on the remote node.
if setting == "monitor_batch_inserts" and setting_value == 1:
if setting == "background_insert_batch" and setting_value == 1:
with pytest.raises(
QueryRuntimeException,
match=r"DB::Exception: Received from.*Memory limit \(for query\) exceeded: .*while pushing to view default\.mv",
@ -85,7 +85,7 @@ def test_distributed_directory_monitor_split_batch_on_failure_OFF(started_cluste
assert int(node2.query("select count() from dist_data")) == 100000
def test_distributed_directory_monitor_split_batch_on_failure_ON(started_cluster):
def test_distributed_background_insert_split_batch_on_failure_ON(started_cluster):
for setting, setting_value in get_test_settings():
create_tables(**{setting: setting_value})
for i in range(0, 100):

View File

@ -186,7 +186,7 @@ def test_insecure_insert_sync():
n1.query("TRUNCATE TABLE data")
n1.query(
"INSERT INTO dist_insecure SELECT * FROM numbers(2)",
settings={"insert_distributed_sync": 1},
settings={"distributed_foreground_insert": 1},
)
assert int(n1.query("SELECT count() FROM dist_insecure")) == 2
n1.query("TRUNCATE TABLE data ON CLUSTER secure")
@ -208,7 +208,7 @@ def test_secure_insert_sync():
n1.query("TRUNCATE TABLE data")
n1.query(
"INSERT INTO dist_secure SELECT * FROM numbers(2)",
settings={"insert_distributed_sync": 1},
settings={"distributed_foreground_insert": 1},
)
assert int(n1.query("SELECT count() FROM dist_secure")) == 2
n1.query("TRUNCATE TABLE data ON CLUSTER secure")
@ -240,7 +240,7 @@ def test_secure_insert_sync():
# - after we will ensure that connection is really established from the context
# of SELECT query, and that the connection will not be established from the
# context of the INSERT query (but actually it is a no-op since the INSERT
# will be done in background, due to insert_distributed_sync=false by
# will be done in background, due to distributed_foreground_insert=false by
# default)
#
# - if the bug is there, then FLUSH DISTRIBUTED will fail, because it will go

View File

@ -1,7 +1,7 @@
<clickhouse>
<profiles>
<default>
<distributed_directory_monitor_batch_inserts>1</distributed_directory_monitor_batch_inserts>
<distributed_background_insert_batch>1</distributed_background_insert_batch>
</default>
</profiles>
</clickhouse>

View File

@ -1,7 +1,7 @@
<clickhouse>
<profiles>
<default>
<distributed_directory_monitor_batch_inserts>0</distributed_directory_monitor_batch_inserts>
<distributed_background_insert_batch>0</distributed_background_insert_batch>
</default>
</profiles>
</clickhouse>

View File

@ -1,7 +1,7 @@
<clickhouse>
<profiles>
<default>
<distributed_directory_monitor_split_batch_on_failure>1</distributed_directory_monitor_split_batch_on_failure>
<distributed_background_insert_split_batch_on_failure>1</distributed_background_insert_split_batch_on_failure>
</default>
</profiles>
</clickhouse>

View File

@ -12,20 +12,20 @@ from helpers.client import QueryRuntimeException
cluster = ClickHouseCluster(__file__)
# n1 -- distributed_directory_monitor_batch_inserts=1
# n1 -- distributed_background_insert_batch=1
n1 = cluster.add_instance(
"n1",
main_configs=["configs/remote_servers.xml"],
user_configs=["configs/users.d/batch.xml"],
)
# n2 -- distributed_directory_monitor_batch_inserts=0
# n2 -- distributed_background_insert_batch=0
n2 = cluster.add_instance(
"n2",
main_configs=["configs/remote_servers.xml"],
user_configs=["configs/users.d/no_batch.xml"],
)
# n3 -- distributed_directory_monitor_batch_inserts=1/distributed_directory_monitor_split_batch_on_failure=1
# n3 -- distributed_background_insert_batch=1/distributed_background_insert_split_batch_on_failure=1
n3 = cluster.add_instance(
"n3",
main_configs=["configs/remote_servers_split.xml"],
@ -34,7 +34,7 @@ n3 = cluster.add_instance(
"configs/users.d/split.xml",
],
)
# n4 -- distributed_directory_monitor_batch_inserts=0/distributed_directory_monitor_split_batch_on_failure=1
# n4 -- distributed_background_insert_batch=0/distributed_background_insert_split_batch_on_failure=1
n4 = cluster.add_instance(
"n4",
main_configs=["configs/remote_servers_split.xml"],

View File

@ -91,7 +91,7 @@ def test_prefer_localhost_replica_0_load_balancing_in_order_sync(cluster, q):
cluster,
load_balancing="in_order",
prefer_localhost_replica=0,
insert_distributed_sync=1,
distributed_foreground_insert=1,
)
assert int(n1.query("SELECT count() FROM data")) == 10 * q
assert int(n2.query("SELECT count() FROM data")) == 10

View File

@ -1,7 +1,7 @@
<clickhouse>
<profiles>
<default>
<distributed_directory_monitor_batch_inserts>1</distributed_directory_monitor_batch_inserts>
<distributed_background_insert_batch>1</distributed_background_insert_batch>
<min_insert_block_size_rows>3</min_insert_block_size_rows>
</default>
</profiles>

View File

@ -251,7 +251,7 @@ def test_inserts_single_replica_local_internal_replication(started_cluster):
node1.query(
"INSERT INTO distributed_one_replica_internal_replication VALUES ('2000-01-01', 1)",
settings={
"insert_distributed_sync": "1",
"distributed_foreground_insert": "1",
"prefer_localhost_replica": "1",
# to make the test more deterministic
"load_balancing": "first_or_random",
@ -265,7 +265,7 @@ def test_inserts_single_replica_internal_replication(started_cluster):
node1.query(
"INSERT INTO distributed_one_replica_internal_replication VALUES ('2000-01-01', 1)",
settings={
"insert_distributed_sync": "1",
"distributed_foreground_insert": "1",
"prefer_localhost_replica": "0",
# to make the test more deterministic
"load_balancing": "first_or_random",
@ -285,7 +285,7 @@ def test_inserts_single_replica_no_internal_replication(started_cluster):
node1.query(
"INSERT INTO distributed_one_replica_no_internal_replication VALUES ('2000-01-01', 1)",
settings={
"insert_distributed_sync": "1",
"distributed_foreground_insert": "1",
"prefer_localhost_replica": "0",
},
)

View File

@ -41,7 +41,7 @@ CREATE TABLE distributed_table(date Date, val UInt64) ENGINE = Distributed(test_
def test_insertion_sync(started_cluster):
node1.query(
"""SET insert_distributed_sync = 1, insert_distributed_timeout = 0;
"""SET distributed_foreground_insert = 1, distributed_background_insert_timeout = 0;
INSERT INTO distributed_table SELECT today() as date, number as val FROM system.numbers LIMIT 10000"""
)
@ -49,7 +49,7 @@ def test_insertion_sync(started_cluster):
node1.query(
"""
SET insert_distributed_sync = 1, insert_distributed_timeout = 1;
SET distributed_foreground_insert = 1, distributed_background_insert_timeout = 1;
INSERT INTO distributed_table SELECT today() - 1 as date, number as val FROM system.numbers LIMIT 10000"""
)
@ -58,21 +58,21 @@ def test_insertion_sync(started_cluster):
# Insert with explicitly specified columns.
node1.query(
"""
SET insert_distributed_sync = 1, insert_distributed_timeout = 1;
SET distributed_foreground_insert = 1, distributed_background_insert_timeout = 1;
INSERT INTO distributed_table(date, val) VALUES ('2000-01-01', 100500)"""
)
# Insert with columns specified in different order.
node1.query(
"""
SET insert_distributed_sync = 1, insert_distributed_timeout = 1;
SET distributed_foreground_insert = 1, distributed_background_insert_timeout = 1;
INSERT INTO distributed_table(val, date) VALUES (100500, '2000-01-01')"""
)
# Insert with an incomplete list of columns.
node1.query(
"""
SET insert_distributed_sync = 1, insert_distributed_timeout = 1;
SET distributed_foreground_insert = 1, distributed_background_insert_timeout = 1;
INSERT INTO distributed_table(val) VALUES (100500)"""
)
@ -101,7 +101,7 @@ def test_insertion_sync_fails_on_error(started_cluster):
pm.partition_instances(node2, node1, action='REJECT --reject-with tcp-reset')
with pytest.raises(QueryRuntimeException):
node1.query('''
SET insert_distributed_sync = 1, insert_distributed_timeout = 0;
SET distributed_foreground_insert = 1, distributed_background_insert_timeout = 0;
INSERT INTO distributed_table SELECT today() as date, number as val FROM system.numbers''', timeout=2)
"""
@ -110,7 +110,7 @@ def test_insertion_sync_fails_with_timeout(started_cluster):
with pytest.raises(QueryRuntimeException):
node1.query(
"""
SET insert_distributed_sync = 1, insert_distributed_timeout = 1;
SET distributed_foreground_insert = 1, distributed_background_insert_timeout = 1;
INSERT INTO distributed_table SELECT today() as date, number as val FROM system.numbers"""
)
@ -119,7 +119,7 @@ def test_insertion_without_sync_ignores_timeout(started_cluster):
with pytest.raises(QueryTimeoutExceedException):
node1.query(
"""
SET insert_distributed_sync = 0, insert_distributed_timeout = 1;
SET distributed_foreground_insert = 0, distributed_background_insert_timeout = 1;
INSERT INTO distributed_table SELECT today() as date, number as val FROM system.numbers""",
timeout=1.5,
)
@ -129,7 +129,7 @@ def test_insertion_sync_with_disabled_timeout(started_cluster):
with pytest.raises(QueryTimeoutExceedException):
node1.query(
"""
SET insert_distributed_sync = 1, insert_distributed_timeout = 0;
SET distributed_foreground_insert = 1, distributed_background_insert_timeout = 0;
INSERT INTO distributed_table SELECT today() as date, number as val FROM system.numbers""",
timeout=1,
)
@ -142,7 +142,7 @@ def test_async_inserts_into_local_shard(started_cluster):
)
node1.query(
"""INSERT INTO shard_distributed VALUES (1)""",
settings={"insert_distributed_sync": 0},
settings={"distributed_foreground_insert": 0},
)
assert TSV(node1.query("""SELECT count() FROM shard_distributed""")) == TSV("1\n")

View File

@ -1,7 +1,7 @@
<clickhouse>
<profiles>
<default>
<distributed_directory_monitor_batch_inserts>1</distributed_directory_monitor_batch_inserts>
<distributed_background_insert_batch>1</distributed_background_insert_batch>
<min_insert_block_size_rows>3</min_insert_block_size_rows>
<allow_deprecated_syntax_for_merge_tree>1</allow_deprecated_syntax_for_merge_tree>
</default>

View File

@ -69,27 +69,27 @@ def create_tables(cluster, table_name):
# populate data, equal number of rows for each replica
nodes[0].query(
f"INSERT INTO {table_name} SELECT number, number FROM numbers(10)",
settings={"insert_distributed_sync": 1},
settings={"distributed_foreground_insert": 1},
)
nodes[0].query(
f"INSERT INTO {table_name} SELECT number, number FROM numbers(10, 10)",
settings={"insert_distributed_sync": 1},
settings={"distributed_foreground_insert": 1},
)
nodes[1].query(
f"INSERT INTO {table_name} SELECT number, number FROM numbers(20, 10)",
settings={"insert_distributed_sync": 1},
settings={"distributed_foreground_insert": 1},
)
nodes[1].query(
f"INSERT INTO {table_name} SELECT number, number FROM numbers(30, 10)",
settings={"insert_distributed_sync": 1},
settings={"distributed_foreground_insert": 1},
)
nodes[2].query(
f"INSERT INTO {table_name} SELECT number, number FROM numbers(40, 10)",
settings={"insert_distributed_sync": 1},
settings={"distributed_foreground_insert": 1},
)
nodes[2].query(
f"INSERT INTO {table_name} SELECT number, number FROM numbers(50, 10)",
settings={"insert_distributed_sync": 1},
settings={"distributed_foreground_insert": 1},
)
return "60\t0\t59\t1770\n"

View File

@ -79,23 +79,23 @@ def create_tables(cluster, table_name):
# populate data
nodes[0].query(
f"INSERT INTO {table_name}_d SELECT number, number FROM numbers(1000)",
settings={"insert_distributed_sync": 1},
settings={"distributed_foreground_insert": 1},
)
nodes[0].query(
f"INSERT INTO {table_name}_d SELECT number, number FROM numbers(2000)",
settings={"insert_distributed_sync": 1},
settings={"distributed_foreground_insert": 1},
)
nodes[0].query(
f"INSERT INTO {table_name}_d SELECT -number, -number FROM numbers(1000)",
settings={"insert_distributed_sync": 1},
settings={"distributed_foreground_insert": 1},
)
nodes[0].query(
f"INSERT INTO {table_name}_d SELECT -number, -number FROM numbers(2000)",
settings={"insert_distributed_sync": 1},
settings={"distributed_foreground_insert": 1},
)
nodes[0].query(
f"INSERT INTO {table_name}_d SELECT number, number FROM numbers(3)",
settings={"insert_distributed_sync": 1},
settings={"distributed_foreground_insert": 1},
)

View File

@ -41,7 +41,7 @@ def test_default_column():
)
for insert_sync in [0, 1]:
settings = {"insert_distributed_sync": insert_sync}
settings = {"distributed_foreground_insert": insert_sync}
# INSERT INTO TABLE dist (x)
node1.query("TRUNCATE TABLE local ON CLUSTER 'test_cluster'")
@ -86,7 +86,7 @@ def test_materialized_column_allow_insert_materialized():
for insert_sync in [0, 1]:
settings = {
"insert_distributed_sync": insert_sync,
"distributed_foreground_insert": insert_sync,
"insert_allow_materialized_columns": 1,
}
@ -133,7 +133,7 @@ def test_materialized_column_disallow_insert_materialized():
for insert_sync in [0, 1]:
settings = {
"insert_distributed_sync": insert_sync,
"distributed_foreground_insert": insert_sync,
"insert_allow_materialized_columns": 0,
}
@ -173,7 +173,7 @@ def test_materialized_column_disallow_insert_materialized_different_shards():
for insert_sync in [0, 1]:
settings = {
"insert_distributed_sync": insert_sync,
"distributed_foreground_insert": insert_sync,
"insert_allow_materialized_columns": 0,
}

View File

@ -2,7 +2,7 @@
-- https://github.com/ClickHouse/ClickHouse/issues/1059
SET insert_distributed_sync = 1;
SET distributed_foreground_insert = 1;
DROP TABLE IF EXISTS union1;
DROP TABLE IF EXISTS union2;

View File

@ -4,14 +4,14 @@ DROP TABLE IF EXISTS dist_00612;
CREATE TABLE data_00612 (key UInt64, val UInt64) ENGINE = MergeTree ORDER BY key;
CREATE TABLE dist_00612 AS data_00612 ENGINE = Distributed(test_shard_localhost, currentDatabase(), data_00612, rand());
SET insert_distributed_sync=1;
SET distributed_foreground_insert=1;
SET prefer_localhost_replica=0;
SET max_query_size=29;
INSERT INTO dist_00612 VALUES(1, 1), (2, 2), (3, 3), (4, 4), (5, 5);
SELECT key FROM dist_00612;
SET max_query_size=262144;
SET insert_distributed_sync=0;
SET distributed_foreground_insert=0;
SET prefer_localhost_replica=1;
DROP TABLE dist_00612;
DROP TABLE data_00612;

View File

@ -1,6 +1,6 @@
-- Tags: distributed
set insert_distributed_sync = 1;
set distributed_foreground_insert = 1;
set allow_suspicious_low_cardinality_types = 1;
DROP TABLE IF EXISTS test_low_null_float;

View File

@ -1,6 +1,6 @@
-- Tags: distributed
SET insert_distributed_sync = 1;
SET distributed_foreground_insert = 1;
DROP TABLE IF EXISTS low_cardinality;
DROP TABLE IF EXISTS low_cardinality_all;

View File

@ -1,12 +1,12 @@
insert_allow_materialized_columns=0
insert_distributed_sync=0
distributed_foreground_insert=0
2018-08-01
2018-08-01
2018-08-01 2017-08-01
2018-08-01 2017-08-01
2018-08-01
2018-08-01 2017-08-01
insert_distributed_sync=1
distributed_foreground_insert=1
2018-08-01
2018-08-01
2018-08-01 2017-08-01
@ -14,14 +14,14 @@ insert_distributed_sync=1
2018-08-01
2018-08-01 2017-08-01
insert_allow_materialized_columns=1
insert_distributed_sync=0
distributed_foreground_insert=0
2018-08-01
2018-08-01
2018-08-01 2019-08-01
2018-08-01 2019-08-01
2018-08-01
2018-08-01 2019-08-01
insert_distributed_sync=1
distributed_foreground_insert=1
2018-08-01
2018-08-01
2018-08-01 2019-08-01

View File

@ -10,10 +10,10 @@ SELECT 'insert_allow_materialized_columns=0';
SET insert_allow_materialized_columns=0;
--
-- insert_distributed_sync=0
-- distributed_foreground_insert=0
--
SELECT 'insert_distributed_sync=0';
SET insert_distributed_sync=0;
SELECT 'distributed_foreground_insert=0';
SET distributed_foreground_insert=0;
set allow_deprecated_syntax_for_merge_tree=1;
CREATE TABLE local_00952 (date Date, value Date MATERIALIZED toDate('2017-08-01')) ENGINE = MergeTree(date, date, 8192);
@ -31,10 +31,10 @@ DROP TABLE distributed_00952;
DROP TABLE local_00952;
--
-- insert_distributed_sync=1
-- distributed_foreground_insert=1
--
SELECT 'insert_distributed_sync=1';
SET insert_distributed_sync=1;
SELECT 'distributed_foreground_insert=1';
SET distributed_foreground_insert=1;
CREATE TABLE local_00952 (date Date, value Date MATERIALIZED toDate('2017-08-01')) ENGINE = MergeTree(date, date, 8192);
CREATE TABLE distributed_00952 AS local_00952 ENGINE = Distributed('test_cluster_two_shards', currentDatabase(), local_00952, rand());
@ -56,10 +56,10 @@ SELECT 'insert_allow_materialized_columns=1';
SET insert_allow_materialized_columns=1;
--
-- insert_distributed_sync=0
-- distributed_foreground_insert=0
--
SELECT 'insert_distributed_sync=0';
SET insert_distributed_sync=0;
SELECT 'distributed_foreground_insert=0';
SET distributed_foreground_insert=0;
CREATE TABLE local_00952 (date Date, value Date MATERIALIZED toDate('2017-08-01')) ENGINE = MergeTree(date, date, 8192);
CREATE TABLE distributed_00952 AS local_00952 ENGINE = Distributed('test_cluster_two_shards', currentDatabase(), local_00952, rand());
@ -76,10 +76,10 @@ DROP TABLE distributed_00952;
DROP TABLE local_00952;
--
-- insert_distributed_sync=1
-- distributed_foreground_insert=1
--
SELECT 'insert_distributed_sync=1';
SET insert_distributed_sync=1;
SELECT 'distributed_foreground_insert=1';
SET distributed_foreground_insert=1;
CREATE TABLE local_00952 (date Date, value Date MATERIALIZED toDate('2017-08-01')) ENGINE = MergeTree(date, date, 8192);
CREATE TABLE distributed_00952 AS local_00952 ENGINE = Distributed('test_cluster_two_shards', currentDatabase(), local_00952, rand());

View File

@ -1,6 +1,6 @@
-- Tags: distributed
set insert_distributed_sync=1;
set distributed_foreground_insert=1;
DROP TABLE IF EXISTS dist_00967;
DROP TABLE IF EXISTS underlying_00967;

View File

@ -4,7 +4,7 @@
SET max_threads = 1;
-- data should be inserted into Distributed table synchronously
SET insert_distributed_sync = 1;
SET distributed_foreground_insert = 1;
DROP TABLE IF EXISTS mem1;
DROP TABLE IF EXISTS mem2;

View File

@ -3,7 +3,7 @@
-- Create dictionary, since dictGet*() uses DB::Context in executeImpl()
-- (To cover scope of the Context in PushingToViews chain)
set insert_distributed_sync=1;
set distributed_foreground_insert=1;
DROP TABLE IF EXISTS mv;
DROP DATABASE IF EXISTS dict_in_01023;

View File

@ -5,9 +5,9 @@ DROP TABLE IF EXISTS dist_test_01040;
CREATE TABLE test_01040 (key UInt64) ENGINE=TinyLog();
CREATE TABLE dist_test_01040 AS test_01040 Engine=Distributed(test_cluster_two_shards, currentDatabase(), test_01040, key) SETTINGS
monitor_batch_inserts=1,
monitor_sleep_time_ms=10,
monitor_max_sleep_time_ms=100;
background_insert_batch=1,
background_insert_sleep_time_ms=10,
background_insert_max_sleep_time_ms=100;
-- internal_replication=false
SELECT 'test_cluster_two_shards prefer_localhost_replica=0';
@ -28,9 +28,9 @@ DROP TABLE dist_test_01040;
-- internal_replication=true
CREATE TABLE dist_test_01040 AS test_01040 Engine=Distributed(test_cluster_two_shards_internal_replication, currentDatabase(), test_01040, key) SETTINGS
monitor_batch_inserts=1,
monitor_sleep_time_ms=10,
monitor_max_sleep_time_ms=100;
background_insert_batch=1,
background_insert_sleep_time_ms=10,
background_insert_max_sleep_time_ms=100;
SELECT 'test_cluster_two_shards_internal_replication prefer_localhost_replica=0';
SET prefer_localhost_replica=0;
INSERT INTO dist_test_01040 SELECT toUInt64(number) FROM numbers(2);

View File

@ -2,7 +2,7 @@
-- from https://github.com/ClickHouse/ClickHouse/issues/5142
set insert_distributed_sync = 1;
set distributed_foreground_insert = 1;
DROP TABLE IF EXISTS t;
DROP TABLE IF EXISTS t_d;

View File

@ -1,6 +1,6 @@
-- Tags: distributed
-- set insert_distributed_sync = 1; -- see https://github.com/ClickHouse/ClickHouse/issues/18971
-- set distributed_foreground_insert = 1; -- see https://github.com/ClickHouse/ClickHouse/issues/18971
SET allow_experimental_parallel_reading_from_replicas = 0; -- see https://github.com/ClickHouse/ClickHouse/issues/34525
SET prefer_localhost_replica = 1;

View File

@ -14,7 +14,7 @@ create table test_table as test_table_sharded
engine=Distributed(test_cluster_two_shards, currentDatabase(), test_table_sharded, hash);
SET distributed_product_mode = 'local';
SET insert_distributed_sync = 1;
SET distributed_foreground_insert = 1;
INSERT INTO test_table VALUES ('2020-04-20', 'Hello', 123);

View File

@ -1,6 +1,6 @@
-- Tags: distributed
set insert_distributed_sync = 1;
set distributed_foreground_insert = 1;
DROP TABLE IF EXISTS visits;
DROP TABLE IF EXISTS visits_dist;

View File

@ -6,7 +6,7 @@ DROP TABLE IF EXISTS t_dist;
create table t_local(a int) engine Log;
create table t_dist (a int) engine Distributed(test_shard_localhost, currentDatabase(), 't_local', cityHash64(a));
set insert_distributed_sync = 1;
set distributed_foreground_insert = 1;
insert into t_dist values (1);

View File

@ -10,7 +10,7 @@ create table shard_0.tbl (number UInt64) engine = MergeTree order by number;
create table shard_1.tbl (number UInt64) engine = MergeTree order by number;
create table distr (number UInt64) engine = Distributed(test_cluster_two_shards_different_databases, '', tbl);
set insert_distributed_sync = 1;
set distributed_foreground_insert = 1;
set insert_distributed_one_random_shard = 1;
set max_block_size = 1;
set max_insert_block_size = 1;

View File

@ -6,7 +6,7 @@ DROP TABLE IF EXISTS distributed;
CREATE TABLE local (x UInt8) ENGINE = Memory;
CREATE TABLE distributed AS local ENGINE = Distributed(test_cluster_two_shards, currentDatabase(), local, x);
SET insert_distributed_sync = 1;
SET distributed_foreground_insert = 1;
INSERT INTO distributed SELECT number FROM numbers(256) WHERE number % 2 = 0;
SELECT count() FROM local;

View File

@ -6,7 +6,7 @@ DROP TABLE IF EXISTS distributed;
CREATE TABLE local (x UInt8) ENGINE = Memory;
CREATE TABLE distributed AS local ENGINE = Distributed(test_cluster_two_shards, currentDatabase(), local, x);
SET insert_distributed_sync = 0, network_compression_method = 'zstd';
SET distributed_foreground_insert = 0, network_compression_method = 'zstd';
INSERT INTO distributed SELECT number FROM numbers(256);
SYSTEM FLUSH DISTRIBUTED distributed;

View File

@ -8,14 +8,14 @@ SET send_logs_level='error';
CREATE TABLE tmp_01683 (n Int8) ENGINE=Memory;
CREATE TABLE dist_01683 (n UInt64) Engine=Distributed(test_cluster_two_shards, currentDatabase(), tmp_01683, n);
SET insert_distributed_sync=1;
SET distributed_foreground_insert=1;
INSERT INTO dist_01683 VALUES (1),(2);
SET insert_distributed_sync=0;
SET distributed_foreground_insert=0;
INSERT INTO dist_01683 VALUES (1),(2);
SYSTEM FLUSH DISTRIBUTED dist_01683;
-- TODO: cover distributed_directory_monitor_batch_inserts=1
-- TODO: cover distributed_background_insert_batch=1
SELECT * FROM tmp_01683 ORDER BY n;

View File

@ -6,12 +6,12 @@ SET prefer_localhost_replica=0;
CREATE TABLE tmp_01781 (n LowCardinality(String)) ENGINE=Memory;
CREATE TABLE dist_01781 (n LowCardinality(String)) Engine=Distributed(test_cluster_two_shards, currentDatabase(), tmp_01781, cityHash64(n));
SET insert_distributed_sync=1;
SET distributed_foreground_insert=1;
INSERT INTO dist_01781 VALUES ('1'),('2');
-- different LowCardinality size
INSERT INTO dist_01781 SELECT * FROM numbers(1000);
SET insert_distributed_sync=0;
SET distributed_foreground_insert=0;
SYSTEM STOP DISTRIBUTED SENDS dist_01781;
INSERT INTO dist_01781 VALUES ('1'),('2');
-- different LowCardinality size

View File

@ -1,7 +1,7 @@
<Warning> DistributedSink: Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 1)), implicit conversion will be done.
<Warning> DistributedSink: Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 1)), implicit conversion will be done.
<Warning> default.dist_01683.DirectoryMonitor.default: Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 0)), implicit conversion will be done
<Warning> default.dist_01683.DirectoryMonitor.default: Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 0)), implicit conversion will be done
<Warning> default.dist_01683.DistributedInsertQueue.default: Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 0)), implicit conversion will be done
<Warning> default.dist_01683.DistributedInsertQueue.default: Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 0)), implicit conversion will be done
1
1
2

View File

@ -14,16 +14,16 @@ $CLICKHOUSE_CLIENT --prefer_localhost_replica=0 -nm -q "
CREATE TABLE tmp_01683 (n Int8) ENGINE=Memory;
CREATE TABLE dist_01683 (n UInt64) Engine=Distributed(test_cluster_two_shards, currentDatabase(), tmp_01683, n);
SET insert_distributed_sync=1;
SET distributed_foreground_insert=1;
INSERT INTO dist_01683 VALUES (1),(2);
SET insert_distributed_sync=0;
SET distributed_foreground_insert=0;
-- force log messages from the 'SYSTEM FLUSH DISTRIBUTED' context
SYSTEM STOP DISTRIBUTED SENDS dist_01683;
INSERT INTO dist_01683 VALUES (1),(2);
SYSTEM FLUSH DISTRIBUTED dist_01683;
-- TODO: cover distributed_directory_monitor_batch_inserts=1
-- TODO: cover distributed_background_insert_batch=1
SELECT * FROM tmp_01683 ORDER BY n;

View File

@ -9,7 +9,7 @@ drop table if exists shard_0.data_01850;
create table shard_0.data_01850 (key Int) engine=Memory();
create table dist_01850 (key Int) engine=Distributed('test_cluster_two_replicas_different_databases', /* default_database= */ '', data_01850, key);
set insert_distributed_sync=1;
set distributed_foreground_insert=1;
set prefer_localhost_replica=0;
insert into dist_01850 values (1); -- { serverError 60 }

View File

@ -14,14 +14,14 @@ $CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS distributed"
$CLICKHOUSE_CLIENT --query "CREATE TABLE local (x UInt8) ENGINE = Memory;"
$CLICKHOUSE_CLIENT --query "CREATE TABLE distributed AS local ENGINE = Distributed(test_cluster_two_shards, currentDatabase(), local, x);"
$CLICKHOUSE_CLIENT --insert_distributed_sync=0 --network_compression_method='zstd' --query "INSERT INTO distributed SELECT number FROM numbers(256);"
$CLICKHOUSE_CLIENT --insert_distributed_sync=0 --network_compression_method='zstd' --query "SYSTEM FLUSH DISTRIBUTED distributed;"
$CLICKHOUSE_CLIENT --distributed_foreground_insert=0 --network_compression_method='zstd' --query "INSERT INTO distributed SELECT number FROM numbers(256);"
$CLICKHOUSE_CLIENT --distributed_foreground_insert=0 --network_compression_method='zstd' --query "SYSTEM FLUSH DISTRIBUTED distributed;"
function select_thread()
{
while true; do
$CLICKHOUSE_CLIENT --insert_distributed_sync=0 --network_compression_method='zstd' --query "SELECT count() FROM local" >/dev/null
$CLICKHOUSE_CLIENT --insert_distributed_sync=0 --network_compression_method='zstd' --query "SELECT count() FROM distributed" >/dev/null
$CLICKHOUSE_CLIENT --distributed_foreground_insert=0 --network_compression_method='zstd' --query "SELECT count() FROM local" >/dev/null
$CLICKHOUSE_CLIENT --distributed_foreground_insert=0 --network_compression_method='zstd' --query "SELECT count() FROM distributed" >/dev/null
done
}

View File

@ -1,5 +1,5 @@
#!/usr/bin/env bash
# Tags: no-fasttest, distributed
# Tags: no-fasttest, distributed, long
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
@ -8,12 +8,12 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# This function takes 4 arguments:
# $1 - OpenTelemetry Trace Id
# $2 - value of insert_distributed_sync
# $2 - value of distributed_foreground_insert
# $3 - value of prefer_localhost_replica
# $4 - a String that helps to debug
function insert()
{
echo "INSERT INTO ${CLICKHOUSE_DATABASE}.dist_opentelemetry SETTINGS insert_distributed_sync=$2, prefer_localhost_replica=$3 VALUES(1),(2)" |
echo "INSERT INTO ${CLICKHOUSE_DATABASE}.dist_opentelemetry SETTINGS distributed_foreground_insert=$2, prefer_localhost_replica=$3 VALUES(1),(2)" |
${CLICKHOUSE_CURL} \
-X POST \
-H "traceparent: 00-$1-5150000000000515-01" \
@ -47,7 +47,7 @@ ${CLICKHOUSE_CLIENT} -nq "
#
# $1 - OpenTelemetry Trace Id
# $2 - value of insert_distributed_sync
# $2 - value of distributed_foreground_insert
function check_span_kind()
{
${CLICKHOUSE_CLIENT} -nq "

View File

@ -8,12 +8,12 @@ SET prefer_localhost_replica=0;
CREATE TABLE tmp_02482 (i UInt64, n LowCardinality(String)) ENGINE = Memory;
CREATE TABLE dist_02482(i UInt64, n LowCardinality(Nullable(String))) ENGINE = Distributed(test_cluster_two_shards, currentDatabase(), tmp_02482, i);
SET insert_distributed_sync=1;
SET distributed_foreground_insert=1;
INSERT INTO dist_02482 VALUES (1, '1'), (2, '2');
INSERT INTO dist_02482 SELECT number, number FROM numbers(1000);
SET insert_distributed_sync=0;
SET distributed_foreground_insert=0;
SYSTEM STOP DISTRIBUTED SENDS dist_02482;

View File

@ -9,6 +9,6 @@ ENGINE = Memory;
INSERT INTO data_a_02187
SELECT *
FROM system.one
SETTINGS max_block_size = '1', min_insert_block_size_rows = '65536', min_insert_block_size_bytes = '0', max_insert_threads = '0', max_threads = '3', receive_timeout = '10', receive_data_timeout_ms = '10000', connections_with_failover_max_tries = '0', extremes = '1', use_uncompressed_cache = '0', optimize_move_to_prewhere = '1', optimize_move_to_prewhere_if_final = '0', replication_alter_partitions_sync = '2', totals_mode = 'before_having', allow_suspicious_low_cardinality_types = '1', compile_expressions = '1', min_count_to_compile_expression = '0', group_by_two_level_threshold = '100', distributed_aggregation_memory_efficient = '0', distributed_group_by_no_merge = '1', optimize_distributed_group_by_sharding_key = '1', optimize_skip_unused_shards = '1', optimize_skip_unused_shards_rewrite_in = '1', force_optimize_skip_unused_shards = '2', optimize_skip_unused_shards_nesting = '1', force_optimize_skip_unused_shards_nesting = '2', merge_tree_min_rows_for_concurrent_read = '10000', force_primary_key = '1', network_compression_method = 'ZSTD', network_zstd_compression_level = '7', log_queries = '0', log_queries_min_type = 'QUERY_FINISH', distributed_product_mode = 'local', insert_quorum = '2', insert_quorum_timeout = '0', insert_quorum_parallel = '0', select_sequential_consistency = '1', join_use_nulls = '1', any_join_distinct_right_table_keys = '1', preferred_max_column_in_block_size_bytes = '32', insert_distributed_sync = '1', insert_allow_materialized_columns = '1', use_index_for_in_with_subqueries = '1', joined_subquery_requires_alias = '0', empty_result_for_aggregation_by_empty_set = '1', allow_suspicious_codecs = '1', query_profiler_real_time_period_ns = '0', query_profiler_cpu_time_period_ns = '0', opentelemetry_start_trace_probability = '1', max_rows_to_read = '1000000', read_overflow_mode = 'break', max_rows_to_group_by = '10', group_by_overflow_mode = 'any', max_rows_to_sort = '100', sort_overflow_mode = 'break', max_result_rows = '10', max_execution_time = '3', max_execution_speed = '1', max_bytes_in_join = '100', join_algorithm = 'partial_merge', max_memory_usage = '1099511627776', log_query_threads = '1', send_logs_level = 'fatal', enable_optimize_predicate_expression = '1', prefer_localhost_replica = '1', optimize_read_in_order = '1', optimize_aggregation_in_order = '1', read_in_order_two_level_merge_threshold = '1', allow_introspection_functions = '1', check_query_single_value_result = '1', allow_experimental_live_view = '1', default_table_engine = 'Memory', mutations_sync = '2', convert_query_to_cnf = '0', optimize_arithmetic_operations_in_aggregate_functions = '1', optimize_duplicate_order_by_and_distinct = '0', optimize_multiif_to_if = '0', optimize_monotonous_functions_in_order_by = '1', optimize_functions_to_subcolumns = '1', optimize_using_constraints = '1', optimize_substitute_columns = '1', optimize_append_index = '1', transform_null_in = '1', data_type_default_nullable = '1', cast_keep_nullable = '1', cast_ipv4_ipv6_default_on_conversion_error = '0', system_events_show_zero_values = '1', enable_global_with_statement = '1', optimize_on_insert = '0', optimize_rewrite_sum_if_to_count_if = '1', distributed_ddl_output_mode = 'throw', union_default_mode = 'ALL', optimize_aggregators_of_group_by_keys = '1', optimize_group_by_function_keys = '1', short_circuit_function_evaluation = 'enable', async_insert = '1', enable_filesystem_cache = '0', allow_deprecated_database_ordinary = '1', allow_deprecated_syntax_for_merge_tree = '1', allow_experimental_nlp_functions = '1', allow_experimental_object_type = '1', allow_experimental_map_type = '1', optimize_use_projections = '1', input_format_null_as_default = '1', input_format_ipv4_default_on_conversion_error = '0', input_format_ipv6_default_on_conversion_error = '0', output_format_json_named_tuples_as_objects = '1', output_format_write_statistics = '0', output_format_pretty_row_numbers = '1';
SETTINGS max_block_size = '1', min_insert_block_size_rows = '65536', min_insert_block_size_bytes = '0', max_insert_threads = '0', max_threads = '3', receive_timeout = '10', receive_data_timeout_ms = '10000', connections_with_failover_max_tries = '0', extremes = '1', use_uncompressed_cache = '0', optimize_move_to_prewhere = '1', optimize_move_to_prewhere_if_final = '0', replication_alter_partitions_sync = '2', totals_mode = 'before_having', allow_suspicious_low_cardinality_types = '1', compile_expressions = '1', min_count_to_compile_expression = '0', group_by_two_level_threshold = '100', distributed_aggregation_memory_efficient = '0', distributed_group_by_no_merge = '1', optimize_distributed_group_by_sharding_key = '1', optimize_skip_unused_shards = '1', optimize_skip_unused_shards_rewrite_in = '1', force_optimize_skip_unused_shards = '2', optimize_skip_unused_shards_nesting = '1', force_optimize_skip_unused_shards_nesting = '2', merge_tree_min_rows_for_concurrent_read = '10000', force_primary_key = '1', network_compression_method = 'ZSTD', network_zstd_compression_level = '7', log_queries = '0', log_queries_min_type = 'QUERY_FINISH', distributed_product_mode = 'local', insert_quorum = '2', insert_quorum_timeout = '0', insert_quorum_parallel = '0', select_sequential_consistency = '1', join_use_nulls = '1', any_join_distinct_right_table_keys = '1', preferred_max_column_in_block_size_bytes = '32', distributed_foreground_insert = '1', insert_allow_materialized_columns = '1', use_index_for_in_with_subqueries = '1', joined_subquery_requires_alias = '0', empty_result_for_aggregation_by_empty_set = '1', allow_suspicious_codecs = '1', query_profiler_real_time_period_ns = '0', query_profiler_cpu_time_period_ns = '0', opentelemetry_start_trace_probability = '1', max_rows_to_read = '1000000', read_overflow_mode = 'break', max_rows_to_group_by = '10', group_by_overflow_mode = 'any', max_rows_to_sort = '100', sort_overflow_mode = 'break', max_result_rows = '10', max_execution_time = '3', max_execution_speed = '1', max_bytes_in_join = '100', join_algorithm = 'partial_merge', max_memory_usage = '1099511627776', log_query_threads = '1', send_logs_level = 'fatal', enable_optimize_predicate_expression = '1', prefer_localhost_replica = '1', optimize_read_in_order = '1', optimize_aggregation_in_order = '1', read_in_order_two_level_merge_threshold = '1', allow_introspection_functions = '1', check_query_single_value_result = '1', allow_experimental_live_view = '1', default_table_engine = 'Memory', mutations_sync = '2', convert_query_to_cnf = '0', optimize_arithmetic_operations_in_aggregate_functions = '1', optimize_duplicate_order_by_and_distinct = '0', optimize_multiif_to_if = '0', optimize_monotonous_functions_in_order_by = '1', optimize_functions_to_subcolumns = '1', optimize_using_constraints = '1', optimize_substitute_columns = '1', optimize_append_index = '1', transform_null_in = '1', data_type_default_nullable = '1', cast_keep_nullable = '1', cast_ipv4_ipv6_default_on_conversion_error = '0', system_events_show_zero_values = '1', enable_global_with_statement = '1', optimize_on_insert = '0', optimize_rewrite_sum_if_to_count_if = '1', distributed_ddl_output_mode = 'throw', union_default_mode = 'ALL', optimize_aggregators_of_group_by_keys = '1', optimize_group_by_function_keys = '1', short_circuit_function_evaluation = 'enable', async_insert = '1', enable_filesystem_cache = '0', allow_deprecated_database_ordinary = '1', allow_deprecated_syntax_for_merge_tree = '1', allow_experimental_nlp_functions = '1', allow_experimental_object_type = '1', allow_experimental_map_type = '1', optimize_use_projections = '1', input_format_null_as_default = '1', input_format_ipv4_default_on_conversion_error = '0', input_format_ipv6_default_on_conversion_error = '0', output_format_json_named_tuples_as_objects = '1', output_format_write_statistics = '0', output_format_pretty_row_numbers = '1';
DROP TABLE data_a_02187;

View File

@ -6,7 +6,7 @@ SELECT 'monitor_batch_insert={{ setting }}';
DROP TABLE IF EXISTS dist;
DROP TABLE IF EXISTS underlying;
CREATE TABLE dist (key Int) ENGINE=Distributed(test_shard_localhost, currentDatabase(), underlying) SETTINGS monitor_batch_inserts={{ setting }};
CREATE TABLE dist (key Int) ENGINE=Distributed(test_shard_localhost, currentDatabase(), underlying) SETTINGS background_insert_batch={{ setting }};
SYSTEM STOP DISTRIBUTED SENDS dist;
INSERT INTO dist SETTINGS prefer_localhost_replica=0, max_threads=1 VALUES (1);

View File

@ -6,7 +6,7 @@ DROP TABLE IF EXISTS local_tbl;
CREATE TABLE local_tbl (`key` UInt32, `value` UInt32 DEFAULT 42) ENGINE = MergeTree ORDER BY key;
CREATE TABLE dist_tbl ENGINE = Distributed('test_shard_localhost', currentDatabase(), 'local_tbl', rand());
SHOW CREATE TABLE dist_tbl;
INSERT INTO dist_tbl (key) SETTINGS insert_distributed_sync=1 VALUES (99);
INSERT INTO dist_tbl (key) SETTINGS distributed_foreground_insert=1 VALUES (99);
SELECT 'local_tbl';
SELECT * FROM local_tbl;
SELECT 'dist_tbl';

View File

@ -0,0 +1,8 @@
CREATE TABLE default.dist_monitor_batch_inserts\n(\n `dummy` UInt8\n)\nENGINE = Distributed(\'default\', \'system\', \'one\')\nSETTINGS monitor_batch_inserts = 1
CREATE TABLE default.dist_monitor_split_batch_on_failure\n(\n `dummy` UInt8\n)\nENGINE = Distributed(\'default\', \'system\', \'one\')\nSETTINGS monitor_split_batch_on_failure = 1
CREATE TABLE default.dist_monitor_sleep_time_ms\n(\n `dummy` UInt8\n)\nENGINE = Distributed(\'default\', \'system\', \'one\')\nSETTINGS monitor_sleep_time_ms = 1
CREATE TABLE default.dist_monitor_max_sleep_time_ms\n(\n `dummy` UInt8\n)\nENGINE = Distributed(\'default\', \'system\', \'one\')\nSETTINGS monitor_max_sleep_time_ms = 1
CREATE TABLE default.dist_background_insert_batch\n(\n `dummy` UInt8\n)\nENGINE = Distributed(\'default\', \'system\', \'one\')\nSETTINGS background_insert_batch = 1
CREATE TABLE default.dist_background_insert_split_batch_on_failure\n(\n `dummy` UInt8\n)\nENGINE = Distributed(\'default\', \'system\', \'one\')\nSETTINGS background_insert_split_batch_on_failure = 1
CREATE TABLE default.dist_background_insert_sleep_time_ms\n(\n `dummy` UInt8\n)\nENGINE = Distributed(\'default\', \'system\', \'one\')\nSETTINGS background_insert_sleep_time_ms = 1
CREATE TABLE default.dist_background_insert_max_sleep_time_ms\n(\n `dummy` UInt8\n)\nENGINE = Distributed(\'default\', \'system\', \'one\')\nSETTINGS background_insert_max_sleep_time_ms = 1

View File

@ -0,0 +1,31 @@
{% for table_setting in [
'monitor_batch_inserts',
'monitor_split_batch_on_failure',
'monitor_sleep_time_ms',
'monitor_max_sleep_time_ms',
'background_insert_batch',
'background_insert_split_batch_on_failure',
'background_insert_sleep_time_ms',
'background_insert_max_sleep_time_ms',
] %}
drop table if exists dist_{{ table_setting }};
create table dist_{{ table_setting }} as system.one engine=Distributed(default, system, one) settings {{ table_setting }}=1;
show create dist_{{ table_setting }};
drop table dist_{{ table_setting }};
{% endfor %}
create table data (key Int) engine=Null();
create table dist (key Int) engine=Distributed(default, currentDatabase(), data, key);
{% for query_setting in [
'distributed_directory_monitor_sleep_time_ms',
'distributed_directory_monitor_max_sleep_time_ms',
'distributed_directory_monitor_batch_inserts',
'distributed_directory_monitor_split_batch_on_failure',
'insert_distributed_sync',
'insert_distributed_timeout',
] %}
insert into dist settings {{query_setting}}=1 values (1);
system flush distributed dist;
{% endfor %}