Try assigning replica numbers consistently independent of initiator

This commit is contained in:
Igor Nikonov 2024-07-23 13:30:40 +00:00
parent fbaca99e3a
commit 842b51c782
8 changed files with 101 additions and 98 deletions

View File

@ -40,6 +40,7 @@ namespace ErrorCodes
extern const int TOO_LARGE_DISTRIBUTED_DEPTH; extern const int TOO_LARGE_DISTRIBUTED_DEPTH;
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int UNEXPECTED_CLUSTER; extern const int UNEXPECTED_CLUSTER;
extern const int INCONSISTENT_CLUSTER_DEFINITION;
} }
namespace ClusterProxy namespace ClusterProxy
@ -519,22 +520,75 @@ void executeQueryWithParallelReplicas(
"`cluster_for_parallel_replicas` setting refers to cluster with several shards. Expected a cluster with one shard"); "`cluster_for_parallel_replicas` setting refers to cluster with several shards. Expected a cluster with one shard");
} }
const auto replica_count = std::min<size_t>(settings.max_parallel_replicas.value, new_cluster->getShardsInfo().begin()->getAllNodeCount()); const auto & shard = new_cluster->getShardsInfo().at(0);
size_t max_replicas_to_use = settings.max_parallel_replicas;
if (max_replicas_to_use > shard.getAllNodeCount())
{
LOG_INFO(
getLogger("ReadFromParallelRemoteReplicasStep"),
"The number of replicas requested ({}) is bigger than the real number available in the cluster ({}). "
"Will use the latter number to execute the query.",
settings.max_parallel_replicas,
shard.getAllNodeCount());
max_replicas_to_use = shard.getAllNodeCount();
}
auto coordinator = std::make_shared<ParallelReplicasReadingCoordinator>(replica_count, settings.parallel_replicas_mark_segment_size); auto coordinator = std::make_shared<ParallelReplicasReadingCoordinator>(max_replicas_to_use, settings.parallel_replicas_mark_segment_size);
auto external_tables = new_context->getExternalTables(); auto external_tables = new_context->getExternalTables();
std::vector<ConnectionPoolWithFailover::Base::ShuffledPool> shuffled_pool;
if (max_replicas_to_use < shard.getAllNodeCount())
{
shuffled_pool = shard.pool->getShuffledPools(settings);
shuffled_pool.resize(max_replicas_to_use);
}
else
{
/// if all replicas in cluster are used for query execution
/// try to preserve replicas order as in cluster definition
/// it's important for data locality during query execution
auto priority_func = [](size_t i) { return Priority{static_cast<Int64>(i)}; };
shuffled_pool = shard.pool->getShuffledPools(settings, priority_func);
}
std::vector<ConnectionPoolPtr> pools_to_use;
pools_to_use.reserve(shuffled_pool.size());
for (auto & pool : shuffled_pool)
pools_to_use.emplace_back(std::move(pool.pool));
/// do not build local plan for distributed queries for now (address it later) /// do not build local plan for distributed queries for now (address it later)
if (settings.allow_experimental_analyzer && settings.parallel_replicas_local_plan && !shard_num) if (settings.allow_experimental_analyzer && settings.parallel_replicas_local_plan && !shard_num)
{ {
/// find local replica index in pool, to assign it as replica number
std::optional<size_t> local_replica_number;
for (size_t i = 0, s = pools_to_use.size(); i < s; ++i)
{
const auto & hostname = pools_to_use[i]->getHost();
const auto found = std::find_if(
begin(shard.local_addresses),
end(shard.local_addresses),
[&hostname](const Cluster::Address & local_addr) { return hostname == local_addr.host_name; });
if (found != shard.local_addresses.end())
{
local_replica_number = i;
break;
}
}
if (!local_replica_number)
throw Exception(
ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION,
"Local replica is not found in '{}' cluster definition, see 'cluster_for_parallel_replicas' setting",
new_cluster->getName());
auto [local_plan, with_parallel_replicas] = createLocalPlanForParallelReplicas( auto [local_plan, with_parallel_replicas] = createLocalPlanForParallelReplicas(
query_ast, query_ast,
header, header,
new_context, new_context,
processed_stage, processed_stage,
coordinator, coordinator,
std::move(analyzed_read_from_merge_tree)); std::move(analyzed_read_from_merge_tree),
local_replica_number.value());
if (!with_parallel_replicas) if (!with_parallel_replicas)
{ {
@ -542,6 +596,8 @@ void executeQueryWithParallelReplicas(
return; return;
} }
LOG_DEBUG(logger, "Local replica got replica number {}", local_replica_number.value());
auto read_from_remote = std::make_unique<ReadFromParallelRemoteReplicasStep>( auto read_from_remote = std::make_unique<ReadFromParallelRemoteReplicasStep>(
query_ast, query_ast,
new_cluster, new_cluster,
@ -555,7 +611,8 @@ void executeQueryWithParallelReplicas(
std::move(external_tables), std::move(external_tables),
getLogger("ReadFromParallelRemoteReplicasStep"), getLogger("ReadFromParallelRemoteReplicasStep"),
std::move(storage_limits), std::move(storage_limits),
/*exclude_local_replica*/ true); std::move(pools_to_use),
local_replica_number);
auto remote_plan = std::make_unique<QueryPlan>(); auto remote_plan = std::make_unique<QueryPlan>();
remote_plan->addStep(std::move(read_from_remote)); remote_plan->addStep(std::move(read_from_remote));
@ -587,7 +644,7 @@ void executeQueryWithParallelReplicas(
std::move(external_tables), std::move(external_tables),
getLogger("ReadFromParallelRemoteReplicasStep"), getLogger("ReadFromParallelRemoteReplicasStep"),
std::move(storage_limits), std::move(storage_limits),
/*exclude_local_replica*/ false); std::move(pools_to_use));
query_plan.addStep(std::move(read_from_remote)); query_plan.addStep(std::move(read_from_remote));
} }

View File

@ -27,7 +27,8 @@ std::pair<std::unique_ptr<QueryPlan>, bool> createLocalPlanForParallelReplicas(
ContextPtr context, ContextPtr context,
QueryProcessingStage::Enum processed_stage, QueryProcessingStage::Enum processed_stage,
ParallelReplicasReadingCoordinatorPtr coordinator, ParallelReplicasReadingCoordinatorPtr coordinator,
QueryPlanStepPtr analyzed_read_from_merge_tree) QueryPlanStepPtr analyzed_read_from_merge_tree,
size_t replica_number)
{ {
checkStackSize(); checkStackSize();
@ -84,8 +85,8 @@ std::pair<std::unique_ptr<QueryPlan>, bool> createLocalPlanForParallelReplicas(
MergeTreeReadTaskCallback read_task_cb = [coordinator](ParallelReadRequest req) -> std::optional<ParallelReadResponse> MergeTreeReadTaskCallback read_task_cb = [coordinator](ParallelReadRequest req) -> std::optional<ParallelReadResponse>
{ return coordinator->handleRequest(std::move(req)); }; { return coordinator->handleRequest(std::move(req)); };
auto read_from_merge_tree_parallel_replicas auto read_from_merge_tree_parallel_replicas = reading->createLocalParallelReplicasReadingStep(
= reading->createLocalParallelReplicasReadingStep(analyzed_result_ptr, std::move(all_ranges_cb), std::move(read_task_cb)); analyzed_result_ptr, std::move(all_ranges_cb), std::move(read_task_cb), replica_number);
node->step = std::move(read_from_merge_tree_parallel_replicas); node->step = std::move(read_from_merge_tree_parallel_replicas);
addConvertingActions(*query_plan, header, /*has_missing_objects=*/false); addConvertingActions(*query_plan, header, /*has_missing_objects=*/false);

View File

@ -14,5 +14,6 @@ std::pair<std::unique_ptr<QueryPlan>, bool> createLocalPlanForParallelReplicas(
ContextPtr context, ContextPtr context,
QueryProcessingStage::Enum processed_stage, QueryProcessingStage::Enum processed_stage,
ParallelReplicasReadingCoordinatorPtr coordinator, ParallelReplicasReadingCoordinatorPtr coordinator,
QueryPlanStepPtr read_from_merge_tree); QueryPlanStepPtr read_from_merge_tree,
size_t replica_number);
} }

