diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index f3efad0d699..9e0185b96cd 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -1346,10 +1346,10 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu /** If there was more than one stream, * then DISTINCT needs to be performed once again after merging all streams. */ - if (query.distinct) + if (!from_aggregation_stage && query.distinct) executeDistinct(query_plan, false, expressions.selected_columns, false); - if (expressions.hasLimitBy()) + if (!from_aggregation_stage && expressions.hasLimitBy()) { executeExpression(query_plan, expressions.before_limit_by, "Before LIMIT BY"); executeLimitBy(query_plan); diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 9aaa6692560..ff734d02462 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -284,86 +284,6 @@ void replaceConstantExpressions( visitor.visit(node); } -/// This is the implementation of optimize_distributed_group_by_sharding_key. -/// It returns up to which stage the query can be processed on a shard, which -/// is one of the following: -/// - QueryProcessingStage::Complete -/// - QueryProcessingStage::WithMergeableStateAfterAggregation -/// - QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit -/// - none (in this case regular WithMergeableState should be used) -std::optional getOptimizedQueryProcessingStage(const SelectQueryInfo & query_info, bool extremes, const Names & sharding_key_columns) -{ - const auto & select = query_info.query->as(); - - auto sharding_block_has = [&](const auto & exprs) -> bool - { - std::unordered_set expr_columns; - for (auto & expr : exprs) - { - auto id = expr->template as(); - if (!id) - continue; - expr_columns.emplace(id->name()); - } - - for (const auto & column : sharding_key_columns) - { - if (!expr_columns.contains(column)) - return false; - } - - return true; - }; - - // GROUP BY qualifiers - // - TODO: WITH TOTALS can be implemented - // - TODO: WITH ROLLUP can be implemented (I guess) - if (select.group_by_with_totals || select.group_by_with_rollup || select.group_by_with_cube) - return {}; - - // Window functions are not supported. - if (query_info.has_window) - return {}; - - // TODO: extremes support can be implemented - if (extremes) - return {}; - - // DISTINCT - if (select.distinct) - { - if (!sharding_block_has(select.select()->children)) - return {}; - } - - // GROUP BY - const ASTPtr group_by = select.groupBy(); - if (!group_by) - { - if (!select.distinct) - return {}; - } - else - { - if (!sharding_block_has(group_by->children)) - return {}; - } - - // ORDER BY - const ASTPtr order_by = select.orderBy(); - if (order_by) - return QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit; - - // LIMIT BY - // LIMIT - // OFFSET - if (select.limitBy() || select.limitLength() || select.limitOffset()) - return QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit; - - // Only simple SELECT FROM GROUP BY sharding_key can use Complete state. - return QueryProcessingStage::Complete; -} - size_t getClusterQueriedNodes(const Settings & settings, const ClusterPtr & cluster) { size_t num_local_shards = cluster->getLocalShardCount(); @@ -527,13 +447,12 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage( { /// NOTE: distributed_group_by_no_merge=1 does not respect distributed_push_down_limit /// (since in this case queries processed separately and the initiator is just a proxy in this case). + if (to_stage != QueryProcessingStage::Complete) + throw Exception("Queries with distributed_group_by_no_merge=1 should be processed to Complete stage", ErrorCodes::LOGICAL_ERROR); return QueryProcessingStage::Complete; } } - if (settings.distributed_push_down_limit) - return QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit; - /// Nested distributed query cannot return Complete stage, /// since the parent query need to aggregate the results after. if (to_stage == QueryProcessingStage::WithMergeableState) @@ -542,24 +461,108 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage( /// If there is only one node, the query can be fully processed by the /// shard, initiator will work as a proxy only. if (getClusterQueriedNodes(settings, cluster) == 1) - return QueryProcessingStage::Complete; - - if (settings.optimize_skip_unused_shards && - settings.optimize_distributed_group_by_sharding_key && - has_sharding_key && - (settings.allow_nondeterministic_optimize_skip_unused_shards || sharding_key_is_deterministic)) { - auto stage = getOptimizedQueryProcessingStage(query_info, settings.extremes, sharding_key_expr->getRequiredColumns()); - if (stage) - { - LOG_DEBUG(log, "Force processing stage to {}", QueryProcessingStage::toString(*stage)); - return *stage; - } + /// In case the query was processed to + /// WithMergeableStateAfterAggregation/WithMergeableStateAfterAggregationAndLimit + /// (which are greater the Complete stage) + /// we cannot return Complete (will break aliases and similar), + /// relevant for Distributed over Distributed + return std::max(to_stage, QueryProcessingStage::Complete); + } + + auto optimized_stage = getOptimizedQueryProcessingStage(query_info, settings); + if (optimized_stage) + { + if (*optimized_stage == QueryProcessingStage::Complete) + return std::min(to_stage, *optimized_stage); + return *optimized_stage; } return QueryProcessingStage::WithMergeableState; } +std::optional StorageDistributed::getOptimizedQueryProcessingStage(const SelectQueryInfo & query_info, const Settings & settings) const +{ + bool optimize_sharding_key_aggregation = + settings.optimize_skip_unused_shards && + settings.optimize_distributed_group_by_sharding_key && + has_sharding_key && + (settings.allow_nondeterministic_optimize_skip_unused_shards || sharding_key_is_deterministic); + + QueryProcessingStage::Enum default_stage = QueryProcessingStage::WithMergeableStateAfterAggregation; + if (settings.distributed_push_down_limit) + default_stage = QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit; + + const auto & select = query_info.query->as(); + + auto expr_contains_sharding_key = [&](const auto & exprs) -> bool + { + std::unordered_set expr_columns; + for (auto & expr : exprs) + { + auto id = expr->template as(); + if (!id) + continue; + expr_columns.emplace(id->name()); + } + + for (const auto & column : sharding_key_expr->getRequiredColumns()) + { + if (!expr_columns.contains(column)) + return false; + } + + return true; + }; + + // GROUP BY qualifiers + // - TODO: WITH TOTALS can be implemented + // - TODO: WITH ROLLUP can be implemented (I guess) + if (select.group_by_with_totals || select.group_by_with_rollup || select.group_by_with_cube) + return {}; + // Window functions are not supported. + if (query_info.has_window) + return {}; + // TODO: extremes support can be implemented + if (settings.extremes) + return {}; + + // DISTINCT + if (select.distinct) + { + if (!optimize_sharding_key_aggregation || !expr_contains_sharding_key(select.select()->children)) + return {}; + } + + // GROUP BY + const ASTPtr group_by = select.groupBy(); + if (!query_info.syntax_analyzer_result->aggregates.empty() || group_by) + { + if (!optimize_sharding_key_aggregation || !group_by || !expr_contains_sharding_key(group_by->children)) + return {}; + } + + // LIMIT BY + if (const ASTPtr limit_by = select.limitBy()) + { + if (!optimize_sharding_key_aggregation || !expr_contains_sharding_key(limit_by->children)) + return {}; + } + + // ORDER BY + if (const ASTPtr order_by = select.orderBy()) + return default_stage; + + // LIMIT BY + // LIMIT + // OFFSET + if (select.limitLength() || select.limitOffset()) + return default_stage; + + // Only simple SELECT FROM GROUP BY sharding_key can use Complete state. + return QueryProcessingStage::Complete; +} + Pipe StorageDistributed::read( const Names & column_names, const StorageMetadataPtr & metadata_snapshot, diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index 4331817386e..f8b16dec7be 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -177,6 +177,24 @@ private: ClusterPtr skipUnusedShards(ClusterPtr cluster, const ASTPtr & query_ptr, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) const; + /// This method returns optimal query processing stage. + /// + /// Here is the list of stages (from the less optimal to more optimal): + /// - WithMergeableState + /// - WithMergeableStateAfterAggregation + /// - WithMergeableStateAfterAggregationAndLimit + /// - Complete + /// + /// Some simple queries w/o GROUP BY/DISTINCT can use more optimal stage. + /// + /// Also in case of optimize_distributed_group_by_sharding_key=1 the queries + /// with GROUP BY/DISTINCT sharding_key can also use more optimal stage. + /// (see also optimize_skip_unused_shards/allow_nondeterministic_optimize_skip_unused_shards) + /// + /// @return QueryProcessingStage or empty std::optoinal + /// (in this case regular WithMergeableState should be used) + std::optional getOptimizedQueryProcessingStage(const SelectQueryInfo & query_info, const Settings & settings) const; + size_t getRandomShardIndex(const Cluster::ShardsInfo & shards); const DistributedSettings & getDistributedSettingsRef() const { return distributed_settings; } diff --git a/tests/queries/0_stateless/00184_shard_distributed_group_by_no_merge.reference b/tests/queries/0_stateless/00184_shard_distributed_group_by_no_merge.reference index b667c57a14c..b2b0b43e490 100644 --- a/tests/queries/0_stateless/00184_shard_distributed_group_by_no_merge.reference +++ b/tests/queries/0_stateless/00184_shard_distributed_group_by_no_merge.reference @@ -25,6 +25,8 @@ ORDER BY LIMIT LIMIT BY 0 1 +0 +1 LIMIT BY LIMIT 0 GROUP BY ORDER BY diff --git a/tests/queries/0_stateless/01244_optimize_distributed_group_by_sharding_key.reference b/tests/queries/0_stateless/01244_optimize_distributed_group_by_sharding_key.reference index 4442b0b6b61..a4a6b87de25 100644 --- a/tests/queries/0_stateless/01244_optimize_distributed_group_by_sharding_key.reference +++ b/tests/queries/0_stateless/01244_optimize_distributed_group_by_sharding_key.reference @@ -57,7 +57,9 @@ LIMIT 1 0 LIMIT OFFSET 1 1 -OFFSET +OFFSET distributed_push_down_limit=0 +1 1 +OFFSET distributed_push_down_limit=1 1 1 1 0 1 1 @@ -65,6 +67,8 @@ WHERE LIMIT OFFSET 1 1 LIMIT BY 1 1 0 +1 0 +1 1 1 1 GROUP BY (Distributed-over-Distributed) 4 0 diff --git a/tests/queries/0_stateless/01244_optimize_distributed_group_by_sharding_key.sql b/tests/queries/0_stateless/01244_optimize_distributed_group_by_sharding_key.sql index 1dcdd795bc1..a8dc0d91c37 100644 --- a/tests/queries/0_stateless/01244_optimize_distributed_group_by_sharding_key.sql +++ b/tests/queries/0_stateless/01244_optimize_distributed_group_by_sharding_key.sql @@ -60,8 +60,10 @@ select 'LIMIT'; select count(), * from dist_01247 group by number limit 1; select 'LIMIT OFFSET'; select count(), * from dist_01247 group by number limit 1 offset 1; -select 'OFFSET'; -select count(), * from dist_01247 group by number offset 1; +select 'OFFSET distributed_push_down_limit=0'; +select count(), * from dist_01247 group by number offset 1 settings distributed_push_down_limit=0; +select 'OFFSET distributed_push_down_limit=1'; +select count(), * from dist_01247 group by number offset 1 settings distributed_push_down_limit=1; -- this will emulate different data on for different shards select 'WHERE LIMIT OFFSET'; select count(), * from dist_01247 where number = _shard_num-1 group by number order by number limit 1 offset 1; diff --git a/tests/queries/0_stateless/01814_distributed_push_down_limit.reference b/tests/queries/0_stateless/01814_distributed_push_down_limit.reference index f879f2cbd21..c542b5b7325 100644 --- a/tests/queries/0_stateless/01814_distributed_push_down_limit.reference +++ b/tests/queries/0_stateless/01814_distributed_push_down_limit.reference @@ -12,18 +12,6 @@ distributed_push_down_limit=1 8 9 40 40 -auto-distributed_push_down_limit -0 -1 -2 -3 -4 -5 -6 -7 -8 -9 -40 40 distributed_push_down_limit=1 with OFFSET 97 96 diff --git a/tests/queries/0_stateless/01814_distributed_push_down_limit.sh b/tests/queries/0_stateless/01814_distributed_push_down_limit.sh index 93321646037..24b27e74ba5 100755 --- a/tests/queries/0_stateless/01814_distributed_push_down_limit.sh +++ b/tests/queries/0_stateless/01814_distributed_push_down_limit.sh @@ -86,9 +86,11 @@ function test_distributed_push_down_limit_0() function test_distributed_push_down_limit_1() { local args=( - "remote('127.{2,3}', $CLICKHOUSE_DATABASE, data_01814)" + "remote('127.{2,3}', $CLICKHOUSE_DATABASE, data_01814, key)" 0 # offset --distributed_push_down_limit 1 + --optimize_skip_unused_shards 1 + --optimize_distributed_group_by_sharding_key 1 ) test_distributed_push_down_limit_with_query_log "${args[@]}" } @@ -97,22 +99,11 @@ function test_distributed_push_down_limit_1_offset() { local settings_and_opts=( --distributed_push_down_limit 1 - ) - - $CLICKHOUSE_CLIENT "${settings_and_opts[@]}" -q "select * from remote('127.{2,3}', $CLICKHOUSE_DATABASE, data_01814) group by key order by key desc limit 5, 10" -} - -function test_auto_distributed_push_down_limit() -{ - local args=( - dist_01814 - 0 # offset --optimize_skip_unused_shards 1 --optimize_distributed_group_by_sharding_key 1 - --prefer_localhost_replica 0 - --distributed_push_down_limit 0 ) - test_distributed_push_down_limit_with_query_log "${args[@]}" + + $CLICKHOUSE_CLIENT "${settings_and_opts[@]}" -q "select * from remote('127.{2,3}', $CLICKHOUSE_DATABASE, data_01814, key) group by key order by key desc limit 5, 10" } function main() @@ -151,16 +142,6 @@ function main() done echo "$out" - echo 'auto-distributed_push_down_limit' - for ((i = 0; i < max_tries; ++i)); do - out=$(test_auto_distributed_push_down_limit) - out_lines=( $out ) - if [[ ${#out_lines[@]} -gt 2 ]] && [[ ${out_lines[-1]} = 40 ]] && [[ ${out_lines[-2]} = 40 ]]; then - break - fi - done - echo "$out" - echo 'distributed_push_down_limit=1 with OFFSET' test_distributed_push_down_limit_1_offset } diff --git a/tests/queries/0_stateless/01951_distributed_push_down_limit.reference b/tests/queries/0_stateless/01951_distributed_push_down_limit.reference new file mode 100644 index 00000000000..9e803a171c4 --- /dev/null +++ b/tests/queries/0_stateless/01951_distributed_push_down_limit.reference @@ -0,0 +1,32 @@ +-- { echo } +explain select * from remote('127.{1,2}', view(select * from numbers(1e6))) order by number limit 10 settings distributed_push_down_limit=0; +Expression (Projection) + Limit (preliminary LIMIT) + MergingSorted (Merge sorted streams after aggregation stage for ORDER BY) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + Union + MergingSorted (Merge sorted streams for ORDER BY) + MergeSorting (Merge sorted blocks for ORDER BY) + PartialSorting (Sort each block for ORDER BY) + Expression (Before ORDER BY) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY)))) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + ReadFromStorage (SystemNumbers) + ReadFromRemote (Read from remote replica) +explain select * from remote('127.{1,2}', view(select * from numbers(1e6))) order by number limit 10 settings distributed_push_down_limit=1; +Expression (Projection) + Limit (preliminary LIMIT) + MergingSorted (Merge sorted streams after aggregation stage for ORDER BY) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + Union + Limit (preliminary LIMIT) + MergingSorted (Merge sorted streams for ORDER BY) + MergeSorting (Merge sorted blocks for ORDER BY) + PartialSorting (Sort each block for ORDER BY) + Expression (Before ORDER BY) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY)))) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + ReadFromStorage (SystemNumbers) + ReadFromRemote (Read from remote replica) diff --git a/tests/queries/0_stateless/01951_distributed_push_down_limit.sql b/tests/queries/0_stateless/01951_distributed_push_down_limit.sql new file mode 100644 index 00000000000..0d6e2069215 --- /dev/null +++ b/tests/queries/0_stateless/01951_distributed_push_down_limit.sql @@ -0,0 +1,3 @@ +-- { echo } +explain select * from remote('127.{1,2}', view(select * from numbers(1e6))) order by number limit 10 settings distributed_push_down_limit=0; +explain select * from remote('127.{1,2}', view(select * from numbers(1e6))) order by number limit 10 settings distributed_push_down_limit=1; diff --git a/tests/queries/0_stateless/01952_optimize_distributed_group_by_sharding_key.reference b/tests/queries/0_stateless/01952_optimize_distributed_group_by_sharding_key.reference new file mode 100644 index 00000000000..10787068f43 --- /dev/null +++ b/tests/queries/0_stateless/01952_optimize_distributed_group_by_sharding_key.reference @@ -0,0 +1,115 @@ +-- { echo } +explain select distinct k1 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)); -- not optimized +Expression (Projection) + Distinct + SettingQuotaAndLimits (Set limits and quota after reading from storage) + Union + Distinct (Preliminary DISTINCT) + Expression (Before ORDER BY) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY)))) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + ReadFromStorage (SystemNumbers) + ReadFromRemote (Read from remote replica) +explain select distinct k1, k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)); -- optimized +SettingQuotaAndLimits (Set limits and quota after reading from storage) + Union + Expression (Projection) + Distinct + Distinct (Preliminary DISTINCT) + Expression (Before ORDER BY) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY)))) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + ReadFromStorage (SystemNumbers) + ReadFromRemote (Read from remote replica) +explain select distinct on (k1) k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)); -- not optimized +Expression (Projection) + LimitBy + Expression (Before LIMIT BY) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + Union + LimitBy + Expression ((Before LIMIT BY + Before ORDER BY)) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY)))) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + ReadFromStorage (SystemNumbers) + ReadFromRemote (Read from remote replica) +explain select distinct on (k1, k2) v from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)); -- optimized +SettingQuotaAndLimits (Set limits and quota after reading from storage) + Union + Expression (Projection) + LimitBy + Expression ((Before LIMIT BY + Before ORDER BY)) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY)))) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + ReadFromStorage (SystemNumbers) + ReadFromRemote (Read from remote replica) +explain select distinct k1 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- not optimized +Expression (Projection) + Distinct + MergingSorted (Merge sorted streams for ORDER BY, without aggregation) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + Union + MergingSorted (Merge sorted streams for ORDER BY) + MergeSorting (Merge sorted blocks for ORDER BY) + PartialSorting (Sort each block for ORDER BY) + Distinct (Preliminary DISTINCT) + Expression (Before ORDER BY) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY)))) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + ReadFromStorage (SystemNumbers) + ReadFromRemote (Read from remote replica) +explain select distinct k1, k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- optimized +Expression (Projection) + MergingSorted (Merge sorted streams after aggregation stage for ORDER BY) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + Union + Distinct + MergingSorted (Merge sorted streams for ORDER BY) + MergeSorting (Merge sorted blocks for ORDER BY) + PartialSorting (Sort each block for ORDER BY) + Distinct (Preliminary DISTINCT) + Expression (Before ORDER BY) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY)))) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + ReadFromStorage (SystemNumbers) + ReadFromRemote (Read from remote replica) +explain select distinct on (k1) k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- not optimized +Expression (Projection) + LimitBy + Expression (Before LIMIT BY) + MergingSorted (Merge sorted streams for ORDER BY, without aggregation) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + Union + LimitBy + Expression (Before LIMIT BY) + MergingSorted (Merge sorted streams for ORDER BY) + MergeSorting (Merge sorted blocks for ORDER BY) + PartialSorting (Sort each block for ORDER BY) + Expression (Before ORDER BY) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY)))) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + ReadFromStorage (SystemNumbers) + ReadFromRemote (Read from remote replica) +explain select distinct on (k1, k2) v from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- optimized +Expression (Projection) + MergingSorted (Merge sorted streams after aggregation stage for ORDER BY) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + Union + LimitBy + Expression (Before LIMIT BY) + MergingSorted (Merge sorted streams for ORDER BY) + MergeSorting (Merge sorted blocks for ORDER BY) + PartialSorting (Sort each block for ORDER BY) + Expression (Before ORDER BY) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY)))) + SettingQuotaAndLimits (Set limits and quota after reading from storage) + ReadFromStorage (SystemNumbers) + ReadFromRemote (Read from remote replica) diff --git a/tests/queries/0_stateless/01952_optimize_distributed_group_by_sharding_key.sql b/tests/queries/0_stateless/01952_optimize_distributed_group_by_sharding_key.sql new file mode 100644 index 00000000000..2ae872f72b0 --- /dev/null +++ b/tests/queries/0_stateless/01952_optimize_distributed_group_by_sharding_key.sql @@ -0,0 +1,13 @@ +set optimize_skip_unused_shards=1; +set optimize_distributed_group_by_sharding_key=1; + +-- { echo } +explain select distinct k1 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)); -- not optimized +explain select distinct k1, k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)); -- optimized +explain select distinct on (k1) k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)); -- not optimized +explain select distinct on (k1, k2) v from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)); -- optimized + +explain select distinct k1 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- not optimized +explain select distinct k1, k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- optimized +explain select distinct on (k1) k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- not optimized +explain select distinct on (k1, k2) v from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- optimized diff --git a/tests/queries/0_stateless/02001_dist_on_dist_WithMergeableStateAfterAggregation.reference b/tests/queries/0_stateless/02001_dist_on_dist_WithMergeableStateAfterAggregation.reference new file mode 100644 index 00000000000..6c680840239 --- /dev/null +++ b/tests/queries/0_stateless/02001_dist_on_dist_WithMergeableStateAfterAggregation.reference @@ -0,0 +1,8 @@ +-- { echo } +select dummy as foo from remote('127.{2,3}', currentDatabase(), dist) limit 1 settings prefer_localhost_replica=0, distributed_push_down_limit=0; +0 +select dummy as foo from remote('127.{2,3}', currentDatabase(), dist) limit 1 settings prefer_localhost_replica=0, distributed_push_down_limit=1; +0 +select dummy as foo from remote('127.{2,3}', currentDatabase(), dist) limit 1 settings prefer_localhost_replica=0, distributed_group_by_no_merge=1; +0 +0 diff --git a/tests/queries/0_stateless/02001_dist_on_dist_WithMergeableStateAfterAggregation.sql b/tests/queries/0_stateless/02001_dist_on_dist_WithMergeableStateAfterAggregation.sql new file mode 100644 index 00000000000..0925df1888d --- /dev/null +++ b/tests/queries/0_stateless/02001_dist_on_dist_WithMergeableStateAfterAggregation.sql @@ -0,0 +1,6 @@ +drop table if exists dist; +create table dist as system.one engine=Distributed('test_shard_localhost', system, one); +-- { echo } +select dummy as foo from remote('127.{2,3}', currentDatabase(), dist) limit 1 settings prefer_localhost_replica=0, distributed_push_down_limit=0; +select dummy as foo from remote('127.{2,3}', currentDatabase(), dist) limit 1 settings prefer_localhost_replica=0, distributed_push_down_limit=1; +select dummy as foo from remote('127.{2,3}', currentDatabase(), dist) limit 1 settings prefer_localhost_replica=0, distributed_group_by_no_merge=1;