From 7b209694d500cfcdf753e76a7d63d4aee1d7098b Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 15 Jul 2021 09:09:58 +0300 Subject: [PATCH] Fix optimize_distributed_group_by_sharding_key for multiple columns Before we incorrectly check that columns from GROUP BY was a subset of columns from sharding key, while this is not right, consider the following example: select k1, any(k2), sum(v) from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v), cityHash64(k1, k2)) group by k1 Here the columns from GROUP BY is a subset of columns from sharding key, but the optimization cannot be applied, since there is no guarantee that particular shard contains distinct values of k1. So instead we should check that GROUP BY contains all columns that is required for calculating sharding key expression, i.e.: select k1, k2, sum(v) from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v), cityHash64(k1, k2)) group by k1, k2 --- src/Storages/StorageDistributed.cpp | 26 +++++++++---------- ...istributed_group_by_sharding_key.reference | 10 +++++++ ...mize_distributed_group_by_sharding_key.sql | 8 ++++++ 3 files changed, 31 insertions(+), 13 deletions(-) diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 21fa06e19f0..7fa59179aed 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -290,26 +290,27 @@ void replaceConstantExpressions( /// - QueryProcessingStage::WithMergeableStateAfterAggregation /// - QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit /// - none (in this case regular WithMergeableState should be used) -std::optional getOptimizedQueryProcessingStage(const SelectQueryInfo & query_info, bool extremes, const Block & sharding_key_block) +std::optional getOptimizedQueryProcessingStage(const SelectQueryInfo & query_info, bool extremes, const Names & sharding_key_columns) { const auto & select = query_info.query->as(); - auto sharding_block_has = [&](const auto & exprs, size_t limit = SIZE_MAX) -> bool + auto sharding_block_has = [&](const auto & exprs) -> bool { - size_t i = 0; + std::unordered_set expr_columns; for (auto & expr : exprs) { - ++i; - if (i > limit) - break; - auto id = expr->template as(); if (!id) - return false; - /// TODO: if GROUP BY contains multiIf()/if() it should contain only columns from sharding_key - if (!sharding_key_block.has(id->name())) + continue; + expr_columns.emplace(id->name()); + } + + for (const auto & column : sharding_key_columns) + { + if (!expr_columns.contains(column)) return false; } + return true; }; @@ -343,7 +344,7 @@ std::optional getOptimizedQueryProcessingStage(const } else { - if (!sharding_block_has(group_by->children, 1)) + if (!sharding_block_has(group_by->children)) return {}; } @@ -547,8 +548,7 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage( has_sharding_key && (settings.allow_nondeterministic_optimize_skip_unused_shards || sharding_key_is_deterministic)) { - Block sharding_key_block = sharding_key_expr->getSampleBlock(); - auto stage = getOptimizedQueryProcessingStage(query_info, settings.extremes, sharding_key_block); + auto stage = getOptimizedQueryProcessingStage(query_info, settings.extremes, sharding_key_expr->getRequiredColumns()); if (stage) { LOG_DEBUG(log, "Force processing stage to {}", QueryProcessingStage::toString(*stage)); diff --git a/tests/queries/0_stateless/01244_optimize_distributed_group_by_sharding_key.reference b/tests/queries/0_stateless/01244_optimize_distributed_group_by_sharding_key.reference index acaf6531101..4442b0b6b61 100644 --- a/tests/queries/0_stateless/01244_optimize_distributed_group_by_sharding_key.reference +++ b/tests/queries/0_stateless/01244_optimize_distributed_group_by_sharding_key.reference @@ -115,6 +115,7 @@ GROUP BY WITH TOTALS LIMIT 2 0 4 0 +GROUP BY (compound) GROUP BY sharding_key, ... 0 0 1 0 @@ -123,6 +124,15 @@ GROUP BY sharding_key, ... GROUP BY ..., sharding_key 0 0 1 0 +0 0 +1 0 +sharding_key (compound) +1 2 3 +1 2 3 +1 2 6 +1 2 +1 2 +2 window functions 0 0 1 0 diff --git a/tests/queries/0_stateless/01244_optimize_distributed_group_by_sharding_key.sql b/tests/queries/0_stateless/01244_optimize_distributed_group_by_sharding_key.sql index 6b6300a4871..4719119165a 100644 --- a/tests/queries/0_stateless/01244_optimize_distributed_group_by_sharding_key.sql +++ b/tests/queries/0_stateless/01244_optimize_distributed_group_by_sharding_key.sql @@ -97,6 +97,7 @@ select 'GROUP BY WITH TOTALS LIMIT'; select count(), * from dist_01247 group by number with totals limit 1; -- GROUP BY (compound) +select 'GROUP BY (compound)'; drop table if exists dist_01247; drop table if exists data_01247; create table data_01247 engine=Memory() as select number key, 0 value from numbers(2); @@ -106,6 +107,13 @@ select * from dist_01247 group by key, value; select 'GROUP BY ..., sharding_key'; select * from dist_01247 group by value, key; +-- sharding_key (compound) +select 'sharding_key (compound)'; +select k1, k2, sum(v) from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v), cityHash64(k1, k2)) group by k1, k2; -- optimization applied +select k1, any(k2), sum(v) from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v), cityHash64(k1, k2)) group by k1; -- optimization does not applied +select distinct k1, k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v), cityHash64(k1, k2)); -- optimization applied +select distinct on (k1) k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v), cityHash64(k1, k2)); -- optimization does not applied + -- window functions select 'window functions'; select key, sum(sum(value)) over (rows unbounded preceding) from dist_01247 group by key settings allow_experimental_window_functions=1;