Merge pull request #57518 from ClickHouse/cleanup-around-distributed

Simple cleanup in distributed (while dealing with parallel replicas)
This commit is contained in:
Nikolai Kochetov 2023-12-06 10:22:17 +01:00 committed by GitHub
commit 718b542f40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 77 additions and 38 deletions

View File

@ -1,6 +1,6 @@
#pragma once
#include <time.h>
#include <ctime>
#include <cstdlib>
#include <climits>
#include <random>
@ -180,6 +180,7 @@ PoolWithFailoverBase<TNestedPool>::getShuffledPools(
shuffled_pools.reserve(nested_pools.size());
for (size_t i = 0; i < nested_pools.size(); ++i)
shuffled_pools.push_back(ShuffledPool{nested_pools[i].get(), &pool_states[i], i, /* error_count = */ 0, /* slowdown_count = */ 0});
::sort(
shuffled_pools.begin(), shuffled_pools.end(),
[](const ShuffledPool & lhs, const ShuffledPool & rhs)

View File

@ -488,8 +488,14 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG, "Unknown element in config: {}", replica_key);
}
addShard(settings, std::move(replica_addresses), /* treat_local_as_remote = */ false, current_shard_num,
std::move(insert_paths), weight, internal_replication);
addShard(
settings,
replica_addresses,
/* treat_local_as_remote = */ false,
current_shard_num,
weight,
std::move(insert_paths),
internal_replication);
}
else
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG, "Unknown element in config: {}", key);
@ -525,7 +531,7 @@ Cluster::Cluster(
addresses_with_failover.emplace_back(current);
addShard(settings, std::move(current), params.treat_local_as_remote, current_shard_num, /* insert_paths= */ {}, /* weight= */ 1);
addShard(settings, std::move(current), params.treat_local_as_remote, current_shard_num, /* weight= */ 1);
++current_shard_num;
}
@ -553,15 +559,21 @@ Cluster::Cluster(
addresses_with_failover.emplace_back(current);
addShard(settings, std::move(current), params.treat_local_as_remote, current_shard_num, /* insert_paths= */ {}, /* weight= */ 1);
addShard(settings, std::move(current), params.treat_local_as_remote, current_shard_num, /* weight= */ 1);
++current_shard_num;
}
initMisc();
}
void Cluster::addShard(const Settings & settings, Addresses && addresses, bool treat_local_as_remote, UInt32 current_shard_num,
ShardInfoInsertPathForInternalReplication && insert_paths, UInt32 weight, bool internal_replication)
void Cluster::addShard(
const Settings & settings,
Addresses addresses,
bool treat_local_as_remote,
UInt32 current_shard_num,
UInt32 weight,
ShardInfoInsertPathForInternalReplication insert_paths,
bool internal_replication)
{
Addresses shard_local_addresses;
@ -572,19 +584,28 @@ void Cluster::addShard(const Settings & settings, Addresses && addresses, bool t
{
auto replica_pool = ConnectionPoolFactory::instance().get(
static_cast<unsigned>(settings.distributed_connections_pool_size),
replica.host_name, replica.port,
replica.default_database, replica.user, replica.password, replica.quota_key,
replica.cluster, replica.cluster_secret,
"server", replica.compression,
replica.secure, replica.priority);
replica.host_name,
replica.port,
replica.default_database,
replica.user,
replica.password,
replica.quota_key,
replica.cluster,
replica.cluster_secret,
"server",
replica.compression,
replica.secure,
replica.priority);
all_replicas_pools.emplace_back(replica_pool);
if (replica.is_local && !treat_local_as_remote)
shard_local_addresses.push_back(replica);
}
ConnectionPoolWithFailoverPtr shard_pool = std::make_shared<ConnectionPoolWithFailover>(
all_replicas_pools, settings.load_balancing,
settings.distributed_replica_error_half_life.totalSeconds(), settings.distributed_replica_error_cap);
all_replicas_pools,
settings.load_balancing,
settings.distributed_replica_error_half_life.totalSeconds(),
settings.distributed_replica_error_cap);
if (weight)
slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size());

