diff --git a/dbms/src/DataStreams/FilterBlockInputStream.cpp b/dbms/src/DataStreams/FilterBlockInputStream.cpp index ac117127b5c..6de31ac47a5 100644 --- a/dbms/src/DataStreams/FilterBlockInputStream.cpp +++ b/dbms/src/DataStreams/FilterBlockInputStream.cpp @@ -17,14 +17,16 @@ namespace ErrorCodes } -FilterBlockInputStream::FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_, ssize_t filter_column_) - : expression(expression_), filter_column(filter_column_) +FilterBlockInputStream::FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_, + ssize_t filter_column_, bool remove_filter) + : expression(expression_), filter_column(filter_column_), remove_filter(remove_filter) { children.push_back(input); } -FilterBlockInputStream::FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_, const String & filter_column_name) - : expression(expression_) +FilterBlockInputStream::FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_, + const String & filter_column_name, bool remove_filter) + : expression(expression_), remove_filter(remove_filter) { children.push_back(input); @@ -46,6 +48,9 @@ FilterBlockInputStream::FilterBlockInputStream(const BlockInputStreamPtr & input FilterDescription filter_description_check(*column_elem.column); column_elem.column = column_elem.type->createColumnConst(header.rows(), UInt64(1)); } + + if (remove_filter) + header.erase(filter_column_name); } @@ -75,7 +80,7 @@ Block FilterBlockInputStream::readImpl() Block res; if (constant_filter_description.always_false) - return res; + return removeFilterIfNeed(std::move(res)); /// Until non-empty block after filtering or end of stream. while (1) @@ -87,7 +92,7 @@ Block FilterBlockInputStream::readImpl() expression->execute(res); if (constant_filter_description.always_true) - return res; + return removeFilterIfNeed(std::move(res)); size_t columns = res.columns(); ColumnPtr column = res.safeGetByPosition(filter_column).column; @@ -106,7 +111,7 @@ Block FilterBlockInputStream::readImpl() } if (constant_filter_description.always_true) - return res; + return removeFilterIfNeed(std::move(res)); FilterDescription filter_and_holder(*column); @@ -148,7 +153,7 @@ Block FilterBlockInputStream::readImpl() /// Replace the column with the filter by a constant. res.safeGetByPosition(filter_column).column = res.safeGetByPosition(filter_column).type->createColumnConst(filtered_rows, UInt64(1)); /// No need to touch the rest of the columns. - return res; + return removeFilterIfNeed(std::move(res)); } /// Filter the rest of the columns. @@ -176,9 +181,18 @@ Block FilterBlockInputStream::readImpl() current_column.column = current_column.column->filter(*filter_and_holder.data, -1); } - return res; + return removeFilterIfNeed(std::move(res)); } } +Block FilterBlockInputStream::removeFilterIfNeed(Block && block) +{ + if (block && remove_filter) + block.erase(static_cast(filter_column)); + + return block; +} + + } diff --git a/dbms/src/DataStreams/FilterBlockInputStream.h b/dbms/src/DataStreams/FilterBlockInputStream.h index 8bebda86fd4..cfed66df8a5 100644 --- a/dbms/src/DataStreams/FilterBlockInputStream.h +++ b/dbms/src/DataStreams/FilterBlockInputStream.h @@ -21,8 +21,8 @@ private: public: /// filter_column_ - the number of the column with filter conditions. - FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_, ssize_t filter_column_); - FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_, const String & filter_column_name_); + FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_, ssize_t filter_column_, bool remove_filter = false); + FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_, const String & filter_column_name_, bool remove_filter = false); String getName() const override; Block getTotals() override; @@ -35,8 +35,11 @@ private: ExpressionActionsPtr expression; Block header; ssize_t filter_column; + bool remove_filter; ConstantFilterDescription constant_filter_description; + + Block removeFilterIfNeed(Block && block); }; } diff --git a/dbms/src/Interpreters/ExpressionActions.cpp b/dbms/src/Interpreters/ExpressionActions.cpp index 14fdb99090c..ddaba9a0451 100644 --- a/dbms/src/Interpreters/ExpressionActions.cpp +++ b/dbms/src/Interpreters/ExpressionActions.cpp @@ -1016,10 +1016,25 @@ void ExpressionActionsChain::finalize() for (int i = static_cast(steps.size()) - 1; i >= 0; --i) { Names required_output = steps[i].required_output; + std::unordered_map required_output_indexes; + for (size_t j = 0; j < required_output.size(); ++j) + required_output_indexes[required_output[j]] = j; + auto & can_remove_required_output = steps[i].can_remove_required_output; + if (i + 1 < static_cast(steps.size())) { + const NameSet & additional_input = steps[i + 1].additional_input; for (const auto & it : steps[i + 1].actions->getRequiredColumnsWithTypes()) - required_output.push_back(it.name); + { + if (additional_input.count(it.name) == 0) + { + auto iter = required_output_indexes.find(it.name); + if (iter == required_output_indexes.end()) + required_output.push_back(it.name); + else if (!can_remove_required_output.empty()) + *can_remove_required_output[iter->second] = false; + } + } } steps[i].actions->finalize(required_output); } diff --git a/dbms/src/Interpreters/ExpressionActions.h b/dbms/src/Interpreters/ExpressionActions.h index f29e53a1d7e..71d3bad4f9f 100644 --- a/dbms/src/Interpreters/ExpressionActions.h +++ b/dbms/src/Interpreters/ExpressionActions.h @@ -230,7 +230,9 @@ struct ExpressionActionsChain struct Step { ExpressionActionsPtr actions; + NameSet additional_input; Names required_output; + std::vector> can_remove_required_output; /// Has the same size with required_output, is filled in finalize() Step(const ExpressionActionsPtr & actions_ = nullptr, const Names & required_output_ = Names()) : actions(actions_), required_output(required_output_) {} diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.cpp b/dbms/src/Interpreters/ExpressionAnalyzer.cpp index 4bea5fffcee..11a12d4504f 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.cpp +++ b/dbms/src/Interpreters/ExpressionAnalyzer.cpp @@ -60,6 +60,7 @@ #include #include #include +#include namespace DB @@ -2429,7 +2430,7 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty return true; } -bool ExpressionAnalyzer::appendPrewhere(ExpressionActionsChain & chain, bool only_types) +bool ExpressionAnalyzer::appendPrewhere(ExpressionActionsChain & chain, bool only_types, std::shared_ptr & remove_filter) { assertSelect(); @@ -2437,15 +2438,61 @@ bool ExpressionAnalyzer::appendPrewhere(ExpressionActionsChain & chain, bool onl return false; initChain(chain, source_columns); - ExpressionActionsChain::Step & step = chain.steps.back(); - - step.required_output.push_back(select_query->prewhere_expression->getColumnName()); + auto & step = chain.getLastStep(); getRootActions(select_query->prewhere_expression, only_types, false, step.actions); + String prewhere_column_name = select_query->prewhere_expression->getColumnName(); + step.required_output.push_back(prewhere_column_name); + step.can_remove_required_output = {remove_filter = std::make_shared(true)}; + + { + /// Remove unused source_columns from prewhere actions. + auto tmp_actions = std::make_shared(source_columns, settings); + getRootActions(select_query->prewhere_expression, only_types, false, tmp_actions); + tmp_actions->finalize({prewhere_column_name}); + auto required_columns = tmp_actions->getRequiredColumns(); + NameSet required_source_columns(required_columns.begin(), required_columns.end()); + + auto names = step.actions->getSampleBlock().getNames(); + NameSet name_set(names.begin(), names.end()); + + for (const auto & column : source_columns) + if (required_source_columns.count(column.name) == 0) + name_set.erase(column.name); + + Names required_output(name_set.begin(), name_set.end()); + step.actions->finalize(required_output); + } + + { + /// Add empty action with input = {prewhere actions output} + {unused source columns} + /// Reasons: + /// 1. Remove remove source columns which are used only in prewhere actions during prewhere actions execution. + /// Example: select A prewhere B > 0. B can be removed at prewhere step. + /// 2. Store side columns which were calculated during prewhere actions execution if they are used. + /// Example: select F(A) prewhere F(A) > 0. F(A) can be saved from prewhere step. + /// 3. Check if we can remove filer column at prewhere step. If we can, action will store single REMOVE_COLUMN. + ColumnsWithTypeAndName columns = step.actions->getSampleBlock().getColumnsWithTypeAndName(); + auto required_columns = step.actions->getRequiredColumns(); + NameSet prewhere_input_names(required_columns.begin(), required_columns.end()); + NameSet unused_source_columns; + + for (const auto & column : source_columns) + { + if (prewhere_input_names.count(column.name) == 0) + { + columns.emplace_back(column.type, column.name); + unused_source_columns.emplace(column.name); + } + } + + chain.steps.emplace_back(std::make_shared(std::move(columns), settings)); + chain.steps.back().additional_input = std::move(unused_source_columns); + } return true; } -bool ExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain, bool only_types) +bool ExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain, bool only_types, std::shared_ptr & remove_filter) { assertSelect(); @@ -2456,6 +2503,7 @@ bool ExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain, bool only_t ExpressionActionsChain::Step & step = chain.steps.back(); step.required_output.push_back(select_query->where_expression->getColumnName()); + step.can_remove_required_output = {remove_filter = std::make_shared(true)}; getRootActions(select_query->where_expression, only_types, false, step.actions); return true; diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.h b/dbms/src/Interpreters/ExpressionAnalyzer.h index 3ba255ae0e6..78341bdb298 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.h +++ b/dbms/src/Interpreters/ExpressionAnalyzer.h @@ -103,8 +103,8 @@ public: /// Before aggregation: bool appendArrayJoin(ExpressionActionsChain & chain, bool only_types); bool appendJoin(ExpressionActionsChain & chain, bool only_types); - bool appendPrewhere(ExpressionActionsChain & chain, bool only_types); - bool appendWhere(ExpressionActionsChain & chain, bool only_types); + bool appendPrewhere(ExpressionActionsChain & chain, bool only_types, std::shared_ptr & remove_filter); + bool appendWhere(ExpressionActionsChain & chain, bool only_types, std::shared_ptr & remove_filter); bool appendGroupBy(ExpressionActionsChain & chain, bool only_types); void appendAggregateFunctionsArguments(ExpressionActionsChain & chain, bool only_types); diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 209e4bdef91..cc7ad92ddd0 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -251,6 +251,8 @@ InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpression * throw out unnecessary columns based on the entire query. In unnecessary parts of the query, we will not execute subqueries. */ + std::shared_ptr remove_where_filter; + { res.need_aggregate = query_analyzer->hasAggregation(); @@ -263,7 +265,7 @@ InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpression chain.addStep(); } - if (query_analyzer->appendWhere(chain, !res.first_stage)) + if (query_analyzer->appendWhere(chain, !res.first_stage, remove_where_filter)) { res.has_where = true; res.before_where = chain.getLastActions(); @@ -310,7 +312,10 @@ InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpression /// Before executing WHERE and HAVING, remove the extra columns from the block (mostly the aggregation keys). if (res.has_where) + { res.before_where->prependProjectInput(); + res.remove_where_filter = *remove_where_filter; + } if (res.has_having) res.before_having->prependProjectInput(); @@ -339,10 +344,11 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt { ExpressionActionsChain chain; PrewhereInfoPtr prewhere_info; - ExpressionActionsPtr remove_prewhere_column_actions; + std::shared_ptr remove_prewhere_filter; + /** Read the data from Storage. from_stage - to what stage the request was completed in Storage. */ - QueryProcessingStage::Enum from_stage = executeFetchColumns( - pipeline, dry_run, chain, prewhere_info, remove_prewhere_column_actions); + QueryProcessingStage::Enum from_stage = executeFetchColumns(pipeline, dry_run, chain, + prewhere_info, remove_prewhere_filter); if (from_stage == QueryProcessingStage::WithMergeableState && to_stage == QueryProcessingStage::WithMergeableState) @@ -353,8 +359,9 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt QueryProcessingStage::toString(from_stage) << " -> " << QueryProcessingStage::toString(to_stage)); expressions = analyzeExpressions(from_stage, chain); + if (prewhere_info) - prewhere_info->remove_prewhere_column = remove_prewhere_column_actions->getActions().size() > 0; /// Added REMOVE_COLUMN + prewhere_info->remove_prewhere_column = *remove_prewhere_filter; } const Settings & settings = context.getSettingsRef(); @@ -391,7 +398,7 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt } if (expressions.has_where) - executeWhere(pipeline, expressions.before_where); + executeWhere(pipeline, expressions.before_where, expressions.remove_where_filter); if (expressions.need_aggregate) executeAggregation(pipeline, expressions.before_aggregation, aggregate_overflow_row, aggregate_final); @@ -522,7 +529,7 @@ static void getLimitLengthAndOffset(ASTSelectQuery & query, size_t & length, siz QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns( Pipeline & pipeline, bool dry_run, ExpressionActionsChain & chain, - PrewhereInfoPtr & prewhere_info, ExpressionActionsPtr & remove_prewhere_column_actions) + PrewhereInfoPtr & prewhere_info, std::shared_ptr & remove_prewhere_filter) { /// List of columns to read to execute the query. Names required_columns = query_analyzer->getRequiredSourceColumns(); @@ -688,12 +695,10 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns( optimize_prewhere(*merge_tree); } - if (!dry_run && query_analyzer->appendPrewhere(chain, false)) + if (!dry_run && query_analyzer->appendPrewhere(chain, false, remove_prewhere_filter)) { query_info.prewhere_info = prewhere_info = std::make_shared( - chain.getLastActions(), query.prewhere_expression->getColumnName()); - chain.addStep(); - remove_prewhere_column_actions = chain.getLastActions(); + chain.steps.front().actions, query.prewhere_expression->getColumnName()); chain.addStep(); } @@ -759,11 +764,11 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns( } -void InterpreterSelectQuery::executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression) +void InterpreterSelectQuery::executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_fiter) { pipeline.transform([&](auto & stream) { - stream = std::make_shared(stream, expression, query.where_expression->getColumnName()); + stream = std::make_shared(stream, expression, query.where_expression->getColumnName(), remove_fiter); }); } diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.h b/dbms/src/Interpreters/InterpreterSelectQuery.h index 67950218f06..29cf2a86ada 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.h +++ b/dbms/src/Interpreters/InterpreterSelectQuery.h @@ -124,8 +124,10 @@ private: bool has_order_by = false; bool has_limit_by = false; + bool remove_prewhere_filter = false; + bool remove_where_filter = false; + ExpressionActionsPtr before_join; /// including JOIN - ExpressionActionsPtr before_prewhere; ExpressionActionsPtr before_where; ExpressionActionsPtr before_aggregation; ExpressionActionsPtr before_having; @@ -159,9 +161,9 @@ private: /// Fetch data from the table. Returns the stage to which the query was processed in Storage. QueryProcessingStage::Enum executeFetchColumns(Pipeline & pipeline, bool dry_run, ExpressionActionsChain & chain, PrewhereInfoPtr & prewhere_info, - ExpressionActionsPtr & remove_prewhere_column_actions); + std::shared_ptr & remove_prewhere_filter); - void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression); + void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_filter); void executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final); void executeMergeAggregated(Pipeline & pipeline, bool overflow_row, bool final); void executeTotalsAndHaving(Pipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row); diff --git a/dbms/src/Storages/MergeTree/MergeTreeRangeReader.cpp b/dbms/src/Storages/MergeTree/MergeTreeRangeReader.cpp index 1dfe032a30c..e6992fe4c5d 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeRangeReader.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeRangeReader.cpp @@ -613,25 +613,25 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r if (!result.block) return; - /// Calculate the number of rows in block in order to create const column. - size_t rows = result.block.rows(); - /// If block has single column, it's filter. We need to count bytes in it in order to get the number of rows. - if (result.block.columns() == 1) + auto getNumRows = [&]() { - if (result.getFilter()) - rows = countBytesInFilter(result.getFilter()->getData()); + /// If block has single column, it's filter. We need to count bytes in it in order to get the number of rows. + if (result.block.columns() > 1) + return result.block.rows(); + else if (result.getFilter()) + return countBytesInFilter(result.getFilter()->getData()); else - rows = prev_rows; - } + return prev_rows; + }; if (remove_prewhere_column) result.block.erase(*prewhere_column_name); else - prewhere_column.column = prewhere_column.type->createColumnConst(rows, UInt64(1)); + prewhere_column.column = prewhere_column.type->createColumnConst(getNumRows(), UInt64(1)); /// If block is empty, create column in order to store rows number. if (last_reader_in_chain && result.block.columns() == 0) - result.block.insert({ColumnNothing::create(rows), std::make_shared(), "_nothing"}); + result.block.insert({ColumnNothing::create(getNumRows()), std::make_shared(), "_nothing"}); } } diff --git a/dbms/src/Storages/SelectQueryInfo.h b/dbms/src/Storages/SelectQueryInfo.h index fb33c5019ed..d21847cd9cc 100644 --- a/dbms/src/Storages/SelectQueryInfo.h +++ b/dbms/src/Storages/SelectQueryInfo.h @@ -27,7 +27,7 @@ struct PrewhereInfo bool remove_prewhere_column = false; PrewhereInfo() = default; - explicit PrewhereInfo(ExpressionActionsPtr && prewhere_actions_, String prewhere_column_name_) + explicit PrewhereInfo(ExpressionActionsPtr prewhere_actions_, String prewhere_column_name_) : prewhere_actions(std::move(prewhere_actions_)), prewhere_column_name(std::move(prewhere_column_name_)) {} };