mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-17 20:02:05 +00:00
max distributed depth
Add settings max_distributed_depth fix style fix fix fix fix test fix fix
This commit is contained in:
parent
95c87d4ded
commit
56073db22d
@ -547,6 +547,7 @@
|
||||
M(577, INVALID_SHARD_ID) \
|
||||
M(578, INVALID_FORMAT_INSERT_QUERY_WITH_DATA) \
|
||||
M(579, INCORRECT_PART_TYPE) \
|
||||
M(580, TOO_LARGE_DISTRIBUTED_DEPTH) \
|
||||
\
|
||||
M(998, POSTGRESQL_CONNECTION_FAILURE) \
|
||||
M(999, KEEPER_EXCEPTION) \
|
||||
|
@ -59,6 +59,7 @@
|
||||
#define DBMS_MIN_REVISION_WITH_VERSION_PATCH 54401
|
||||
#define DBMS_MIN_REVISION_WITH_SERVER_LOGS 54406
|
||||
#define DBMS_MIN_REVISION_WITH_CLIENT_SUPPORT_EMBEDDED_DATA 54415
|
||||
#define DBMS_MIN_REVISION_WITH_DISTRIBUTED_DEPTH 54417
|
||||
/// Minimum revision with exactly the same set of aggregation methods and rules to select them.
|
||||
/// Two-level (bucketed) aggregation is incompatible if servers are inconsistent in these rules
|
||||
/// (keys will be placed in different buckets and result will not be fully aggregated).
|
||||
|
@ -435,6 +435,7 @@ class IColumn;
|
||||
M(Bool, allow_experimental_database_replicated, false, "Allow to create databases with Replicated engine", 0) \
|
||||
M(UInt64, database_replicated_initial_query_timeout_sec, 300, "How long initial DDL query should wait for Replicated database to precess previous DDL queue entries", 0) \
|
||||
M(Bool, database_replicated_ddl_output, true, "Return table with query execution status as a result of DDL query", 0) \
|
||||
M(UInt64, max_distributed_depth, 5, "Maximum distributed query depth", 0) \
|
||||
\
|
||||
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
|
||||
\
|
||||
|
@ -60,6 +60,9 @@ void ClientInfo::write(WriteBuffer & out, const UInt64 server_protocol_revision)
|
||||
if (server_protocol_revision >= DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO)
|
||||
writeBinary(quota_key, out);
|
||||
|
||||
if (server_protocol_revision >= DBMS_MIN_REVISION_WITH_DISTRIBUTED_DEPTH)
|
||||
writeBinary(distributed_depth, out);
|
||||
|
||||
if (interface == Interface::TCP)
|
||||
{
|
||||
if (server_protocol_revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH)
|
||||
@ -137,6 +140,9 @@ void ClientInfo::read(ReadBuffer & in, const UInt64 client_protocol_revision)
|
||||
if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO)
|
||||
readBinary(quota_key, in);
|
||||
|
||||
if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_DISTRIBUTED_DEPTH)
|
||||
readBinary(distributed_depth, in);
|
||||
|
||||
if (interface == Interface::TCP)
|
||||
{
|
||||
if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH)
|
||||
|
@ -96,6 +96,8 @@ public:
|
||||
/// Common
|
||||
String quota_key;
|
||||
|
||||
UInt64 distributed_depth = 0;
|
||||
|
||||
bool empty() const { return query_kind == QueryKind::NO_QUERY; }
|
||||
|
||||
/** Serialization and deserialization.
|
||||
|
@ -16,6 +16,11 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int TOO_LARGE_DISTRIBUTED_DEPTH;
|
||||
}
|
||||
|
||||
namespace ClusterProxy
|
||||
{
|
||||
|
||||
@ -92,6 +97,9 @@ void executeQuery(
|
||||
|
||||
const Settings & settings = context.getSettingsRef();
|
||||
|
||||
if (settings.max_distributed_depth && context.getClientInfo().distributed_depth > settings.max_distributed_depth)
|
||||
throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH);
|
||||
|
||||
std::vector<QueryPlanPtr> plans;
|
||||
Pipes remote_pipes;
|
||||
Pipes delayed_pipes;
|
||||
@ -100,6 +108,8 @@ void executeQuery(
|
||||
|
||||
auto new_context = updateSettingsForCluster(*query_info.cluster, context, settings, log);
|
||||
|
||||
new_context->getClientInfo().distributed_depth += 1;
|
||||
|
||||
ThrottlerPtr user_level_throttler;
|
||||
if (auto * process_list_element = context.getProcessListElement())
|
||||
user_level_throttler = process_list_element->getUserNetworkThrottler();
|
||||
|
@ -58,6 +58,7 @@ namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int TIMEOUT_EXCEEDED;
|
||||
extern const int TOO_LARGE_DISTRIBUTED_DEPTH;
|
||||
}
|
||||
|
||||
static Block adoptBlock(const Block & header, const Block & block, Poco::Logger * log)
|
||||
@ -93,7 +94,7 @@ DistributedBlockOutputStream::DistributedBlockOutputStream(
|
||||
const ClusterPtr & cluster_,
|
||||
bool insert_sync_,
|
||||
UInt64 insert_timeout_)
|
||||
: context(context_)
|
||||
: context(std::make_unique<Context>(context_))
|
||||
, storage(storage_)
|
||||
, metadata_snapshot(metadata_snapshot_)
|
||||
, query_ast(query_ast_)
|
||||
@ -103,6 +104,10 @@ DistributedBlockOutputStream::DistributedBlockOutputStream(
|
||||
, insert_timeout(insert_timeout_)
|
||||
, log(&Poco::Logger::get("DistributedBlockOutputStream"))
|
||||
{
|
||||
const auto & settings = context->getSettingsRef();
|
||||
if (settings.max_distributed_depth && context->getClientInfo().distributed_depth > settings.max_distributed_depth)
|
||||
throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH);
|
||||
context->getClientInfo().distributed_depth += 1;
|
||||
}
|
||||
|
||||
|
||||
@ -143,7 +148,7 @@ void DistributedBlockOutputStream::write(const Block & block)
|
||||
|
||||
void DistributedBlockOutputStream::writeAsync(const Block & block)
|
||||
{
|
||||
const Settings & settings = context.getSettingsRef();
|
||||
const Settings & settings = context->getSettingsRef();
|
||||
bool random_shard_insert = settings.insert_distributed_one_random_shard && !storage.has_sharding_key;
|
||||
|
||||
if (random_shard_insert)
|
||||
@ -194,7 +199,7 @@ std::string DistributedBlockOutputStream::getCurrentStateDescription()
|
||||
|
||||
void DistributedBlockOutputStream::initWritingJobs(const Block & first_block, size_t start, size_t end)
|
||||
{
|
||||
const Settings & settings = context.getSettingsRef();
|
||||
const Settings & settings = context->getSettingsRef();
|
||||
const auto & addresses_with_failovers = cluster->getShardsAddresses();
|
||||
const auto & shards_info = cluster->getShardsInfo();
|
||||
size_t num_shards = end - start;
|
||||
@ -303,7 +308,7 @@ DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobRep
|
||||
}
|
||||
|
||||
const Block & shard_block = (num_shards > 1) ? job.current_shard_block : current_block;
|
||||
const Settings & settings = context.getSettingsRef();
|
||||
const Settings & settings = context->getSettingsRef();
|
||||
|
||||
/// Do not initiate INSERT for empty block.
|
||||
if (shard_block.rows() == 0)
|
||||
@ -343,7 +348,8 @@ DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobRep
|
||||
if (throttler)
|
||||
job.connection_entry->setThrottler(throttler);
|
||||
|
||||
job.stream = std::make_shared<RemoteBlockOutputStream>(*job.connection_entry, timeouts, query_string, settings, context.getClientInfo());
|
||||
job.stream = std::make_shared<RemoteBlockOutputStream>(
|
||||
*job.connection_entry, timeouts, query_string, settings, context->getClientInfo());
|
||||
job.stream->writePrefix();
|
||||
}
|
||||
|
||||
@ -357,7 +363,7 @@ DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobRep
|
||||
if (!job.stream)
|
||||
{
|
||||
/// Forward user settings
|
||||
job.local_context = std::make_unique<Context>(context);
|
||||
job.local_context = std::make_unique<Context>(*context);
|
||||
|
||||
/// Copying of the query AST is required to avoid race,
|
||||
/// in case of INSERT into multiple local shards.
|
||||
@ -385,7 +391,7 @@ DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobRep
|
||||
|
||||
void DistributedBlockOutputStream::writeSync(const Block & block)
|
||||
{
|
||||
const Settings & settings = context.getSettingsRef();
|
||||
const Settings & settings = context->getSettingsRef();
|
||||
const auto & shards_info = cluster->getShardsInfo();
|
||||
bool random_shard_insert = settings.insert_distributed_one_random_shard && !storage.has_sharding_key;
|
||||
size_t start = 0;
|
||||
@ -562,7 +568,7 @@ void DistributedBlockOutputStream::writeSplitAsync(const Block & block)
|
||||
void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, size_t shard_id)
|
||||
{
|
||||
const auto & shard_info = cluster->getShardsInfo()[shard_id];
|
||||
const auto & settings = context.getSettingsRef();
|
||||
const auto & settings = context->getSettingsRef();
|
||||
|
||||
if (shard_info.hasInternalReplication())
|
||||
{
|
||||
@ -598,7 +604,7 @@ void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, size_t sh
|
||||
void DistributedBlockOutputStream::writeToLocal(const Block & block, size_t repeats)
|
||||
{
|
||||
/// Async insert does not support settings forwarding yet whereas sync one supports
|
||||
InterpreterInsertQuery interp(query_ast, context);
|
||||
InterpreterInsertQuery interp(query_ast, *context);
|
||||
|
||||
auto block_io = interp.execute();
|
||||
|
||||
@ -610,7 +616,7 @@ void DistributedBlockOutputStream::writeToLocal(const Block & block, size_t repe
|
||||
|
||||
void DistributedBlockOutputStream::writeToShard(const Block & block, const std::vector<std::string> & dir_names)
|
||||
{
|
||||
const auto & settings = context.getSettingsRef();
|
||||
const auto & settings = context->getSettingsRef();
|
||||
const auto & distributed_settings = storage.getDistributedSettingsRef();
|
||||
|
||||
bool fsync = distributed_settings.fsync_after_insert;
|
||||
@ -675,8 +681,8 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
|
||||
WriteBufferFromOwnString header_buf;
|
||||
writeVarUInt(DBMS_TCP_PROTOCOL_VERSION, header_buf);
|
||||
writeStringBinary(query_string, header_buf);
|
||||
context.getSettingsRef().write(header_buf);
|
||||
context.getClientInfo().write(header_buf, DBMS_TCP_PROTOCOL_VERSION);
|
||||
context->getSettingsRef().write(header_buf);
|
||||
context->getClientInfo().write(header_buf, DBMS_TCP_PROTOCOL_VERSION);
|
||||
writeVarUInt(block.rows(), header_buf);
|
||||
writeVarUInt(block.bytes(), header_buf);
|
||||
writeStringBinary(block.cloneEmpty().dumpStructure(), header_buf);
|
||||
@ -724,7 +730,7 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
|
||||
Poco::File(first_file_tmp_path).remove();
|
||||
|
||||
/// Notify
|
||||
auto sleep_ms = context.getSettingsRef().distributed_directory_monitor_sleep_time_ms;
|
||||
auto sleep_ms = context->getSettingsRef().distributed_directory_monitor_sleep_time_ms;
|
||||
for (const auto & dir_name : dir_names)
|
||||
{
|
||||
auto & directory_monitor = storage.requireDirectoryMonitor(disk, dir_name);
|
||||
@ -732,5 +738,4 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -84,7 +84,7 @@ private:
|
||||
std::string getCurrentStateDescription();
|
||||
|
||||
private:
|
||||
const Context & context;
|
||||
std::unique_ptr<Context> context;
|
||||
StorageDistributed & storage;
|
||||
StorageMetadataPtr metadata_snapshot;
|
||||
ASTPtr query_ast;
|
||||
|
@ -5,13 +5,13 @@ DROP TABLE IF EXISTS distr2;
|
||||
CREATE TABLE distr (x UInt8) ENGINE = Distributed(test_shard_localhost, currentDatabase(), distr); -- { serverError 269 }
|
||||
|
||||
CREATE TABLE distr0 (x UInt8) ENGINE = Distributed(test_shard_localhost, '', distr0);
|
||||
SELECT * FROM distr0; -- { serverError 306 }
|
||||
SELECT * FROM distr0; -- { serverError 580 }
|
||||
|
||||
CREATE TABLE distr1 (x UInt8) ENGINE = Distributed(test_shard_localhost, currentDatabase(), distr2);
|
||||
CREATE TABLE distr2 (x UInt8) ENGINE = Distributed(test_shard_localhost, currentDatabase(), distr1);
|
||||
|
||||
SELECT * FROM distr1; -- { serverError 306 }
|
||||
SELECT * FROM distr2; -- { serverError 306 }
|
||||
SELECT * FROM distr1; -- { serverError 580 }
|
||||
SELECT * FROM distr2; -- { serverError 580 }
|
||||
|
||||
DROP TABLE distr0;
|
||||
DROP TABLE distr1;
|
||||
|
@ -23,7 +23,7 @@ set is_done 0
|
||||
while {$is_done == 0} {
|
||||
send -- "\t"
|
||||
expect {
|
||||
"_connections" {
|
||||
"_" {
|
||||
set is_done 1
|
||||
}
|
||||
default {
|
||||
|
26
tests/queries/0_stateless/01763_max_distributed_depth.sql
Normal file
26
tests/queries/0_stateless/01763_max_distributed_depth.sql
Normal file
@ -0,0 +1,26 @@
|
||||
DROP TABLE IF EXISTS tt6;
|
||||
|
||||
CREATE TABLE tt6
|
||||
(
|
||||
`id` UInt32,
|
||||
`first_column` UInt32,
|
||||
`second_column` UInt32,
|
||||
`third_column` UInt32,
|
||||
`status` String
|
||||
|
||||
)
|
||||
ENGINE = Distributed('test_shard_localhost', '', 'tt6', rand());
|
||||
|
||||
INSERT INTO tt6 VALUES (1, 1, 1, 1, 'ok'); -- { serverError 580 }
|
||||
|
||||
SELECT * FROM tt6; -- { serverError 580 }
|
||||
|
||||
SET max_distributed_depth = 0;
|
||||
|
||||
-- stack overflow
|
||||
INSERT INTO tt6 VALUES (1, 1, 1, 1, 'ok'); -- { serverError 306}
|
||||
|
||||
-- stack overflow
|
||||
SELECT * FROM tt6; -- { serverError 306 }
|
||||
|
||||
DROP TABLE tt6;
|
Loading…
Reference in New Issue
Block a user