Fix: correct execution over cluster with multiple shards

respect shard number during parallel replicas query execution
This commit is contained in:
Igor Nikonov 2023-08-10 11:43:26 +00:00
parent 5e7a438631
commit 6eb6c8a320

View File

@ -270,7 +270,28 @@ void executeQueryWithParallelReplicas(
const ClusterPtr & not_optimized_cluster) const ClusterPtr & not_optimized_cluster)
{ {
const auto & settings = context->getSettingsRef(); const auto & settings = context->getSettingsRef();
ClusterPtr new_cluster = not_optimized_cluster->getClusterWithReplicasAsShards(settings); auto new_context = Context::createCopy(context);
auto scalars = new_context->hasQueryContext() ? new_context->getQueryContext()->getScalars() : Scalars{};
Int64 shard_num = 0; /// shard_num is 1-based, so 0 - no shard specified
auto it = scalars.find("_shard_num");
if (it != scalars.end())
{
const Block & block = it->second;
shard_num = block.getColumns()[0]->get64(0);
}
ClusterPtr new_cluster;
/// if got valid shard_num from query initiator, then parallel replicas scope is the specified shard
/// shards are numbered in order of appearance in the cluster config
if (shard_num > 0)
{
LOG_DEBUG(&Poco::Logger::get("executeQueryWithParallelReplicas"), "Parallel replicas query in shard scope: shard_num={}", shard_num);
/// shard_num is 1-based, but getClusterWithSingleShard expects 0-based index
new_cluster = not_optimized_cluster->getClusterWithSingleShard(shard_num - 1);
}
else
new_cluster = not_optimized_cluster->getClusterWithReplicasAsShards(settings);
auto all_replicas_count = std::min(static_cast<size_t>(settings.max_parallel_replicas), new_cluster->getShardCount()); auto all_replicas_count = std::min(static_cast<size_t>(settings.max_parallel_replicas), new_cluster->getShardCount());
auto coordinator = std::make_shared<ParallelReplicasReadingCoordinator>(all_replicas_count); auto coordinator = std::make_shared<ParallelReplicasReadingCoordinator>(all_replicas_count);
@ -284,8 +305,6 @@ void executeQueryWithParallelReplicas(
/// to then tell it about the reading method we chose. /// to then tell it about the reading method we chose.
query_info.coordinator = coordinator; query_info.coordinator = coordinator;
auto new_context = Context::createCopy(context);
auto scalars = new_context->hasQueryContext() ? new_context->getQueryContext()->getScalars() : Scalars{};
auto external_tables = new_context->getExternalTables(); auto external_tables = new_context->getExternalTables();
auto read_from_remote = std::make_unique<ReadFromParallelRemoteReplicasStep>( auto read_from_remote = std::make_unique<ReadFromParallelRemoteReplicasStep>(