Simplify code

This commit is contained in:
Igor Nikonov 2024-07-13 21:19:14 +00:00
parent f42fe31396
commit 32dc3fe8d1
3 changed files with 4 additions and 11 deletions

View File

@ -519,9 +519,7 @@ void executeQueryWithParallelReplicas(
"`cluster_for_parallel_replicas` setting refers to cluster with several shards. Expected a cluster with one shard"); "`cluster_for_parallel_replicas` setting refers to cluster with several shards. Expected a cluster with one shard");
} }
auto replica_count = new_cluster->getShardsInfo().begin()->getAllNodeCount(); const auto replica_count = std::min(settings.max_parallel_replicas.value, new_cluster->getShardsInfo().begin()->getAllNodeCount());
if (settings.max_parallel_replicas < replica_count)
replica_count = settings.max_parallel_replicas;
auto coordinator = std::make_shared<ParallelReplicasReadingCoordinator>(replica_count, settings.parallel_replicas_mark_segment_size); auto coordinator = std::make_shared<ParallelReplicasReadingCoordinator>(replica_count, settings.parallel_replicas_mark_segment_size);

View File

@ -458,7 +458,6 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder
pools_to_use.reserve(shuffled_pool.size()); pools_to_use.reserve(shuffled_pool.size());
if (exclude_local_replica) if (exclude_local_replica)
{ {
std::vector<size_t> local_addr_possitions;
for (auto & pool : shuffled_pool) for (auto & pool : shuffled_pool)
{ {
const auto & hostname = pool.pool->getHost(); const auto & hostname = pool.pool->getHost();
@ -466,17 +465,12 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder
begin(shard.local_addresses), begin(shard.local_addresses),
end(shard.local_addresses), end(shard.local_addresses),
[&hostname](const Cluster::Address & local_addr) { return hostname == local_addr.host_name; }); [&hostname](const Cluster::Address & local_addr) { return hostname == local_addr.host_name; });
if (it != shard.local_addresses.end()) if (it == shard.local_addresses.end())
{ {
pool.pool.reset();
}
}
}
for (const auto & pool : shuffled_pool)
{
if (pool.pool)
pools_to_use.push_back(pool.pool); pools_to_use.push_back(pool.pool);
} }
}
}
LOG_DEBUG( LOG_DEBUG(
getLogger("ReadFromParallelRemoteReplicasStep"), getLogger("ReadFromParallelRemoteReplicasStep"),

View File

@ -26,6 +26,7 @@ WHERE query_id in (select query_id from system.query_log where current_database
AND message LIKE '%Total rows to read: 3000%' SETTINGS allow_experimental_parallel_reading_from_replicas=0; AND message LIKE '%Total rows to read: 3000%' SETTINGS allow_experimental_parallel_reading_from_replicas=0;
-- reading in order coordinator -- reading in order coordinator
-- disable parallel_replicas_local_plan since the test relay on traces which only present in case of no local plan
SELECT k, sipHash64(v) FROM t1 order by k limit 5 offset 998 SETTINGS optimize_read_in_order=1, parallel_replicas_local_plan=0, log_comment='02898_inorder_190aed82-2423-413b-ad4c-24dcca50f65b'; SELECT k, sipHash64(v) FROM t1 order by k limit 5 offset 998 SETTINGS optimize_read_in_order=1, parallel_replicas_local_plan=0, log_comment='02898_inorder_190aed82-2423-413b-ad4c-24dcca50f65b';
SYSTEM FLUSH LOGS; SYSTEM FLUSH LOGS;