From 6eb6c8a320c919ad33d5e9b1b5f7093eb071be1c Mon Sep 17 00:00:00 2001 From: Igor Nikonov Date: Thu, 10 Aug 2023 11:43:26 +0000 Subject: [PATCH] Fix: correct execution over cluster with multiple shards respect shard number during parallel replicas query execution --- .../ClusterProxy/executeQuery.cpp | 25 ++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index e4a48fdcac0..fbc7bbd5bbb 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -270,7 +270,28 @@ void executeQueryWithParallelReplicas( const ClusterPtr & not_optimized_cluster) { const auto & settings = context->getSettingsRef(); - ClusterPtr new_cluster = not_optimized_cluster->getClusterWithReplicasAsShards(settings); + auto new_context = Context::createCopy(context); + auto scalars = new_context->hasQueryContext() ? new_context->getQueryContext()->getScalars() : Scalars{}; + + Int64 shard_num = 0; /// shard_num is 1-based, so 0 - no shard specified + auto it = scalars.find("_shard_num"); + if (it != scalars.end()) + { + const Block & block = it->second; + shard_num = block.getColumns()[0]->get64(0); + } + + ClusterPtr new_cluster; + /// if got valid shard_num from query initiator, then parallel replicas scope is the specified shard + /// shards are numbered in order of appearance in the cluster config + if (shard_num > 0) + { + LOG_DEBUG(&Poco::Logger::get("executeQueryWithParallelReplicas"), "Parallel replicas query in shard scope: shard_num={}", shard_num); + /// shard_num is 1-based, but getClusterWithSingleShard expects 0-based index + new_cluster = not_optimized_cluster->getClusterWithSingleShard(shard_num - 1); + } + else + new_cluster = not_optimized_cluster->getClusterWithReplicasAsShards(settings); auto all_replicas_count = std::min(static_cast(settings.max_parallel_replicas), new_cluster->getShardCount()); auto coordinator = std::make_shared(all_replicas_count); @@ -284,8 +305,6 @@ void executeQueryWithParallelReplicas( /// to then tell it about the reading method we chose. query_info.coordinator = coordinator; - auto new_context = Context::createCopy(context); - auto scalars = new_context->hasQueryContext() ? new_context->getQueryContext()->getScalars() : Scalars{}; auto external_tables = new_context->getExternalTables(); auto read_from_remote = std::make_unique(