From 56073db22d65cef453cfff0758235db012ee626a Mon Sep 17 00:00:00 2001 From: feng lv Date: Sat, 20 Mar 2021 12:10:22 +0000 Subject: [PATCH] max distributed depth Add settings max_distributed_depth fix style fix fix fix fix test fix fix --- src/Common/ErrorCodes.cpp | 1 + src/Core/Defines.h | 1 + src/Core/Settings.h | 1 + src/Interpreters/ClientInfo.cpp | 6 ++++ src/Interpreters/ClientInfo.h | 2 ++ .../ClusterProxy/executeQuery.cpp | 10 ++++++ .../DistributedBlockOutputStream.cpp | 33 +++++++++++-------- .../DistributedBlockOutputStream.h | 2 +- .../00987_distributed_stack_overflow.sql | 6 ++-- ..._autocomplete_word_break_characters.expect | 2 +- .../01763_max_distributed_depth.reference | 0 .../01763_max_distributed_depth.sql | 26 +++++++++++++++ 12 files changed, 71 insertions(+), 19 deletions(-) create mode 100644 tests/queries/0_stateless/01763_max_distributed_depth.reference create mode 100644 tests/queries/0_stateless/01763_max_distributed_depth.sql diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 586c0fbde4d..8d8a650def1 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -547,6 +547,7 @@ M(577, INVALID_SHARD_ID) \ M(578, INVALID_FORMAT_INSERT_QUERY_WITH_DATA) \ M(579, INCORRECT_PART_TYPE) \ + M(580, TOO_LARGE_DISTRIBUTED_DEPTH) \ \ M(998, POSTGRESQL_CONNECTION_FAILURE) \ M(999, KEEPER_EXCEPTION) \ diff --git a/src/Core/Defines.h b/src/Core/Defines.h index 468c1187e91..4751c9832c7 100644 --- a/src/Core/Defines.h +++ b/src/Core/Defines.h @@ -59,6 +59,7 @@ #define DBMS_MIN_REVISION_WITH_VERSION_PATCH 54401 #define DBMS_MIN_REVISION_WITH_SERVER_LOGS 54406 #define DBMS_MIN_REVISION_WITH_CLIENT_SUPPORT_EMBEDDED_DATA 54415 +#define DBMS_MIN_REVISION_WITH_DISTRIBUTED_DEPTH 54417 /// Minimum revision with exactly the same set of aggregation methods and rules to select them. /// Two-level (bucketed) aggregation is incompatible if servers are inconsistent in these rules /// (keys will be placed in different buckets and result will not be fully aggregated). diff --git a/src/Core/Settings.h b/src/Core/Settings.h index cf7bda7d1a1..0af2c23b659 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -435,6 +435,7 @@ class IColumn; M(Bool, allow_experimental_database_replicated, false, "Allow to create databases with Replicated engine", 0) \ M(UInt64, database_replicated_initial_query_timeout_sec, 300, "How long initial DDL query should wait for Replicated database to precess previous DDL queue entries", 0) \ M(Bool, database_replicated_ddl_output, true, "Return table with query execution status as a result of DDL query", 0) \ + M(UInt64, max_distributed_depth, 5, "Maximum distributed query depth", 0) \ \ /** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \ \ diff --git a/src/Interpreters/ClientInfo.cpp b/src/Interpreters/ClientInfo.cpp index 5449f397f49..84aac46f449 100644 --- a/src/Interpreters/ClientInfo.cpp +++ b/src/Interpreters/ClientInfo.cpp @@ -60,6 +60,9 @@ void ClientInfo::write(WriteBuffer & out, const UInt64 server_protocol_revision) if (server_protocol_revision >= DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO) writeBinary(quota_key, out); + if (server_protocol_revision >= DBMS_MIN_REVISION_WITH_DISTRIBUTED_DEPTH) + writeBinary(distributed_depth, out); + if (interface == Interface::TCP) { if (server_protocol_revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH) @@ -137,6 +140,9 @@ void ClientInfo::read(ReadBuffer & in, const UInt64 client_protocol_revision) if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO) readBinary(quota_key, in); + if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_DISTRIBUTED_DEPTH) + readBinary(distributed_depth, in); + if (interface == Interface::TCP) { if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH) diff --git a/src/Interpreters/ClientInfo.h b/src/Interpreters/ClientInfo.h index bc471dc3aa4..21aae45bfab 100644 --- a/src/Interpreters/ClientInfo.h +++ b/src/Interpreters/ClientInfo.h @@ -96,6 +96,8 @@ public: /// Common String quota_key; + UInt64 distributed_depth = 0; + bool empty() const { return query_kind == QueryKind::NO_QUERY; } /** Serialization and deserialization. diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index 59cbae67770..c75aba8a79c 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -16,6 +16,11 @@ namespace DB { +namespace ErrorCodes +{ + extern const int TOO_LARGE_DISTRIBUTED_DEPTH; +} + namespace ClusterProxy { @@ -92,6 +97,9 @@ void executeQuery( const Settings & settings = context.getSettingsRef(); + if (settings.max_distributed_depth && context.getClientInfo().distributed_depth > settings.max_distributed_depth) + throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH); + std::vector plans; Pipes remote_pipes; Pipes delayed_pipes; @@ -100,6 +108,8 @@ void executeQuery( auto new_context = updateSettingsForCluster(*query_info.cluster, context, settings, log); + new_context->getClientInfo().distributed_depth += 1; + ThrottlerPtr user_level_throttler; if (auto * process_list_element = context.getProcessListElement()) user_level_throttler = process_list_element->getUserNetworkThrottler(); diff --git a/src/Storages/Distributed/DistributedBlockOutputStream.cpp b/src/Storages/Distributed/DistributedBlockOutputStream.cpp index f8ba4221842..a81fed62f2b 100644 --- a/src/Storages/Distributed/DistributedBlockOutputStream.cpp +++ b/src/Storages/Distributed/DistributedBlockOutputStream.cpp @@ -58,6 +58,7 @@ namespace ErrorCodes { extern const int LOGICAL_ERROR; extern const int TIMEOUT_EXCEEDED; + extern const int TOO_LARGE_DISTRIBUTED_DEPTH; } static Block adoptBlock(const Block & header, const Block & block, Poco::Logger * log) @@ -93,7 +94,7 @@ DistributedBlockOutputStream::DistributedBlockOutputStream( const ClusterPtr & cluster_, bool insert_sync_, UInt64 insert_timeout_) - : context(context_) + : context(std::make_unique(context_)) , storage(storage_) , metadata_snapshot(metadata_snapshot_) , query_ast(query_ast_) @@ -103,6 +104,10 @@ DistributedBlockOutputStream::DistributedBlockOutputStream( , insert_timeout(insert_timeout_) , log(&Poco::Logger::get("DistributedBlockOutputStream")) { + const auto & settings = context->getSettingsRef(); + if (settings.max_distributed_depth && context->getClientInfo().distributed_depth > settings.max_distributed_depth) + throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH); + context->getClientInfo().distributed_depth += 1; } @@ -143,7 +148,7 @@ void DistributedBlockOutputStream::write(const Block & block) void DistributedBlockOutputStream::writeAsync(const Block & block) { - const Settings & settings = context.getSettingsRef(); + const Settings & settings = context->getSettingsRef(); bool random_shard_insert = settings.insert_distributed_one_random_shard && !storage.has_sharding_key; if (random_shard_insert) @@ -194,7 +199,7 @@ std::string DistributedBlockOutputStream::getCurrentStateDescription() void DistributedBlockOutputStream::initWritingJobs(const Block & first_block, size_t start, size_t end) { - const Settings & settings = context.getSettingsRef(); + const Settings & settings = context->getSettingsRef(); const auto & addresses_with_failovers = cluster->getShardsAddresses(); const auto & shards_info = cluster->getShardsInfo(); size_t num_shards = end - start; @@ -303,7 +308,7 @@ DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobRep } const Block & shard_block = (num_shards > 1) ? job.current_shard_block : current_block; - const Settings & settings = context.getSettingsRef(); + const Settings & settings = context->getSettingsRef(); /// Do not initiate INSERT for empty block. if (shard_block.rows() == 0) @@ -343,7 +348,8 @@ DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobRep if (throttler) job.connection_entry->setThrottler(throttler); - job.stream = std::make_shared(*job.connection_entry, timeouts, query_string, settings, context.getClientInfo()); + job.stream = std::make_shared( + *job.connection_entry, timeouts, query_string, settings, context->getClientInfo()); job.stream->writePrefix(); } @@ -357,7 +363,7 @@ DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobRep if (!job.stream) { /// Forward user settings - job.local_context = std::make_unique(context); + job.local_context = std::make_unique(*context); /// Copying of the query AST is required to avoid race, /// in case of INSERT into multiple local shards. @@ -385,7 +391,7 @@ DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobRep void DistributedBlockOutputStream::writeSync(const Block & block) { - const Settings & settings = context.getSettingsRef(); + const Settings & settings = context->getSettingsRef(); const auto & shards_info = cluster->getShardsInfo(); bool random_shard_insert = settings.insert_distributed_one_random_shard && !storage.has_sharding_key; size_t start = 0; @@ -562,7 +568,7 @@ void DistributedBlockOutputStream::writeSplitAsync(const Block & block) void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, size_t shard_id) { const auto & shard_info = cluster->getShardsInfo()[shard_id]; - const auto & settings = context.getSettingsRef(); + const auto & settings = context->getSettingsRef(); if (shard_info.hasInternalReplication()) { @@ -598,7 +604,7 @@ void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, size_t sh void DistributedBlockOutputStream::writeToLocal(const Block & block, size_t repeats) { /// Async insert does not support settings forwarding yet whereas sync one supports - InterpreterInsertQuery interp(query_ast, context); + InterpreterInsertQuery interp(query_ast, *context); auto block_io = interp.execute(); @@ -610,7 +616,7 @@ void DistributedBlockOutputStream::writeToLocal(const Block & block, size_t repe void DistributedBlockOutputStream::writeToShard(const Block & block, const std::vector & dir_names) { - const auto & settings = context.getSettingsRef(); + const auto & settings = context->getSettingsRef(); const auto & distributed_settings = storage.getDistributedSettingsRef(); bool fsync = distributed_settings.fsync_after_insert; @@ -675,8 +681,8 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std:: WriteBufferFromOwnString header_buf; writeVarUInt(DBMS_TCP_PROTOCOL_VERSION, header_buf); writeStringBinary(query_string, header_buf); - context.getSettingsRef().write(header_buf); - context.getClientInfo().write(header_buf, DBMS_TCP_PROTOCOL_VERSION); + context->getSettingsRef().write(header_buf); + context->getClientInfo().write(header_buf, DBMS_TCP_PROTOCOL_VERSION); writeVarUInt(block.rows(), header_buf); writeVarUInt(block.bytes(), header_buf); writeStringBinary(block.cloneEmpty().dumpStructure(), header_buf); @@ -724,7 +730,7 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std:: Poco::File(first_file_tmp_path).remove(); /// Notify - auto sleep_ms = context.getSettingsRef().distributed_directory_monitor_sleep_time_ms; + auto sleep_ms = context->getSettingsRef().distributed_directory_monitor_sleep_time_ms; for (const auto & dir_name : dir_names) { auto & directory_monitor = storage.requireDirectoryMonitor(disk, dir_name); @@ -732,5 +738,4 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std:: } } - } diff --git a/src/Storages/Distributed/DistributedBlockOutputStream.h b/src/Storages/Distributed/DistributedBlockOutputStream.h index ca57ad46fbb..8a1cef43f44 100644 --- a/src/Storages/Distributed/DistributedBlockOutputStream.h +++ b/src/Storages/Distributed/DistributedBlockOutputStream.h @@ -84,7 +84,7 @@ private: std::string getCurrentStateDescription(); private: - const Context & context; + std::unique_ptr context; StorageDistributed & storage; StorageMetadataPtr metadata_snapshot; ASTPtr query_ast; diff --git a/tests/queries/0_stateless/00987_distributed_stack_overflow.sql b/tests/queries/0_stateless/00987_distributed_stack_overflow.sql index 4baa6969b31..fe323ebb359 100644 --- a/tests/queries/0_stateless/00987_distributed_stack_overflow.sql +++ b/tests/queries/0_stateless/00987_distributed_stack_overflow.sql @@ -5,13 +5,13 @@ DROP TABLE IF EXISTS distr2; CREATE TABLE distr (x UInt8) ENGINE = Distributed(test_shard_localhost, currentDatabase(), distr); -- { serverError 269 } CREATE TABLE distr0 (x UInt8) ENGINE = Distributed(test_shard_localhost, '', distr0); -SELECT * FROM distr0; -- { serverError 306 } +SELECT * FROM distr0; -- { serverError 580 } CREATE TABLE distr1 (x UInt8) ENGINE = Distributed(test_shard_localhost, currentDatabase(), distr2); CREATE TABLE distr2 (x UInt8) ENGINE = Distributed(test_shard_localhost, currentDatabase(), distr1); -SELECT * FROM distr1; -- { serverError 306 } -SELECT * FROM distr2; -- { serverError 306 } +SELECT * FROM distr1; -- { serverError 580 } +SELECT * FROM distr2; -- { serverError 580 } DROP TABLE distr0; DROP TABLE distr1; diff --git a/tests/queries/0_stateless/01370_client_autocomplete_word_break_characters.expect b/tests/queries/0_stateless/01370_client_autocomplete_word_break_characters.expect index 50ef009dee9..a6d52b39918 100755 --- a/tests/queries/0_stateless/01370_client_autocomplete_word_break_characters.expect +++ b/tests/queries/0_stateless/01370_client_autocomplete_word_break_characters.expect @@ -23,7 +23,7 @@ set is_done 0 while {$is_done == 0} { send -- "\t" expect { - "_connections" { + "_" { set is_done 1 } default { diff --git a/tests/queries/0_stateless/01763_max_distributed_depth.reference b/tests/queries/0_stateless/01763_max_distributed_depth.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/01763_max_distributed_depth.sql b/tests/queries/0_stateless/01763_max_distributed_depth.sql new file mode 100644 index 00000000000..172a28ac13e --- /dev/null +++ b/tests/queries/0_stateless/01763_max_distributed_depth.sql @@ -0,0 +1,26 @@ +DROP TABLE IF EXISTS tt6; + +CREATE TABLE tt6 +( + `id` UInt32, + `first_column` UInt32, + `second_column` UInt32, + `third_column` UInt32, + `status` String + +) +ENGINE = Distributed('test_shard_localhost', '', 'tt6', rand()); + +INSERT INTO tt6 VALUES (1, 1, 1, 1, 'ok'); -- { serverError 580 } + +SELECT * FROM tt6; -- { serverError 580 } + +SET max_distributed_depth = 0; + +-- stack overflow +INSERT INTO tt6 VALUES (1, 1, 1, 1, 'ok'); -- { serverError 306} + +-- stack overflow +SELECT * FROM tt6; -- { serverError 306 } + +DROP TABLE tt6;