Fix: resize pools_to_use correctly (keep local replica in it)

This commit is contained in:
Igor Nikonov 2024-07-23 15:43:25 +00:00
parent 217e50d12e
commit f5630f86e4

View File

@ -540,14 +540,15 @@ void executeQueryWithParallelReplicas(
std::vector<ConnectionPoolWithFailover::Base::ShuffledPool> shuffled_pool;
if (max_replicas_to_use < shard.getAllNodeCount())
{
// will be shuffled according to `load_balancing` setting
shuffled_pool = shard.pool->getShuffledPools(settings);
shuffled_pool.resize(max_replicas_to_use);
}
else
{
/// if all replicas in cluster are used for query execution
/// try to preserve replicas order as in cluster definition
/// it's important for data locality during query execution
/// If all replicas in cluster are used for query execution,
/// try to preserve replicas order as in cluster definition.
/// It's important for data locality during query execution
/// independently of the query initiator
auto priority_func = [](size_t i) { return Priority{static_cast<Int64>(i)}; };
shuffled_pool = shard.pool->getShuffledPools(settings, priority_func);
}
@ -560,8 +561,8 @@ void executeQueryWithParallelReplicas(
/// do not build local plan for distributed queries for now (address it later)
if (settings.allow_experimental_analyzer && settings.parallel_replicas_local_plan && !shard_num)
{
/// find local replica index in pool, to assign it as replica number
std::optional<size_t> local_replica_number;
/// find local replica index in pool
std::optional<size_t> local_replica_index;
for (size_t i = 0, s = pools_to_use.size(); i < s; ++i)
{
const auto & hostname = pools_to_use[i]->getHost();
@ -571,16 +572,25 @@ void executeQueryWithParallelReplicas(
[&hostname](const Cluster::Address & local_addr) { return hostname == local_addr.host_name; });
if (found != shard.local_addresses.end())
{
local_replica_number = i;
local_replica_index = i;
break;
}
}
if (!local_replica_number)
if (!local_replica_index)
throw Exception(
ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION,
"Local replica is not found in '{}' cluster definition, see 'cluster_for_parallel_replicas' setting",
new_cluster->getName());
// resize the pool but keep local replicas in it (and udpate its index)
chassert(max_replicas_to_use <= pools_to_use.size());
if (local_replica_index >= max_replicas_to_use)
{
std::swap(pools_to_use[max_replicas_to_use - 1], pools_to_use[local_replica_index.value()]);
local_replica_index = max_replicas_to_use - 1;
}
pools_to_use.resize(max_replicas_to_use);
auto [local_plan, with_parallel_replicas] = createLocalPlanForParallelReplicas(
query_ast,
header,
@ -588,7 +598,7 @@ void executeQueryWithParallelReplicas(
processed_stage,
coordinator,
std::move(analyzed_read_from_merge_tree),
local_replica_number.value());
local_replica_index.value());
if (!with_parallel_replicas)
{
@ -596,7 +606,7 @@ void executeQueryWithParallelReplicas(
return;
}
LOG_DEBUG(logger, "Local replica got replica number {}", local_replica_number.value());
LOG_DEBUG(logger, "Local replica got replica number {}", local_replica_index.value());
auto read_from_remote = std::make_unique<ReadFromParallelRemoteReplicasStep>(
query_ast,
@ -612,7 +622,7 @@ void executeQueryWithParallelReplicas(
getLogger("ReadFromParallelRemoteReplicasStep"),
std::move(storage_limits),
std::move(pools_to_use),
local_replica_number);
local_replica_index);
auto remote_plan = std::make_unique<QueryPlan>();
remote_plan->addStep(std::move(read_from_remote));
@ -631,6 +641,9 @@ void executeQueryWithParallelReplicas(
}
else
{
chassert(max_replicas_to_use <= pools_to_use.size());
pools_to_use.resize(max_replicas_to_use);
auto read_from_remote = std::make_unique<ReadFromParallelRemoteReplicasStep>(
query_ast,
new_cluster,