diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 2bbb92cf0b8..8f9c4bcc655 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -447,6 +447,8 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage( { /// NOTE: distributed_group_by_no_merge=1 does not respect distributed_push_down_limit /// (since in this case queries processed separately and the initiator is just a proxy in this case). + if (to_stage != QueryProcessingStage::Complete) + throw Exception("Queries with distributed_group_by_no_merge=1 should be processed to Complete stage", ErrorCodes::LOGICAL_ERROR); return QueryProcessingStage::Complete; } } @@ -459,11 +461,22 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage( /// If there is only one node, the query can be fully processed by the /// shard, initiator will work as a proxy only. if (getClusterQueriedNodes(settings, cluster) == 1) - return QueryProcessingStage::Complete; + { + /// In case the query was processed to + /// WithMergeableStateAfterAggregation/WithMergeableStateAfterAggregationAndLimit + /// (which are greater the Complete stage) + /// we cannot return Complete (will break aliases and similar), + /// relevant for Distributed over Distributed + return std::max(to_stage, QueryProcessingStage::Complete); + } auto optimized_stage = getOptimizedQueryProcessingStage(query_info, settings); if (optimized_stage) + { + if (*optimized_stage == QueryProcessingStage::Complete) + return std::min(to_stage, *optimized_stage); return *optimized_stage; + } return QueryProcessingStage::WithMergeableState; } diff --git a/tests/queries/0_stateless/02001_dist_on_dist_WithMergeableStateAfterAggregation.reference b/tests/queries/0_stateless/02001_dist_on_dist_WithMergeableStateAfterAggregation.reference new file mode 100644 index 00000000000..6c680840239 --- /dev/null +++ b/tests/queries/0_stateless/02001_dist_on_dist_WithMergeableStateAfterAggregation.reference @@ -0,0 +1,8 @@ +-- { echo } +select dummy as foo from remote('127.{2,3}', currentDatabase(), dist) limit 1 settings prefer_localhost_replica=0, distributed_push_down_limit=0; +0 +select dummy as foo from remote('127.{2,3}', currentDatabase(), dist) limit 1 settings prefer_localhost_replica=0, distributed_push_down_limit=1; +0 +select dummy as foo from remote('127.{2,3}', currentDatabase(), dist) limit 1 settings prefer_localhost_replica=0, distributed_group_by_no_merge=1; +0 +0 diff --git a/tests/queries/0_stateless/02001_dist_on_dist_WithMergeableStateAfterAggregation.sql b/tests/queries/0_stateless/02001_dist_on_dist_WithMergeableStateAfterAggregation.sql new file mode 100644 index 00000000000..0925df1888d --- /dev/null +++ b/tests/queries/0_stateless/02001_dist_on_dist_WithMergeableStateAfterAggregation.sql @@ -0,0 +1,6 @@ +drop table if exists dist; +create table dist as system.one engine=Distributed('test_shard_localhost', system, one); +-- { echo } +select dummy as foo from remote('127.{2,3}', currentDatabase(), dist) limit 1 settings prefer_localhost_replica=0, distributed_push_down_limit=0; +select dummy as foo from remote('127.{2,3}', currentDatabase(), dist) limit 1 settings prefer_localhost_replica=0, distributed_push_down_limit=1; +select dummy as foo from remote('127.{2,3}', currentDatabase(), dist) limit 1 settings prefer_localhost_replica=0, distributed_group_by_no_merge=1;