Avoid running LIMIT BY/DISTINCT step on the initiator for optimize_distributed_group_by_sharding_key

Before the following queries was running LimitBy/Distinct step on the
initator:

  select distinct sharding_key from dist order by k

While this can be omitted.
This commit is contained in:
Azat Khuzhin 2021-07-17 20:21:13 +03:00
parent c830964240
commit ff12f5102a
6 changed files with 143 additions and 5 deletions

View File

@ -1341,10 +1341,10 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
/** If there was more than one stream, /** If there was more than one stream,
* then DISTINCT needs to be performed once again after merging all streams. * then DISTINCT needs to be performed once again after merging all streams.
*/ */
if (query.distinct) if (!from_aggregation_stage && query.distinct)
executeDistinct(query_plan, false, expressions.selected_columns, false); executeDistinct(query_plan, false, expressions.selected_columns, false);
if (expressions.hasLimitBy()) if (!from_aggregation_stage && expressions.hasLimitBy())
{ {
executeExpression(query_plan, expressions.before_limit_by, "Before LIMIT BY"); executeExpression(query_plan, expressions.before_limit_by, "Before LIMIT BY");
executeLimitBy(query_plan); executeLimitBy(query_plan);

View File

@ -529,15 +529,21 @@ std::optional<QueryProcessingStage::Enum> StorageDistributed::getOptimizedQueryP
return {}; return {};
} }
// LIMIT BY
if (const ASTPtr limit_by = select.limitBy())
{
if (!optimize_sharding_key_aggregation || !expr_contains_sharding_key(limit_by->children))
return {};
}
// ORDER BY // ORDER BY
const ASTPtr order_by = select.orderBy(); if (const ASTPtr order_by = select.orderBy())
if (order_by)
return default_stage; return default_stage;
// LIMIT BY // LIMIT BY
// LIMIT // LIMIT
// OFFSET // OFFSET
if (select.limitBy() || select.limitLength() || select.limitOffset()) if (select.limitLength() || select.limitOffset())
return default_stage; return default_stage;
// Only simple SELECT FROM GROUP BY sharding_key can use Complete state. // Only simple SELECT FROM GROUP BY sharding_key can use Complete state.

View File

@ -25,6 +25,8 @@ ORDER BY LIMIT
LIMIT BY LIMIT BY
0 0
1 1
0
1
LIMIT BY LIMIT LIMIT BY LIMIT
0 0
GROUP BY ORDER BY GROUP BY ORDER BY

View File

@ -67,6 +67,8 @@ WHERE LIMIT OFFSET
1 1 1 1
LIMIT BY 1 LIMIT BY 1
1 0 1 0
1 0
1 1
1 1 1 1
GROUP BY (Distributed-over-Distributed) GROUP BY (Distributed-over-Distributed)
4 0 4 0

View File

@ -0,0 +1,115 @@
-- { echo }
explain select distinct k1 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)); -- not optimized
Expression (Projection)
Distinct
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
Distinct (Preliminary DISTINCT)
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (SystemNumbers)
ReadFromRemote (Read from remote replica)
explain select distinct k1, k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)); -- optimized
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
Expression (Projection)
Distinct
Distinct (Preliminary DISTINCT)
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (SystemNumbers)
ReadFromRemote (Read from remote replica)
explain select distinct on (k1) k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)); -- not optimized
Expression (Projection)
LimitBy
Expression (Before LIMIT BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
LimitBy
Expression ((Before LIMIT BY + Before ORDER BY))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (SystemNumbers)
ReadFromRemote (Read from remote replica)
explain select distinct on (k1, k2) v from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)); -- optimized
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
Expression (Projection)
LimitBy
Expression ((Before LIMIT BY + Before ORDER BY))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (SystemNumbers)
ReadFromRemote (Read from remote replica)
explain select distinct k1 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- not optimized
Expression (Projection)
Distinct
MergingSorted (Merge sorted streams for ORDER BY, without aggregation)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
MergingSorted (Merge sorted streams for ORDER BY)
MergeSorting (Merge sorted blocks for ORDER BY)
PartialSorting (Sort each block for ORDER BY)
Distinct (Preliminary DISTINCT)
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (SystemNumbers)
ReadFromRemote (Read from remote replica)
explain select distinct k1, k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- optimized
Expression (Projection)
MergingSorted (Merge sorted streams after aggregation stage for ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
Distinct
MergingSorted (Merge sorted streams for ORDER BY)
MergeSorting (Merge sorted blocks for ORDER BY)
PartialSorting (Sort each block for ORDER BY)
Distinct (Preliminary DISTINCT)
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (SystemNumbers)
ReadFromRemote (Read from remote replica)
explain select distinct on (k1) k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- not optimized
Expression (Projection)
LimitBy
Expression (Before LIMIT BY)
MergingSorted (Merge sorted streams for ORDER BY, without aggregation)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
LimitBy
Expression (Before LIMIT BY)
MergingSorted (Merge sorted streams for ORDER BY)
MergeSorting (Merge sorted blocks for ORDER BY)
PartialSorting (Sort each block for ORDER BY)
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (SystemNumbers)
ReadFromRemote (Read from remote replica)
explain select distinct on (k1, k2) v from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- optimized
Expression (Projection)
MergingSorted (Merge sorted streams after aggregation stage for ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
LimitBy
Expression (Before LIMIT BY)
MergingSorted (Merge sorted streams for ORDER BY)
MergeSorting (Merge sorted blocks for ORDER BY)
PartialSorting (Sort each block for ORDER BY)
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (SystemNumbers)
ReadFromRemote (Read from remote replica)

View File

@ -0,0 +1,13 @@
set optimize_skip_unused_shards=1;
set optimize_distributed_group_by_sharding_key=1;
-- { echo }
explain select distinct k1 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)); -- not optimized
explain select distinct k1, k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)); -- optimized
explain select distinct on (k1) k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)); -- not optimized
explain select distinct on (k1, k2) v from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)); -- optimized
explain select distinct k1 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- not optimized
explain select distinct k1, k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- optimized
explain select distinct on (k1) k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- not optimized
explain select distinct on (k1, k2) v from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- optimized