Simple cleanup in distributed (while dealing with parallel replicas)

This commit is contained in:
Igor Nikonov 2023-12-05 12:25:02 +00:00
parent 5770def9af
commit 87a4c0bde4
7 changed files with 61 additions and 34 deletions

View File

@ -1,6 +1,6 @@
#pragma once
#include <time.h>
#include <ctime>
#include <cstdlib>
#include <climits>
#include <random>
@ -180,6 +180,7 @@ PoolWithFailoverBase<TNestedPool>::getShuffledPools(
shuffled_pools.reserve(nested_pools.size());
for (size_t i = 0; i < nested_pools.size(); ++i)
shuffled_pools.push_back(ShuffledPool{nested_pools[i].get(), &pool_states[i], i, /* error_count = */ 0, /* slowdown_count = */ 0});
::sort(
shuffled_pools.begin(), shuffled_pools.end(),
[](const ShuffledPool & lhs, const ShuffledPool & rhs)

View File

@ -488,8 +488,14 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG, "Unknown element in config: {}", replica_key);
}
addShard(settings, std::move(replica_addresses), /* treat_local_as_remote = */ false, current_shard_num,
std::move(insert_paths), weight, internal_replication);
addShard(
settings,
replica_addresses,
/* treat_local_as_remote = */ false,
current_shard_num,
std::move(insert_paths),
weight,
internal_replication);
}
else
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG, "Unknown element in config: {}", key);
@ -560,7 +566,7 @@ Cluster::Cluster(
initMisc();
}
void Cluster::addShard(const Settings & settings, Addresses && addresses, bool treat_local_as_remote, UInt32 current_shard_num,
void Cluster::addShard(const Settings & settings, Addresses addresses, bool treat_local_as_remote, UInt32 current_shard_num,
ShardInfoInsertPathForInternalReplication && insert_paths, UInt32 weight, bool internal_replication)
{
Addresses shard_local_addresses;
@ -572,19 +578,28 @@ void Cluster::addShard(const Settings & settings, Addresses && addresses, bool t
{
auto replica_pool = ConnectionPoolFactory::instance().get(
static_cast<unsigned>(settings.distributed_connections_pool_size),
replica.host_name, replica.port,
replica.default_database, replica.user, replica.password, replica.quota_key,
replica.cluster, replica.cluster_secret,
"server", replica.compression,
replica.secure, replica.priority);
replica.host_name,
replica.port,
replica.default_database,
replica.user,
replica.password,
replica.quota_key,
replica.cluster,
replica.cluster_secret,
"server",
replica.compression,
replica.secure,
replica.priority);
all_replicas_pools.emplace_back(replica_pool);
if (replica.is_local && !treat_local_as_remote)
shard_local_addresses.push_back(replica);
}
ConnectionPoolWithFailoverPtr shard_pool = std::make_shared<ConnectionPoolWithFailover>(
all_replicas_pools, settings.load_balancing,
settings.distributed_replica_error_half_life.totalSeconds(), settings.distributed_replica_error_cap);
all_replicas_pools,
settings.load_balancing,
settings.distributed_replica_error_half_life.totalSeconds(),
settings.distributed_replica_error_cap);
if (weight)
slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size());

View File

@ -291,7 +291,7 @@ private:
struct ReplicasAsShardsTag {};
Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard);
void addShard(const Settings & settings, Addresses && addresses, bool treat_local_as_remote, UInt32 current_shard_num,
void addShard(const Settings & settings, Addresses addresses, bool treat_local_as_remote, UInt32 current_shard_num,
ShardInfoInsertPathForInternalReplication && insert_paths = {}, UInt32 weight = 1, bool internal_replication = false);
/// Inter-server secret

View File

@ -80,15 +80,6 @@ public:
UInt32 shard_count,
bool parallel_replicas_enabled);
struct ShardPlans
{
/// If a shard has local replicas this won't be nullptr
std::unique_ptr<QueryPlan> local_plan;
/// Contains several steps to read from all remote replicas
std::unique_ptr<QueryPlan> remote_plan;
};
const Block header;
const ColumnsDescriptionByShardNum objects_by_shard;
const StorageSnapshotPtr storage_snapshot;

View File

@ -185,8 +185,11 @@ void executeQuery(
QueryProcessingStage::Enum processed_stage,
const StorageID & main_table,
const ASTPtr & table_func_ptr,
SelectStreamFactory & stream_factory, Poco::Logger * log,
const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info,
SelectStreamFactory & stream_factory,
Poco::Logger * log,
const ASTPtr & query_ast,
ContextPtr context,
const SelectQueryInfo & query_info,
const ExpressionActionsPtr & sharding_key_expr,
const std::string & sharding_key_column_name,
const ClusterPtr & not_optimized_cluster,
@ -253,9 +256,15 @@ void executeQuery(
const auto & addresses = cluster->getShardsAddresses().at(i);
bool parallel_replicas_enabled = addresses.size() > 1 && context->canUseParallelReplicas();
stream_factory.createForShard(shard_info,
query_ast_for_shard, main_table, table_func_ptr,
new_context, plans, remote_shards, static_cast<UInt32>(shards),
stream_factory.createForShard(
shard_info,
query_ast_for_shard,
main_table,
table_func_ptr,
new_context,
plans,
remote_shards,
static_cast<UInt32>(shards),
parallel_replicas_enabled);
}

View File

@ -54,8 +54,11 @@ void executeQuery(
QueryProcessingStage::Enum processed_stage,
const StorageID & main_table,
const ASTPtr & table_func_ptr,
SelectStreamFactory & stream_factory, Poco::Logger * log,
const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info,
SelectStreamFactory & stream_factory,
Poco::Logger * log,
const ASTPtr & query_ast,
ContextPtr context,
const SelectQueryInfo & query_info,
const ExpressionActionsPtr & sharding_key_expr,
const std::string & sharding_key_column_name,
const ClusterPtr & not_optimized_cluster,

View File

@ -908,12 +908,20 @@ void StorageDistributed::read(
}
ClusterProxy::executeQuery(
query_plan, header, processed_stage,
main_table, remote_table_function_ptr,
select_stream_factory, log, modified_query_ast,
local_context, query_info,
sharding_key_expr, sharding_key_column_name,
query_info.cluster, additional_shard_filter_generator);
query_plan,
header,
processed_stage,
main_table,
remote_table_function_ptr,
select_stream_factory,
log,
modified_query_ast,
local_context,
query_info,
sharding_key_expr,
sharding_key_column_name,
query_info.cluster,
additional_shard_filter_generator);
/// This is a bug, it is possible only when there is no shards to query, and this is handled earlier.
if (!query_plan.isInitialized())