Merge pull request #21942 from ucasFL/distributed_depth

Add settings max_distributed_depth
This commit is contained in:
Alexander Kuzmenkov 2021-04-09 15:52:58 +03:00 committed by GitHub
commit 0264124146
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 72 additions and 20 deletions

View File

@ -548,6 +548,7 @@
M(578, INVALID_FORMAT_INSERT_QUERY_WITH_DATA) \
M(579, INCORRECT_PART_TYPE) \
M(580, CANNOT_SET_ROUNDING_MODE) \
M(581, TOO_LARGE_DISTRIBUTED_DEPTH) \
\
M(998, POSTGRESQL_CONNECTION_FAILURE) \
M(999, KEEPER_EXCEPTION) \

View File

@ -81,8 +81,9 @@
#define DBMS_MIN_REVISION_WITH_REFERER_IN_CLIENT_INFO 54447
/// Version of ClickHouse TCP protocol. Increment it manually when you change the protocol.
#define DBMS_TCP_PROTOCOL_VERSION 54447
#define DBMS_TCP_PROTOCOL_VERSION 54448
#define DBMS_MIN_PROTOCOL_VERSION_WITH_DISTRIBUTED_DEPTH 54448
/// The boundary on which the blocks for asynchronous file operations should be aligned.
#define DEFAULT_AIO_FILE_BLOCK_SIZE 4096

View File

@ -441,6 +441,7 @@ class IColumn;
M(Bool, engine_file_truncate_on_insert, false, "Enables or disables truncate before insert in file engine tables", 0) \
M(Bool, allow_experimental_database_replicated, false, "Allow to create databases with Replicated engine", 0) \
M(UInt64, database_replicated_initial_query_timeout_sec, 300, "How long initial DDL query should wait for Replicated database to precess previous DDL queue entries", 0) \
M(UInt64, max_distributed_depth, 5, "Maximum distributed query depth", 0) \
M(Bool, database_replicated_always_detach_permanently, false, "Execute DETACH TABLE as DETACH TABLE PERMANENTLY if database engine is Replicated", 0) \
M(DistributedDDLOutputMode, distributed_ddl_output_mode, DistributedDDLOutputMode::THROW, "Format of distributed DDL query result", 0) \
M(UInt64, distributed_ddl_entry_format_version, 1, "Version of DDL entry to write into ZooKeeper", 0) \

View File

@ -60,6 +60,9 @@ void ClientInfo::write(WriteBuffer & out, const UInt64 server_protocol_revision)
if (server_protocol_revision >= DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO)
writeBinary(quota_key, out);
if (server_protocol_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_DISTRIBUTED_DEPTH)
writeVarUInt(distributed_depth, out);
if (interface == Interface::TCP)
{
if (server_protocol_revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH)
@ -137,6 +140,9 @@ void ClientInfo::read(ReadBuffer & in, const UInt64 client_protocol_revision)
if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO)
readBinary(quota_key, in);
if (client_protocol_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_DISTRIBUTED_DEPTH)
readVarUInt(distributed_depth, in);
if (interface == Interface::TCP)
{
if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH)

View File

