From 718c284437d4f9bd7f8e6b6d40b3f2bfcf925e26 Mon Sep 17 00:00:00 2001 From: Amos Bird Date: Tue, 4 May 2021 20:40:34 +0800 Subject: [PATCH] Fix more tests --- src/Interpreters/ActionsDAG.cpp | 29 +++-- src/Interpreters/ActionsDAG.h | 5 +- src/Interpreters/ExpressionAnalyzer.cpp | 2 +- src/Interpreters/ExpressionAnalyzer.h | 1 + src/Interpreters/InterpreterSelectQuery.cpp | 119 +++++++++--------- src/Storages/MergeTree/MergeTreeData.cpp | 26 ++-- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 2 +- src/Storages/SelectQueryInfo.h | 1 + .../0_stateless/01710_projections.reference | 1 - 9 files changed, 101 insertions(+), 85 deletions(-) diff --git a/src/Interpreters/ActionsDAG.cpp b/src/Interpreters/ActionsDAG.cpp index 6b1ff986986..a4fb08cab14 100644 --- a/src/Interpreters/ActionsDAG.cpp +++ b/src/Interpreters/ActionsDAG.cpp @@ -440,7 +440,7 @@ void ActionsDAG::removeUnusedActions(bool allow_remove_inputs) } NameSet ActionsDAG::foldActionsByProjection( - const NameSet & required_columns, const Block & projection_block_for_keys, const String & predicate_column_name) + const NameSet & required_columns, const Block & projection_block_for_keys, const String & predicate_column_name, bool add_missing_keys) { std::unordered_set visited_nodes; std::unordered_set visited_index_names; @@ -457,21 +457,24 @@ NameSet ActionsDAG::foldActionsByProjection( } } - for (const auto & column : required_columns) + if (add_missing_keys) { - if (visited_index_names.find(column) == visited_index_names.end()) + for (const auto & column : required_columns) { - if (const ColumnWithTypeAndName * column_with_type_name = projection_block_for_keys.findByName(column)) + if (visited_index_names.find(column) == visited_index_names.end()) { - const auto * node = &addInput(*column_with_type_name); - index.push_back(node); - visited_nodes.insert(node); - visited_index_names.insert(column); - } - else - { - // Missing column - return {}; + if (const ColumnWithTypeAndName * column_with_type_name = projection_block_for_keys.findByName(column)) + { + const auto * node = &addInput(*column_with_type_name); + visited_nodes.insert(node); + index.push_back(node); + visited_index_names.insert(column); + } + else + { + // Missing column + return {}; + } } } } diff --git a/src/Interpreters/ActionsDAG.h b/src/Interpreters/ActionsDAG.h index 3c7ff6264fa..9862cb8708e 100644 --- a/src/Interpreters/ActionsDAG.h +++ b/src/Interpreters/ActionsDAG.h @@ -169,7 +169,10 @@ public: void removeUnusedActions(const NameSet & required_names); NameSet foldActionsByProjection( - const NameSet & required_columns, const Block & projection_block_for_keys, const String & predicate_column_name = {}); + const NameSet & required_columns, + const Block & projection_block_for_keys, + const String & predicate_column_name = {}, + bool add_missing_keys = true); void reorderAggregationKeysForProjection(const std::unordered_map & key_names_pos_map); void addAggregatesViaProjection(const Block & aggregates); diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index af4369527bc..9866817c1c4 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -1638,7 +1638,7 @@ void ExpressionAnalysisResult::finalize(const ExpressionActionsChain & chain, si if (hasWhere()) { - auto where_column_name = query.where()->getColumnName(); + where_column_name = query.where()->getColumnName(); remove_where_filter = chain.steps.at(where_step_num)->required_output.find(where_column_name)->second; } } diff --git a/src/Interpreters/ExpressionAnalyzer.h b/src/Interpreters/ExpressionAnalyzer.h index 8a86b82a2ec..ef25ee2ece5 100644 --- a/src/Interpreters/ExpressionAnalyzer.h +++ b/src/Interpreters/ExpressionAnalyzer.h @@ -209,6 +209,7 @@ struct ExpressionAnalysisResult bool has_order_by = false; bool has_window = false; + String where_column_name; bool remove_where_filter = false; bool optimize_read_in_order = false; bool optimize_aggregation_in_order = false; diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 28671503f6a..3515b7e6419 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -397,7 +397,6 @@ InterpreterSelectQuery::InterpreterSelectQuery( view = nullptr; } - // TODO Check if we can have prewhere work for projections, also need to allow it in TreeRewriter if (try_move_to_prewhere && storage && query.where() && !query.prewhere()) { /// PREWHERE optimization: transfer some condition from WHERE to PREWHERE if enabled and viable @@ -422,6 +421,13 @@ InterpreterSelectQuery::InterpreterSelectQuery( } } + if (query.prewhere() && query.where()) + { + /// Filter block in WHERE instead to get better performance + query.setExpression( + ASTSelectQuery::Expression::WHERE, makeASTFunction("and", query.prewhere()->clone(), query.where()->clone())); + } + query_analyzer = std::make_unique( query_ptr, syntax_analyzer_result, context, metadata_snapshot, NameSet(required_result_column_names.begin(), required_result_column_names.end()), @@ -505,12 +511,6 @@ InterpreterSelectQuery::InterpreterSelectQuery( query.setExpression(ASTSelectQuery::Expression::WHERE, std::make_shared(0u)); need_analyze_again = true; } - if (query.prewhere() && query.where()) - { - /// Filter block in WHERE instead to get better performance - query.setExpression(ASTSelectQuery::Expression::WHERE, makeASTFunction("and", query.prewhere()->clone(), query.where()->clone())); - need_analyze_again = true; - } if (need_analyze_again) { @@ -931,33 +931,6 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu query_info.projection->aggregate_final = aggregate_final; } - if (expressions.filter_info) - { - if (!expressions.prewhere_info) - { - const bool does_storage_support_prewhere = !input && !input_pipe && storage && storage->supportsPrewhere(); - if (does_storage_support_prewhere && settings.optimize_move_to_prewhere) - { - /// Execute row level filter in prewhere as a part of "move to prewhere" optimization. - expressions.prewhere_info = std::make_shared( - std::move(expressions.filter_info->actions), - std::move(expressions.filter_info->column_name)); - expressions.prewhere_info->prewhere_actions->projectInput(false); - expressions.prewhere_info->remove_prewhere_column = expressions.filter_info->do_remove_column; - expressions.prewhere_info->need_filter = true; - expressions.filter_info = nullptr; - } - } - else - { - /// Add row level security actions to prewhere. - expressions.prewhere_info->row_level_filter_actions = std::move(expressions.filter_info->actions); - expressions.prewhere_info->row_level_column_name = std::move(expressions.filter_info->column_name); - expressions.prewhere_info->row_level_filter_actions->projectInput(false); - expressions.filter_info = nullptr; - } - } - if (options.only_analyze) { auto read_nothing = std::make_unique(source_header); @@ -1543,47 +1516,77 @@ void InterpreterSelectQuery::addEmptySourceToQueryPlan( } } + auto read_from_pipe = std::make_unique(std::move(pipe)); + read_from_pipe->setStepDescription("Read from NullSource"); + query_plan.addStep(std::move(read_from_pipe)); + if (query_info.projection) { if (query_info.projection->before_where) { - auto expression = std::make_shared( - query_info.projection->before_where, ExpressionActionsSettings::fromContext(context_)); - pipe.addSimpleTransform( - [&expression](const Block & header) { return std::make_shared(header, expression); }); + auto where_step = std::make_unique( + query_plan.getCurrentDataStream(), + query_info.projection->before_where, + query_info.projection->where_column_name, + query_info.projection->remove_where_filter); + + where_step->setStepDescription("WHERE"); + query_plan.addStep(std::move(where_step)); } if (query_info.projection->desc->type == ProjectionDescription::Type::Aggregate) { if (query_info.projection->before_aggregation) { - auto expression = std::make_shared( - query_info.projection->before_aggregation, ExpressionActionsSettings::fromContext(context_)); - pipe.addSimpleTransform( - [&expression](const Block & header) { return std::make_shared(header, expression); }); + auto expression_before_aggregation + = std::make_unique(query_plan.getCurrentDataStream(), query_info.projection->before_aggregation); + expression_before_aggregation->setStepDescription("Before GROUP BY"); + query_plan.addStep(std::move(expression_before_aggregation)); } + + executeMergeAggregatedImpl( + query_plan, + query_info.projection->aggregate_overflow_row, + query_info.projection->aggregate_final, + false, + context_->getSettingsRef(), + query_info.projection->aggregation_keys, + query_info.projection->aggregate_descriptions); } } - - auto read_from_pipe = std::make_unique(std::move(pipe)); - read_from_pipe->setStepDescription("Read from NullSource"); - query_plan.addStep(std::move(read_from_pipe)); - - if (query_info.projection && query_info.projection->desc->type == ProjectionDescription::Type::Aggregate) - { - executeMergeAggregatedImpl( - query_plan, - query_info.projection->aggregate_overflow_row, - query_info.projection->aggregate_final, - false, - context_->getSettingsRef(), - query_info.projection->aggregation_keys, - query_info.projection->aggregate_descriptions); - } } void InterpreterSelectQuery::addPrewhereAliasActions() { + const Settings & settings = context->getSettingsRef(); + auto & expressions = analysis_result; + if (expressions.filter_info) + { + if (!expressions.prewhere_info) + { + const bool does_storage_support_prewhere = !input && !input_pipe && storage && storage->supportsPrewhere(); + if (does_storage_support_prewhere && settings.optimize_move_to_prewhere) + { + /// Execute row level filter in prewhere as a part of "move to prewhere" optimization. + expressions.prewhere_info = std::make_shared( + std::move(expressions.filter_info->actions), + std::move(expressions.filter_info->column_name)); + expressions.prewhere_info->prewhere_actions->projectInput(false); + expressions.prewhere_info->remove_prewhere_column = expressions.filter_info->do_remove_column; + expressions.prewhere_info->need_filter = true; + expressions.filter_info = nullptr; + } + } + else + { + /// Add row level security actions to prewhere. + expressions.prewhere_info->row_level_filter_actions = std::move(expressions.filter_info->actions); + expressions.prewhere_info->row_level_column_name = std::move(expressions.filter_info->column_name); + expressions.prewhere_info->row_level_filter_actions->projectInput(false); + expressions.filter_info = nullptr; + } + } + auto & prewhere_info = analysis_result.prewhere_info; auto & columns_to_remove_after_prewhere = analysis_result.columns_to_remove_after_prewhere; diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 0d7e77f7cf4..fe4b1140d56 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -3923,14 +3923,16 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection( { if (analysis_result.before_where) { + candidate.where_column_name = analysis_result.where_column_name; candidate.remove_where_filter = analysis_result.remove_where_filter; candidate.before_where = analysis_result.before_where->clone(); std::cerr << fmt::format("before_where_actions = \n{}", candidate.before_where->dumpDAG()) << std::endl; required_columns = candidate.before_where->foldActionsByProjection( required_columns, projection.sample_block_for_keys, - query_ptr->as().where()->getColumnName()); + candidate.where_column_name); std::cerr << fmt::format("before_where_actions = \n{}", candidate.before_where->dumpDAG()) << std::endl; + std::cerr << fmt::format("where_required_columns = \n{}", fmt::join(required_columns, ", ")) << std::endl; if (required_columns.empty()) return false; @@ -3947,13 +3949,16 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection( candidate.prewhere_info->need_filter = prewhere_info->need_filter; auto prewhere_actions = prewhere_info->prewhere_actions->clone(); - // If there is a before_where action, prewhere_action only requires columns to evaluate the prewhere expression. - // Else it should provide all columns to later actions. - NameSet prewhere_required_columns = analysis_result.before_where ? NameSet{} : required_columns; + auto prewhere_required_columns = required_columns; + // required_columns should not contain columns generated by prewhere + for (const auto & column : prewhere_actions->getResultColumns()) + required_columns.erase(column.name); std::cerr << fmt::format("prewhere_actions = \n{}", prewhere_actions->dumpDAG()) << std::endl; + // Prewhere_action should not add missing keys. prewhere_required_columns = prewhere_actions->foldActionsByProjection( - prewhere_required_columns, projection.sample_block_for_keys, prewhere_info->prewhere_column_name); + prewhere_required_columns, projection.sample_block_for_keys, prewhere_info->prewhere_column_name, false); std::cerr << fmt::format("prewhere_actions = \n{}", prewhere_actions->dumpDAG()) << std::endl; + std::cerr << fmt::format("prewhere_required_columns = \n{}", fmt::join(prewhere_required_columns, ", ")) << std::endl; if (prewhere_required_columns.empty()) return false; candidate.prewhere_info->prewhere_actions = std::make_shared(prewhere_actions, actions_settings); @@ -3962,21 +3967,23 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection( { auto row_level_filter_actions = prewhere_info->row_level_filter_actions->clone(); prewhere_required_columns = row_level_filter_actions->foldActionsByProjection( - prewhere_required_columns, projection.sample_block_for_keys, prewhere_info->row_level_column_name); + prewhere_required_columns, projection.sample_block_for_keys, prewhere_info->row_level_column_name, false); + std::cerr << fmt::format("row_level_filter_required_columns = \n{}", fmt::join(prewhere_required_columns, ", ")) + << std::endl; if (prewhere_required_columns.empty()) return false; candidate.prewhere_info->row_level_filter = std::make_shared(row_level_filter_actions, actions_settings); } - // TODO wait for alias analysis to be moved into expression analyzer if (prewhere_info->alias_actions) { auto alias_actions = prewhere_info->alias_actions->clone(); std::cerr << fmt::format("alias_actions = \n{}", alias_actions->dumpDAG()) << std::endl; prewhere_required_columns - = alias_actions->foldActionsByProjection(prewhere_required_columns, projection.sample_block_for_keys); + = alias_actions->foldActionsByProjection(prewhere_required_columns, projection.sample_block_for_keys, {}, false); std::cerr << fmt::format("alias_actions = \n{}", alias_actions->dumpDAG()) << std::endl; + std::cerr << fmt::format("alias_required_columns = \n{}", fmt::join(prewhere_required_columns, ", ")) << std::endl; if (prewhere_required_columns.empty()) return false; candidate.prewhere_info->alias_actions = std::make_shared(alias_actions, actions_settings); @@ -4033,8 +4040,7 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection( std::cerr << fmt::format("before_aggregation = \n{}", candidate.before_aggregation->dumpDAG()) << std::endl; auto required_columns = candidate.before_aggregation->foldActionsByProjection(keys, projection.sample_block_for_keys); std::cerr << fmt::format("before_aggregation = \n{}", candidate.before_aggregation->dumpDAG()) << std::endl; - - std::cerr << fmt::format("keys = {}", fmt::join(required_columns, ", ")) << std::endl; + std::cerr << fmt::format("aggregate_required_columns = \n{}", fmt::join(required_columns, ", ")) << std::endl; if (required_columns.empty()) continue; diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 9daa1844df2..22dfe05df14 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -201,7 +201,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( auto where_step = std::make_unique( plan->getCurrentDataStream(), query_info.projection->before_where, - given_select.where()->getColumnName(), + query_info.projection->where_column_name, query_info.projection->remove_where_filter); where_step->setStepDescription("WHERE"); diff --git a/src/Storages/SelectQueryInfo.h b/src/Storages/SelectQueryInfo.h index c9991e481f1..043bfe99a6b 100644 --- a/src/Storages/SelectQueryInfo.h +++ b/src/Storages/SelectQueryInfo.h @@ -126,6 +126,7 @@ struct ProjectionCandidate const ProjectionDescription * desc; PrewhereInfoPtr prewhere_info; ActionsDAGPtr before_where; + String where_column_name; bool remove_where_filter = false; ActionsDAGPtr before_aggregation; Names required_columns; diff --git a/tests/queries/0_stateless/01710_projections.reference b/tests/queries/0_stateless/01710_projections.reference index 9af517f3c05..9f30d82e23e 100644 --- a/tests/queries/0_stateless/01710_projections.reference +++ b/tests/queries/0_stateless/01710_projections.reference @@ -1,5 +1,4 @@ 2020-10-24 00:00:00 0 -0.2906208323366036 -2020-10-24 00:00:00 0 -5.350401006920782 2020-10-24 00:00:00 1000 -0.3856164567802157 1.2352100180287104 2020-10-24 00:00:00 1.3619605237696326 0.16794469697335793 0.7637956767025532 0.8899329799574005 0.6227685185389797 0.30795997278638165 0.7637956767025532 2020-10-24 00:00:00 19 -1.9455094931672063 0.7759802460082872 0.6 0