From bfb5b8f441c93de24c835e12847db28efc0ad394 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Mar=C3=ADn?= Date: Mon, 16 Oct 2023 18:24:05 +0200 Subject: [PATCH] Implement replica estimation on top of the analyzer --- src/Planner/Planner.cpp | 18 +++-- src/Planner/PlannerJoinTree.cpp | 129 +++++++++++++++++--------------- 2 files changed, 82 insertions(+), 65 deletions(-) diff --git a/src/Planner/Planner.cpp b/src/Planner/Planner.cpp index b92a7fb0fea..7341ee4f1ba 100644 --- a/src/Planner/Planner.cpp +++ b/src/Planner/Planner.cpp @@ -40,10 +40,11 @@ #include #include -#include -#include -#include #include +#include +#include +#include +#include #include #include @@ -143,6 +144,7 @@ void checkStoragesSupportTransactions(const PlannerContextPtr & planner_context) * getQueryProcessingStage method will be called. * * StorageDistributed skip unused shards optimization relies on this. + * Parallel replicas estimation relies on this too. * * To collect filters that will be applied to specific table in case we have JOINs requires * to run query plan optimization pipeline. @@ -156,6 +158,11 @@ void checkStoragesSupportTransactions(const PlannerContextPtr & planner_context) void collectFiltersForAnalysis(const QueryTreeNodePtr & query_tree, const PlannerContextPtr & planner_context) { bool collect_filters = false; + const auto & query_context = planner_context->getQueryContext(); + const auto & settings = query_context->getSettingsRef(); + + bool parallel_replicas_estimation_enabled + = query_context->canUseParallelReplicasOnInitiator() && settings.parallel_replicas_min_number_of_rows_per_replica > 0; for (auto & [table_expression, table_expression_data] : planner_context->getTableExpressionNodeToData()) { @@ -165,7 +172,8 @@ void collectFiltersForAnalysis(const QueryTreeNodePtr & query_tree, const Planne continue; const auto & storage = table_node ? table_node->getStorage() : table_function_node->getStorage(); - if (typeid_cast(storage.get())) + if (typeid_cast(storage.get()) + || (parallel_replicas_estimation_enabled && std::dynamic_pointer_cast(storage))) { collect_filters = true; break; @@ -187,8 +195,6 @@ void collectFiltersForAnalysis(const QueryTreeNodePtr & query_tree, const Planne dummy_storage_to_table_expression_data.emplace(dummy_storage, table_expression_data); } - const auto & query_context = planner_context->getQueryContext(); - SelectQueryOptions select_query_options; Planner planner(updated_query_tree, select_query_options); planner.buildQueryPlanIfNeeded(); diff --git a/src/Planner/PlannerJoinTree.cpp b/src/Planner/PlannerJoinTree.cpp index 16314e54122..1e4c51dda59 100644 --- a/src/Planner/PlannerJoinTree.cpp +++ b/src/Planner/PlannerJoinTree.cpp @@ -526,6 +526,41 @@ FilterDAGInfo buildAdditionalFiltersIfNeeded(const StoragePtr & storage, return buildFilterInfo(additional_filter_ast, table_expression_query_info.table_expression, planner_context); } +static UInt64 mainQueryNodeBlockSizeByLimit(const SelectQueryInfo & select_query_info) +{ + auto const & main_query_node = select_query_info.query_tree->as(); + + /// Constness of limit and offset is validated during query analysis stage + size_t limit_length = 0; + if (main_query_node.hasLimit()) + limit_length = main_query_node.getLimit()->as().getValue().safeGet(); + + size_t limit_offset = 0; + if (main_query_node.hasOffset()) + limit_offset = main_query_node.getOffset()->as().getValue().safeGet(); + + /** If not specified DISTINCT, WHERE, GROUP BY, HAVING, ORDER BY, JOIN, LIMIT BY, LIMIT WITH TIES + * but LIMIT is specified, and limit + offset < max_block_size, + * then as the block size we will use limit + offset (not to read more from the table than requested), + * and also set the number of threads to 1. + */ + if (main_query_node.hasLimit() + && !main_query_node.isDistinct() + && !main_query_node.isLimitWithTies() + && !main_query_node.hasPrewhere() + && !main_query_node.hasWhere() + && select_query_info.filter_asts.empty() + && !main_query_node.hasGroupBy() + && !main_query_node.hasHaving() + && !main_query_node.hasOrderBy() + && !main_query_node.hasLimitBy() + && !select_query_info.need_aggregate + && !select_query_info.has_window + && limit_length <= std::numeric_limits::max() - limit_offset) + return limit_length + limit_offset; + return 0; +} + JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expression, const SelectQueryInfo & select_query_info, const SelectQueryOptions & select_query_options, @@ -580,54 +615,27 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres } UInt64 max_block_size = settings.max_block_size; - - auto & main_query_node = select_query_info.query_tree->as(); - + UInt64 max_block_size_limited = 0; if (is_single_table_expression) { - size_t limit_length = 0; - if (main_query_node.hasLimit()) - { - /// Constness of limit is validated during query analysis stage - limit_length = main_query_node.getLimit()->as().getValue().safeGet(); - } - - size_t limit_offset = 0; - if (main_query_node.hasOffset()) - { - /// Constness of offset is validated during query analysis stage - limit_offset = main_query_node.getOffset()->as().getValue().safeGet(); - } - /** If not specified DISTINCT, WHERE, GROUP BY, HAVING, ORDER BY, JOIN, LIMIT BY, LIMIT WITH TIES * but LIMIT is specified, and limit + offset < max_block_size, * then as the block size we will use limit + offset (not to read more from the table than requested), * and also set the number of threads to 1. */ - if (main_query_node.hasLimit() && - !main_query_node.isDistinct() && - !main_query_node.isLimitWithTies() && - !main_query_node.hasPrewhere() && - !main_query_node.hasWhere() && - select_query_info.filter_asts.empty() && - !main_query_node.hasGroupBy() && - !main_query_node.hasHaving() && - !main_query_node.hasOrderBy() && - !main_query_node.hasLimitBy() && - !select_query_info.need_aggregate && - !select_query_info.has_window && - limit_length <= std::numeric_limits::max() - limit_offset) + max_block_size_limited = mainQueryNodeBlockSizeByLimit(select_query_info); + if (max_block_size_limited) { - if (limit_length + limit_offset < max_block_size) + if (max_block_size_limited < max_block_size) { - max_block_size = std::max(1, limit_length + limit_offset); + max_block_size = std::max(1, max_block_size_limited); max_streams = 1; max_threads_execute_query = 1; } - if (limit_length + limit_offset < select_query_info.local_storage_limits.local_limits.size_limits.max_rows) + if (max_block_size_limited < select_query_info.local_storage_limits.local_limits.size_limits.max_rows) { - table_expression_query_info.limit = limit_length + limit_offset; + table_expression_query_info.limit = max_block_size_limited; } } @@ -689,32 +697,35 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres if (storage_merge_tree && query_context->canUseParallelReplicasOnInitiator() && settings.parallel_replicas_min_number_of_rows_per_replica > 0) { - /// This is trash - /// It uses the old InterpreterSelectQuery to do the estimation of how many rows will be read - /// Ideally we should be able to use estimateNumberOfRowsToRead over the storage, but to do this - /// properly we need all the actions/filters, which aren't available yet - /// If we instead delay this check for later several things will happen: - /// * The header might be different (updatePrewhereOutputsIfNeeded) - /// * The storage will have been initiated (thus already preparing parallel replicas) - auto query_options = SelectQueryOptions( - QueryProcessingStage::WithMergeableState, - /* depth */ 1, - /* is_subquery_= */ true) - .ignoreProjections() - .ignoreAlias(); - InterpreterSelectQuery select( - table_expression_query_info.original_query, - query_context, - query_options, - table_expression_query_info.prepared_sets); - select.adjustParallelReplicasAfterAnalysis(); - planner_context->getMutableQueryContext()->setSetting( - "allow_experimental_parallel_reading_from_replicas", - select.getContext()->getSettingsRef().allow_experimental_parallel_reading_from_replicas.operator Field()); - planner_context->getMutableQueryContext()->setSetting( - "max_parallel_replicas", select.getContext()->getSettingsRef().max_parallel_replicas.operator Field()); - } + ActionDAGNodes filter_nodes; + if (table_expression_query_info.filter_actions_dag) + filter_nodes.nodes = table_expression_query_info.filter_actions_dag->getOutputs(); + UInt64 rows_to_read = storage_merge_tree->estimateNumberOfRowsToRead( + query_context, storage_snapshot, table_expression_query_info, filter_nodes); + if (max_block_size_limited && max_block_size_limited < rows_to_read) + rows_to_read = max_block_size_limited; + + size_t number_of_replicas_to_use = rows_to_read / settings.parallel_replicas_min_number_of_rows_per_replica; + LOG_TRACE( + &Poco::Logger::get("Planner"), + "Estimated {} rows to read. It is enough work for {} parallel replicas", + rows_to_read, + number_of_replicas_to_use); + + if (number_of_replicas_to_use <= 1) + { + planner_context->getMutableQueryContext()->setSetting( + "allow_experimental_parallel_reading_from_replicas", Field(0)); + planner_context->getMutableQueryContext()->setSetting("max_parallel_replicas", UInt64{0}); + LOG_DEBUG(&Poco::Logger::get("Planner"), "Disabling parallel replicas because there aren't enough rows to read"); + } + else if (number_of_replicas_to_use < settings.max_parallel_replicas) + { + planner_context->getMutableQueryContext()->setSetting("max_parallel_replicas", number_of_replicas_to_use); + LOG_DEBUG(&Poco::Logger::get("Planner"), "Reducing the number of replicas to use to {}", number_of_replicas_to_use); + } + } const auto & prewhere_actions = table_expression_data.getPrewhereFilterActions();