From 6bc0a628cd4a803c290ca9116c8f6f9b3d6bea03 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Fri, 25 Jun 2021 17:49:28 +0300 Subject: [PATCH 1/2] Remove PrewhereDAGInfo. --- src/Interpreters/ExpressionAnalyzer.cpp | 3 +- src/Interpreters/ExpressionAnalyzer.h | 2 +- src/Interpreters/InterpreterSelectQuery.cpp | 73 +++++-------------- .../getHeaderForProcessingStage.cpp | 4 +- .../QueryPlan/ReadFromMergeTree.cpp | 5 +- src/Processors/QueryPlan/ReadFromMergeTree.h | 1 + src/Storages/IStorage.cpp | 7 +- .../MergeTreeBaseSelectProcessor.cpp | 36 ++++++--- .../MergeTree/MergeTreeBaseSelectProcessor.h | 6 +- .../MergeTree/MergeTreeBlockReadUtils.cpp | 6 +- src/Storages/MergeTree/MergeTreeData.cpp | 29 +++----- .../MergeTree/MergeTreeRangeReader.cpp | 2 +- src/Storages/MergeTree/MergeTreeRangeReader.h | 22 +++++- .../MergeTreeReverseSelectProcessor.cpp | 3 +- .../MergeTreeReverseSelectProcessor.h | 1 + .../MergeTree/MergeTreeSelectProcessor.cpp | 3 +- .../MergeTree/MergeTreeSelectProcessor.h | 1 + ...rgeTreeThreadSelectBlockInputProcessor.cpp | 3 +- ...MergeTreeThreadSelectBlockInputProcessor.h | 2 + src/Storages/SelectQueryInfo.h | 26 +------ src/Storages/StorageBuffer.cpp | 7 +- 21 files changed, 108 insertions(+), 134 deletions(-) diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index fe52b30da7b..96f898e3fe6 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -1514,7 +1514,7 @@ ExpressionAnalysisResult::ExpressionAnalysisResult( if (auto actions = query_analyzer.appendPrewhere(chain, !first_stage, additional_required_columns_after_prewhere)) { - prewhere_info = std::make_shared(actions, query.prewhere()->getColumnName()); + prewhere_info = std::make_shared(actions, query.prewhere()->getColumnName()); if (allowEarlyConstantFolding(*prewhere_info->prewhere_actions, settings)) { @@ -1734,7 +1734,6 @@ void ExpressionAnalysisResult::checkActions() const check_actions(prewhere_info->prewhere_actions); check_actions(prewhere_info->alias_actions); - check_actions(prewhere_info->remove_columns_actions); } } diff --git a/src/Interpreters/ExpressionAnalyzer.h b/src/Interpreters/ExpressionAnalyzer.h index 70ff5643b7c..272a5166102 100644 --- a/src/Interpreters/ExpressionAnalyzer.h +++ b/src/Interpreters/ExpressionAnalyzer.h @@ -239,7 +239,7 @@ struct ExpressionAnalysisResult /// Columns will be removed after prewhere actions execution. NameSet columns_to_remove_after_prewhere; - PrewhereDAGInfoPtr prewhere_info; + PrewhereInfoPtr prewhere_info; FilterDAGInfoPtr filter_info; ConstantFilterDescription prewhere_constant_filter_description; ConstantFilterDescription where_constant_filter_description; diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 7cca527cbc1..4d741bfc484 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -958,11 +958,11 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu if (expressions.prewhere_info) { - if (expressions.prewhere_info->row_level_filter_actions) + if (expressions.prewhere_info->row_level_filter) { auto row_level_filter_step = std::make_unique( query_plan.getCurrentDataStream(), - expressions.prewhere_info->row_level_filter_actions, + expressions.prewhere_info->row_level_filter, expressions.prewhere_info->row_level_column_name, false); @@ -978,18 +978,6 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu prewhere_step->setStepDescription("PREWHERE"); query_plan.addStep(std::move(prewhere_step)); - - // To remove additional columns in dry run - // For example, sample column which can be removed in this stage - // TODO There seems to be no place initializing remove_columns_actions - if (expressions.prewhere_info->remove_columns_actions) - { - auto remove_columns = std::make_unique( - query_plan.getCurrentDataStream(), expressions.prewhere_info->remove_columns_actions); - - remove_columns->setStepDescription("Remove unnecessary columns after PREWHERE"); - query_plan.addStep(std::move(remove_columns)); - } } } else @@ -1479,33 +1467,29 @@ void InterpreterSelectQuery::addEmptySourceToQueryPlan( if (prewhere_info.alias_actions) { - pipe.addSimpleTransform( - [&](const Block & header) { return std::make_shared(header, prewhere_info.alias_actions); }); + pipe.addSimpleTransform([&](const Block & header) + { + return std::make_shared(header, + std::make_shared(prewhere_info.alias_actions)); + }); } if (prewhere_info.row_level_filter) { pipe.addSimpleTransform([&](const Block & header) { - return std::make_shared(header, prewhere_info.row_level_filter, prewhere_info.row_level_column_name, true); + return std::make_shared(header, + std::make_shared(prewhere_info.row_level_filter), + prewhere_info.row_level_column_name, true); }); } pipe.addSimpleTransform([&](const Block & header) { return std::make_shared( - header, prewhere_info.prewhere_actions, prewhere_info.prewhere_column_name, prewhere_info.remove_prewhere_column); + header, std::make_shared(prewhere_info.prewhere_actions), + prewhere_info.prewhere_column_name, prewhere_info.remove_prewhere_column); }); - - // To remove additional columns - // In some cases, we did not read any marks so that the pipeline.streams is empty - // Thus, some columns in prewhere are not removed as expected - // This leads to mismatched header in distributed table - if (prewhere_info.remove_columns_actions) - { - pipe.addSimpleTransform( - [&](const Block & header) { return std::make_shared(header, prewhere_info.remove_columns_actions); }); - } } auto read_from_pipe = std::make_unique(std::move(pipe)); @@ -1560,7 +1544,7 @@ void InterpreterSelectQuery::addPrewhereAliasActions() if (does_storage_support_prewhere && settings.optimize_move_to_prewhere) { /// Execute row level filter in prewhere as a part of "move to prewhere" optimization. - expressions.prewhere_info = std::make_shared( + expressions.prewhere_info = std::make_shared( std::move(expressions.filter_info->actions), std::move(expressions.filter_info->column_name)); expressions.prewhere_info->prewhere_actions->projectInput(false); @@ -1572,9 +1556,9 @@ void InterpreterSelectQuery::addPrewhereAliasActions() else { /// Add row level security actions to prewhere. - expressions.prewhere_info->row_level_filter_actions = std::move(expressions.filter_info->actions); + expressions.prewhere_info->row_level_filter = std::move(expressions.filter_info->actions); expressions.prewhere_info->row_level_column_name = std::move(expressions.filter_info->column_name); - expressions.prewhere_info->row_level_filter_actions->projectInput(false); + expressions.prewhere_info->row_level_filter->projectInput(false); expressions.filter_info = nullptr; } } @@ -1613,9 +1597,9 @@ void InterpreterSelectQuery::addPrewhereAliasActions() auto prewhere_required_columns = prewhere_info->prewhere_actions->getRequiredColumns().getNames(); required_columns_from_prewhere.insert(prewhere_required_columns.begin(), prewhere_required_columns.end()); - if (prewhere_info->row_level_filter_actions) + if (prewhere_info->row_level_filter) { - auto row_level_required_columns = prewhere_info->row_level_filter_actions->getRequiredColumns().getNames(); + auto row_level_required_columns = prewhere_info->row_level_filter->getRequiredColumns().getNames(); required_columns_from_prewhere.insert(row_level_required_columns.begin(), row_level_required_columns.end()); } } @@ -1898,28 +1882,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc auto & prewhere_info = analysis_result.prewhere_info; if (prewhere_info) - { - auto actions_settings = ExpressionActionsSettings::fromContext(context, CompileExpressions::yes); - - query_info.prewhere_info = std::make_shared(); - query_info.prewhere_info->prewhere_actions - = std::make_shared(prewhere_info->prewhere_actions, actions_settings); - - if (prewhere_info->row_level_filter_actions) - query_info.prewhere_info->row_level_filter - = std::make_shared(prewhere_info->row_level_filter_actions, actions_settings); - if (prewhere_info->alias_actions) - query_info.prewhere_info->alias_actions - = std::make_shared(prewhere_info->alias_actions, actions_settings); - if (prewhere_info->remove_columns_actions) - query_info.prewhere_info->remove_columns_actions - = std::make_shared(prewhere_info->remove_columns_actions, actions_settings); - - query_info.prewhere_info->prewhere_column_name = prewhere_info->prewhere_column_name; - query_info.prewhere_info->remove_prewhere_column = prewhere_info->remove_prewhere_column; - query_info.prewhere_info->row_level_column_name = prewhere_info->row_level_column_name; - query_info.prewhere_info->need_filter = prewhere_info->need_filter; - } + query_info.prewhere_info = prewhere_info; /// Create optimizer with prepared actions. /// Maybe we will need to calc input_order_info later, e.g. while reading from StorageMerge. diff --git a/src/Interpreters/getHeaderForProcessingStage.cpp b/src/Interpreters/getHeaderForProcessingStage.cpp index 335575a6362..19837cc05d9 100644 --- a/src/Interpreters/getHeaderForProcessingStage.cpp +++ b/src/Interpreters/getHeaderForProcessingStage.cpp @@ -98,12 +98,12 @@ Block getHeaderForProcessingStage( if (prewhere_info.row_level_filter) { - prewhere_info.row_level_filter->execute(header); + header = prewhere_info.row_level_filter->updateHeader(std::move(header)); header.erase(prewhere_info.row_level_column_name); } if (prewhere_info.prewhere_actions) - prewhere_info.prewhere_actions->execute(header); + header = prewhere_info.prewhere_actions->updateHeader(std::move(header)); if (prewhere_info.remove_prewhere_column) header.erase(prewhere_info.prewhere_column_name); diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index fd5de98b4c0..2dc8246cde7 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -94,6 +94,7 @@ ReadFromMergeTree::ReadFromMergeTree( , data(data_) , query_info(query_info_) , prewhere_info(getPrewhereInfo(query_info)) + , actions_settings(ExpressionActionsSettings::fromContext(context_)) , metadata_snapshot(std::move(metadata_snapshot_)) , metadata_snapshot_base(std::move(metadata_snapshot_base_)) , context(std::move(context_)) @@ -157,7 +158,7 @@ Pipe ReadFromMergeTree::readFromPool( i, pool, min_marks_for_concurrent_read, max_block_size, settings.preferred_block_size_bytes, settings.preferred_max_column_in_block_size_bytes, data, metadata_snapshot, use_uncompressed_cache, - prewhere_info, reader_settings, virt_column_names); + prewhere_info, actions_settings, reader_settings, virt_column_names); if (i == 0) { @@ -180,7 +181,7 @@ ProcessorPtr ReadFromMergeTree::createSource( return std::make_shared( data, metadata_snapshot, part.data_part, max_block_size, preferred_block_size_bytes, preferred_max_column_in_block_size_bytes, required_columns, part.ranges, use_uncompressed_cache, - prewhere_info, true, reader_settings, virt_column_names, part.part_index_in_query); + prewhere_info, actions_settings, true, reader_settings, virt_column_names, part.part_index_in_query); } Pipe ReadFromMergeTree::readInOrder( diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.h b/src/Processors/QueryPlan/ReadFromMergeTree.h index 6e1efffdb02..a5184d28593 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.h +++ b/src/Processors/QueryPlan/ReadFromMergeTree.h @@ -90,6 +90,7 @@ private: const MergeTreeData & data; SelectQueryInfo query_info; PrewhereInfoPtr prewhere_info; + ExpressionActionsSettings actions_settings; StorageMetadataPtr metadata_snapshot; StorageMetadataPtr metadata_snapshot_base; diff --git a/src/Storages/IStorage.cpp b/src/Storages/IStorage.cpp index 83c91dffd7f..c73eb62d039 100644 --- a/src/Storages/IStorage.cpp +++ b/src/Storages/IStorage.cpp @@ -198,7 +198,7 @@ NameDependencies IStorage::getDependentViewsByColumn(ContextPtr context) const return name_deps; } -std::string PrewhereDAGInfo::dump() const +std::string PrewhereInfo::dump() const { WriteBufferFromOwnString ss; ss << "PrewhereDagInfo\n"; @@ -213,11 +213,6 @@ std::string PrewhereDAGInfo::dump() const ss << "prewhere_actions " << prewhere_actions->dumpDAG() << "\n"; } - if (remove_columns_actions) - { - ss << "remove_columns_actions " << remove_columns_actions->dumpDAG() << "\n"; - } - ss << "remove_prewhere_column " << remove_prewhere_column << ", need_filter " << need_filter << "\n"; diff --git a/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp index d9cb949042c..68f754b08fb 100644 --- a/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp @@ -26,6 +26,7 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor( const MergeTreeData & storage_, const StorageMetadataPtr & metadata_snapshot_, const PrewhereInfoPtr & prewhere_info_, + ExpressionActionsSettings actions_settings, UInt64 max_block_size_rows_, UInt64 preferred_block_size_bytes_, UInt64 preferred_max_column_in_block_size_bytes_, @@ -49,6 +50,23 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor( for (auto it = virt_column_names.rbegin(); it != virt_column_names.rend(); ++it) if (header_without_virtual_columns.has(*it)) header_without_virtual_columns.erase(*it); + + if (prewhere_info) + { + prewhere_actions = std::make_unique(); + if (prewhere_info->alias_actions) + prewhere_actions->alias_actions = std::make_shared(prewhere_info->alias_actions, actions_settings); + + if (prewhere_info->row_level_filter) + prewhere_actions->row_level_filter = std::make_shared(prewhere_info->row_level_filter, actions_settings); + + prewhere_actions->prewhere_actions = std::make_shared(prewhere_info->prewhere_actions, actions_settings); + + prewhere_actions->row_level_column_name = prewhere_info->row_level_column_name; + prewhere_actions->prewhere_column_name = prewhere_info->prewhere_column_name; + prewhere_actions->remove_prewhere_column = prewhere_info->remove_prewhere_column; + prewhere_actions->need_filter = prewhere_info->need_filter; + } } @@ -78,14 +96,14 @@ void MergeTreeBaseSelectProcessor::initializeRangeReaders(MergeTreeReadTask & cu { if (reader->getColumns().empty()) { - current_task.range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_info, true); + current_task.range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_actions.get(), true); } else { MergeTreeRangeReader * pre_reader_ptr = nullptr; if (pre_reader != nullptr) { - current_task.pre_range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_info, false); + current_task.pre_range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_actions.get(), false); pre_reader_ptr = ¤t_task.pre_range_reader; } @@ -396,16 +414,17 @@ void MergeTreeBaseSelectProcessor::injectVirtualColumns( chunk.setColumns(columns, num_rows); } -void MergeTreeBaseSelectProcessor::executePrewhereActions(Block & block, const PrewhereInfoPtr & prewhere_info) +Block MergeTreeBaseSelectProcessor::transformHeader( + Block block, const PrewhereInfoPtr & prewhere_info, const DataTypePtr & partition_value_type, const Names & virtual_columns) { if (prewhere_info) { if (prewhere_info->alias_actions) - prewhere_info->alias_actions->execute(block); + block = prewhere_info->alias_actions->updateHeader(std::move(block)); if (prewhere_info->row_level_filter) { - prewhere_info->row_level_filter->execute(block); + block = prewhere_info->row_level_filter->updateHeader(std::move(block)); auto & row_level_column = block.getByName(prewhere_info->row_level_column_name); if (!row_level_column.type->canBeUsedInBooleanContext()) { @@ -417,7 +436,7 @@ void MergeTreeBaseSelectProcessor::executePrewhereActions(Block & block, const P } if (prewhere_info->prewhere_actions) - prewhere_info->prewhere_actions->execute(block); + block = prewhere_info->prewhere_actions->updateHeader(std::move(block)); auto & prewhere_column = block.getByName(prewhere_info->prewhere_column_name); if (!prewhere_column.type->canBeUsedInBooleanContext()) @@ -434,12 +453,7 @@ void MergeTreeBaseSelectProcessor::executePrewhereActions(Block & block, const P ctn.column = ctn.type->createColumnConst(block.rows(), 1u)->convertToFullColumnIfConst(); } } -} -Block MergeTreeBaseSelectProcessor::transformHeader( - Block block, const PrewhereInfoPtr & prewhere_info, const DataTypePtr & partition_value_type, const Names & virtual_columns) -{ - executePrewhereActions(block, prewhere_info); injectVirtualColumns(block, nullptr, partition_value_type, virtual_columns); return block; } diff --git a/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.h b/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.h index 4615dec089f..2ae39dbb058 100644 --- a/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.h +++ b/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.h @@ -13,7 +13,7 @@ namespace DB class IMergeTreeReader; class UncompressedCache; class MarkCache; - +struct PrewhereActions; /// Base class for MergeTreeThreadSelectProcessor and MergeTreeSelectProcessor class MergeTreeBaseSelectProcessor : public SourceWithProgress @@ -24,6 +24,7 @@ public: const MergeTreeData & storage_, const StorageMetadataPtr & metadata_snapshot_, const PrewhereInfoPtr & prewhere_info_, + ExpressionActionsSettings actions_settings, UInt64 max_block_size_rows_, UInt64 preferred_block_size_bytes_, UInt64 preferred_max_column_in_block_size_bytes_, @@ -36,8 +37,6 @@ public: static Block transformHeader( Block block, const PrewhereInfoPtr & prewhere_info, const DataTypePtr & partition_value_type, const Names & virtual_columns); - static void executePrewhereActions(Block & block, const PrewhereInfoPtr & prewhere_info); - protected: Chunk generate() final; @@ -61,6 +60,7 @@ protected: StorageMetadataPtr metadata_snapshot; PrewhereInfoPtr prewhere_info; + std::unique_ptr prewhere_actions; UInt64 max_block_size_rows; UInt64 preferred_block_size_bytes; diff --git a/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp b/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp index 15b4fbd31c0..b8698ae3e01 100644 --- a/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp +++ b/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp @@ -272,16 +272,16 @@ MergeTreeReadTaskColumns getReadTaskColumns( if (prewhere_info) { if (prewhere_info->alias_actions) - pre_column_names = prewhere_info->alias_actions->getRequiredColumns(); + pre_column_names = prewhere_info->alias_actions->getRequiredColumnsNames(); else { - pre_column_names = prewhere_info->prewhere_actions->getRequiredColumns(); + pre_column_names = prewhere_info->prewhere_actions->getRequiredColumnsNames(); if (prewhere_info->row_level_filter) { NameSet names(pre_column_names.begin(), pre_column_names.end()); - for (auto & name : prewhere_info->row_level_filter->getRequiredColumns()) + for (auto & name : prewhere_info->row_level_filter->getRequiredColumnsNames()) { if (names.count(name) == 0) pre_column_names.push_back(name); diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index abc37f52ff9..f6d542d5f2c 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -3940,15 +3940,9 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection( if (analysis_result.prewhere_info) { - const auto & prewhere_info = analysis_result.prewhere_info; - candidate.prewhere_info = std::make_shared(); - candidate.prewhere_info->prewhere_column_name = prewhere_info->prewhere_column_name; - candidate.prewhere_info->remove_prewhere_column = prewhere_info->remove_prewhere_column; - // std::cerr << fmt::format("remove prewhere column : {}", candidate.prewhere_info->remove_prewhere_column) << std::endl; - candidate.prewhere_info->row_level_column_name = prewhere_info->row_level_column_name; - candidate.prewhere_info->need_filter = prewhere_info->need_filter; + candidate.prewhere_info = analysis_result.prewhere_info; - auto prewhere_actions = prewhere_info->prewhere_actions->clone(); + auto prewhere_actions = candidate.prewhere_info->prewhere_actions->clone(); auto prewhere_required_columns = required_columns; // required_columns should not contain columns generated by prewhere for (const auto & column : prewhere_actions->getResultColumns()) @@ -3956,28 +3950,27 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection( // std::cerr << fmt::format("prewhere_actions = \n{}", prewhere_actions->dumpDAG()) << std::endl; // Prewhere_action should not add missing keys. prewhere_required_columns = prewhere_actions->foldActionsByProjection( - prewhere_required_columns, projection.sample_block_for_keys, prewhere_info->prewhere_column_name, false); + prewhere_required_columns, projection.sample_block_for_keys, candidate.prewhere_info->prewhere_column_name, false); // std::cerr << fmt::format("prewhere_actions = \n{}", prewhere_actions->dumpDAG()) << std::endl; // std::cerr << fmt::format("prewhere_required_columns = \n{}", fmt::join(prewhere_required_columns, ", ")) << std::endl; if (prewhere_required_columns.empty()) return false; - candidate.prewhere_info->prewhere_actions = std::make_shared(prewhere_actions, actions_settings); + candidate.prewhere_info->prewhere_actions = prewhere_actions; - if (prewhere_info->row_level_filter_actions) + if (candidate.prewhere_info->row_level_filter) { - auto row_level_filter_actions = prewhere_info->row_level_filter_actions->clone(); + auto row_level_filter_actions = candidate.prewhere_info->row_level_filter->clone(); prewhere_required_columns = row_level_filter_actions->foldActionsByProjection( - prewhere_required_columns, projection.sample_block_for_keys, prewhere_info->row_level_column_name, false); + prewhere_required_columns, projection.sample_block_for_keys, candidate.prewhere_info->row_level_column_name, false); // std::cerr << fmt::format("row_level_filter_required_columns = \n{}", fmt::join(prewhere_required_columns, ", ")) << std::endl; if (prewhere_required_columns.empty()) return false; - candidate.prewhere_info->row_level_filter - = std::make_shared(row_level_filter_actions, actions_settings); + candidate.prewhere_info->row_level_filter = row_level_filter_actions; } - if (prewhere_info->alias_actions) + if (candidate.prewhere_info->alias_actions) { - auto alias_actions = prewhere_info->alias_actions->clone(); + auto alias_actions = candidate.prewhere_info->alias_actions->clone(); // std::cerr << fmt::format("alias_actions = \n{}", alias_actions->dumpDAG()) << std::endl; prewhere_required_columns = alias_actions->foldActionsByProjection(prewhere_required_columns, projection.sample_block_for_keys, {}, false); @@ -3985,7 +3978,7 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection( // std::cerr << fmt::format("alias_required_columns = \n{}", fmt::join(prewhere_required_columns, ", ")) << std::endl; if (prewhere_required_columns.empty()) return false; - candidate.prewhere_info->alias_actions = std::make_shared(alias_actions, actions_settings); + candidate.prewhere_info->alias_actions = alias_actions; } required_columns.insert(prewhere_required_columns.begin(), prewhere_required_columns.end()); } diff --git a/src/Storages/MergeTree/MergeTreeRangeReader.cpp b/src/Storages/MergeTree/MergeTreeRangeReader.cpp index 27682b81c94..8072aa6a3dc 100644 --- a/src/Storages/MergeTree/MergeTreeRangeReader.cpp +++ b/src/Storages/MergeTree/MergeTreeRangeReader.cpp @@ -520,7 +520,7 @@ size_t MergeTreeRangeReader::ReadResult::countBytesInResultFilter(const IColumn: MergeTreeRangeReader::MergeTreeRangeReader( IMergeTreeReader * merge_tree_reader_, MergeTreeRangeReader * prev_reader_, - const PrewhereInfoPtr & prewhere_info_, + const PrewhereActions * prewhere_info_, bool last_reader_in_chain_) : merge_tree_reader(merge_tree_reader_) , index_granularity(&(merge_tree_reader->data_part->index_granularity)) diff --git a/src/Storages/MergeTree/MergeTreeRangeReader.h b/src/Storages/MergeTree/MergeTreeRangeReader.h index 18075e52bdd..7c36ca49c99 100644 --- a/src/Storages/MergeTree/MergeTreeRangeReader.h +++ b/src/Storages/MergeTree/MergeTreeRangeReader.h @@ -15,6 +15,24 @@ class MergeTreeIndexGranularity; struct PrewhereInfo; using PrewhereInfoPtr = std::shared_ptr; +class ExpressionActions; +using ExpressionActionsPtr = std::shared_ptr; + +struct PrewhereActions +{ + /// Actions which are executed in order to alias columns are used for prewhere actions. + ExpressionActionsPtr alias_actions; + /// Actions for row level security filter. Applied separately before prewhere_actions. + /// This actions are separate because prewhere condition should not be executed over filtered rows. + ExpressionActionsPtr row_level_filter; + /// Actions which are executed on block in order to get filter column for prewhere step. + ExpressionActionsPtr prewhere_actions; + String row_level_column_name; + String prewhere_column_name; + bool remove_prewhere_column = false; + bool need_filter = false; +}; + /// MergeTreeReader iterator which allows sequential reading for arbitrary number of rows between pairs of marks in the same part. /// Stores reading state, which can be inside granule. Can skip rows in current granule and start reading from next mark. /// Used generally for reading number of rows less than index granularity to decrease cache misses for fat blocks. @@ -24,7 +42,7 @@ public: MergeTreeRangeReader( IMergeTreeReader * merge_tree_reader_, MergeTreeRangeReader * prev_reader_, - const PrewhereInfoPtr & prewhere_info_, + const PrewhereActions * prewhere_info_, bool last_reader_in_chain_); MergeTreeRangeReader() = default; @@ -217,7 +235,7 @@ private: IMergeTreeReader * merge_tree_reader = nullptr; const MergeTreeIndexGranularity * index_granularity = nullptr; MergeTreeRangeReader * prev_reader = nullptr; /// If not nullptr, read from prev_reader firstly. - PrewhereInfoPtr prewhere_info; + const PrewhereActions * prewhere_info; Stream stream; diff --git a/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.cpp index e9527efaa4a..81833b76735 100644 --- a/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.cpp @@ -23,6 +23,7 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor( MarkRanges mark_ranges_, bool use_uncompressed_cache_, const PrewhereInfoPtr & prewhere_info_, + ExpressionActionsSettings actions_settings, bool check_columns, const MergeTreeReaderSettings & reader_settings_, const Names & virt_column_names_, @@ -31,7 +32,7 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor( : MergeTreeBaseSelectProcessor{ metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID()), - storage_, metadata_snapshot_, prewhere_info_, max_block_size_rows_, + storage_, metadata_snapshot_, prewhere_info_, std::move(actions_settings), max_block_size_rows_, preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, reader_settings_, use_uncompressed_cache_, virt_column_names_}, required_columns{std::move(required_columns_)}, diff --git a/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.h b/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.h index c9fd06c5534..b807c2d912c 100644 --- a/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.h +++ b/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.h @@ -27,6 +27,7 @@ public: MarkRanges mark_ranges, bool use_uncompressed_cache, const PrewhereInfoPtr & prewhere_info, + ExpressionActionsSettings actions_settings, bool check_columns, const MergeTreeReaderSettings & reader_settings, const Names & virt_column_names = {}, diff --git a/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp index 980afa170e9..ce342a69fe0 100644 --- a/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp @@ -23,6 +23,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor( MarkRanges mark_ranges_, bool use_uncompressed_cache_, const PrewhereInfoPtr & prewhere_info_, + ExpressionActionsSettings actions_settings, bool check_columns_, const MergeTreeReaderSettings & reader_settings_, const Names & virt_column_names_, @@ -31,7 +32,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor( : MergeTreeBaseSelectProcessor{ metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID()), - storage_, metadata_snapshot_, prewhere_info_, max_block_size_rows_, + storage_, metadata_snapshot_, prewhere_info_, std::move(actions_settings), max_block_size_rows_, preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, reader_settings_, use_uncompressed_cache_, virt_column_names_}, required_columns{std::move(required_columns_)}, diff --git a/src/Storages/MergeTree/MergeTreeSelectProcessor.h b/src/Storages/MergeTree/MergeTreeSelectProcessor.h index 925c437f1ce..b63107b6dbf 100644 --- a/src/Storages/MergeTree/MergeTreeSelectProcessor.h +++ b/src/Storages/MergeTree/MergeTreeSelectProcessor.h @@ -27,6 +27,7 @@ public: MarkRanges mark_ranges, bool use_uncompressed_cache, const PrewhereInfoPtr & prewhere_info, + ExpressionActionsSettings actions_settings, bool check_columns, const MergeTreeReaderSettings & reader_settings, const Names & virt_column_names = {}, diff --git a/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.cpp b/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.cpp index 2f0aad77d96..daefb17038a 100644 --- a/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.cpp @@ -19,11 +19,12 @@ MergeTreeThreadSelectBlockInputProcessor::MergeTreeThreadSelectBlockInputProcess const StorageMetadataPtr & metadata_snapshot_, const bool use_uncompressed_cache_, const PrewhereInfoPtr & prewhere_info_, + ExpressionActionsSettings actions_settings, const MergeTreeReaderSettings & reader_settings_, const Names & virt_column_names_) : MergeTreeBaseSelectProcessor{ - pool_->getHeader(), storage_, metadata_snapshot_, prewhere_info_, max_block_size_rows_, + pool_->getHeader(), storage_, metadata_snapshot_, prewhere_info_, std::move(actions_settings), max_block_size_rows_, preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, reader_settings_, use_uncompressed_cache_, virt_column_names_}, thread{thread_}, diff --git a/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.h b/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.h index 2b2ed36fc18..30c551eede0 100644 --- a/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.h +++ b/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.h @@ -25,7 +25,9 @@ public: const StorageMetadataPtr & metadata_snapshot_, const bool use_uncompressed_cache_, const PrewhereInfoPtr & prewhere_info_, + ExpressionActionsSettings actions_settings, const MergeTreeReaderSettings & reader_settings_, + const Names & virt_column_names_); String getName() const override { return "MergeTreeThread"; } diff --git a/src/Storages/SelectQueryInfo.h b/src/Storages/SelectQueryInfo.h index 73cf3893a89..a7d2ae3e7dd 100644 --- a/src/Storages/SelectQueryInfo.h +++ b/src/Storages/SelectQueryInfo.h @@ -21,9 +21,6 @@ using ActionsDAGPtr = std::shared_ptr; struct PrewhereInfo; using PrewhereInfoPtr = std::shared_ptr; -struct PrewhereDAGInfo; -using PrewhereDAGInfoPtr = std::shared_ptr; - struct FilterInfo; using FilterInfoPtr = std::shared_ptr; @@ -45,34 +42,19 @@ using ClusterPtr = std::shared_ptr; struct PrewhereInfo { /// Actions which are executed in order to alias columns are used for prewhere actions. - ExpressionActionsPtr alias_actions; + ActionsDAGPtr alias_actions; /// Actions for row level security filter. Applied separately before prewhere_actions. /// This actions are separate because prewhere condition should not be executed over filtered rows. - ExpressionActionsPtr row_level_filter; + ActionsDAGPtr row_level_filter; /// Actions which are executed on block in order to get filter column for prewhere step. - ExpressionActionsPtr prewhere_actions; - /// Actions which are executed after reading from storage in order to remove unused columns. - ExpressionActionsPtr remove_columns_actions; - String row_level_column_name; - String prewhere_column_name; - bool remove_prewhere_column = false; - bool need_filter = false; -}; - -/// Same as PrewhereInfo, but with ActionsDAG. -struct PrewhereDAGInfo -{ - ActionsDAGPtr alias_actions; - ActionsDAGPtr row_level_filter_actions; ActionsDAGPtr prewhere_actions; - ActionsDAGPtr remove_columns_actions; String row_level_column_name; String prewhere_column_name; bool remove_prewhere_column = false; bool need_filter = false; - PrewhereDAGInfo() = default; - explicit PrewhereDAGInfo(ActionsDAGPtr prewhere_actions_, String prewhere_column_name_) + PrewhereInfo() = default; + explicit PrewhereInfo(ActionsDAGPtr prewhere_actions_, String prewhere_column_name_) : prewhere_actions(std::move(prewhere_actions_)), prewhere_column_name(std::move(prewhere_column_name_)) {} std::string dump() const; diff --git a/src/Storages/StorageBuffer.cpp b/src/Storages/StorageBuffer.cpp index 55dc2d12c9d..a433cd248c7 100644 --- a/src/Storages/StorageBuffer.cpp +++ b/src/Storages/StorageBuffer.cpp @@ -369,13 +369,14 @@ void StorageBuffer::read( { if (query_info.prewhere_info) { + auto actions_settings = ExpressionActionsSettings::fromContext(local_context); if (query_info.prewhere_info->alias_actions) { pipe_from_buffers.addSimpleTransform([&](const Block & header) { return std::make_shared( header, - query_info.prewhere_info->alias_actions); + std::make_shared(query_info.prewhere_info->alias_actions, actions_settings)); }); } @@ -385,7 +386,7 @@ void StorageBuffer::read( { return std::make_shared( header, - query_info.prewhere_info->row_level_filter, + std::make_shared(query_info.prewhere_info->row_level_filter, actions_settings), query_info.prewhere_info->row_level_column_name, false); }); @@ -395,7 +396,7 @@ void StorageBuffer::read( { return std::make_shared( header, - query_info.prewhere_info->prewhere_actions, + std::make_shared(query_info.prewhere_info->prewhere_actions, actions_settings), query_info.prewhere_info->prewhere_column_name, query_info.prewhere_info->remove_prewhere_column); }); From bf827936b78a9db2594c6f99975a20dd29c92db7 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 29 Jun 2021 14:53:34 +0300 Subject: [PATCH 2/2] Rename PrewhereActions --- src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp | 2 +- src/Storages/MergeTree/MergeTreeBaseSelectProcessor.h | 4 ++-- src/Storages/MergeTree/MergeTreeRangeReader.cpp | 2 +- src/Storages/MergeTree/MergeTreeRangeReader.h | 7 ++++--- 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp index 68f754b08fb..9334baef964 100644 --- a/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp @@ -53,7 +53,7 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor( if (prewhere_info) { - prewhere_actions = std::make_unique(); + prewhere_actions = std::make_unique(); if (prewhere_info->alias_actions) prewhere_actions->alias_actions = std::make_shared(prewhere_info->alias_actions, actions_settings); diff --git a/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.h b/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.h index 2ae39dbb058..8da9b002e16 100644 --- a/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.h +++ b/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.h @@ -13,7 +13,7 @@ namespace DB class IMergeTreeReader; class UncompressedCache; class MarkCache; -struct PrewhereActions; +struct PrewhereExprInfo; /// Base class for MergeTreeThreadSelectProcessor and MergeTreeSelectProcessor class MergeTreeBaseSelectProcessor : public SourceWithProgress @@ -60,7 +60,7 @@ protected: StorageMetadataPtr metadata_snapshot; PrewhereInfoPtr prewhere_info; - std::unique_ptr prewhere_actions; + std::unique_ptr prewhere_actions; UInt64 max_block_size_rows; UInt64 preferred_block_size_bytes; diff --git a/src/Storages/MergeTree/MergeTreeRangeReader.cpp b/src/Storages/MergeTree/MergeTreeRangeReader.cpp index 8072aa6a3dc..2347280a4a0 100644 --- a/src/Storages/MergeTree/MergeTreeRangeReader.cpp +++ b/src/Storages/MergeTree/MergeTreeRangeReader.cpp @@ -520,7 +520,7 @@ size_t MergeTreeRangeReader::ReadResult::countBytesInResultFilter(const IColumn: MergeTreeRangeReader::MergeTreeRangeReader( IMergeTreeReader * merge_tree_reader_, MergeTreeRangeReader * prev_reader_, - const PrewhereActions * prewhere_info_, + const PrewhereExprInfo * prewhere_info_, bool last_reader_in_chain_) : merge_tree_reader(merge_tree_reader_) , index_granularity(&(merge_tree_reader->data_part->index_granularity)) diff --git a/src/Storages/MergeTree/MergeTreeRangeReader.h b/src/Storages/MergeTree/MergeTreeRangeReader.h index 7c36ca49c99..8cdf485ff1e 100644 --- a/src/Storages/MergeTree/MergeTreeRangeReader.h +++ b/src/Storages/MergeTree/MergeTreeRangeReader.h @@ -18,7 +18,8 @@ using PrewhereInfoPtr = std::shared_ptr; class ExpressionActions; using ExpressionActionsPtr = std::shared_ptr; -struct PrewhereActions +/// The same as PrewhereInfo, but with ExpressionActions instead of ActionsDAG +struct PrewhereExprInfo { /// Actions which are executed in order to alias columns are used for prewhere actions. ExpressionActionsPtr alias_actions; @@ -42,7 +43,7 @@ public: MergeTreeRangeReader( IMergeTreeReader * merge_tree_reader_, MergeTreeRangeReader * prev_reader_, - const PrewhereActions * prewhere_info_, + const PrewhereExprInfo * prewhere_info_, bool last_reader_in_chain_); MergeTreeRangeReader() = default; @@ -235,7 +236,7 @@ private: IMergeTreeReader * merge_tree_reader = nullptr; const MergeTreeIndexGranularity * index_granularity = nullptr; MergeTreeRangeReader * prev_reader = nullptr; /// If not nullptr, read from prev_reader firstly. - const PrewhereActions * prewhere_info; + const PrewhereExprInfo * prewhere_info; Stream stream;