From c87bfe102e36612ebb4b474eb822543de0cf011e Mon Sep 17 00:00:00 2001 From: Igor Nikonov Date: Mon, 17 Jun 2024 12:24:14 +0000 Subject: [PATCH] Fix: correct local replica exclusion - fixes 02731_parallel_replicas_join_subquery --- .../ClusterProxy/executeQuery.cpp | 8 +++++-- src/Processors/QueryPlan/ReadFromRemote.cpp | 21 +++++++++++++++---- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index cde33697915..0937e121426 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -489,8 +489,12 @@ void executeQueryWithParallelReplicas( "`cluster_for_parallel_replicas` setting refers to cluster with several shards. Expected a cluster with one shard"); } - auto coordinator = std::make_shared( - new_cluster->getShardsInfo().begin()->getAllNodeCount(), settings.parallel_replicas_mark_segment_size); + auto replica_count = new_cluster->getShardsInfo().begin()->getAllNodeCount(); + if (settings.max_parallel_replicas < replica_count) + replica_count = settings.max_parallel_replicas; + + auto coordinator = std::make_shared(replica_count, settings.parallel_replicas_mark_segment_size); + auto external_tables = new_context->getExternalTables(); /// do not build local plan for distributed queries for now (address it later) diff --git a/src/Processors/QueryPlan/ReadFromRemote.cpp b/src/Processors/QueryPlan/ReadFromRemote.cpp index 1a96668e30e..f7cb88154af 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.cpp +++ b/src/Processors/QueryPlan/ReadFromRemote.cpp @@ -449,6 +449,7 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder } std::vector pools_to_use; + pools_to_use.reserve(shuffled_pool.size()); if (exclude_local_replica) { std::vector local_addr_possitions; @@ -460,7 +461,9 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder end(shard.local_addresses), [&hostname](const Cluster::Address & local_addr) { return hostname == local_addr.host_name; }); if (it != shard.local_addresses.end()) + { pool.pool.reset(); + } } } for (const auto & pool : shuffled_pool) @@ -469,12 +472,14 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder pools_to_use.push_back(pool.pool); } + LOG_DEBUG( + getLogger("ReadFromParallelRemoteReplicasStep"), + "Number of pools to use is {}. Originally {}", + pools_to_use.size(), + shuffled_pool.size()); + if (pools_to_use.size() > all_replicas_count) pools_to_use.resize(all_replicas_count); - else - all_replicas_count = pools_to_use.size(); - - chassert(all_replicas_count == pools_to_use.size()); if (exclude_local_replica && !pools_to_use.empty()) pools_to_use.resize(all_replicas_count - 1); @@ -482,6 +487,14 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder if (pools_to_use.empty()) return; + { + String pool_addresses; + for (const auto & pool : pools_to_use) + pool_addresses += pool->getAddress() + ";"; + + LOG_DEBUG(getLogger("ReadFromParallelRemoteReplicasStep"), "Addresses to use: {}", pool_addresses); + } + /// local replicas has number 0 size_t offset = (exclude_local_replica ? 1 : 0); for (size_t i = 0 + offset; i < all_replicas_count; ++i)