@ -96,6 +96,8 @@ public:
/// Common
String quota_key;
UInt64 distributed_depth = 0;
bool empty() const { return query_kind == QueryKind::NO_QUERY; }
/** Serialization and deserialization.

View File

@ -16,6 +16,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int TOO_LARGE_DISTRIBUTED_DEPTH;
}
namespace ClusterProxy
{
@ -92,6 +97,9 @@ void executeQuery(
const Settings & settings = context.getSettingsRef();
if (settings.max_distributed_depth && context.getClientInfo().distributed_depth > settings.max_distributed_depth)
throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH);
std::vector<QueryPlanPtr> plans;
Pipes remote_pipes;
Pipes delayed_pipes;
@ -100,6 +108,8 @@ void executeQuery(
auto new_context = updateSettingsForCluster(*query_info.cluster, context, settings, log);
new_context->getClientInfo().distributed_depth += 1;
ThrottlerPtr user_level_throttler;
if (auto * process_list_element = context.getProcessListElement())
user_level_throttler = process_list_element->getUserNetworkThrottler();

View File

@ -58,6 +58,7 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int TIMEOUT_EXCEEDED;
extern const int TOO_LARGE_DISTRIBUTED_DEPTH;
}
static Block adoptBlock(const Block & header, const Block & block, Poco::Logger * log)
@ -93,7 +94,7 @@ DistributedBlockOutputStream::DistributedBlockOutputStream(
const ClusterPtr & cluster_,
bool insert_sync_,
UInt64 insert_timeout_)
: context(context_)
: context(std::make_unique<Context>(context_))
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, query_ast(query_ast_)
@ -103,6 +104,10 @@ DistributedBlockOutputStream::DistributedBlockOutputStream(
, insert_timeout(insert_timeout_)
, log(&Poco::Logger::get("DistributedBlockOutputStream"))
{
const auto & settings = context->getSettingsRef();
if (settings.max_distributed_depth && context->getClientInfo().distributed_depth > settings.max_distributed_depth)
throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH);
context->getClientInfo().distributed_depth += 1;
}
@ -143,7 +148,7 @@ void DistributedBlockOutputStream::write(const Block & block)
void DistributedBlockOutputStream::writeAsync(const Block & block)
{
const Settings & settings = context.getSettingsRef();
const Settings & settings = context->getSettingsRef();
bool random_shard_insert = settings.insert_distributed_one_random_shard && !storage.has_sharding_key;
if (random_shard_insert)
@ -194,7 +199,7 @@ std::string DistributedBlockOutputStream::getCurrentStateDescription()
void DistributedBlockOutputStream::initWritingJobs(const Block & first_block, size_t start, size_t end)
{
const Settings & settings = context.getSettingsRef();
const Settings & settings = context->getSettingsRef();
const auto & addresses_with_failovers = cluster->getShardsAddresses();
const auto & shards_info = cluster->getShardsInfo();
size_t num_shards = end - start;
@ -303,7 +308,7 @@ DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobRep
}
const Block & shard_block = (num_shards > 1) ? job.current_shard_block : current_block;
const Settings & settings = context.getSettingsRef();
const Settings & settings = context->getSettingsRef();
/// Do not initiate INSERT for empty block.
if (shard_block.rows() == 0)
@ -343,7 +348,8 @@ DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobRep
if (throttler)
job.connection_entry->setThrottler(throttler);
job.stream = std::make_shared<RemoteBlockOutputStream>(*job.connection_entry, timeouts, query_string, settings, context.getClientInfo());
job.stream = std::make_shared<RemoteBlockOutputStream>(
*job.connection_entry, timeouts, query_string, settings, context->getClientInfo());
job.stream->writePrefix();
}
@ -357,7 +363,7 @@ DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobRep
if (!job.stream)
{
/// Forward user settings
job.local_context = std::make_unique<Context>(context);
job.local_context = std::make_unique<Context>(*context);
/// Copying of the query AST is required to avoid race,
/// in case of INSERT into multiple local shards.
@ -385,7 +391,7 @@ DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobRep
void DistributedBlockOutputStream::writeSync(const Block & block)
{
const Settings & settings = context.getSettingsRef();
const Settings & settings = context->getSettingsRef();
const auto & shards_info = cluster->getShardsInfo();
bool random_shard_insert = settings.insert_distributed_one_random_shard && !storage.has_sharding_key;
size_t start = 0;
@ -562,7 +568,7 @@ void DistributedBlockOutputStream::writeSplitAsync(const Block & block)
void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, size_t shard_id)
{
const auto & shard_info = cluster->getShardsInfo()[shard_id];
const auto & settings = context.getSettingsRef();
const auto & settings = context->getSettingsRef();
if (shard_info.hasInternalReplication())
{
@ -598,7 +604,7 @@ void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, size_t sh
void DistributedBlockOutputStream::writeToLocal(const Block & block, size_t repeats)
{
/// Async insert does not support settings forwarding yet whereas sync one supports
InterpreterInsertQuery interp(query_ast, context);
InterpreterInsertQuery interp(query_ast, *context);
auto block_io = interp.execute();
@ -610,7 +616,7 @@ void DistributedBlockOutputStream::writeToLocal(const Block & block, size_t repe
void DistributedBlockOutputStream::writeToShard(const Block & block, const std::vector<std::string> & dir_names)
{
const auto & settings = context.getSettingsRef();
const auto & settings = context->getSettingsRef();
const auto & distributed_settings = storage.getDistributedSettingsRef();
bool fsync = distributed_settings.fsync_after_insert;
@ -675,8 +681,8 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
WriteBufferFromOwnString header_buf;
writeVarUInt(DBMS_TCP_PROTOCOL_VERSION, header_buf);
writeStringBinary(query_string, header_buf);
context.getSettingsRef().write(header_buf);
context.getClientInfo().write(header_buf, DBMS_TCP_PROTOCOL_VERSION);
context->getSettingsRef().write(header_buf);
context->getClientInfo().write(header_buf, DBMS_TCP_PROTOCOL_VERSION);
writeVarUInt(block.rows(), header_buf);
writeVarUInt(block.bytes(), header_buf);
writeStringBinary(block.cloneEmpty().dumpStructure(), header_buf); /// obsolete
@ -730,7 +736,7 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
Poco::File(first_file_tmp_path).remove();
/// Notify
auto sleep_ms = context.getSettingsRef().distributed_directory_monitor_sleep_time_ms;
auto sleep_ms = context->getSettingsRef().distributed_directory_monitor_sleep_time_ms;
for (const auto & dir_name : dir_names)
{
auto & directory_monitor = storage.requireDirectoryMonitor(disk, dir_name);
@ -738,5 +744,4 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
}
}
}

View File

@ -84,7 +84,7 @@ private:
std::string getCurrentStateDescription();
private:
const Context & context;
std::unique_ptr<Context> context;
StorageDistributed & storage;
StorageMetadataPtr metadata_snapshot;
ASTPtr query_ast;

View File

@ -5,13 +5,13 @@ DROP TABLE IF EXISTS distr2;
CREATE TABLE distr (x UInt8) ENGINE = Distributed(test_shard_localhost, currentDatabase(), distr); -- { serverError 269 }
CREATE TABLE distr0 (x UInt8) ENGINE = Distributed(test_shard_localhost, '', distr0);
SELECT * FROM distr0; -- { serverError 306 }
SELECT * FROM distr0; -- { serverError 581 }
CREATE TABLE distr1 (x UInt8) ENGINE = Distributed(test_shard_localhost, currentDatabase(), distr2);
CREATE TABLE distr2 (x UInt8) ENGINE = Distributed(test_shard_localhost, currentDatabase(), distr1);
SELECT * FROM distr1; -- { serverError 306 }
SELECT * FROM distr2; -- { serverError 306 }
SELECT * FROM distr1; -- { serverError 581 }
SELECT * FROM distr2; -- { serverError 581 }
DROP TABLE distr0;
DROP TABLE distr1;

View File

@ -23,7 +23,7 @@ set is_done 0
while {$is_done == 0} {
send -- "\t"
expect {
"_connections" {
"_" {
set is_done 1
}
default {

View File

@ -0,0 +1,26 @@
DROP TABLE IF EXISTS tt6;
CREATE TABLE tt6
(
`id` UInt32,
`first_column` UInt32,
`second_column` UInt32,
`third_column` UInt32,
`status` String
)
ENGINE = Distributed('test_shard_localhost', '', 'tt6', rand());
INSERT INTO tt6 VALUES (1, 1, 1, 1, 'ok'); -- { serverError 581 }
SELECT * FROM tt6; -- { serverError 581 }
SET max_distributed_depth = 0;
-- stack overflow
INSERT INTO tt6 VALUES (1, 1, 1, 1, 'ok'); -- { serverError 306}
-- stack overflow
SELECT * FROM tt6; -- { serverError 306 }
DROP TABLE tt6;