Fix: correct local replica exclusion

- fixes 02731_parallel_replicas_join_subquery
This commit is contained in:
Igor Nikonov 2024-06-17 12:24:14 +00:00
parent 70a33e633c
commit c87bfe102e
2 changed files with 23 additions and 6 deletions

View File

@ -489,8 +489,12 @@ void executeQueryWithParallelReplicas(
"`cluster_for_parallel_replicas` setting refers to cluster with several shards. Expected a cluster with one shard");
}
auto coordinator = std::make_shared<ParallelReplicasReadingCoordinator>(
new_cluster->getShardsInfo().begin()->getAllNodeCount(), settings.parallel_replicas_mark_segment_size);
auto replica_count = new_cluster->getShardsInfo().begin()->getAllNodeCount();
if (settings.max_parallel_replicas < replica_count)
replica_count = settings.max_parallel_replicas;
auto coordinator = std::make_shared<ParallelReplicasReadingCoordinator>(replica_count, settings.parallel_replicas_mark_segment_size);
auto external_tables = new_context->getExternalTables();
/// do not build local plan for distributed queries for now (address it later)

View File

@ -449,6 +449,7 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder
}
std::vector<ConnectionPoolPtr> pools_to_use;
pools_to_use.reserve(shuffled_pool.size());
if (exclude_local_replica)
{
std::vector<size_t> local_addr_possitions;
@ -460,21 +461,25 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder
end(shard.local_addresses),
[&hostname](const Cluster::Address & local_addr) { return hostname == local_addr.host_name; });
if (it != shard.local_addresses.end())
{
pool.pool.reset();
}
}
}
for (const auto & pool : shuffled_pool)
{
if (pool.pool)
pools_to_use.push_back(pool.pool);
}
LOG_DEBUG(
getLogger("ReadFromParallelRemoteReplicasStep"),
"Number of pools to use is {}. Originally {}",
pools_to_use.size(),
shuffled_pool.size());
if (pools_to_use.size() > all_replicas_count)
pools_to_use.resize(all_replicas_count);
else
all_replicas_count = pools_to_use.size();
chassert(all_replicas_count == pools_to_use.size());
if (exclude_local_replica && !pools_to_use.empty())
pools_to_use.resize(all_replicas_count - 1);
@ -482,6 +487,14 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder
if (pools_to_use.empty())
return;
{
String pool_addresses;
for (const auto & pool : pools_to_use)
pool_addresses += pool->getAddress() + ";";
LOG_DEBUG(getLogger("ReadFromParallelRemoteReplicasStep"), "Addresses to use: {}", pool_addresses);
}
/// local replicas has number 0
size_t offset = (exclude_local_replica ? 1 : 0);
for (size_t i = 0 + offset; i < all_replicas_count; ++i)