From 32dc3fe8d1c0384221cd1f27adc53bfe45a01e28 Mon Sep 17 00:00:00 2001 From: Igor Nikonov Date: Sat, 13 Jul 2024 21:19:14 +0000 Subject: [PATCH] Simplify code --- src/Interpreters/ClusterProxy/executeQuery.cpp | 4 +--- src/Processors/QueryPlan/ReadFromRemote.cpp | 10 ++-------- .../02898_parallel_replicas_progress_bar.sql | 1 + 3 files changed, 4 insertions(+), 11 deletions(-) diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index 6039c545085..79f4344e6cc 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -519,9 +519,7 @@ void executeQueryWithParallelReplicas( "`cluster_for_parallel_replicas` setting refers to cluster with several shards. Expected a cluster with one shard"); } - auto replica_count = new_cluster->getShardsInfo().begin()->getAllNodeCount(); - if (settings.max_parallel_replicas < replica_count) - replica_count = settings.max_parallel_replicas; + const auto replica_count = std::min(settings.max_parallel_replicas.value, new_cluster->getShardsInfo().begin()->getAllNodeCount()); auto coordinator = std::make_shared(replica_count, settings.parallel_replicas_mark_segment_size); diff --git a/src/Processors/QueryPlan/ReadFromRemote.cpp b/src/Processors/QueryPlan/ReadFromRemote.cpp index 26c57c4a760..3da22265c5c 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.cpp +++ b/src/Processors/QueryPlan/ReadFromRemote.cpp @@ -458,7 +458,6 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder pools_to_use.reserve(shuffled_pool.size()); if (exclude_local_replica) { - std::vector local_addr_possitions; for (auto & pool : shuffled_pool) { const auto & hostname = pool.pool->getHost(); @@ -466,17 +465,12 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder begin(shard.local_addresses), end(shard.local_addresses), [&hostname](const Cluster::Address & local_addr) { return hostname == local_addr.host_name; }); - if (it != shard.local_addresses.end()) + if (it == shard.local_addresses.end()) { - pool.pool.reset(); + pools_to_use.push_back(pool.pool); } } } - for (const auto & pool : shuffled_pool) - { - if (pool.pool) - pools_to_use.push_back(pool.pool); - } LOG_DEBUG( getLogger("ReadFromParallelRemoteReplicasStep"), diff --git a/tests/queries/0_stateless/02898_parallel_replicas_progress_bar.sql b/tests/queries/0_stateless/02898_parallel_replicas_progress_bar.sql index d3bf228e0fb..9348ea1dc32 100644 --- a/tests/queries/0_stateless/02898_parallel_replicas_progress_bar.sql +++ b/tests/queries/0_stateless/02898_parallel_replicas_progress_bar.sql @@ -26,6 +26,7 @@ WHERE query_id in (select query_id from system.query_log where current_database AND message LIKE '%Total rows to read: 3000%' SETTINGS allow_experimental_parallel_reading_from_replicas=0; -- reading in order coordinator +-- disable parallel_replicas_local_plan since the test relay on traces which only present in case of no local plan SELECT k, sipHash64(v) FROM t1 order by k limit 5 offset 998 SETTINGS optimize_read_in_order=1, parallel_replicas_local_plan=0, log_comment='02898_inorder_190aed82-2423-413b-ad4c-24dcca50f65b'; SYSTEM FLUSH LOGS;