Merge pull request #26353 from azat/optimize_distributed_group_by_sharding_key-fix

Fix optimize_distributed_group_by_sharding_key for multiple columns
This commit is contained in:
alexey-milovidov 2021-07-17 01:45:10 +03:00 committed by GitHub
commit 1701cc429d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 31 additions and 13 deletions

View File

@ -290,26 +290,27 @@ void replaceConstantExpressions(
/// - QueryProcessingStage::WithMergeableStateAfterAggregation
/// - QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit
/// - none (in this case regular WithMergeableState should be used)
std::optional<QueryProcessingStage::Enum> getOptimizedQueryProcessingStage(const SelectQueryInfo & query_info, bool extremes, const Block & sharding_key_block)
std::optional<QueryProcessingStage::Enum> getOptimizedQueryProcessingStage(const SelectQueryInfo & query_info, bool extremes, const Names & sharding_key_columns)
{
const auto & select = query_info.query->as<ASTSelectQuery &>();
auto sharding_block_has = [&](const auto & exprs, size_t limit = SIZE_MAX) -> bool
auto sharding_block_has = [&](const auto & exprs) -> bool
{
size_t i = 0;
std::unordered_set<std::string> expr_columns;
for (auto & expr : exprs)
{
++i;
if (i > limit)
break;
auto id = expr->template as<ASTIdentifier>();
if (!id)
return false;
/// TODO: if GROUP BY contains multiIf()/if() it should contain only columns from sharding_key
if (!sharding_key_block.has(id->name()))
continue;
expr_columns.emplace(id->name());
}
for (const auto & column : sharding_key_columns)
{
if (!expr_columns.contains(column))
return false;
}
return true;
};
@ -343,7 +344,7 @@ std::optional<QueryProcessingStage::Enum> getOptimizedQueryProcessingStage(const
}
else
{
if (!sharding_block_has(group_by->children, 1))
if (!sharding_block_has(group_by->children))
return {};
}
@ -547,8 +548,7 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(
has_sharding_key &&
(settings.allow_nondeterministic_optimize_skip_unused_shards || sharding_key_is_deterministic))
{
Block sharding_key_block = sharding_key_expr->getSampleBlock();
auto stage = getOptimizedQueryProcessingStage(query_info, settings.extremes, sharding_key_block);
auto stage = getOptimizedQueryProcessingStage(query_info, settings.extremes, sharding_key_expr->getRequiredColumns());
if (stage)
{
LOG_DEBUG(log, "Force processing stage to {}", QueryProcessingStage::toString(*stage));

View File

@ -115,6 +115,7 @@ GROUP BY WITH TOTALS LIMIT
2 0
4 0
GROUP BY (compound)
GROUP BY sharding_key, ...
0 0
1 0
@ -123,6 +124,15 @@ GROUP BY sharding_key, ...
GROUP BY ..., sharding_key
0 0
1 0
0 0
1 0
sharding_key (compound)
1 2 3
1 2 3
1 2 6
1 2
1 2
2
window functions
0 0
1 0

View File

@ -97,6 +97,7 @@ select 'GROUP BY WITH TOTALS LIMIT';
select count(), * from dist_01247 group by number with totals limit 1;
-- GROUP BY (compound)
select 'GROUP BY (compound)';
drop table if exists dist_01247;
drop table if exists data_01247;
create table data_01247 engine=Memory() as select number key, 0 value from numbers(2);
@ -106,6 +107,13 @@ select * from dist_01247 group by key, value;
select 'GROUP BY ..., sharding_key';
select * from dist_01247 group by value, key;
-- sharding_key (compound)
select 'sharding_key (compound)';
select k1, k2, sum(v) from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v), cityHash64(k1, k2)) group by k1, k2; -- optimization applied
select k1, any(k2), sum(v) from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v), cityHash64(k1, k2)) group by k1; -- optimization does not applied
select distinct k1, k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v), cityHash64(k1, k2)); -- optimization applied
select distinct on (k1) k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v), cityHash64(k1, k2)); -- optimization does not applied
-- window functions
select 'window functions';
select key, sum(sum(value)) over (rows unbounded preceding) from dist_01247 group by key settings allow_experimental_window_functions=1;