diff --git a/dbms/src/DataStreams/CheckConstraintsBlockOutputStream.cpp b/dbms/src/DataStreams/CheckConstraintsBlockOutputStream.cpp index f771a5cf20c..b9f4412c034 100644 --- a/dbms/src/DataStreams/CheckConstraintsBlockOutputStream.cpp +++ b/dbms/src/DataStreams/CheckConstraintsBlockOutputStream.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include diff --git a/dbms/src/Functions/FunctionsConversion.h b/dbms/src/Functions/FunctionsConversion.h index cfa70ecc205..100737b43c7 100644 --- a/dbms/src/Functions/FunctionsConversion.h +++ b/dbms/src/Functions/FunctionsConversion.h @@ -35,7 +35,6 @@ #include #include #include -#include #include #include #include diff --git a/dbms/src/Interpreters/ActionsVisitor.h b/dbms/src/Interpreters/ActionsVisitor.h index b850c615655..f6db551ff33 100644 --- a/dbms/src/Interpreters/ActionsVisitor.h +++ b/dbms/src/Interpreters/ActionsVisitor.h @@ -2,7 +2,6 @@ #include #include -#include #include #include @@ -13,6 +12,9 @@ namespace DB class Context; class ASTFunction; +class ExpressionActions; +using ExpressionActionsPtr = std::shared_ptr; + /// The case of an explicit enumeration of values. SetPtr makeExplicitSet( const ASTFunction * node, const Block & sample_block, bool create_ordered_set, diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.cpp b/dbms/src/Interpreters/ExpressionAnalyzer.cpp index c6c6f08f815..56c999d0681 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.cpp +++ b/dbms/src/Interpreters/ExpressionAnalyzer.cpp @@ -70,9 +70,51 @@ using LogAST = DebugASTLog; /// set to true to enable logs namespace ErrorCodes { extern const int UNKNOWN_IDENTIFIER; + extern const int ILLEGAL_PREWHERE; extern const int LOGICAL_ERROR; } +namespace +{ + +/// Check if there is an ignore function. It's used for disabling constant folding in query +/// predicates because some performance tests use ignore function as a non-optimize guard. +bool allowEarlyConstantFolding(const ExpressionActions & actions, const Context & context) +{ + if (!context.getSettingsRef().enable_early_constant_folding) + return false; + + for (auto & action : actions.getActions()) + { + if (action.type == action.APPLY_FUNCTION && action.function_base) + { + auto name = action.function_base->getName(); + if (name == "ignore") + return false; + } + } + return true; +} + +} + +bool sanitizeBlock(Block & block) +{ + for (auto & col : block) + { + if (!col.column) + { + if (isNotCreatable(col.type->getTypeId())) + return false; + col.column = col.type->createColumn(); + } + else if (isColumnConst(*col.column) && !col.column->empty()) + col.column = col.column->cloneEmpty(); + } + return true; +} + + ExpressionAnalyzer::ExpressionAnalyzer( const ASTPtr & query_, const SyntaxAnalyzerResultPtr & syntax_analyzer_result_, @@ -884,6 +926,13 @@ ExpressionActionsPtr ExpressionAnalyzer::getConstActions() return actions; } +ExpressionActionsPtr SelectQueryExpressionAnalyzer::simpleSelectActions() +{ + ExpressionActionsChain new_chain(context); + appendSelect(new_chain, false); + return new_chain.getLastActions(); +} + void SelectQueryExpressionAnalyzer::getAggregateInfo(Names & key_names, AggregateDescriptions & aggregates) const { for (const auto & name_and_type : aggregation_keys) @@ -892,4 +941,231 @@ void SelectQueryExpressionAnalyzer::getAggregateInfo(Names & key_names, Aggregat aggregates = aggregate_descriptions; } +ExpressionAnalysisResult::ExpressionAnalysisResult(const ASTSelectQuery & query, + SelectQueryExpressionAnalyzer & query_analyzer, + bool first_stage_, + bool second_stage_, + const Context & context, + const StoragePtr & storage, + bool only_types, + const FilterInfoPtr & filter_info_, + const Block & source_header) + : first_stage(first_stage_) + , second_stage(second_stage_) +{ + /// first_stage: Do I need to perform the first part of the pipeline - running on remote servers during distributed processing. + /// second_stage: Do I need to execute the second part of the pipeline - running on the initiating server during distributed processing. + + /** First we compose a chain of actions and remember the necessary steps from it. + * Regardless of from_stage and to_stage, we will compose a complete sequence of actions to perform optimization and + * throw out unnecessary columns based on the entire query. In unnecessary parts of the query, we will not execute subqueries. + */ + + bool finalized = false; + size_t where_step_num = 0; + + auto finalizeChain = [&](ExpressionActionsChain & chain) + { + if (!finalized) + { + chain.finalize(); + finalize(chain, context, where_step_num); + chain.clear(); + } + finalized = true; + }; + + { + ExpressionActionsChain chain(context); + Names additional_required_columns_after_prewhere; + + if (storage && (query.sample_size() || context.getSettingsRef().parallel_replicas_count > 1)) + { + Names columns_for_sampling = storage->getColumnsRequiredForSampling(); + additional_required_columns_after_prewhere.insert(additional_required_columns_after_prewhere.end(), + columns_for_sampling.begin(), columns_for_sampling.end()); + } + + if (storage && query.final()) + { + Names columns_for_final = storage->getColumnsRequiredForFinal(); + additional_required_columns_after_prewhere.insert(additional_required_columns_after_prewhere.end(), + columns_for_final.begin(), columns_for_final.end()); + } + + if (storage && filter_info_) + { + filter_info = filter_info_; + query_analyzer.appendPreliminaryFilter(chain, filter_info->actions, filter_info->column_name); + } + + if (query_analyzer.appendPrewhere(chain, !first_stage, additional_required_columns_after_prewhere)) + { + prewhere_info = std::make_shared( + chain.steps.front().actions, query.prewhere()->getColumnName()); + + if (allowEarlyConstantFolding(*prewhere_info->prewhere_actions, context)) + { + Block before_prewhere_sample = source_header; + if (sanitizeBlock(before_prewhere_sample)) + { + prewhere_info->prewhere_actions->execute(before_prewhere_sample); + auto & column_elem = before_prewhere_sample.getByName(query.prewhere()->getColumnName()); + /// If the filter column is a constant, record it. + if (column_elem.column) + prewhere_constant_filter_description = ConstantFilterDescription(*column_elem.column); + } + } + chain.addStep(); + } + + need_aggregate = query_analyzer.hasAggregation(); + + query_analyzer.appendArrayJoin(chain, only_types || !first_stage); + + if (query_analyzer.appendJoin(chain, only_types || !first_stage)) + { + before_join = chain.getLastActions(); + if (!hasJoin()) + throw Exception("No expected JOIN", ErrorCodes::LOGICAL_ERROR); + chain.addStep(); + } + + if (query_analyzer.appendWhere(chain, only_types || !first_stage)) + { + where_step_num = chain.steps.size() - 1; + before_where = chain.getLastActions(); + if (allowEarlyConstantFolding(*before_where, context)) + { + Block before_where_sample; + if (chain.steps.size() > 1) + before_where_sample = chain.steps[chain.steps.size() - 2].actions->getSampleBlock(); + else + before_where_sample = source_header; + if (sanitizeBlock(before_where_sample)) + { + before_where->execute(before_where_sample); + auto & column_elem = before_where_sample.getByName(query.where()->getColumnName()); + /// If the filter column is a constant, record it. + if (column_elem.column) + where_constant_filter_description = ConstantFilterDescription(*column_elem.column); + } + } + chain.addStep(); + } + + if (need_aggregate) + { + query_analyzer.appendGroupBy(chain, only_types || !first_stage); + query_analyzer.appendAggregateFunctionsArguments(chain, only_types || !first_stage); + before_aggregation = chain.getLastActions(); + + finalizeChain(chain); + + if (query_analyzer.appendHaving(chain, only_types || !second_stage)) + { + before_having = chain.getLastActions(); + chain.addStep(); + } + } + + bool has_stream_with_non_joned_rows = (before_join && before_join->getTableJoinAlgo()->hasStreamWithNonJoinedRows()); + optimize_read_in_order = + context.getSettingsRef().optimize_read_in_order + && storage && query.orderBy() + && !query_analyzer.hasAggregation() + && !query.final() + && !has_stream_with_non_joned_rows; + + /// If there is aggregation, we execute expressions in SELECT and ORDER BY on the initiating server, otherwise on the source servers. + query_analyzer.appendSelect(chain, only_types || (need_aggregate ? !second_stage : !first_stage)); + selected_columns = chain.getLastStep().required_output; + has_order_by = query_analyzer.appendOrderBy(chain, only_types || (need_aggregate ? !second_stage : !first_stage), optimize_read_in_order); + before_order_and_select = chain.getLastActions(); + chain.addStep(); + + if (query_analyzer.appendLimitBy(chain, only_types || !second_stage)) + { + before_limit_by = chain.getLastActions(); + chain.addStep(); + } + + query_analyzer.appendProjectResult(chain); + final_projection = chain.getLastActions(); + + finalizeChain(chain); + } + + /// Before executing WHERE and HAVING, remove the extra columns from the block (mostly the aggregation keys). + removeExtraColumns(); + + subqueries_for_sets = query_analyzer.getSubqueriesForSets(); + + checkActions(); +} + +void ExpressionAnalysisResult::finalize(const ExpressionActionsChain & chain, const Context & context_, size_t where_step_num) +{ + if (hasPrewhere()) + { + const ExpressionActionsChain::Step & step = chain.steps.at(0); + prewhere_info->remove_prewhere_column = step.can_remove_required_output.at(0); + + Names columns_to_remove; + for (size_t i = 1; i < step.required_output.size(); ++i) + { + if (step.can_remove_required_output[i]) + columns_to_remove.push_back(step.required_output[i]); + } + + if (!columns_to_remove.empty()) + { + auto columns = prewhere_info->prewhere_actions->getSampleBlock().getNamesAndTypesList(); + ExpressionActionsPtr actions = std::make_shared(columns, context_); + for (const auto & column : columns_to_remove) + actions->add(ExpressionAction::removeColumn(column)); + + prewhere_info->remove_columns_actions = std::move(actions); + } + + columns_to_remove_after_prewhere = std::move(columns_to_remove); + } + else if (hasFilter()) + { + /// Can't have prewhere and filter set simultaneously + filter_info->do_remove_column = chain.steps.at(0).can_remove_required_output.at(0); + } + if (hasWhere()) + remove_where_filter = chain.steps.at(where_step_num).can_remove_required_output.at(0); +} + +void ExpressionAnalysisResult::removeExtraColumns() +{ + if (hasFilter()) + filter_info->actions->prependProjectInput(); + if (hasWhere()) + before_where->prependProjectInput(); + if (hasHaving()) + before_having->prependProjectInput(); +} + +void ExpressionAnalysisResult::checkActions() +{ + /// Check that PREWHERE doesn't contain unusual actions. Unusual actions are that can change number of rows. + if (hasPrewhere()) + { + auto check_actions = [](const ExpressionActionsPtr & actions) + { + if (actions) + for (const auto & action : actions->getActions()) + if (action.type == ExpressionAction::Type::JOIN || action.type == ExpressionAction::Type::ARRAY_JOIN) + throw Exception("PREWHERE cannot contain ARRAY JOIN or JOIN action", ErrorCodes::ILLEGAL_PREWHERE); + }; + + check_actions(prewhere_info->prewhere_actions); + check_actions(prewhere_info->alias_actions); + check_actions(prewhere_info->remove_columns_actions); + } +} + } diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.h b/dbms/src/Interpreters/ExpressionAnalyzer.h index 0b077901c66..e762cdf4d33 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.h +++ b/dbms/src/Interpreters/ExpressionAnalyzer.h @@ -2,11 +2,13 @@ #include #include +#include #include #include #include #include #include +#include namespace DB @@ -29,6 +31,9 @@ class ASTExpressionList; class ASTSelectQuery; struct ASTTablesInSelectQueryElement; +/// Create columns in block or return false if not possible +bool sanitizeBlock(Block & block); + /// ExpressionAnalyzer sources, intermediates and results. It splits data and logic, allows to test them separately. struct ExpressionAnalyzerData { @@ -156,10 +161,73 @@ protected: bool isRemoteStorage() const; }; +class SelectQueryExpressionAnalyzer; + +/// Result of SelectQueryExpressionAnalyzer: expressions for InterpreterSelectQuery +struct ExpressionAnalysisResult +{ + bool need_aggregate = false; + bool has_order_by = false; + + bool remove_where_filter = false; + bool optimize_read_in_order = false; + + ExpressionActionsPtr before_join; /// including JOIN + ExpressionActionsPtr before_where; + ExpressionActionsPtr before_aggregation; + ExpressionActionsPtr before_having; + ExpressionActionsPtr before_order_and_select; + ExpressionActionsPtr before_limit_by; + ExpressionActionsPtr final_projection; + + /// Columns from the SELECT list, before renaming them to aliases. + Names selected_columns; + + /// Columns will be removed after prewhere actions execution. + Names columns_to_remove_after_prewhere; + + /// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing. + bool first_stage = false; + /// Do I need to execute the second part of the pipeline - running on the initiating server during distributed processing. + bool second_stage = false; + + SubqueriesForSets subqueries_for_sets; + PrewhereInfoPtr prewhere_info; + FilterInfoPtr filter_info; + ConstantFilterDescription prewhere_constant_filter_description; + ConstantFilterDescription where_constant_filter_description; + + ExpressionAnalysisResult() = default; + + ExpressionAnalysisResult( + const ASTSelectQuery & query, + SelectQueryExpressionAnalyzer & query_analyzer, + bool first_stage, + bool second_stage, + const Context & context, + const StoragePtr & storage, + bool only_types, + const FilterInfoPtr & filter_info, + const Block & source_header); + + bool hasFilter() const { return filter_info.get(); } + bool hasJoin() const { return before_join.get(); } + bool hasPrewhere() const { return prewhere_info.get(); } + bool hasWhere() const { return before_where.get(); } + bool hasHaving() const { return before_having.get(); } + bool hasLimitBy() const { return before_limit_by.get(); } + + void removeExtraColumns(); + void checkActions(); + void finalize(const ExpressionActionsChain & chain, const Context & context, size_t where_step_num); +}; + /// SelectQuery specific ExpressionAnalyzer part. class SelectQueryExpressionAnalyzer : public ExpressionAnalyzer { public: + friend struct ExpressionAnalysisResult; + SelectQueryExpressionAnalyzer( const ASTPtr & query_, const SyntaxAnalyzerResultPtr & syntax_analyzer_result_, @@ -185,6 +253,39 @@ public: /// Tables that will need to be sent to remote servers for distributed query processing. const Tables & getExternalTables() const { return external_tables; } + ExpressionActionsPtr simpleSelectActions(); + + /// These appends are public only for tests + void appendSelect(ExpressionActionsChain & chain, bool only_types); + /// Deletes all columns except mentioned by SELECT, arranges the remaining columns and renames them to aliases. + void appendProjectResult(ExpressionActionsChain & chain) const; + + /// Create Set-s that we can from IN section to use the index on them. + void makeSetsForIndex(const ASTPtr & node); + +private: + /// If non-empty, ignore all expressions not from this list. + NameSet required_result_columns; + + /** + * Create Set from a subquery or a table expression in the query. The created set is suitable for using the index. + * The set will not be created if its size hits the limit. + */ + void tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_or_table_name); + + /** + * Checks if subquery is not a plain StorageSet. + * Because while making set we will read data from StorageSet which is not allowed. + * Returns valid SetPtr from StorageSet if the latter is used after IN or nullptr otherwise. + */ + SetPtr isPlainStorageSetInSubquery(const ASTPtr & subquery_of_table_name); + + JoinPtr makeTableJoin(const ASTTablesInSelectQueryElement & join_element); + void makeSubqueryForJoin(const ASTTablesInSelectQueryElement & join_element, NamesWithAliases && required_columns_with_aliases, + SubqueryForSet & subquery_for_set) const; + + const ASTSelectQuery * getAggregatingQuery() const; + /** These methods allow you to build a chain of transformations over a block, that receives values in the desired sections of the query. * * Example usage: @@ -213,37 +314,10 @@ public: /// After aggregation: bool appendHaving(ExpressionActionsChain & chain, bool only_types); - void appendSelect(ExpressionActionsChain & chain, bool only_types); + /// appendSelect bool appendOrderBy(ExpressionActionsChain & chain, bool only_types, bool optimize_read_in_order); bool appendLimitBy(ExpressionActionsChain & chain, bool only_types); - /// Deletes all columns except mentioned by SELECT, arranges the remaining columns and renames them to aliases. - void appendProjectResult(ExpressionActionsChain & chain) const; - - /// Create Set-s that we can from IN section to use the index on them. - void makeSetsForIndex(const ASTPtr & node); - -private: - /// If non-empty, ignore all expressions not from this list. - NameSet required_result_columns; - - /** - * Create Set from a subquery or a table expression in the query. The created set is suitable for using the index. - * The set will not be created if its size hits the limit. - */ - void tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_or_table_name); - - /** - * Checks if subquery is not a plain StorageSet. - * Because while making set we will read data from StorageSet which is not allowed. - * Returns valid SetPtr from StorageSet if the latter is used after IN or nullptr otherwise. - */ - SetPtr isPlainStorageSetInSubquery(const ASTPtr & subquery_of_table_name); - - JoinPtr makeTableJoin(const ASTTablesInSelectQueryElement & join_element); - void makeSubqueryForJoin(const ASTTablesInSelectQueryElement & join_element, NamesWithAliases && required_columns_with_aliases, - SubqueryForSet & subquery_for_set) const; - - const ASTSelectQuery * getAggregatingQuery() const; + /// appendProjectResult }; } diff --git a/dbms/src/Interpreters/InterpreterCreateQuery.h b/dbms/src/Interpreters/InterpreterCreateQuery.h index ae8125b84d6..0ff5d8c60e7 100644 --- a/dbms/src/Interpreters/InterpreterCreateQuery.h +++ b/dbms/src/Interpreters/InterpreterCreateQuery.h @@ -6,6 +6,7 @@ #include #include #include +#include namespace DB diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index ac7ea12d898..cf2ecf36056 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -154,9 +154,7 @@ String InterpreterSelectQuery::generateFilterActions(ExpressionActionsPtr & acti /// Using separate expression analyzer to prevent any possible alias injection auto syntax_result = SyntaxAnalyzer(*context).analyze(query_ast, storage->getColumns().getAllPhysical()); SelectQueryExpressionAnalyzer analyzer(query_ast, syntax_result, *context); - ExpressionActionsChain new_chain(*context); - analyzer.appendSelect(new_chain, false); - actions = new_chain.getLastActions(); + actions = analyzer.simpleSelectActions(); return expr_list->children.at(0)->getColumnName(); } @@ -212,22 +210,6 @@ static Context getSubqueryContext(const Context & context) return subquery_context; } -static bool sanitizeBlock(Block & block) -{ - for (auto & col : block) - { - if (!col.column) - { - if (isNotCreatable(col.type->getTypeId())) - return false; - col.column = col.type->createColumn(); - } - else if (isColumnConst(*col.column) && !col.column->empty()) - col.column = col.column->cloneEmpty(); - } - return true; -} - InterpreterSelectQuery::InterpreterSelectQuery( const ASTPtr & query_ptr_, const Context & context_, @@ -556,11 +538,18 @@ Block InterpreterSelectQuery::getSampleBlockImpl(bool try_move_to_prewhere) if (storage && !options.only_analyze) from_stage = storage->getQueryProcessingStage(*context); - analysis_result = analyzeExpressions( + /// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing. + bool first_stage = from_stage < QueryProcessingStage::WithMergeableState + && options.to_stage >= QueryProcessingStage::WithMergeableState; + /// Do I need to execute the second part of the pipeline - running on the initiating server during distributed processing. + bool second_stage = from_stage <= QueryProcessingStage::WithMergeableState + && options.to_stage > QueryProcessingStage::WithMergeableState; + + analysis_result = ExpressionAnalysisResult( getSelectQuery(), *query_analyzer, - from_stage, - options.to_stage, + first_stage, + second_stage, *context, storage, options.only_analyze, @@ -616,253 +605,6 @@ Block InterpreterSelectQuery::getSampleBlockImpl(bool try_move_to_prewhere) return analysis_result.final_projection->getSampleBlock(); } -/// Check if there is an ignore function. It's used for disabling constant folding in query -/// predicates because some performance tests use ignore function as a non-optimize guard. -static bool allowEarlyConstantFolding(const ExpressionActions & actions, const Context & context) -{ - if (!context.getSettingsRef().enable_early_constant_folding) - return false; - - for (auto & action : actions.getActions()) - { - if (action.type == action.APPLY_FUNCTION && action.function_base) - { - auto name = action.function_base->getName(); - if (name == "ignore") - return false; - } - } - return true; -} - -InterpreterSelectQuery::AnalysisResult -InterpreterSelectQuery::analyzeExpressions( - const ASTSelectQuery & query, - SelectQueryExpressionAnalyzer & query_analyzer, - QueryProcessingStage::Enum from_stage, - QueryProcessingStage::Enum to_stage, - const Context & context, - const StoragePtr & storage, - bool only_types, - const FilterInfoPtr & filter_info, - const Block & source_header) -{ - AnalysisResult res; - - /// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing. - res.first_stage = from_stage < QueryProcessingStage::WithMergeableState - && to_stage >= QueryProcessingStage::WithMergeableState; - /// Do I need to execute the second part of the pipeline - running on the initiating server during distributed processing. - res.second_stage = from_stage <= QueryProcessingStage::WithMergeableState - && to_stage > QueryProcessingStage::WithMergeableState; - - /** First we compose a chain of actions and remember the necessary steps from it. - * Regardless of from_stage and to_stage, we will compose a complete sequence of actions to perform optimization and - * throw out unnecessary columns based on the entire query. In unnecessary parts of the query, we will not execute subqueries. - */ - - bool has_filter = false; - bool has_prewhere = false; - bool has_where = false; - size_t where_step_num; - - auto finalizeChain = [&](ExpressionActionsChain & chain) - { - chain.finalize(); - - if (has_prewhere) - { - const ExpressionActionsChain::Step & step = chain.steps.at(0); - res.prewhere_info->remove_prewhere_column = step.can_remove_required_output.at(0); - - Names columns_to_remove; - for (size_t i = 1; i < step.required_output.size(); ++i) - { - if (step.can_remove_required_output[i]) - columns_to_remove.push_back(step.required_output[i]); - } - - if (!columns_to_remove.empty()) - { - auto columns = res.prewhere_info->prewhere_actions->getSampleBlock().getNamesAndTypesList(); - ExpressionActionsPtr actions = std::make_shared(columns, context); - for (const auto & column : columns_to_remove) - actions->add(ExpressionAction::removeColumn(column)); - - res.prewhere_info->remove_columns_actions = std::move(actions); - } - - res.columns_to_remove_after_prewhere = std::move(columns_to_remove); - } - else if (has_filter) - { - /// Can't have prewhere and filter set simultaneously - res.filter_info->do_remove_column = chain.steps.at(0).can_remove_required_output.at(0); - } - if (has_where) - res.remove_where_filter = chain.steps.at(where_step_num).can_remove_required_output.at(0); - - has_filter = has_prewhere = has_where = false; - - chain.clear(); - }; - - { - ExpressionActionsChain chain(context); - Names additional_required_columns_after_prewhere; - - if (storage && (query.sample_size() || context.getSettingsRef().parallel_replicas_count > 1)) - { - Names columns_for_sampling = storage->getColumnsRequiredForSampling(); - additional_required_columns_after_prewhere.insert(additional_required_columns_after_prewhere.end(), - columns_for_sampling.begin(), columns_for_sampling.end()); - } - - if (storage && query.final()) - { - Names columns_for_final = storage->getColumnsRequiredForFinal(); - additional_required_columns_after_prewhere.insert(additional_required_columns_after_prewhere.end(), - columns_for_final.begin(), columns_for_final.end()); - } - - if (storage && filter_info) - { - has_filter = true; - res.filter_info = filter_info; - query_analyzer.appendPreliminaryFilter(chain, filter_info->actions, filter_info->column_name); - } - - if (query_analyzer.appendPrewhere(chain, !res.first_stage, additional_required_columns_after_prewhere)) - { - has_prewhere = true; - - res.prewhere_info = std::make_shared( - chain.steps.front().actions, query.prewhere()->getColumnName()); - - if (allowEarlyConstantFolding(*res.prewhere_info->prewhere_actions, context)) - { - Block before_prewhere_sample = source_header; - if (sanitizeBlock(before_prewhere_sample)) - { - res.prewhere_info->prewhere_actions->execute(before_prewhere_sample); - auto & column_elem = before_prewhere_sample.getByName(query.prewhere()->getColumnName()); - /// If the filter column is a constant, record it. - if (column_elem.column) - res.prewhere_constant_filter_description = ConstantFilterDescription(*column_elem.column); - } - } - chain.addStep(); - } - - res.need_aggregate = query_analyzer.hasAggregation(); - - query_analyzer.appendArrayJoin(chain, only_types || !res.first_stage); - - if (query_analyzer.appendJoin(chain, only_types || !res.first_stage)) - { - res.before_join = chain.getLastActions(); - if (!res.hasJoin()) - throw Exception("No expected JOIN", ErrorCodes::LOGICAL_ERROR); - chain.addStep(); - } - - if (query_analyzer.appendWhere(chain, only_types || !res.first_stage)) - { - where_step_num = chain.steps.size() - 1; - has_where = res.has_where = true; - res.before_where = chain.getLastActions(); - if (allowEarlyConstantFolding(*res.before_where, context)) - { - Block before_where_sample; - if (chain.steps.size() > 1) - before_where_sample = chain.steps[chain.steps.size() - 2].actions->getSampleBlock(); - else - before_where_sample = source_header; - if (sanitizeBlock(before_where_sample)) - { - res.before_where->execute(before_where_sample); - auto & column_elem = before_where_sample.getByName(query.where()->getColumnName()); - /// If the filter column is a constant, record it. - if (column_elem.column) - res.where_constant_filter_description = ConstantFilterDescription(*column_elem.column); - } - } - chain.addStep(); - } - - if (res.need_aggregate) - { - query_analyzer.appendGroupBy(chain, only_types || !res.first_stage); - query_analyzer.appendAggregateFunctionsArguments(chain, only_types || !res.first_stage); - res.before_aggregation = chain.getLastActions(); - - finalizeChain(chain); - - if (query_analyzer.appendHaving(chain, only_types || !res.second_stage)) - { - res.has_having = true; - res.before_having = chain.getLastActions(); - chain.addStep(); - } - } - - bool has_stream_with_non_joned_rows = (res.before_join && res.before_join->getTableJoinAlgo()->hasStreamWithNonJoinedRows()); - res.optimize_read_in_order = - context.getSettingsRef().optimize_read_in_order - && storage && query.orderBy() - && !query_analyzer.hasAggregation() - && !query.final() - && !has_stream_with_non_joned_rows; - - /// If there is aggregation, we execute expressions in SELECT and ORDER BY on the initiating server, otherwise on the source servers. - query_analyzer.appendSelect(chain, only_types || (res.need_aggregate ? !res.second_stage : !res.first_stage)); - res.selected_columns = chain.getLastStep().required_output; - res.has_order_by = query_analyzer.appendOrderBy(chain, only_types || (res.need_aggregate ? !res.second_stage : !res.first_stage), res.optimize_read_in_order); - res.before_order_and_select = chain.getLastActions(); - chain.addStep(); - - if (query_analyzer.appendLimitBy(chain, only_types || !res.second_stage)) - { - res.has_limit_by = true; - res.before_limit_by = chain.getLastActions(); - chain.addStep(); - } - - query_analyzer.appendProjectResult(chain); - res.final_projection = chain.getLastActions(); - - finalizeChain(chain); - } - - /// Before executing WHERE and HAVING, remove the extra columns from the block (mostly the aggregation keys). - if (res.filter_info) - res.filter_info->actions->prependProjectInput(); - if (res.has_where) - res.before_where->prependProjectInput(); - if (res.has_having) - res.before_having->prependProjectInput(); - - res.subqueries_for_sets = query_analyzer.getSubqueriesForSets(); - - /// Check that PREWHERE doesn't contain unusual actions. Unusual actions are that can change number of rows. - if (res.prewhere_info) - { - auto check_actions = [](const ExpressionActionsPtr & actions) - { - if (actions) - for (const auto & action : actions->getActions()) - if (action.type == ExpressionAction::Type::JOIN || action.type == ExpressionAction::Type::ARRAY_JOIN) - throw Exception("PREWHERE cannot contain ARRAY JOIN or JOIN action", ErrorCodes::ILLEGAL_PREWHERE); - }; - - check_actions(res.prewhere_info->prewhere_actions); - check_actions(res.prewhere_info->alias_actions); - check_actions(res.prewhere_info->remove_columns_actions); - } - - return res; -} - static Field getWithFillFieldValue(const ASTPtr & node, const Context & context) { const auto & [field, type] = evaluateConstantExpression(node, context); @@ -1094,7 +836,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS if (expressions.first_stage) { - if (expressions.filter_info) + if (expressions.hasFilter()) { if constexpr (pipeline_with_processors) { @@ -1176,7 +918,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS } } - if (expressions.has_where) + if (expressions.hasWhere()) executeWhere(pipeline, expressions.before_where, expressions.remove_where_filter); if (expressions.need_aggregate) @@ -1192,7 +934,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS * but there is an ORDER or LIMIT, * then we will perform the preliminary sorting and LIMIT on the remote server. */ - if (!expressions.second_stage && !expressions.need_aggregate && !expressions.has_having) + if (!expressions.second_stage && !expressions.need_aggregate && !expressions.hasHaving()) { if (expressions.has_order_by) executeOrder(pipeline, query_info.input_sorting_info); @@ -1200,7 +942,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS if (expressions.has_order_by && query.limitLength()) executeDistinct(pipeline, false, expressions.selected_columns); - if (expressions.has_limit_by) + if (expressions.hasLimitBy()) { executeExpression(pipeline, expressions.before_limit_by); executeLimitBy(pipeline); @@ -1230,7 +972,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS if (query.group_by_with_totals) { bool final = !query.group_by_with_rollup && !query.group_by_with_cube; - executeTotalsAndHaving(pipeline, expressions.has_having, expressions.before_having, aggregate_overflow_row, final); + executeTotalsAndHaving(pipeline, expressions.hasHaving(), expressions.before_having, aggregate_overflow_row, final); } if (query.group_by_with_rollup) @@ -1238,14 +980,14 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS else if (query.group_by_with_cube) executeRollupOrCube(pipeline, Modificator::CUBE); - if ((query.group_by_with_rollup || query.group_by_with_cube) && expressions.has_having) + if ((query.group_by_with_rollup || query.group_by_with_cube) && expressions.hasHaving()) { if (query.group_by_with_totals) throw Exception("WITH TOTALS and WITH ROLLUP or CUBE are not supported together in presence of HAVING", ErrorCodes::NOT_IMPLEMENTED); executeHaving(pipeline, expressions.before_having); } } - else if (expressions.has_having) + else if (expressions.hasHaving()) executeHaving(pipeline, expressions.before_having); executeExpression(pipeline, expressions.before_order_and_select); @@ -1273,7 +1015,8 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS /** Optimization - if there are several sources and there is LIMIT, then first apply the preliminary LIMIT, * limiting the number of rows in each up to `offset + limit`. */ - if (query.limitLength() && !query.limit_with_ties && pipeline.hasMoreThanOneStream() && !query.distinct && !expressions.has_limit_by && !settings.extremes) + if (query.limitLength() && !query.limit_with_ties && pipeline.hasMoreThanOneStream() && + !query.distinct && !expressions.hasLimitBy() && !settings.extremes) { executePreLimit(pipeline); } @@ -1298,7 +1041,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS if (need_second_distinct_pass) executeDistinct(pipeline, false, expressions.selected_columns); - if (expressions.has_limit_by) + if (expressions.hasLimitBy()) { executeExpression(pipeline, expressions.before_limit_by); executeLimitBy(pipeline); diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.h b/dbms/src/Interpreters/InterpreterSelectQuery.h index 77a6f1ca3ac..0e4e3256332 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.h +++ b/dbms/src/Interpreters/InterpreterSelectQuery.h @@ -152,55 +152,6 @@ private: template void executeImpl(TPipeline & pipeline, const BlockInputStreamPtr & prepared_input, std::optional prepared_pipe, QueryPipeline & save_context_and_storage); - struct AnalysisResult - { - bool hasJoin() const { return before_join.get(); } - bool has_where = false; - bool need_aggregate = false; - bool has_having = false; - bool has_order_by = false; - bool has_limit_by = false; - - bool remove_where_filter = false; - bool optimize_read_in_order = false; - - ExpressionActionsPtr before_join; /// including JOIN - ExpressionActionsPtr before_where; - ExpressionActionsPtr before_aggregation; - ExpressionActionsPtr before_having; - ExpressionActionsPtr before_order_and_select; - ExpressionActionsPtr before_limit_by; - ExpressionActionsPtr final_projection; - - /// Columns from the SELECT list, before renaming them to aliases. - Names selected_columns; - - /// Columns will be removed after prewhere actions execution. - Names columns_to_remove_after_prewhere; - - /// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing. - bool first_stage = false; - /// Do I need to execute the second part of the pipeline - running on the initiating server during distributed processing. - bool second_stage = false; - - SubqueriesForSets subqueries_for_sets; - PrewhereInfoPtr prewhere_info; - FilterInfoPtr filter_info; - ConstantFilterDescription prewhere_constant_filter_description; - ConstantFilterDescription where_constant_filter_description; - }; - - static AnalysisResult analyzeExpressions( - const ASTSelectQuery & query, - SelectQueryExpressionAnalyzer & query_analyzer, - QueryProcessingStage::Enum from_stage, - QueryProcessingStage::Enum to_stage, - const Context & context, - const StoragePtr & storage, - bool only_types, - const FilterInfoPtr & filter_info, - const Block & source_header); - /** From which table to read. With JOIN, the "left" table is returned. */ static void getDatabaseAndTableNames(const ASTSelectQuery & query, String & database_name, String & table_name, const Context & context); @@ -284,7 +235,7 @@ private: SelectQueryInfo query_info; /// Is calculated in getSampleBlock. Is used later in readImpl. - AnalysisResult analysis_result; + ExpressionAnalysisResult analysis_result; FilterInfoPtr filter_info; QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns; diff --git a/dbms/src/Interpreters/SubqueryForSet.cpp b/dbms/src/Interpreters/SubqueryForSet.cpp index 5a2a06cc411..47de516d154 100644 --- a/dbms/src/Interpreters/SubqueryForSet.cpp +++ b/dbms/src/Interpreters/SubqueryForSet.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include namespace DB diff --git a/dbms/src/Interpreters/SubqueryForSet.h b/dbms/src/Interpreters/SubqueryForSet.h index aa510faefbc..3463f708a46 100644 --- a/dbms/src/Interpreters/SubqueryForSet.h +++ b/dbms/src/Interpreters/SubqueryForSet.h @@ -1,16 +1,18 @@ #pragma once +#include +#include #include #include #include -#include namespace DB { class InterpreterSelectWithUnionQuery; - +class ExpressionActions; +using ExpressionActionsPtr = std::shared_ptr; /// Information on what to do when executing a subquery in the [GLOBAL] IN/JOIN section. struct SubqueryForSet diff --git a/dbms/src/Interpreters/interpretSubquery.cpp b/dbms/src/Interpreters/interpretSubquery.cpp index 82545d4b3be..e06d7f159ac 100644 --- a/dbms/src/Interpreters/interpretSubquery.cpp +++ b/dbms/src/Interpreters/interpretSubquery.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include diff --git a/dbms/src/Processors/Formats/Impl/ConstantExpressionTemplate.h b/dbms/src/Processors/Formats/Impl/ConstantExpressionTemplate.h index bbac2e2a999..931b05673c6 100644 --- a/dbms/src/Processors/Formats/Impl/ConstantExpressionTemplate.h +++ b/dbms/src/Processors/Formats/Impl/ConstantExpressionTemplate.h @@ -1,7 +1,6 @@ #pragma once #include -#include #include #include @@ -12,6 +11,9 @@ struct LiteralInfo; using LiteralsInfo = std::vector; struct SpecialParserType; +class ExpressionActions; +using ExpressionActionsPtr = std::shared_ptr; + /// Deduces template of an expression by replacing literals with dummy columns. /// It allows to parse and evaluate similar expressions without using heavy IParsers and ExpressionAnalyzer. /// Using ConstantExpressionTemplate for one expression is slower then evaluateConstantExpression(...), diff --git a/dbms/src/Processors/Transforms/CreatingSetsTransform.h b/dbms/src/Processors/Transforms/CreatingSetsTransform.h index 00f64440393..aeb7a43b61b 100644 --- a/dbms/src/Processors/Transforms/CreatingSetsTransform.h +++ b/dbms/src/Processors/Transforms/CreatingSetsTransform.h @@ -1,11 +1,14 @@ #pragma once +#include #include #include #include +#include namespace DB { +class QueryStatus; struct Progress; using ProgressCallback = std::function; diff --git a/dbms/src/Processors/Transforms/ExpressionTransform.cpp b/dbms/src/Processors/Transforms/ExpressionTransform.cpp index 9bd4ba89db6..7ae2eafa0c6 100644 --- a/dbms/src/Processors/Transforms/ExpressionTransform.cpp +++ b/dbms/src/Processors/Transforms/ExpressionTransform.cpp @@ -1,5 +1,6 @@ #include #include +#include namespace DB { diff --git a/dbms/src/Processors/Transforms/InflatingExpressionTransform.cpp b/dbms/src/Processors/Transforms/InflatingExpressionTransform.cpp index 6653aa0c5c7..83792253390 100644 --- a/dbms/src/Processors/Transforms/InflatingExpressionTransform.cpp +++ b/dbms/src/Processors/Transforms/InflatingExpressionTransform.cpp @@ -1,5 +1,6 @@ #include #include +#include namespace DB { diff --git a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp index 348cb741123..505bc140043 100644 --- a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp +++ b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include diff --git a/dbms/src/Storages/IStorage.cpp b/dbms/src/Storages/IStorage.cpp index 332ecbc8681..1192107ab32 100644 --- a/dbms/src/Storages/IStorage.cpp +++ b/dbms/src/Storages/IStorage.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include diff --git a/dbms/src/Storages/IStorage.h b/dbms/src/Storages/IStorage.h index 29cdc0661cd..65daaa6d77c 100644 --- a/dbms/src/Storages/IStorage.h +++ b/dbms/src/Storages/IStorage.h @@ -52,6 +52,9 @@ using Processors = std::vector; class Pipe; using Pipes = std::vector; +class StoragePolicy; +using StoragePolicyPtr = std::shared_ptr; + struct ColumnSize { size_t marks = 0; diff --git a/dbms/src/Storages/Kafka/StorageKafka.h b/dbms/src/Storages/Kafka/StorageKafka.h index bf710f58202..8f00c111ee4 100644 --- a/dbms/src/Storages/Kafka/StorageKafka.h +++ b/dbms/src/Storages/Kafka/StorageKafka.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include diff --git a/dbms/src/Storages/MergeTree/KeyCondition.h b/dbms/src/Storages/MergeTree/KeyCondition.h index 1971191514c..fd1d11c0ec8 100644 --- a/dbms/src/Storages/MergeTree/KeyCondition.h +++ b/dbms/src/Storages/MergeTree/KeyCondition.h @@ -4,7 +4,6 @@ #include #include -#include #include #include #include @@ -19,6 +18,9 @@ namespace DB class IFunction; using FunctionBasePtr = std::shared_ptr; +class ExpressionActions; +using ExpressionActionsPtr = std::shared_ptr; + /** Range with open or closed ends; possibly unbounded. */ struct Range diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.h b/dbms/src/Storages/MergeTree/MergeTreeData.h index ab5644749ee..bfb478c7751 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.h +++ b/dbms/src/Storages/MergeTree/MergeTreeData.h @@ -2,7 +2,6 @@ #include #include -#include #include #include #include @@ -35,6 +34,9 @@ class MergeListEntry; class AlterCommands; class MergeTreePartsMover; +class ExpressionActions; +using ExpressionActionsPtr = std::shared_ptr; + namespace ErrorCodes { extern const int LOGICAL_ERROR; diff --git a/dbms/src/Storages/MergeTree/MergeTreeRangeReader.h b/dbms/src/Storages/MergeTree/MergeTreeRangeReader.h index 345f537d2aa..6e06768fe86 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeRangeReader.h +++ b/dbms/src/Storages/MergeTree/MergeTreeRangeReader.h @@ -1,7 +1,6 @@ #pragma once #include #include -#include #include namespace DB diff --git a/dbms/src/Storages/StorageDistributed.h b/dbms/src/Storages/StorageDistributed.h index a3fd6f3c6e1..6b53ce33792 100644 --- a/dbms/src/Storages/StorageDistributed.h +++ b/dbms/src/Storages/StorageDistributed.h @@ -8,7 +8,6 @@ #include #include #include -#include #include #include #include @@ -23,6 +22,8 @@ class StorageDistributedDirectoryMonitor; class Volume; using VolumePtr = std::shared_ptr; +class ExpressionActions; +using ExpressionActionsPtr = std::shared_ptr; /** A distributed table that resides on multiple servers. * Uses data from the specified database and tables on each server. diff --git a/dbms/src/Storages/StorageJoin.cpp b/dbms/src/Storages/StorageJoin.cpp index 9288f29e58c..9fc791aab2d 100644 --- a/dbms/src/Storages/StorageJoin.cpp +++ b/dbms/src/Storages/StorageJoin.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include diff --git a/dbms/src/Storages/StorageMerge.h b/dbms/src/Storages/StorageMerge.h index c773ff3ae15..35fb8db2438 100644 --- a/dbms/src/Storages/StorageMerge.h +++ b/dbms/src/Storages/StorageMerge.h @@ -4,6 +4,7 @@ #include #include +#include namespace DB diff --git a/dbms/src/Storages/StorageMySQL.h b/dbms/src/Storages/StorageMySQL.h index 574221377dc..03563b233e1 100644 --- a/dbms/src/Storages/StorageMySQL.h +++ b/dbms/src/Storages/StorageMySQL.h @@ -6,6 +6,7 @@ #include #include +#include #include diff --git a/dbms/src/Storages/StorageNull.cpp b/dbms/src/Storages/StorageNull.cpp index bbb620132c8..c01f463400d 100644 --- a/dbms/src/Storages/StorageNull.cpp +++ b/dbms/src/Storages/StorageNull.cpp @@ -3,6 +3,7 @@ #include #include +#include #include #include diff --git a/dbms/src/Storages/registerStorages.h b/dbms/src/Storages/registerStorages.h index b88b2666a8f..c6decff5876 100644 --- a/dbms/src/Storages/registerStorages.h +++ b/dbms/src/Storages/registerStorages.h @@ -1,5 +1,6 @@ #pragma once #include +#include "config_core.h" namespace DB { diff --git a/dbms/src/TableFunctions/TableFunctionNumbers.cpp b/dbms/src/TableFunctions/TableFunctionNumbers.cpp index 9b00eb600e8..615a54dd1b4 100644 --- a/dbms/src/TableFunctions/TableFunctionNumbers.cpp +++ b/dbms/src/TableFunctions/TableFunctionNumbers.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include "registerTableFunctions.h" diff --git a/dbms/src/TableFunctions/TableFunctionS3.cpp b/dbms/src/TableFunctions/TableFunctionS3.cpp index 019c3ca4f51..34a1607178f 100644 --- a/dbms/src/TableFunctions/TableFunctionS3.cpp +++ b/dbms/src/TableFunctions/TableFunctionS3.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include diff --git a/dbms/src/TableFunctions/TableFunctionValues.cpp b/dbms/src/TableFunctions/TableFunctionValues.cpp index 96e39f434d8..b30867949e9 100644 --- a/dbms/src/TableFunctions/TableFunctionValues.cpp +++ b/dbms/src/TableFunctions/TableFunctionValues.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include "registerTableFunctions.h" diff --git a/dbms/src/TableFunctions/parseColumnsListForTableFunction.cpp b/dbms/src/TableFunctions/parseColumnsListForTableFunction.cpp index 41aaec9d1fb..7f94236f239 100644 --- a/dbms/src/TableFunctions/parseColumnsListForTableFunction.cpp +++ b/dbms/src/TableFunctions/parseColumnsListForTableFunction.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include