View File

@ -291,8 +291,14 @@ private:
struct ReplicasAsShardsTag {};
Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard);
void addShard(const Settings & settings, Addresses && addresses, bool treat_local_as_remote, UInt32 current_shard_num,
ShardInfoInsertPathForInternalReplication && insert_paths = {}, UInt32 weight = 1, bool internal_replication = false);
void addShard(
const Settings & settings,
Addresses addresses,
bool treat_local_as_remote,
UInt32 current_shard_num,
UInt32 weight = 1,
ShardInfoInsertPathForInternalReplication insert_paths = {},
bool internal_replication = false);
/// Inter-server secret
String secret;

View File

@ -80,15 +80,6 @@ public:
UInt32 shard_count,
bool parallel_replicas_enabled);
struct ShardPlans
{
/// If a shard has local replicas this won't be nullptr
std::unique_ptr<QueryPlan> local_plan;
/// Contains several steps to read from all remote replicas
std::unique_ptr<QueryPlan> remote_plan;
};
const Block header;
const ColumnsDescriptionByShardNum objects_by_shard;
const StorageSnapshotPtr storage_snapshot;

View File

@ -185,8 +185,11 @@ void executeQuery(
QueryProcessingStage::Enum processed_stage,
const StorageID & main_table,
const ASTPtr & table_func_ptr,
SelectStreamFactory & stream_factory, Poco::Logger * log,
const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info,
SelectStreamFactory & stream_factory,
Poco::Logger * log,
const ASTPtr & query_ast,
ContextPtr context,
const SelectQueryInfo & query_info,
const ExpressionActionsPtr & sharding_key_expr,
const std::string & sharding_key_column_name,
const ClusterPtr & not_optimized_cluster,
@ -253,9 +256,15 @@ void executeQuery(
const auto & addresses = cluster->getShardsAddresses().at(i);
bool parallel_replicas_enabled = addresses.size() > 1 && context->canUseParallelReplicas();
stream_factory.createForShard(shard_info,
query_ast_for_shard, main_table, table_func_ptr,
new_context, plans, remote_shards, static_cast<UInt32>(shards),
stream_factory.createForShard(
shard_info,
query_ast_for_shard,
main_table,
table_func_ptr,
new_context,
plans,
remote_shards,
static_cast<UInt32>(shards),
parallel_replicas_enabled);
}

View File

@ -54,8 +54,11 @@ void executeQuery(
QueryProcessingStage::Enum processed_stage,
const StorageID & main_table,
const ASTPtr & table_func_ptr,
SelectStreamFactory & stream_factory, Poco::Logger * log,
const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info,
SelectStreamFactory & stream_factory,
Poco::Logger * log,
const ASTPtr & query_ast,
ContextPtr context,
const SelectQueryInfo & query_info,
const ExpressionActionsPtr & sharding_key_expr,
const std::string & sharding_key_column_name,
const ClusterPtr & not_optimized_cluster,

View File

@ -908,12 +908,20 @@ void StorageDistributed::read(
}
ClusterProxy::executeQuery(
query_plan, header, processed_stage,
main_table, remote_table_function_ptr,
select_stream_factory, log, modified_query_ast,
local_context, query_info,
sharding_key_expr, sharding_key_column_name,
query_info.cluster, additional_shard_filter_generator);
query_plan,
header,
processed_stage,
main_table,
remote_table_function_ptr,
select_stream_factory,
log,
modified_query_ast,
local_context,
query_info,
sharding_key_expr,
sharding_key_column_name,
query_info.cluster,
additional_shard_filter_generator);
/// This is a bug, it is possible only when there is no shards to query, and this is handled earlier.
if (!query_plan.isInitialized())