View File

@ -348,9 +348,9 @@ ReadFromMergeTree::ReadFromMergeTree(
std::unique_ptr<ReadFromMergeTree> ReadFromMergeTree::createLocalParallelReplicasReadingStep( std::unique_ptr<ReadFromMergeTree> ReadFromMergeTree::createLocalParallelReplicasReadingStep(
AnalysisResultPtr analyzed_result_ptr_, AnalysisResultPtr analyzed_result_ptr_,
MergeTreeAllRangesCallback all_ranges_callback_, MergeTreeAllRangesCallback all_ranges_callback_,
MergeTreeReadTaskCallback read_task_callback_) MergeTreeReadTaskCallback read_task_callback_,
size_t replica_number)
{ {
const auto number_of_local_replica = 0;
const bool enable_parallel_reading = true; const bool enable_parallel_reading = true;
return std::make_unique<ReadFromMergeTree>( return std::make_unique<ReadFromMergeTree>(
prepared_parts, prepared_parts,
@ -364,11 +364,11 @@ std::unique_ptr<ReadFromMergeTree> ReadFromMergeTree::createLocalParallelReplica
requested_num_streams, requested_num_streams,
max_block_numbers_to_read, max_block_numbers_to_read,
log, log,
analyzed_result_ptr_, std::move(analyzed_result_ptr_),
enable_parallel_reading, enable_parallel_reading,
all_ranges_callback_, all_ranges_callback_,
read_task_callback_, read_task_callback_,
number_of_local_replica); replica_number);
} }
Pipe ReadFromMergeTree::readFromPoolParallelReplicas(RangesInDataParts parts_with_range, Names required_columns, PoolSettings pool_settings) Pipe ReadFromMergeTree::readFromPoolParallelReplicas(RangesInDataParts parts_with_range, Names required_columns, PoolSettings pool_settings)

View File

@ -129,7 +129,8 @@ public:
std::unique_ptr<ReadFromMergeTree> createLocalParallelReplicasReadingStep( std::unique_ptr<ReadFromMergeTree> createLocalParallelReplicasReadingStep(
AnalysisResultPtr analyzed_result_ptr_, AnalysisResultPtr analyzed_result_ptr_,
MergeTreeAllRangesCallback all_ranges_callback_, MergeTreeAllRangesCallback all_ranges_callback_,
MergeTreeReadTaskCallback read_task_callback_); MergeTreeReadTaskCallback read_task_callback_,
size_t replica_number);
static constexpr auto name = "ReadFromMergeTree"; static constexpr auto name = "ReadFromMergeTree";
String getName() const override { return name; } String getName() const override { return name; }

View File

@ -371,7 +371,8 @@ ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep(
Tables external_tables_, Tables external_tables_,
LoggerPtr log_, LoggerPtr log_,
std::shared_ptr<const StorageLimitsList> storage_limits_, std::shared_ptr<const StorageLimitsList> storage_limits_,
bool exclude_local_replica_) std::vector<ConnectionPoolPtr> pools_to_use_,
std::optional<size_t> exclude_pool_index_)
: ISourceStep(DataStream{.header = std::move(header_)}) : ISourceStep(DataStream{.header = std::move(header_)})
, cluster(cluster_) , cluster(cluster_)
, query_ast(query_ast_) , query_ast(query_ast_)
@ -384,24 +385,20 @@ ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep(
, external_tables{external_tables_} , external_tables{external_tables_}
, storage_limits(std::move(storage_limits_)) , storage_limits(std::move(storage_limits_))
, log(log_) , log(log_)
, exclude_local_replica(exclude_local_replica_) , pools_to_use(std::move(pools_to_use_))
, exclude_pool_index(exclude_pool_index_)
{ {
chassert(cluster->getShardCount() == 1); chassert(cluster->getShardCount() == 1);
std::vector<String> replicas; std::vector<String> replicas;
replicas.reserve(cluster->getShardsAddresses().front().size()); replicas.reserve(pools_to_use.size());
for (const auto & addr : cluster->getShardsAddresses().front()) for (size_t i = 0, l = pools_to_use.size(); i < l; ++i)
{ {
if (exclude_local_replica && addr.is_local) if (exclude_pool_index.has_value() && i == exclude_pool_index)
continue; continue;
/// replace hostname with replica name if the hostname started with replica namespace, replicas.push_back(pools_to_use[i]->getAddress());
/// it makes description shorter and more readable
if (!addr.database_replica_name.empty() && addr.host_name.starts_with(addr.database_replica_name))
replicas.push_back(fmt::format("{}", addr.database_replica_name));
else
replicas.push_back(fmt::format("{}", addr.host_name));
} }
auto description = fmt::format("Query: {} Replicas: {}", formattedAST(query_ast), fmt::join(replicas, ", ")); auto description = fmt::format("Query: {} Replicas: {}", formattedAST(query_ast), fmt::join(replicas, ", "));
@ -421,86 +418,29 @@ void ReadFromParallelRemoteReplicasStep::enforceAggregationInOrder()
void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{ {
Pipes pipes; Pipes pipes;
const Settings & current_settings = context->getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
const auto & shard = cluster->getShardsInfo().at(0);
size_t max_replicas_to_use = current_settings.max_parallel_replicas;
if (max_replicas_to_use > shard.getAllNodeCount())
{
LOG_INFO(
getLogger("ReadFromParallelRemoteReplicasStep"),
"The number of replicas requested ({}) is bigger than the real number available in the cluster ({}). "
"Will use the latter number to execute the query.",
current_settings.max_parallel_replicas,
shard.getAllNodeCount());
max_replicas_to_use = shard.getAllNodeCount();
}
std::vector<ConnectionPoolWithFailover::Base::ShuffledPool> shuffled_pool;
if (max_replicas_to_use < shard.getAllNodeCount())
{
shuffled_pool = shard.pool->getShuffledPools(current_settings);
}
else
{
/// try to preserve replicas order if all replicas in cluster are used for query execution
/// it's important for data locality during query execution
auto priority_func = [](size_t i) { return Priority{static_cast<Int64>(i)}; };
shuffled_pool = shard.pool->getShuffledPools(current_settings, priority_func);
}
std::vector<ConnectionPoolPtr> pools_to_use;
pools_to_use.reserve(shuffled_pool.size());
for (const auto & pool : shuffled_pool)
{
if (exclude_local_replica)
{
const auto & hostname = pool.pool->getHost();
auto it = std::find_if(
begin(shard.local_addresses),
end(shard.local_addresses),
[&hostname](const Cluster::Address & local_addr) { return hostname == local_addr.host_name; });
if (it == shard.local_addresses.end())
pools_to_use.push_back(pool.pool);
}
else
{
pools_to_use.push_back(pool.pool);
}
}
pools_to_use.resize(std::min(pools_to_use.size(), max_replicas_to_use));
// if local plan is used for local replica, we should exclude one remote replica
if (exclude_local_replica && !pools_to_use.empty())
pools_to_use.resize(max_replicas_to_use - 1);
LOG_DEBUG(
getLogger("ReadFromParallelRemoteReplicasStep"),
"Number of pools to use is {}. Originally {}",
pools_to_use.size(),
shuffled_pool.size());
if (pools_to_use.empty())
return;
std::vector<std::string_view> addresses; std::vector<std::string_view> addresses;
addresses.reserve(pools_to_use.size()); addresses.reserve(pools_to_use.size());
for (const auto & pool : pools_to_use) for (size_t i = 0, l = pools_to_use.size(); i < l; ++i)
addresses.emplace_back(pool->getAddress()); {
if (exclude_pool_index.has_value() && i == exclude_pool_index)
continue;
addresses.emplace_back(pools_to_use[i]->getAddress());
}
LOG_DEBUG(getLogger("ReadFromParallelRemoteReplicasStep"), "Addresses to use: {}", fmt::join(addresses, ", ")); LOG_DEBUG(getLogger("ReadFromParallelRemoteReplicasStep"), "Addresses to use: {}", fmt::join(addresses, ", "));
/// when using local plan for local replica, 0 is assigned to local replica as replica num, - in this case, starting from 1 here for (size_t i = 0, l = pools_to_use.size(); i < l; ++i)
size_t replica_num = (exclude_local_replica ? 1 : 0);
for (const auto & pool : pools_to_use)
{ {
if (exclude_pool_index.has_value() && i == exclude_pool_index)
continue;
IConnections::ReplicaInfo replica_info{ IConnections::ReplicaInfo replica_info{
/// we should use this number specifically because efficiency of data distribution by consistent hash depends on it. /// we should use this number specifically because efficiency of data distribution by consistent hash depends on it.
.number_of_current_replica = replica_num, .number_of_current_replica = i,
}; };
++replica_num;
addPipeForSingeReplica(pipes, pool, replica_info); addPipeForSingeReplica(pipes, pools_to_use[i], replica_info);
} }
auto pipe = Pipe::unitePipes(std::move(pipes)); auto pipe = Pipe::unitePipes(std::move(pipes));

View File

@ -79,7 +79,8 @@ public:
Tables external_tables_, Tables external_tables_,
LoggerPtr log_, LoggerPtr log_,
std::shared_ptr<const StorageLimitsList> storage_limits_, std::shared_ptr<const StorageLimitsList> storage_limits_,
bool exclude_local_replica = false); std::vector<ConnectionPoolPtr> pools_to_use,
std::optional<size_t> exclude_pool_index_ = std::nullopt);
String getName() const override { return "ReadFromRemoteParallelReplicas"; } String getName() const override { return "ReadFromRemoteParallelReplicas"; }
@ -102,7 +103,8 @@ private:
Tables external_tables; Tables external_tables;
std::shared_ptr<const StorageLimitsList> storage_limits; std::shared_ptr<const StorageLimitsList> storage_limits;
LoggerPtr log; LoggerPtr log;
bool exclude_local_replica; std::vector<ConnectionPoolPtr> pools_to_use;
std::optional<size_t> exclude_pool_index;
}; };
} }

View File

@ -185,7 +185,8 @@ public:
void handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement announcement) void handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement announcement)
{ {
if (++sent_initial_requests > replicas_count) if (++sent_initial_requests > replicas_count)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Initiator received more initial requests than there are replicas"); throw Exception(
ErrorCodes::LOGICAL_ERROR, "Initiator received more initial requests than there are replicas: replica_num={}", announcement.replica_num);
doHandleInitialAllRangesAnnouncement(std::move(announcement)); doHandleInitialAllRangesAnnouncement(std::move(announcement));
} }