From b5a5778589dd5912975e53aa5014ba49e7c33c41 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Tue, 26 Jan 2021 21:45:37 +0300 Subject: [PATCH] Distributed: Add ability to limit amount of pending bytes for async INSERT Right now with distributed_directory_monitor_batch_inserts=1 and insert_distributed_sync=0 INSERT into Distributed table will store blocks that should be sent to remote (and in case of prefer_localhost_replica=0 to the localhost too) on the local filesystem, and sent it in background. However there is no limit for this storage, and if the remote is unavailable (or some other error), these pending blocks may take significant space, and this is not always desired behaviour. Add new Distributed setting - bytes_to_throw_insert, that will set the limit for how much pending bytes is allowed, if the limit will be reached an exception will be throw. By default was set to 0, to avoid surprises. --- .../table-engines/special/distributed.md | 7 +++++++ src/Common/ErrorCodes.cpp | 1 + .../DistributedBlockOutputStream.cpp | 2 ++ src/Storages/Distributed/DistributedSettings.h | 2 ++ src/Storages/StorageDistributed.cpp | 17 +++++++++++++++++ src/Storages/StorageDistributed.h | 2 ++ ..._distributed_bytes_to_throw_insert.reference | 0 .../01670_distributed_bytes_to_throw_insert.sql | 14 ++++++++++++++ tests/queries/0_stateless/arcadia_skip_list.txt | 1 + 9 files changed, 46 insertions(+) create mode 100644 tests/queries/0_stateless/01670_distributed_bytes_to_throw_insert.reference create mode 100644 tests/queries/0_stateless/01670_distributed_bytes_to_throw_insert.sql diff --git a/docs/en/engines/table-engines/special/distributed.md b/docs/en/engines/table-engines/special/distributed.md index 7fffa962480..f7b77da7294 100644 --- a/docs/en/engines/table-engines/special/distributed.md +++ b/docs/en/engines/table-engines/special/distributed.md @@ -31,6 +31,13 @@ Also it accept the following settings: - `fsync_directories` - do the `fsync` for directories. Guarantees that the OS refreshed directory metadata after operations related to asynchronous inserts on Distributed table (after insert, after sending the data to shard, etc). +- `bytes_to_throw_insert` - if more than this number of compressed bytes will be pending for async INSERT, an exception will be thrown. 0 - do not throw. + + See also: + + - [insert_distributed_sync](../../../operations/settings/settings.md#insert_distributed_sync) setting + - [prefer_localhost_replica](../../../operations/settings/settings.md#settings-prefer-localhost-replica) setting + !!! note "Note" **Durability settings** (`fsync_...`): diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index f81e377da2b..fa921ef7c1c 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -540,6 +540,7 @@ M(571, DATABASE_REPLICATION_FAILED) \ M(572, TOO_MANY_QUERY_PLAN_OPTIMIZATIONS) \ M(573, EPOLL_ERROR) \ + M(574, DISTRIBUTED_TOO_MANY_PENDING_BYTES) \ \ M(999, KEEPER_EXCEPTION) \ M(1000, POCO_EXCEPTION) \ diff --git a/src/Storages/Distributed/DistributedBlockOutputStream.cpp b/src/Storages/Distributed/DistributedBlockOutputStream.cpp index 51bd6d83105..2c185e76dbd 100644 --- a/src/Storages/Distributed/DistributedBlockOutputStream.cpp +++ b/src/Storages/Distributed/DistributedBlockOutputStream.cpp @@ -628,6 +628,8 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std:: /// and keep monitor thread out from reading incomplete data std::string first_file_tmp_path; + storage.throwInsertIfNeeded(); + auto reservation = storage.getStoragePolicy()->reserveAndCheck(block.bytes()); const auto disk = reservation->getDisk(); auto disk_path = disk->getPath(); diff --git a/src/Storages/Distributed/DistributedSettings.h b/src/Storages/Distributed/DistributedSettings.h index 9df787428df..0ad966ab913 100644 --- a/src/Storages/Distributed/DistributedSettings.h +++ b/src/Storages/Distributed/DistributedSettings.h @@ -17,6 +17,8 @@ class ASTStorage; #define LIST_OF_DISTRIBUTED_SETTINGS(M) \ M(Bool, fsync_after_insert, false, "Do fsync for every inserted. Will decreases performance of inserts (only for async INSERT, i.e. insert_distributed_sync=false)", 0) \ M(Bool, fsync_directories, false, "Do fsync for temporary directory (that is used for async INSERT only) after all part operations (writes, renames, etc.).", 0) \ + /** Inserts settings. */ \ + M(UInt64, bytes_to_throw_insert, 0, "If more than this number of compressed bytes will be pending for async INSERT, an exception will be thrown. 0 - do not throw.", 0) \ DECLARE_SETTINGS_TRAITS(DistributedSettingsTraits, LIST_OF_DISTRIBUTED_SETTINGS) diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 804cd99ae19..1d3f5968536 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -85,6 +86,7 @@ namespace ErrorCodes extern const int UNABLE_TO_SKIP_UNUSED_SHARDS; extern const int INVALID_SHARD_ID; extern const int ALTER_OF_COLUMN_IS_FORBIDDEN; + extern const int DISTRIBUTED_TOO_MANY_PENDING_BYTES; } namespace ActionLocks @@ -975,6 +977,21 @@ void StorageDistributed::renameOnDisk(const String & new_path_to_table_data) relative_data_path = new_path_to_table_data; } +void StorageDistributed::throwInsertIfNeeded() const +{ + if (!distributed_settings.bytes_to_throw_insert) + return; + + /// TODO: update the counters + UInt64 total_bytes = *totalBytes(global_context.getSettingsRef()); + if (total_bytes > distributed_settings.bytes_to_throw_insert) + { + throw Exception(ErrorCodes::DISTRIBUTED_TOO_MANY_PENDING_BYTES, + "Too many bytes pending for async INSERT: {} (bytes_to_throw_insert={})", + formatReadableSizeWithBinarySuffix(total_bytes), + formatReadableSizeWithBinarySuffix(distributed_settings.bytes_to_throw_insert)); + } +} void registerStorageDistributed(StorageFactory & factory) { diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index 0a1ec5f4503..7e5b8c24ce8 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -174,6 +174,8 @@ private: const DistributedSettings & getDistributedSettingsRef() const { return distributed_settings; } + void throwInsertIfNeeded() const; + private: String remote_database; String remote_table; diff --git a/tests/queries/0_stateless/01670_distributed_bytes_to_throw_insert.reference b/tests/queries/0_stateless/01670_distributed_bytes_to_throw_insert.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/01670_distributed_bytes_to_throw_insert.sql b/tests/queries/0_stateless/01670_distributed_bytes_to_throw_insert.sql new file mode 100644 index 00000000000..1415c895290 --- /dev/null +++ b/tests/queries/0_stateless/01670_distributed_bytes_to_throw_insert.sql @@ -0,0 +1,14 @@ +drop table if exists dist_01670; +drop table if exists data_01670; + +create table data_01670 (key Int) engine=Null(); +create table dist_01670 (key Int) engine=Distributed(test_shard_localhost, currentDatabase(), data_01670) settings bytes_to_throw_insert=1; +system stop distributed sends dist_01670; +-- first batch is always OK, since there is no pending bytes yet +insert into dist_01670 select * from numbers(1) settings prefer_localhost_replica=0; +-- second will fail, because of bytes_to_throw_insert=1 +-- (previous block definitelly takes more, since it has header) +insert into dist_01670 select * from numbers(1) settings prefer_localhost_replica=0; -- { serverError 574 } +system flush distributed dist_01670; +drop table dist_01670; +drop table data_01670; diff --git a/tests/queries/0_stateless/arcadia_skip_list.txt b/tests/queries/0_stateless/arcadia_skip_list.txt index a823ba38c1c..59f0aa37e49 100644 --- a/tests/queries/0_stateless/arcadia_skip_list.txt +++ b/tests/queries/0_stateless/arcadia_skip_list.txt @@ -197,6 +197,7 @@ 01181_db_atomic_drop_on_cluster 01658_test_base64Encode_mysql_compatibility 01659_test_base64Decode_mysql_compatibility +01670_distributed_bytes_to_throw_insert 01674_htm_xml_coarse_parse 01675_data_type_coroutine 01676_clickhouse_client_autocomplete