diff --git a/docs/en/engines/table-engines/special/distributed.md b/docs/en/engines/table-engines/special/distributed.md index 7fffa962480..f7b77da7294 100644 --- a/docs/en/engines/table-engines/special/distributed.md +++ b/docs/en/engines/table-engines/special/distributed.md @@ -31,6 +31,13 @@ Also it accept the following settings: - `fsync_directories` - do the `fsync` for directories. Guarantees that the OS refreshed directory metadata after operations related to asynchronous inserts on Distributed table (after insert, after sending the data to shard, etc). +- `bytes_to_throw_insert` - if more than this number of compressed bytes will be pending for async INSERT, an exception will be thrown. 0 - do not throw. + + See also: + + - [insert_distributed_sync](../../../operations/settings/settings.md#insert_distributed_sync) setting + - [prefer_localhost_replica](../../../operations/settings/settings.md#settings-prefer-localhost-replica) setting + !!! note "Note" **Durability settings** (`fsync_...`): diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index f81e377da2b..fa921ef7c1c 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -540,6 +540,7 @@ M(571, DATABASE_REPLICATION_FAILED) \ M(572, TOO_MANY_QUERY_PLAN_OPTIMIZATIONS) \ M(573, EPOLL_ERROR) \ + M(574, DISTRIBUTED_TOO_MANY_PENDING_BYTES) \ \ M(999, KEEPER_EXCEPTION) \ M(1000, POCO_EXCEPTION) \ diff --git a/src/Storages/Distributed/DistributedBlockOutputStream.cpp b/src/Storages/Distributed/DistributedBlockOutputStream.cpp index 51bd6d83105..2c185e76dbd 100644 --- a/src/Storages/Distributed/DistributedBlockOutputStream.cpp +++ b/src/Storages/Distributed/DistributedBlockOutputStream.cpp @@ -628,6 +628,8 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std:: /// and keep monitor thread out from reading incomplete data std::string first_file_tmp_path; + storage.throwInsertIfNeeded(); + auto reservation = storage.getStoragePolicy()->reserveAndCheck(block.bytes()); const auto disk = reservation->getDisk(); auto disk_path = disk->getPath(); diff --git a/src/Storages/Distributed/DistributedSettings.h b/src/Storages/Distributed/DistributedSettings.h index 9df787428df..0ad966ab913 100644 --- a/src/Storages/Distributed/DistributedSettings.h +++ b/src/Storages/Distributed/DistributedSettings.h @@ -17,6 +17,8 @@ class ASTStorage; #define LIST_OF_DISTRIBUTED_SETTINGS(M) \ M(Bool, fsync_after_insert, false, "Do fsync for every inserted. Will decreases performance of inserts (only for async INSERT, i.e. insert_distributed_sync=false)", 0) \ M(Bool, fsync_directories, false, "Do fsync for temporary directory (that is used for async INSERT only) after all part operations (writes, renames, etc.).", 0) \ + /** Inserts settings. */ \ + M(UInt64, bytes_to_throw_insert, 0, "If more than this number of compressed bytes will be pending for async INSERT, an exception will be thrown. 0 - do not throw.", 0) \ DECLARE_SETTINGS_TRAITS(DistributedSettingsTraits, LIST_OF_DISTRIBUTED_SETTINGS) diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 804cd99ae19..1d3f5968536 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -85,6 +86,7 @@ namespace ErrorCodes extern const int UNABLE_TO_SKIP_UNUSED_SHARDS; extern const int INVALID_SHARD_ID; extern const int ALTER_OF_COLUMN_IS_FORBIDDEN; + extern const int DISTRIBUTED_TOO_MANY_PENDING_BYTES; } namespace ActionLocks @@ -975,6 +977,21 @@ void StorageDistributed::renameOnDisk(const String & new_path_to_table_data) relative_data_path = new_path_to_table_data; } +void StorageDistributed::throwInsertIfNeeded() const +{ + if (!distributed_settings.bytes_to_throw_insert) + return; + + /// TODO: update the counters + UInt64 total_bytes = *totalBytes(global_context.getSettingsRef()); + if (total_bytes > distributed_settings.bytes_to_throw_insert) + { + throw Exception(ErrorCodes::DISTRIBUTED_TOO_MANY_PENDING_BYTES, + "Too many bytes pending for async INSERT: {} (bytes_to_throw_insert={})", + formatReadableSizeWithBinarySuffix(total_bytes), + formatReadableSizeWithBinarySuffix(distributed_settings.bytes_to_throw_insert)); + } +} void registerStorageDistributed(StorageFactory & factory) { diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index 0a1ec5f4503..7e5b8c24ce8 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -174,6 +174,8 @@ private: const DistributedSettings & getDistributedSettingsRef() const { return distributed_settings; } + void throwInsertIfNeeded() const; + private: String remote_database; String remote_table; diff --git a/tests/queries/0_stateless/01670_distributed_bytes_to_throw_insert.reference b/tests/queries/0_stateless/01670_distributed_bytes_to_throw_insert.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/01670_distributed_bytes_to_throw_insert.sql b/tests/queries/0_stateless/01670_distributed_bytes_to_throw_insert.sql new file mode 100644 index 00000000000..1415c895290 --- /dev/null +++ b/tests/queries/0_stateless/01670_distributed_bytes_to_throw_insert.sql @@ -0,0 +1,14 @@ +drop table if exists dist_01670; +drop table if exists data_01670; + +create table data_01670 (key Int) engine=Null(); +create table dist_01670 (key Int) engine=Distributed(test_shard_localhost, currentDatabase(), data_01670) settings bytes_to_throw_insert=1; +system stop distributed sends dist_01670; +-- first batch is always OK, since there is no pending bytes yet +insert into dist_01670 select * from numbers(1) settings prefer_localhost_replica=0; +-- second will fail, because of bytes_to_throw_insert=1 +-- (previous block definitelly takes more, since it has header) +insert into dist_01670 select * from numbers(1) settings prefer_localhost_replica=0; -- { serverError 574 } +system flush distributed dist_01670; +drop table dist_01670; +drop table data_01670; diff --git a/tests/queries/0_stateless/arcadia_skip_list.txt b/tests/queries/0_stateless/arcadia_skip_list.txt index a823ba38c1c..59f0aa37e49 100644 --- a/tests/queries/0_stateless/arcadia_skip_list.txt +++ b/tests/queries/0_stateless/arcadia_skip_list.txt @@ -197,6 +197,7 @@ 01181_db_atomic_drop_on_cluster 01658_test_base64Encode_mysql_compatibility 01659_test_base64Decode_mysql_compatibility +01670_distributed_bytes_to_throw_insert 01674_htm_xml_coarse_parse 01675_data_type_coroutine 01676_clickhouse_client_autocomplete