From 702d9955c0e04b15203b250f95ba4eae5347df9f Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Sun, 8 Aug 2021 12:38:24 +0300 Subject: [PATCH] Fix distributed queries with zero shards and aggregation --- src/Storages/StorageDistributed.cpp | 16 ++++++++++++++-- ...mize_skip_unused_shards_zero_shards.reference | 4 ++++ ...9_optimize_skip_unused_shards_zero_shards.sql | 6 +++--- 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index bad2abf3cdc..fcd0e255e5c 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -415,17 +415,22 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage( ClusterPtr cluster = getCluster(); query_info.cluster = cluster; + size_t nodes = getClusterQueriedNodes(settings, cluster); + /// Always calculate optimized cluster here, to avoid conditions during read() /// (Anyway it will be calculated in the read()) - if (getClusterQueriedNodes(settings, cluster) > 1 && settings.optimize_skip_unused_shards) + if (nodes > 1 && settings.optimize_skip_unused_shards) { ClusterPtr optimized_cluster = getOptimizedCluster(local_context, metadata_snapshot, query_info.query); if (optimized_cluster) { LOG_DEBUG(log, "Skipping irrelevant shards - the query will be sent to the following shards of the cluster (shard numbers): {}", makeFormattedListOfShards(optimized_cluster)); + cluster = optimized_cluster; query_info.optimized_cluster = cluster; + + nodes = getClusterQueriedNodes(settings, cluster); } else { @@ -460,7 +465,7 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage( /// If there is only one node, the query can be fully processed by the /// shard, initiator will work as a proxy only. - if (getClusterQueriedNodes(settings, cluster) == 1) + if (nodes == 1) { /// In case the query was processed to /// WithMergeableStateAfterAggregation/WithMergeableStateAfterAggregationAndLimit @@ -469,6 +474,13 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage( /// relevant for Distributed over Distributed return std::max(to_stage, QueryProcessingStage::Complete); } + else if (nodes == 0) + { + /// In case of 0 shards, the query should be processed fully on the initiator, + /// since we need to apply aggregations. + /// That's why we need to return FetchColumns. + return QueryProcessingStage::FetchColumns; + } auto optimized_stage = getOptimizedQueryProcessingStage(query_info, settings); if (optimized_stage) diff --git a/tests/queries/0_stateless/01759_optimize_skip_unused_shards_zero_shards.reference b/tests/queries/0_stateless/01759_optimize_skip_unused_shards_zero_shards.reference index e69de29bb2d..109c1835a6e 100644 --- a/tests/queries/0_stateless/01759_optimize_skip_unused_shards_zero_shards.reference +++ b/tests/queries/0_stateless/01759_optimize_skip_unused_shards_zero_shards.reference @@ -0,0 +1,4 @@ +-- { echo } +select * from remote('127.{1,2}', system, one, dummy) where 0 settings optimize_skip_unused_shards=1, force_optimize_skip_unused_shards=1; +select count() from remote('127.{1,2}', system, one, dummy) where 0 settings optimize_skip_unused_shards=1, force_optimize_skip_unused_shards=1; +0 diff --git a/tests/queries/0_stateless/01759_optimize_skip_unused_shards_zero_shards.sql b/tests/queries/0_stateless/01759_optimize_skip_unused_shards_zero_shards.sql index 2ddf318313f..08b9581ace2 100644 --- a/tests/queries/0_stateless/01759_optimize_skip_unused_shards_zero_shards.sql +++ b/tests/queries/0_stateless/01759_optimize_skip_unused_shards_zero_shards.sql @@ -1,3 +1,3 @@ -create table dist_01756 (dummy UInt8) ENGINE = Distributed('test_cluster_two_shards', 'system', 'one', dummy); -select ignore(1), * from dist_01756 where 0 settings optimize_skip_unused_shards=1, force_optimize_skip_unused_shards=1; -drop table dist_01756; +-- { echo } +select * from remote('127.{1,2}', system, one, dummy) where 0 settings optimize_skip_unused_shards=1, force_optimize_skip_unused_shards=1; +select count() from remote('127.{1,2}', system, one, dummy) where 0 settings optimize_skip_unused_shards=1, force_optimize_skip_unused_shards=1;