Fix distributed queries with zero shards and aggregation

This commit is contained in:
Azat Khuzhin 2021-08-08 12:38:24 +03:00
parent 560e71dcfa
commit 702d9955c0
3 changed files with 21 additions and 5 deletions

View File

@ -415,17 +415,22 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(
ClusterPtr cluster = getCluster();
query_info.cluster = cluster;
size_t nodes = getClusterQueriedNodes(settings, cluster);
/// Always calculate optimized cluster here, to avoid conditions during read()
/// (Anyway it will be calculated in the read())
if (getClusterQueriedNodes(settings, cluster) > 1 && settings.optimize_skip_unused_shards)
if (nodes > 1 && settings.optimize_skip_unused_shards)
{
ClusterPtr optimized_cluster = getOptimizedCluster(local_context, metadata_snapshot, query_info.query);
if (optimized_cluster)
{
LOG_DEBUG(log, "Skipping irrelevant shards - the query will be sent to the following shards of the cluster (shard numbers): {}",
makeFormattedListOfShards(optimized_cluster));
cluster = optimized_cluster;
query_info.optimized_cluster = cluster;
nodes = getClusterQueriedNodes(settings, cluster);
}
else
{
@ -460,7 +465,7 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(
/// If there is only one node, the query can be fully processed by the
/// shard, initiator will work as a proxy only.
if (getClusterQueriedNodes(settings, cluster) == 1)
if (nodes == 1)
{
/// In case the query was processed to
/// WithMergeableStateAfterAggregation/WithMergeableStateAfterAggregationAndLimit
@ -469,6 +474,13 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(
/// relevant for Distributed over Distributed
return std::max(to_stage, QueryProcessingStage::Complete);
}
else if (nodes == 0)
{
/// In case of 0 shards, the query should be processed fully on the initiator,
/// since we need to apply aggregations.
/// That's why we need to return FetchColumns.
return QueryProcessingStage::FetchColumns;
}
auto optimized_stage = getOptimizedQueryProcessingStage(query_info, settings);
if (optimized_stage)

View File

@ -0,0 +1,4 @@
-- { echo }
select * from remote('127.{1,2}', system, one, dummy) where 0 settings optimize_skip_unused_shards=1, force_optimize_skip_unused_shards=1;
select count() from remote('127.{1,2}', system, one, dummy) where 0 settings optimize_skip_unused_shards=1, force_optimize_skip_unused_shards=1;
0

View File

@ -1,3 +1,3 @@
create table dist_01756 (dummy UInt8) ENGINE = Distributed('test_cluster_two_shards', 'system', 'one', dummy);
select ignore(1), * from dist_01756 where 0 settings optimize_skip_unused_shards=1, force_optimize_skip_unused_shards=1;
drop table dist_01756;
-- { echo }
select * from remote('127.{1,2}', system, one, dummy) where 0 settings optimize_skip_unused_shards=1, force_optimize_skip_unused_shards=1;
select count() from remote('127.{1,2}', system, one, dummy) where 0 settings optimize_skip_unused_shards=1, force_optimize_skip_unused_shards=1;