From 6965ac26c302c3c35c22d5cb8c2fc85b93a57e02 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Wed, 27 Jan 2021 21:43:41 +0300 Subject: [PATCH] Distributed: Add ability to delay/throttle INSERT until pending data will be reduced Add two new settings for the Distributed engine: - bytes_to_delay_insert - max_delay_to_insert If at the beginning of INSERT there will be too much pending data, more then bytes_to_delay_insert, then the INSERT will wait until it will be shrinked, and not more then max_delay_to_insert seconds. If after this there will be still too much pending, it will throw an exception. Also new profile events were added (by analogy to the MergeTree): - DistributedDelayedInserts (although you can use system.errors instead of this, but still) - DistributedRejectedInserts - DistributedDelayedInsertsMilliseconds --- .../table-engines/special/distributed.md | 13 ++- src/Common/ProfileEvents.cpp | 3 + .../DistributedBlockOutputStream.cpp | 2 +- .../Distributed/DistributedSettings.h | 2 + src/Storages/StorageDistributed.cpp | 60 ++++++++++++- src/Storages/StorageDistributed.h | 2 +- ...istributed_bytes_to_delay_insert.reference | 3 + ...01675_distributed_bytes_to_delay_insert.sh | 86 +++++++++++++++++++ .../queries/0_stateless/arcadia_skip_list.txt | 1 + 9 files changed, 163 insertions(+), 9 deletions(-) create mode 100644 tests/queries/0_stateless/01675_distributed_bytes_to_delay_insert.reference create mode 100755 tests/queries/0_stateless/01675_distributed_bytes_to_delay_insert.sh diff --git a/docs/en/engines/table-engines/special/distributed.md b/docs/en/engines/table-engines/special/distributed.md index f7b77da7294..097829f6621 100644 --- a/docs/en/engines/table-engines/special/distributed.md +++ b/docs/en/engines/table-engines/special/distributed.md @@ -31,12 +31,11 @@ Also it accept the following settings: - `fsync_directories` - do the `fsync` for directories. Guarantees that the OS refreshed directory metadata after operations related to asynchronous inserts on Distributed table (after insert, after sending the data to shard, etc). -- `bytes_to_throw_insert` - if more than this number of compressed bytes will be pending for async INSERT, an exception will be thrown. 0 - do not throw. +- `bytes_to_throw_insert` - if more than this number of compressed bytes will be pending for async INSERT, an exception will be thrown. 0 - do not throw. Default 0. - See also: +- `bytes_to_delay_insert` - if more than this number of compressed bytes will be pending for async INSERT, the query will be delayed. 0 - do not delay. Default 0. - - [insert_distributed_sync](../../../operations/settings/settings.md#insert_distributed_sync) setting - - [prefer_localhost_replica](../../../operations/settings/settings.md#settings-prefer-localhost-replica) setting +- `max_delay_to_insert` - max delay of inserting data into Distributed table in seconds, if there are a lot of pending bytes for async send. Default 60. !!! note "Note" @@ -46,6 +45,12 @@ Also it accept the following settings: - May significantly decrease the inserts' performance - Affect writing the data stored inside Distributed table folder into the **node which accepted your insert**. If you need to have guarantees of writing data to underlying MergeTree tables - see durability settings (`...fsync...`) in `system.merge_tree_settings` + For **Insert limit settings** (`..._insert`) see also: + + - [insert_distributed_sync](../../../operations/settings/settings.md#insert_distributed_sync) setting + - [prefer_localhost_replica](../../../operations/settings/settings.md#settings-prefer-localhost-replica) setting + - `bytes_to_throw_insert` handled before `bytes_to_delay_insert`, so you should not set it to the value less then `bytes_to_delay_insert` + Example: ``` sql diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index c459bf41352..2453371edab 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -68,6 +68,9 @@ M(DelayedInserts, "Number of times the INSERT of a block to a MergeTree table was throttled due to high number of active data parts for partition.") \ M(RejectedInserts, "Number of times the INSERT of a block to a MergeTree table was rejected with 'Too many parts' exception due to high number of active data parts for partition.") \ M(DelayedInsertsMilliseconds, "Total number of milliseconds spent while the INSERT of a block to a MergeTree table was throttled due to high number of active data parts for partition.") \ + M(DistributedDelayedInserts, "Number of times the INSERT of a block to a Distributed table was throttled due to high number of pending bytes.") \ + M(DistributedRejectedInserts, "Number of times the INSERT of a block to a Distributed table was rejected with 'Too many bytes' exception due to high number of pending bytes.") \ + M(DistributedDelayedInsertsMilliseconds, "Total number of milliseconds spent while the INSERT of a block to a Distributed table was throttled due to high number of pending bytes.") \ M(DuplicatedInsertedBlocks, "Number of times the INSERTed block to a ReplicatedMergeTree table was deduplicated.") \ \ M(ZooKeeperInit, "") \ diff --git a/src/Storages/Distributed/DistributedBlockOutputStream.cpp b/src/Storages/Distributed/DistributedBlockOutputStream.cpp index a5e31d928f2..95b124d2c01 100644 --- a/src/Storages/Distributed/DistributedBlockOutputStream.cpp +++ b/src/Storages/Distributed/DistributedBlockOutputStream.cpp @@ -628,7 +628,7 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std:: /// and keep monitor thread out from reading incomplete data std::string first_file_tmp_path; - storage.throwInsertIfNeeded(); + storage.delayInsertOrThrowIfNeeded(); auto reservation = storage.getStoragePolicy()->reserveAndCheck(block.bytes()); const auto disk = reservation->getDisk(); diff --git a/src/Storages/Distributed/DistributedSettings.h b/src/Storages/Distributed/DistributedSettings.h index 0ad966ab913..7296fa11ffd 100644 --- a/src/Storages/Distributed/DistributedSettings.h +++ b/src/Storages/Distributed/DistributedSettings.h @@ -19,6 +19,8 @@ class ASTStorage; M(Bool, fsync_directories, false, "Do fsync for temporary directory (that is used for async INSERT only) after all part operations (writes, renames, etc.).", 0) \ /** Inserts settings. */ \ M(UInt64, bytes_to_throw_insert, 0, "If more than this number of compressed bytes will be pending for async INSERT, an exception will be thrown. 0 - do not throw.", 0) \ + M(UInt64, bytes_to_delay_insert, 0, "If more than this number of compressed bytes will be pending for async INSERT, the query will be delayed. 0 - do not delay.", 0) \ + M(UInt64, max_delay_to_insert, 60, "Max delay of inserting data into Distributed table in seconds, if there are a lot of pending bytes for async send.", 0) \ DECLARE_SETTINGS_TRAITS(DistributedSettingsTraits, LIST_OF_DISTRIBUTED_SETTINGS) diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 8b7cb2fde02..039cf63eca2 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -70,6 +71,13 @@ const UInt64 FORCE_OPTIMIZE_SKIP_UNUSED_SHARDS_ALWAYS = 2; const UInt64 DISTRIBUTED_GROUP_BY_NO_MERGE_AFTER_AGGREGATION = 2; } +namespace ProfileEvents +{ + extern const Event DistributedRejectedInserts; + extern const Event DistributedDelayedInserts; + extern const Event DistributedDelayedInsertsMilliseconds; +} + namespace DB { @@ -87,6 +95,7 @@ namespace ErrorCodes extern const int INVALID_SHARD_ID; extern const int ALTER_OF_COLUMN_IS_FORBIDDEN; extern const int DISTRIBUTED_TOO_MANY_PENDING_BYTES; + extern const int ARGUMENT_OUT_OF_BOUND; } namespace ActionLocks @@ -977,19 +986,53 @@ void StorageDistributed::renameOnDisk(const String & new_path_to_table_data) relative_data_path = new_path_to_table_data; } -void StorageDistributed::throwInsertIfNeeded() const +void StorageDistributed::delayInsertOrThrowIfNeeded() const { - if (!distributed_settings.bytes_to_throw_insert) + if (!distributed_settings.bytes_to_throw_insert && + !distributed_settings.bytes_to_delay_insert) return; UInt64 total_bytes = *totalBytes(global_context.getSettingsRef()); - if (total_bytes > distributed_settings.bytes_to_throw_insert) + + if (distributed_settings.bytes_to_throw_insert && total_bytes > distributed_settings.bytes_to_throw_insert) { + ProfileEvents::increment(ProfileEvents::DistributedRejectedInserts); throw Exception(ErrorCodes::DISTRIBUTED_TOO_MANY_PENDING_BYTES, "Too many bytes pending for async INSERT: {} (bytes_to_throw_insert={})", formatReadableSizeWithBinarySuffix(total_bytes), formatReadableSizeWithBinarySuffix(distributed_settings.bytes_to_throw_insert)); } + + if (distributed_settings.bytes_to_delay_insert && total_bytes > distributed_settings.bytes_to_delay_insert) + { + /// Step is 5% of the delay and minimal one second. + /// NOTE: max_delay_to_insert is in seconds, and step is in ms. + const size_t step_ms = std::min(1., double(distributed_settings.max_delay_to_insert) * 1'000 * 0.05); + UInt64 delayed_ms = 0; + + do { + delayed_ms += step_ms; + std::this_thread::sleep_for(std::chrono::milliseconds(step_ms)); + } while (*totalBytes(global_context.getSettingsRef()) > distributed_settings.bytes_to_delay_insert && delayed_ms < distributed_settings.max_delay_to_insert*1000); + + ProfileEvents::increment(ProfileEvents::DistributedDelayedInserts); + ProfileEvents::increment(ProfileEvents::DistributedDelayedInsertsMilliseconds, delayed_ms); + + UInt64 new_total_bytes = *totalBytes(global_context.getSettingsRef()); + LOG_INFO(log, "Too many bytes pending for async INSERT: was {}, now {}, INSERT was delayed to {} ms", + formatReadableSizeWithBinarySuffix(total_bytes), + formatReadableSizeWithBinarySuffix(new_total_bytes), + delayed_ms); + + if (new_total_bytes > distributed_settings.bytes_to_delay_insert) + { + ProfileEvents::increment(ProfileEvents::DistributedRejectedInserts); + throw Exception(ErrorCodes::DISTRIBUTED_TOO_MANY_PENDING_BYTES, + "Too many bytes pending for async INSERT: {} (bytes_to_delay_insert={})", + formatReadableSizeWithBinarySuffix(new_total_bytes), + formatReadableSizeWithBinarySuffix(distributed_settings.bytes_to_delay_insert)); + } + } } void registerStorageDistributed(StorageFactory & factory) @@ -1057,6 +1100,17 @@ void registerStorageDistributed(StorageFactory & factory) distributed_settings.loadFromQuery(*args.storage_def); } + if (distributed_settings.max_delay_to_insert < 1) + throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, + "max_delay_to_insert cannot be less then 1"); + + if (distributed_settings.bytes_to_throw_insert && distributed_settings.bytes_to_delay_insert && + distributed_settings.bytes_to_throw_insert <= distributed_settings.bytes_to_delay_insert) + { + throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, + "bytes_to_throw_insert cannot be less or equal to bytes_to_delay_insert (since it is handled first)"); + } + return StorageDistributed::create( args.table_id, args.columns, args.constraints, remote_database, remote_table, cluster_name, diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index 7e5b8c24ce8..5904124505a 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -174,7 +174,7 @@ private: const DistributedSettings & getDistributedSettingsRef() const { return distributed_settings; } - void throwInsertIfNeeded() const; + void delayInsertOrThrowIfNeeded() const; private: String remote_database; diff --git a/tests/queries/0_stateless/01675_distributed_bytes_to_delay_insert.reference b/tests/queries/0_stateless/01675_distributed_bytes_to_delay_insert.reference new file mode 100644 index 00000000000..d8c50c741ea --- /dev/null +++ b/tests/queries/0_stateless/01675_distributed_bytes_to_delay_insert.reference @@ -0,0 +1,3 @@ +max_delay_to_insert will throw +max_delay_to_insert will succeed +flushed diff --git a/tests/queries/0_stateless/01675_distributed_bytes_to_delay_insert.sh b/tests/queries/0_stateless/01675_distributed_bytes_to_delay_insert.sh new file mode 100755 index 00000000000..bad12e4cd58 --- /dev/null +++ b/tests/queries/0_stateless/01675_distributed_bytes_to_delay_insert.sh @@ -0,0 +1,86 @@ +#!/usr/bin/env bash + +# NOTE: $SECONDS accuracy is second, so we need some delta, hence -1 in time conditions. + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +max_delay_to_insert=5 + +${CLICKHOUSE_CLIENT} -nq " +drop table if exists dist_01675; +drop table if exists data_01675; +" + +${CLICKHOUSE_CLIENT} -nq " +create table data_01675 (key Int) engine=Null(); +create table dist_01675 (key Int) engine=Distributed(test_shard_localhost, currentDatabase(), data_01675) settings bytes_to_delay_insert=1, max_delay_to_insert=$max_delay_to_insert; +system stop distributed sends dist_01675; +" + +# +# Case 1: max_delay_to_insert will throw. +# +echo "max_delay_to_insert will throw" + +start_seconds=$SECONDS +${CLICKHOUSE_CLIENT} --testmode -nq " +-- first batch is always OK, since there is no pending bytes yet +insert into dist_01675 select * from numbers(1) settings prefer_localhost_replica=0; +-- second will fail, because of bytes_to_delay_insert=1 and max_delay_to_insert=5, +-- while distributed sends is stopped. +-- +-- (previous block definitelly takes more, since it has header) +insert into dist_01675 select * from numbers(1) settings prefer_localhost_replica=0; -- { serverError 574 } +system flush distributed dist_01675; +" +end_seconds=$SECONDS + +if (( (end_seconds-start_seconds)<(max_delay_to_insert-1) )); then + echo "max_delay_to_insert was not satisfied ($end_seconds-$start_seconds)" +fi + +# +# Case 2: max_delay_to_insert will finally finished. +# +echo "max_delay_to_insert will succeed" + +max_delay_to_insert=10 +${CLICKHOUSE_CLIENT} -nq " +drop table dist_01675; +create table dist_01675 (key Int) engine=Distributed(test_shard_localhost, currentDatabase(), data_01675) settings bytes_to_delay_insert=1, max_delay_to_insert=$max_delay_to_insert; +system stop distributed sends dist_01675; +" + +flush_delay=4 +function flush_distributed_worker() +{ + sleep $flush_delay + ${CLICKHOUSE_CLIENT} -q "system flush distributed dist_01675" + echo flushed +} +flush_distributed_worker & + +start_seconds=$SECONDS +${CLICKHOUSE_CLIENT} --testmode -nq " +-- first batch is always OK, since there is no pending bytes yet +insert into dist_01675 select * from numbers(1) settings prefer_localhost_replica=0; +-- second will succcedd, due to SYSTEM FLUSH DISTRIBUTED in background. +insert into dist_01675 select * from numbers(1) settings prefer_localhost_replica=0; +" +end_seconds=$SECONDS +wait + +if (( (end_seconds-start_seconds)<(flush_delay-1) )); then + echo "max_delay_to_insert was not wait flush_delay ($end_seconds-$start_seconds)" +fi +if (( (end_seconds-start_seconds)>=(max_delay_to_insert-1) )); then + echo "max_delay_to_insert was overcommited ($end_seconds-$start_seconds)" +fi + + +${CLICKHOUSE_CLIENT} -nq " +drop table dist_01675; +drop table data_01675; +" diff --git a/tests/queries/0_stateless/arcadia_skip_list.txt b/tests/queries/0_stateless/arcadia_skip_list.txt index 8f3b4e56134..4b0f2ea3ed5 100644 --- a/tests/queries/0_stateless/arcadia_skip_list.txt +++ b/tests/queries/0_stateless/arcadia_skip_list.txt @@ -200,6 +200,7 @@ 01659_test_base64Decode_mysql_compatibility 01670_distributed_bytes_to_throw_insert 01674_htm_xml_coarse_parse +01675_distributed_bytes_to_delay_insert 01675_data_type_coroutine 01676_clickhouse_client_autocomplete 01671_aggregate_function_group_bitmap_data