Fix Distributed over Distributed for WithMergeableStateAfterAggregation* stages

In case if one Distributed has multiple shards, and underlying
Distributed has only one, there can be the case when the query will be
tried to process from Complete to WithMergeableStateAfterAggregation,
which is obviously wrong.
This commit is contained in:
Azat Khuzhin 2021-08-03 10:10:08 +03:00
parent ff12f5102a
commit 97851bde08
3 changed files with 28 additions and 1 deletions

View File

@ -447,6 +447,8 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(
{
/// NOTE: distributed_group_by_no_merge=1 does not respect distributed_push_down_limit
/// (since in this case queries processed separately and the initiator is just a proxy in this case).
if (to_stage != QueryProcessingStage::Complete)
throw Exception("Queries with distributed_group_by_no_merge=1 should be processed to Complete stage", ErrorCodes::LOGICAL_ERROR);
return QueryProcessingStage::Complete;
}
}
@ -459,11 +461,22 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(
/// If there is only one node, the query can be fully processed by the
/// shard, initiator will work as a proxy only.
if (getClusterQueriedNodes(settings, cluster) == 1)
return QueryProcessingStage::Complete;
{
/// In case the query was processed to
/// WithMergeableStateAfterAggregation/WithMergeableStateAfterAggregationAndLimit
/// (which are greater the Complete stage)
/// we cannot return Complete (will break aliases and similar),
/// relevant for Distributed over Distributed
return std::max(to_stage, QueryProcessingStage::Complete);
}
auto optimized_stage = getOptimizedQueryProcessingStage(query_info, settings);
if (optimized_stage)
{
if (*optimized_stage == QueryProcessingStage::Complete)
return std::min(to_stage, *optimized_stage);
return *optimized_stage;
}
return QueryProcessingStage::WithMergeableState;
}

View File

@ -0,0 +1,8 @@
-- { echo }
select dummy as foo from remote('127.{2,3}', currentDatabase(), dist) limit 1 settings prefer_localhost_replica=0, distributed_push_down_limit=0;
0
select dummy as foo from remote('127.{2,3}', currentDatabase(), dist) limit 1 settings prefer_localhost_replica=0, distributed_push_down_limit=1;
0
select dummy as foo from remote('127.{2,3}', currentDatabase(), dist) limit 1 settings prefer_localhost_replica=0, distributed_group_by_no_merge=1;
0
0

View File

@ -0,0 +1,6 @@
drop table if exists dist;
create table dist as system.one engine=Distributed('test_shard_localhost', system, one);
-- { echo }
select dummy as foo from remote('127.{2,3}', currentDatabase(), dist) limit 1 settings prefer_localhost_replica=0, distributed_push_down_limit=0;
select dummy as foo from remote('127.{2,3}', currentDatabase(), dist) limit 1 settings prefer_localhost_replica=0, distributed_push_down_limit=1;
select dummy as foo from remote('127.{2,3}', currentDatabase(), dist) limit 1 settings prefer_localhost_replica=0, distributed_group_by_no_merge=1;