diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index c126f3bca88..a5b6ff061bc 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -1341,10 +1341,10 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu /** If there was more than one stream, * then DISTINCT needs to be performed once again after merging all streams. */ - if (query.distinct) + if (!from_aggregation_stage && query.distinct) executeDistinct(query_plan, false, expressions.selected_columns, false); - if (expressions.hasLimitBy()) + if (!from_aggregation_stage && expressions.hasLimitBy()) { executeExpression(query_plan, expressions.before_limit_by, "Before LIMIT BY"); executeLimitBy(query_plan); diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 85a2efb9963..2bbb92cf0b8 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -529,15 +529,21 @@ std::optional StorageDistributed::getOptimizedQueryP return {}; } + // LIMIT BY + if (const ASTPtr limit_by = select.limitBy()) + { + if (!optimize_sharding_key_aggregation || !expr_contains_sharding_key(limit_by->children)) + return {}; + } + // ORDER BY - const ASTPtr order_by = select.orderBy(); - if (order_by) + if (const ASTPtr order_by = select.orderBy()) return default_stage; // LIMIT BY // LIMIT // OFFSET - if (select.limitBy() || select.limitLength() || select.limitOffset()) + if (select.limitLength() || select.limitOffset()) return default_stage; // Only simple SELECT FROM GROUP BY sharding_key can use Complete state. diff --git a/tests/queries/0_stateless/00184_shard_distributed_group_by_no_merge.reference b/tests/queries/0_stateless/00184_shard_distributed_group_by_no_merge.reference index b667c57a14c..b2b0b43e490 100644 --- a/tests/queries/0_stateless/00184_shard_distributed_group_by_no_merge.reference +++ b/tests/queries/0_stateless/00184_shard_distributed_group_by_no_merge.reference @@ -25,6 +25,8 @@ ORDER BY LIMIT LIMIT BY 0 1 +0 +1 LIMIT BY LIMIT 0 GROUP BY ORDER BY diff --git a/tests/queries/0_stateless/01244_optimize_distributed_group_by_sharding_key.reference b/tests/queries/0_stateless/01244_optimize_distributed_group_by_sharding_key.reference index 8d356a6966f..a4a6b87de25 100644 --- a/tests/queries/0_stateless/01244_optimize_distributed_group_by_sharding_key.reference +++ b/tests/queries/0_stateless/01244_optimize_distributed_group_by_sharding_key.reference @@ -67,6 +67,8 @@ WHERE LIMIT OFFSET 1 1 LIMIT BY 1 1 0 +1 0 +1 1 1 1 GROUP BY (Distributed-over-Distributed) 4 0 diff --git a/tests/queries/0_stateless/01952_optimize_distributed_group_by_sharding_key.reference b/tests/queries/0_stateless/01952_optimize_distributed_group_by_sharding_key.reference new file mode 100644 index 00000000000..10787068f43 --- /dev/null +++ b/tests/queries/0_stateless/01952_optimize_distributed_group_by_sharding_key.reference @@ -0,0 +1,115 @@ +-- { echo } +explain select distinct k1 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)); -- not optimized +Expression (Projection) + Distinct + SettingQuotaAndLimits (Set limits and quota after reading from storage) + Union + Distinct (Preliminary DISTINCT) + Expression (Before ORDER BY) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY)))) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + ReadFromStorage (SystemNumbers) + ReadFromRemote (Read from remote replica) +explain select distinct k1, k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)); -- optimized +SettingQuotaAndLimits (Set limits and quota after reading from storage) + Union + Expression (Projection) + Distinct + Distinct (Preliminary DISTINCT) + Expression (Before ORDER BY) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY)))) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + ReadFromStorage (SystemNumbers) + ReadFromRemote (Read from remote replica) +explain select distinct on (k1) k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)); -- not optimized +Expression (Projection) + LimitBy + Expression (Before LIMIT BY) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + Union + LimitBy + Expression ((Before LIMIT BY + Before ORDER BY)) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY)))) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + ReadFromStorage (SystemNumbers) + ReadFromRemote (Read from remote replica) +explain select distinct on (k1, k2) v from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)); -- optimized +SettingQuotaAndLimits (Set limits and quota after reading from storage) + Union + Expression (Projection) + LimitBy + Expression ((Before LIMIT BY + Before ORDER BY)) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY)))) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + ReadFromStorage (SystemNumbers) + ReadFromRemote (Read from remote replica) +explain select distinct k1 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- not optimized +Expression (Projection) + Distinct + MergingSorted (Merge sorted streams for ORDER BY, without aggregation) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + Union + MergingSorted (Merge sorted streams for ORDER BY) + MergeSorting (Merge sorted blocks for ORDER BY) + PartialSorting (Sort each block for ORDER BY) + Distinct (Preliminary DISTINCT) + Expression (Before ORDER BY) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY)))) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + ReadFromStorage (SystemNumbers) + ReadFromRemote (Read from remote replica) +explain select distinct k1, k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- optimized +Expression (Projection) + MergingSorted (Merge sorted streams after aggregation stage for ORDER BY) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + Union + Distinct + MergingSorted (Merge sorted streams for ORDER BY) + MergeSorting (Merge sorted blocks for ORDER BY) + PartialSorting (Sort each block for ORDER BY) + Distinct (Preliminary DISTINCT) + Expression (Before ORDER BY) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY)))) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + ReadFromStorage (SystemNumbers) + ReadFromRemote (Read from remote replica) +explain select distinct on (k1) k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- not optimized +Expression (Projection) + LimitBy + Expression (Before LIMIT BY) + MergingSorted (Merge sorted streams for ORDER BY, without aggregation) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + Union + LimitBy + Expression (Before LIMIT BY) + MergingSorted (Merge sorted streams for ORDER BY) + MergeSorting (Merge sorted blocks for ORDER BY) + PartialSorting (Sort each block for ORDER BY) + Expression (Before ORDER BY) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY)))) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + ReadFromStorage (SystemNumbers) + ReadFromRemote (Read from remote replica) +explain select distinct on (k1, k2) v from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- optimized +Expression (Projection) + MergingSorted (Merge sorted streams after aggregation stage for ORDER BY) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + Union + LimitBy + Expression (Before LIMIT BY) + MergingSorted (Merge sorted streams for ORDER BY) + MergeSorting (Merge sorted blocks for ORDER BY) + PartialSorting (Sort each block for ORDER BY) + Expression (Before ORDER BY) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY)))) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + ReadFromStorage (SystemNumbers) + ReadFromRemote (Read from remote replica) diff --git a/tests/queries/0_stateless/01952_optimize_distributed_group_by_sharding_key.sql b/tests/queries/0_stateless/01952_optimize_distributed_group_by_sharding_key.sql new file mode 100644 index 00000000000..2ae872f72b0 --- /dev/null +++ b/tests/queries/0_stateless/01952_optimize_distributed_group_by_sharding_key.sql @@ -0,0 +1,13 @@ +set optimize_skip_unused_shards=1; +set optimize_distributed_group_by_sharding_key=1; + +-- { echo } +explain select distinct k1 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)); -- not optimized +explain select distinct k1, k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)); -- optimized +explain select distinct on (k1) k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)); -- not optimized +explain select distinct on (k1, k2) v from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)); -- optimized + +explain select distinct k1 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- not optimized +explain select distinct k1, k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- optimized +explain select distinct on (k1) k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- not optimized +explain select distinct on (k1, k2) v from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- optimized