diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index b6a04c5cd34..85a2efb9963 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -284,78 +284,6 @@ void replaceConstantExpressions( visitor.visit(node); } -/// This is the implementation of optimize_distributed_group_by_sharding_key. -/// It returns up to which stage the query can be processed on a shard, which -/// is one of the following: -/// - QueryProcessingStage::Complete -/// - QueryProcessingStage::WithMergeableStateAfterAggregation -/// - QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit -/// - none (in this case regular WithMergeableState should be used) -std::optional getOptimizedQueryProcessingStage(const SelectQueryInfo & query_info, bool extremes, const Names & sharding_key_columns) -{ - const auto & select = query_info.query->as(); - - auto sharding_block_has = [&](const auto & exprs) -> bool - { - std::unordered_set expr_columns; - for (auto & expr : exprs) - { - auto id = expr->template as(); - if (!id) - continue; - expr_columns.emplace(id->name()); - } - - for (const auto & column : sharding_key_columns) - { - if (!expr_columns.contains(column)) - return false; - } - - return true; - }; - - // GROUP BY qualifiers - // - TODO: WITH TOTALS can be implemented - // - TODO: WITH ROLLUP can be implemented (I guess) - if (select.group_by_with_totals || select.group_by_with_rollup || select.group_by_with_cube) - return {}; - - // Window functions are not supported. - if (query_info.has_window) - return {}; - - // TODO: extremes support can be implemented - if (extremes) - return {}; - - // DISTINCT - if (select.distinct) - { - if (!sharding_block_has(select.select()->children)) - return {}; - } - - // GROUP BY - const ASTPtr group_by = select.groupBy(); - if (!query_info.syntax_analyzer_result->aggregates.empty() && (!group_by || !sharding_block_has(group_by->children))) - return {}; - - // ORDER BY - const ASTPtr order_by = select.orderBy(); - if (order_by) - return QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit; - - // LIMIT BY - // LIMIT - // OFFSET - if (select.limitBy() || select.limitLength() || select.limitOffset()) - return QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit; - - // Only simple SELECT FROM GROUP BY sharding_key can use Complete state. - return QueryProcessingStage::Complete; -} - size_t getClusterQueriedNodes(const Settings & settings, const ClusterPtr & cluster) { size_t num_local_shards = cluster->getLocalShardCount(); @@ -523,9 +451,6 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage( } } - if (settings.distributed_push_down_limit) - return QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit; - /// Nested distributed query cannot return Complete stage, /// since the parent query need to aggregate the results after. if (to_stage == QueryProcessingStage::WithMergeableState) @@ -536,22 +461,89 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage( if (getClusterQueriedNodes(settings, cluster) == 1) return QueryProcessingStage::Complete; - if (settings.optimize_skip_unused_shards && - settings.optimize_distributed_group_by_sharding_key && - has_sharding_key && - (settings.allow_nondeterministic_optimize_skip_unused_shards || sharding_key_is_deterministic)) - { - auto stage = getOptimizedQueryProcessingStage(query_info, settings.extremes, sharding_key_expr->getRequiredColumns()); - if (stage) - { - LOG_DEBUG(log, "Force processing stage to {}", QueryProcessingStage::toString(*stage)); - return *stage; - } - } + auto optimized_stage = getOptimizedQueryProcessingStage(query_info, settings); + if (optimized_stage) + return *optimized_stage; return QueryProcessingStage::WithMergeableState; } +std::optional StorageDistributed::getOptimizedQueryProcessingStage(const SelectQueryInfo & query_info, const Settings & settings) const +{ + bool optimize_sharding_key_aggregation = + settings.optimize_skip_unused_shards && + settings.optimize_distributed_group_by_sharding_key && + has_sharding_key && + (settings.allow_nondeterministic_optimize_skip_unused_shards || sharding_key_is_deterministic); + + QueryProcessingStage::Enum default_stage = QueryProcessingStage::WithMergeableStateAfterAggregation; + if (settings.distributed_push_down_limit) + default_stage = QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit; + + const auto & select = query_info.query->as(); + + auto expr_contains_sharding_key = [&](const auto & exprs) -> bool + { + std::unordered_set expr_columns; + for (auto & expr : exprs) + { + auto id = expr->template as(); + if (!id) + continue; + expr_columns.emplace(id->name()); + } + + for (const auto & column : sharding_key_expr->getRequiredColumns()) + { + if (!expr_columns.contains(column)) + return false; + } + + return true; + }; + + // GROUP BY qualifiers + // - TODO: WITH TOTALS can be implemented + // - TODO: WITH ROLLUP can be implemented (I guess) + if (select.group_by_with_totals || select.group_by_with_rollup || select.group_by_with_cube) + return {}; + // Window functions are not supported. + if (query_info.has_window) + return {}; + // TODO: extremes support can be implemented + if (settings.extremes) + return {}; + + // DISTINCT + if (select.distinct) + { + if (!optimize_sharding_key_aggregation || !expr_contains_sharding_key(select.select()->children)) + return {}; + } + + // GROUP BY + const ASTPtr group_by = select.groupBy(); + if (!query_info.syntax_analyzer_result->aggregates.empty() || group_by) + { + if (!optimize_sharding_key_aggregation || !group_by || !expr_contains_sharding_key(group_by->children)) + return {}; + } + + // ORDER BY + const ASTPtr order_by = select.orderBy(); + if (order_by) + return default_stage; + + // LIMIT BY + // LIMIT + // OFFSET + if (select.limitBy() || select.limitLength() || select.limitOffset()) + return default_stage; + + // Only simple SELECT FROM GROUP BY sharding_key can use Complete state. + return QueryProcessingStage::Complete; +} + Pipe StorageDistributed::read( const Names & column_names, const StorageMetadataPtr & metadata_snapshot, diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index bf48e814ae2..e09eda00224 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -176,6 +176,24 @@ private: ClusterPtr skipUnusedShards(ClusterPtr cluster, const ASTPtr & query_ptr, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) const; + /// This method returns optimal query processing stage. + /// + /// Here is the list of stages (from the less optimal to more optimal): + /// - WithMergeableState + /// - WithMergeableStateAfterAggregation + /// - WithMergeableStateAfterAggregationAndLimit + /// - Complete + /// + /// Some simple queries w/o GROUP BY/DISTINCT can use more optimal stage. + /// + /// Also in case of optimize_distributed_group_by_sharding_key=1 the queries + /// with GROUP BY/DISTINCT sharding_key can also use more optimal stage. + /// (see also optimize_skip_unused_shards/allow_nondeterministic_optimize_skip_unused_shards) + /// + /// @return QueryProcessingStage or empty std::optoinal + /// (in this case regular WithMergeableState should be used) + std::optional getOptimizedQueryProcessingStage(const SelectQueryInfo & query_info, const Settings & settings) const; + size_t getRandomShardIndex(const Cluster::ShardsInfo & shards); const DistributedSettings & getDistributedSettingsRef() const { return distributed_settings; } diff --git a/tests/queries/0_stateless/01244_optimize_distributed_group_by_sharding_key.reference b/tests/queries/0_stateless/01244_optimize_distributed_group_by_sharding_key.reference index 4442b0b6b61..8d356a6966f 100644 --- a/tests/queries/0_stateless/01244_optimize_distributed_group_by_sharding_key.reference +++ b/tests/queries/0_stateless/01244_optimize_distributed_group_by_sharding_key.reference @@ -57,7 +57,9 @@ LIMIT 1 0 LIMIT OFFSET 1 1 -OFFSET +OFFSET distributed_push_down_limit=0 +1 1 +OFFSET distributed_push_down_limit=1 1 1 1 0 1 1 diff --git a/tests/queries/0_stateless/01244_optimize_distributed_group_by_sharding_key.sql b/tests/queries/0_stateless/01244_optimize_distributed_group_by_sharding_key.sql index 4719119165a..10b47f64cc6 100644 --- a/tests/queries/0_stateless/01244_optimize_distributed_group_by_sharding_key.sql +++ b/tests/queries/0_stateless/01244_optimize_distributed_group_by_sharding_key.sql @@ -60,8 +60,10 @@ select 'LIMIT'; select count(), * from dist_01247 group by number limit 1; select 'LIMIT OFFSET'; select count(), * from dist_01247 group by number limit 1 offset 1; -select 'OFFSET'; -select count(), * from dist_01247 group by number offset 1; +select 'OFFSET distributed_push_down_limit=0'; +select count(), * from dist_01247 group by number offset 1 settings distributed_push_down_limit=0; +select 'OFFSET distributed_push_down_limit=1'; +select count(), * from dist_01247 group by number offset 1 settings distributed_push_down_limit=1; -- this will emulate different data on for different shards select 'WHERE LIMIT OFFSET'; select count(), * from dist_01247 where number = _shard_num-1 group by number order by number limit 1 offset 1; diff --git a/tests/queries/0_stateless/01814_distributed_push_down_limit.reference b/tests/queries/0_stateless/01814_distributed_push_down_limit.reference index f879f2cbd21..c542b5b7325 100644 --- a/tests/queries/0_stateless/01814_distributed_push_down_limit.reference +++ b/tests/queries/0_stateless/01814_distributed_push_down_limit.reference @@ -12,18 +12,6 @@ distributed_push_down_limit=1 8 9 40 40 -auto-distributed_push_down_limit -0 -1 -2 -3 -4 -5 -6 -7 -8 -9 -40 40 distributed_push_down_limit=1 with OFFSET 97 96 diff --git a/tests/queries/0_stateless/01814_distributed_push_down_limit.sh b/tests/queries/0_stateless/01814_distributed_push_down_limit.sh index 93321646037..24b27e74ba5 100755 --- a/tests/queries/0_stateless/01814_distributed_push_down_limit.sh +++ b/tests/queries/0_stateless/01814_distributed_push_down_limit.sh @@ -86,9 +86,11 @@ function test_distributed_push_down_limit_0() function test_distributed_push_down_limit_1() { local args=( - "remote('127.{2,3}', $CLICKHOUSE_DATABASE, data_01814)" + "remote('127.{2,3}', $CLICKHOUSE_DATABASE, data_01814, key)" 0 # offset --distributed_push_down_limit 1 + --optimize_skip_unused_shards 1 + --optimize_distributed_group_by_sharding_key 1 ) test_distributed_push_down_limit_with_query_log "${args[@]}" } @@ -97,22 +99,11 @@ function test_distributed_push_down_limit_1_offset() { local settings_and_opts=( --distributed_push_down_limit 1 - ) - - $CLICKHOUSE_CLIENT "${settings_and_opts[@]}" -q "select * from remote('127.{2,3}', $CLICKHOUSE_DATABASE, data_01814) group by key order by key desc limit 5, 10" -} - -function test_auto_distributed_push_down_limit() -{ - local args=( - dist_01814 - 0 # offset --optimize_skip_unused_shards 1 --optimize_distributed_group_by_sharding_key 1 - --prefer_localhost_replica 0 - --distributed_push_down_limit 0 ) - test_distributed_push_down_limit_with_query_log "${args[@]}" + + $CLICKHOUSE_CLIENT "${settings_and_opts[@]}" -q "select * from remote('127.{2,3}', $CLICKHOUSE_DATABASE, data_01814, key) group by key order by key desc limit 5, 10" } function main() @@ -151,16 +142,6 @@ function main() done echo "$out" - echo 'auto-distributed_push_down_limit' - for ((i = 0; i < max_tries; ++i)); do - out=$(test_auto_distributed_push_down_limit) - out_lines=( $out ) - if [[ ${#out_lines[@]} -gt 2 ]] && [[ ${out_lines[-1]} = 40 ]] && [[ ${out_lines[-2]} = 40 ]]; then - break - fi - done - echo "$out" - echo 'distributed_push_down_limit=1 with OFFSET' test_distributed_push_down_limit_1_offset }