From 97851bde088ab3289064b808da7dd63d541d8ea7 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Tue, 3 Aug 2021 10:10:08 +0300 Subject: [PATCH] Fix Distributed over Distributed for WithMergeableStateAfterAggregation* stages In case if one Distributed has multiple shards, and underlying Distributed has only one, there can be the case when the query will be tried to process from Complete to WithMergeableStateAfterAggregation, which is obviously wrong. --- src/Storages/StorageDistributed.cpp | 15 ++++++++++++++- ...t_WithMergeableStateAfterAggregation.reference | 8 ++++++++ ...on_dist_WithMergeableStateAfterAggregation.sql | 6 ++++++ 3 files changed, 28 insertions(+), 1 deletion(-) create mode 100644 tests/queries/0_stateless/02001_dist_on_dist_WithMergeableStateAfterAggregation.reference create mode 100644 tests/queries/0_stateless/02001_dist_on_dist_WithMergeableStateAfterAggregation.sql diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 2bbb92cf0b8..8f9c4bcc655 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -447,6 +447,8 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage( { /// NOTE: distributed_group_by_no_merge=1 does not respect distributed_push_down_limit /// (since in this case queries processed separately and the initiator is just a proxy in this case). + if (to_stage != QueryProcessingStage::Complete) + throw Exception("Queries with distributed_group_by_no_merge=1 should be processed to Complete stage", ErrorCodes::LOGICAL_ERROR); return QueryProcessingStage::Complete; } } @@ -459,11 +461,22 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage( /// If there is only one node, the query can be fully processed by the /// shard, initiator will work as a proxy only. if (getClusterQueriedNodes(settings, cluster) == 1) - return QueryProcessingStage::Complete; + { + /// In case the query was processed to + /// WithMergeableStateAfterAggregation/WithMergeableStateAfterAggregationAndLimit + /// (which are greater the Complete stage) + /// we cannot return Complete (will break aliases and similar), + /// relevant for Distributed over Distributed + return std::max(to_stage, QueryProcessingStage::Complete); + } auto optimized_stage = getOptimizedQueryProcessingStage(query_info, settings); if (optimized_stage) + { + if (*optimized_stage == QueryProcessingStage::Complete) + return std::min(to_stage, *optimized_stage); return *optimized_stage; + } return QueryProcessingStage::WithMergeableState; } diff --git a/tests/queries/0_stateless/02001_dist_on_dist_WithMergeableStateAfterAggregation.reference b/tests/queries/0_stateless/02001_dist_on_dist_WithMergeableStateAfterAggregation.reference new file mode 100644 index 00000000000..6c680840239 --- /dev/null +++ b/tests/queries/0_stateless/02001_dist_on_dist_WithMergeableStateAfterAggregation.reference @@ -0,0 +1,8 @@ +-- { echo } +select dummy as foo from remote('127.{2,3}', currentDatabase(), dist) limit 1 settings prefer_localhost_replica=0, distributed_push_down_limit=0; +0 +select dummy as foo from remote('127.{2,3}', currentDatabase(), dist) limit 1 settings prefer_localhost_replica=0, distributed_push_down_limit=1; +0 +select dummy as foo from remote('127.{2,3}', currentDatabase(), dist) limit 1 settings prefer_localhost_replica=0, distributed_group_by_no_merge=1; +0 +0 diff --git a/tests/queries/0_stateless/02001_dist_on_dist_WithMergeableStateAfterAggregation.sql b/tests/queries/0_stateless/02001_dist_on_dist_WithMergeableStateAfterAggregation.sql new file mode 100644 index 00000000000..0925df1888d --- /dev/null +++ b/tests/queries/0_stateless/02001_dist_on_dist_WithMergeableStateAfterAggregation.sql @@ -0,0 +1,6 @@ +drop table if exists dist; +create table dist as system.one engine=Distributed('test_shard_localhost', system, one); +-- { echo } +select dummy as foo from remote('127.{2,3}', currentDatabase(), dist) limit 1 settings prefer_localhost_replica=0, distributed_push_down_limit=0; +select dummy as foo from remote('127.{2,3}', currentDatabase(), dist) limit 1 settings prefer_localhost_replica=0, distributed_push_down_limit=1; +select dummy as foo from remote('127.{2,3}', currentDatabase(), dist) limit 1 settings prefer_localhost_replica=0, distributed_group_by_no_merge=1;