Fix optimize_distributed_group_by_sharding_key for multiple columns

Before we incorrectly check that columns from GROUP BY was a subset of
columns from sharding key, while this is not right, consider the
following example:

    select k1, any(k2), sum(v) from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v), cityHash64(k1, k2)) group by k1

Here the columns from GROUP BY is a subset of columns from sharding key,
but the optimization cannot be applied, since there is no guarantee that
particular shard contains distinct values of k1.

So instead we should check that GROUP BY contains all columns that is
required for calculating sharding key expression, i.e.:

    select k1, k2, sum(v) from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v), cityHash64(k1, k2)) group by k1, k2
This commit is contained in:
Azat Khuzhin 2021-07-15 09:09:58 +03:00
parent 0893b9ff8e
commit 7b209694d5
3 changed files with 31 additions and 13 deletions

View File

@ -290,26 +290,27 @@ void replaceConstantExpressions(
/// - QueryProcessingStage::WithMergeableStateAfterAggregation
/// - QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit
/// - none (in this case regular WithMergeableState should be used)
std::optional<QueryProcessingStage::Enum> getOptimizedQueryProcessingStage(const SelectQueryInfo & query_info, bool extremes, const Block & sharding_key_block)
std::optional<QueryProcessingStage::Enum> getOptimizedQueryProcessingStage(const SelectQueryInfo & query_info, bool extremes, const Names & sharding_key_columns)
{
const auto & select = query_info.query->as<ASTSelectQuery &>();
auto sharding_block_has = [&](const auto & exprs, size_t limit = SIZE_MAX) -> bool
auto sharding_block_has = [&](const auto & exprs) -> bool
{
size_t i = 0;
std::unordered_set<std::string> expr_columns;
for (auto & expr : exprs)
{
++i;
if (i > limit)
break;
auto id = expr->template as<ASTIdentifier>();
if (!id)
return false;
/// TODO: if GROUP BY contains multiIf()/if() it should contain only columns from sharding_key
if (!sharding_key_block.has(id->name()))
continue;
expr_columns.emplace(id->name());
}
for (const auto & column : sharding_key_columns)
{
if (!expr_columns.contains(column))
return false;
}
return true;
};
@ -343,7 +344,7 @@ std::optional<QueryProcessingStage::Enum> getOptimizedQueryProcessingStage(const
}
else
{
if (!sharding_block_has(group_by->children, 1))
if (!sharding_block_has(group_by->children))
return {};
}
@ -547,8 +548,7 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(
has_sharding_key &&
(settings.allow_nondeterministic_optimize_skip_unused_shards || sharding_key_is_deterministic))
{
Block sharding_key_block = sharding_key_expr->getSampleBlock();
auto stage = getOptimizedQueryProcessingStage(query_info, settings.extremes, sharding_key_block);
auto stage = getOptimizedQueryProcessingStage(query_info, settings.extremes, sharding_key_expr->getRequiredColumns());
if (stage)
{
LOG_DEBUG(log, "Force processing stage to {}", QueryProcessingStage::toString(*stage));

View File

@ -115,6 +115,7 @@ GROUP BY WITH TOTALS LIMIT
2 0
4 0
GROUP BY (compound)
GROUP BY sharding_key, ...
0 0
1 0
@ -123,6 +124,15 @@ GROUP BY sharding_key, ...
GROUP BY ..., sharding_key
0 0
1 0
0 0
1 0
sharding_key (compound)
1 2 3
1 2 3
1 2 6
1 2
1 2
2
window functions
0 0
1 0

View File

@ -97,6 +97,7 @@ select 'GROUP BY WITH TOTALS LIMIT';
select count(), * from dist_01247 group by number with totals limit 1;
-- GROUP BY (compound)
select 'GROUP BY (compound)';
drop table if exists dist_01247;
drop table if exists data_01247;
create table data_01247 engine=Memory() as select number key, 0 value from numbers(2);
@ -106,6 +107,13 @@ select * from dist_01247 group by key, value;
select 'GROUP BY ..., sharding_key';
select * from dist_01247 group by value, key;
-- sharding_key (compound)
select 'sharding_key (compound)';
select k1, k2, sum(v) from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v), cityHash64(k1, k2)) group by k1, k2; -- optimization applied
select k1, any(k2), sum(v) from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v), cityHash64(k1, k2)) group by k1; -- optimization does not applied
select distinct k1, k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v), cityHash64(k1, k2)); -- optimization applied
select distinct on (k1) k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v), cityHash64(k1, k2)); -- optimization does not applied
-- window functions
select 'window functions';
select key, sum(sum(value)) over (rows unbounded preceding) from dist_01247 group by key settings allow_experimental_window_functions=1;