diff --git a/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp b/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp index 16cca6a7f5d..bd79fc38ae9 100644 --- a/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp +++ b/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp @@ -280,8 +280,7 @@ std::unique_ptr createLocalPlanForParallelReplicas( MergeTreeReadTaskCallback read_task_cb = [coordinator](ParallelReadRequest req) -> std::optional { return coordinator->handleRequest(std::move(req)); }; - const auto number_of_local_replica = new_context->getSettingsRef().max_parallel_replicas - 1; - + const auto number_of_local_replica = 0; auto read_from_merge_tree_parallel_replicas = reading->createLocalParallelReplicasReadingStep(analyzed_merge_tree, all_ranges_cb, read_task_cb, number_of_local_replica); node->step = std::move(read_from_merge_tree_parallel_replicas); diff --git a/src/Processors/QueryPlan/ReadFromRemote.cpp b/src/Processors/QueryPlan/ReadFromRemote.cpp index 23d8f2b496f..6949c35e0ca 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.cpp +++ b/src/Processors/QueryPlan/ReadFromRemote.cpp @@ -434,14 +434,11 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder shard.getAllNodeCount()); all_replicas_count = shard.getAllNodeCount(); } - if (exclude_local_replica) - --all_replicas_count; std::vector shuffled_pool; if (all_replicas_count < shard.getAllNodeCount()) { shuffled_pool = shard.pool->getShuffledPools(current_settings); - shuffled_pool.resize(all_replicas_count); } else { @@ -451,10 +448,37 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder shuffled_pool = shard.pool->getShuffledPools(current_settings, priority_func); } - for (size_t i=0; i < all_replicas_count; ++i) + std::vector pools_to_use; + if (exclude_local_replica) { - IConnections::ReplicaInfo replica_info + std::vector local_addr_possitions; + for (auto & pool : shuffled_pool) { + const auto & hostname = pool.pool->getHost(); + auto it = std::find_if( + begin(shard.local_addresses), + end(shard.local_addresses), + [&hostname](const Cluster::Address & local_addr) { return hostname == local_addr.host_name; }); + if (it != shard.local_addresses.end()) + pool.pool.reset(); + } + } + for (const auto & pool : shuffled_pool) + { + if (pool.pool) + pools_to_use.push_back(pool.pool); + } + + if (pools_to_use.size() > all_replicas_count) + pools_to_use.resize(all_replicas_count); + else + all_replicas_count = pools_to_use.size(); + + /// local replicas has number 0 + size_t offset = (exclude_local_replica ? 1 : 0); + for (size_t i = 0 + offset; i < all_replicas_count + offset; ++i) + { + IConnections::ReplicaInfo replica_info{ .all_replicas_count = all_replicas_count, /// we should use this number specifically because efficiency of data distribution by consistent hash depends on it. .number_of_current_replica = i,