From 4f6c880232cac8e9d9c6e0c79111de5a9fed8c91 Mon Sep 17 00:00:00 2001 From: Denis Glazachev Date: Mon, 25 Jan 2021 18:31:59 +0400 Subject: [PATCH] Pass and handle a chain of multiple prewhere infos --- src/Interpreters/InterpreterSelectQuery.cpp | 72 ++++++---- .../getHeaderForProcessingStage.cpp | 11 +- .../MergeTreeBaseSelectProcessor.cpp | 41 +++--- .../MergeTree/MergeTreeBaseSelectProcessor.h | 8 +- .../MergeTree/MergeTreeBlockReadUtils.cpp | 19 +-- .../MergeTree/MergeTreeBlockReadUtils.h | 7 +- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 30 ++-- .../MergeTree/MergeTreeRangeReader.cpp | 136 ++++++++++++------ src/Storages/MergeTree/MergeTreeRangeReader.h | 13 +- src/Storages/MergeTree/MergeTreeReadPool.cpp | 8 +- src/Storages/MergeTree/MergeTreeReadPool.h | 9 +- .../MergeTreeReverseSelectProcessor.cpp | 11 +- .../MergeTreeReverseSelectProcessor.h | 2 +- .../MergeTree/MergeTreeSelectProcessor.cpp | 11 +- .../MergeTree/MergeTreeSelectProcessor.h | 2 +- ...rgeTreeThreadSelectBlockInputProcessor.cpp | 8 +- ...MergeTreeThreadSelectBlockInputProcessor.h | 2 +- src/Storages/SelectQueryInfo.h | 6 +- src/Storages/StorageBuffer.cpp | 25 ++-- 19 files changed, 247 insertions(+), 174 deletions(-) diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 4f6b76e9b53..9dd63362dbd 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -1186,36 +1186,40 @@ void InterpreterSelectQuery::addEmptySourceToQueryPlan(QueryPlan & query_plan, c { Pipe pipe(std::make_shared(source_header)); - if (query_info.prewhere_info) + if (query_info.prewhere_info_list) { - if (query_info.prewhere_info->alias_actions) + for (const auto & prewhere_info : *query_info.prewhere_info_list) { + if (prewhere_info.alias_actions) + { + pipe.addSimpleTransform([&](const Block & header) + { + return std::make_shared( + header, prewhere_info.alias_actions); + }); + } + pipe.addSimpleTransform([&](const Block & header) { - return std::make_shared(header, query_info.prewhere_info->alias_actions); + return std::make_shared( + header, + prewhere_info.prewhere_actions, + prewhere_info.prewhere_column_name, + prewhere_info.remove_prewhere_column); }); - } - pipe.addSimpleTransform([&](const Block & header) - { - return std::make_shared( - header, - query_info.prewhere_info->prewhere_actions, - query_info.prewhere_info->prewhere_column_name, - query_info.prewhere_info->remove_prewhere_column); - }); - - // To remove additional columns - // In some cases, we did not read any marks so that the pipeline.streams is empty - // Thus, some columns in prewhere are not removed as expected - // This leads to mismatched header in distributed table - if (query_info.prewhere_info->remove_columns_actions) - { - pipe.addSimpleTransform([&](const Block & header) + // To remove additional columns + // In some cases, we did not read any marks so that the pipeline.streams is empty + // Thus, some columns in prewhere are not removed as expected + // This leads to mismatched header in distributed table + if (prewhere_info.remove_columns_actions) { - return std::make_shared( - header, query_info.prewhere_info->remove_columns_actions); - }); + pipe.addSimpleTransform([&](const Block & header) + { + return std::make_shared( + header, prewhere_info.remove_columns_actions); + }); + } } } @@ -1552,17 +1556,23 @@ void InterpreterSelectQuery::executeFetchColumns( if (prewhere_info) { - query_info.prewhere_info = std::make_shared( - std::make_shared(prewhere_info->prewhere_actions), - prewhere_info->prewhere_column_name); + if (!query_info.prewhere_info_list) + query_info.prewhere_info_list = std::make_shared(); + + query_info.prewhere_info_list->emplace_back( + std::make_shared(prewhere_info->prewhere_actions), + prewhere_info->prewhere_column_name); + + auto & new_prewhere_info = query_info.prewhere_info_list->back(); if (prewhere_info->alias_actions) - query_info.prewhere_info->alias_actions = std::make_shared(prewhere_info->alias_actions); - if (prewhere_info->remove_columns_actions) - query_info.prewhere_info->remove_columns_actions = std::make_shared(prewhere_info->remove_columns_actions); + new_prewhere_info.alias_actions = std::make_shared(prewhere_info->alias_actions); - query_info.prewhere_info->remove_prewhere_column = prewhere_info->remove_prewhere_column; - query_info.prewhere_info->need_filter = prewhere_info->need_filter; + if (prewhere_info->remove_columns_actions) + new_prewhere_info.remove_columns_actions = std::make_shared(prewhere_info->remove_columns_actions); + + new_prewhere_info.remove_prewhere_column = prewhere_info->remove_prewhere_column; + new_prewhere_info.need_filter = prewhere_info->need_filter; } /// Create optimizer with prepared actions. diff --git a/src/Interpreters/getHeaderForProcessingStage.cpp b/src/Interpreters/getHeaderForProcessingStage.cpp index e341a5637f4..761f04e81ee 100644 --- a/src/Interpreters/getHeaderForProcessingStage.cpp +++ b/src/Interpreters/getHeaderForProcessingStage.cpp @@ -42,11 +42,14 @@ Block getHeaderForProcessingStage( case QueryProcessingStage::FetchColumns: { Block header = metadata_snapshot->getSampleBlockForColumns(column_names, storage.getVirtuals(), storage.getStorageID()); - if (query_info.prewhere_info) + if (query_info.prewhere_info_list) { - query_info.prewhere_info->prewhere_actions->execute(header); - if (query_info.prewhere_info->remove_prewhere_column) - header.erase(query_info.prewhere_info->prewhere_column_name); + for (const auto & prewhere_info : *query_info.prewhere_info_list) + { + prewhere_info.prewhere_actions->execute(header); + if (prewhere_info.remove_prewhere_column) + header.erase(prewhere_info.prewhere_column_name); + } } return header; } diff --git a/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp index c852151f27d..3405a211c98 100644 --- a/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp @@ -22,17 +22,17 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor( Block header, const MergeTreeData & storage_, const StorageMetadataPtr & metadata_snapshot_, - const PrewhereInfoPtr & prewhere_info_, + const PrewhereInfoListPtr & prewhere_info_list_, UInt64 max_block_size_rows_, UInt64 preferred_block_size_bytes_, UInt64 preferred_max_column_in_block_size_bytes_, const MergeTreeReaderSettings & reader_settings_, bool use_uncompressed_cache_, const Names & virt_column_names_) - : SourceWithProgress(getHeader(std::move(header), prewhere_info_, virt_column_names_)) + : SourceWithProgress(getHeader(std::move(header), prewhere_info_list_, virt_column_names_)) , storage(storage_) , metadata_snapshot(metadata_snapshot_) - , prewhere_info(prewhere_info_) + , prewhere_info_list(prewhere_info_list_) , max_block_size_rows(max_block_size_rows_) , preferred_block_size_bytes(preferred_block_size_bytes_) , preferred_max_column_in_block_size_bytes(preferred_max_column_in_block_size_bytes_) @@ -70,18 +70,18 @@ Chunk MergeTreeBaseSelectProcessor::generate() void MergeTreeBaseSelectProcessor::initializeRangeReaders(MergeTreeReadTask & current_task) { - if (prewhere_info) + if (prewhere_info_list) { if (reader->getColumns().empty()) { - current_task.range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_info, true); + current_task.range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_info_list, true); } else { MergeTreeRangeReader * pre_reader_ptr = nullptr; if (pre_reader != nullptr) { - current_task.pre_range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_info, false); + current_task.pre_range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_info_list, false); pre_reader_ptr = ¤t_task.pre_range_reader; } @@ -309,34 +309,37 @@ void MergeTreeBaseSelectProcessor::injectVirtualColumns(Chunk & chunk, MergeTree chunk.setColumns(columns, num_rows); } -void MergeTreeBaseSelectProcessor::executePrewhereActions(Block & block, const PrewhereInfoPtr & prewhere_info) +void MergeTreeBaseSelectProcessor::executePrewhereActions(Block & block, const PrewhereInfoListPtr & prewhere_info_list) { - if (prewhere_info) - { - if (prewhere_info->alias_actions) - prewhere_info->alias_actions->execute(block); + if (!prewhere_info_list) + return; - prewhere_info->prewhere_actions->execute(block); - auto & prewhere_column = block.getByName(prewhere_info->prewhere_column_name); + for (const auto & prewhere_info : *prewhere_info_list) + { + if (prewhere_info.alias_actions) + prewhere_info.alias_actions->execute(block); + + prewhere_info.prewhere_actions->execute(block); + auto & prewhere_column = block.getByName(prewhere_info.prewhere_column_name); if (!prewhere_column.type->canBeUsedInBooleanContext()) throw Exception("Invalid type for filter in PREWHERE: " + prewhere_column.type->getName(), - ErrorCodes::LOGICAL_ERROR); + ErrorCodes::LOGICAL_ERROR); - if (prewhere_info->remove_prewhere_column) - block.erase(prewhere_info->prewhere_column_name); + if (prewhere_info.remove_prewhere_column) + block.erase(prewhere_info.prewhere_column_name); else { - auto & ctn = block.getByName(prewhere_info->prewhere_column_name); + auto & ctn = block.getByName(prewhere_info.prewhere_column_name); ctn.column = ctn.type->createColumnConst(block.rows(), 1u)->convertToFullColumnIfConst(); } } } Block MergeTreeBaseSelectProcessor::getHeader( - Block block, const PrewhereInfoPtr & prewhere_info, const Names & virtual_columns) + Block block, const PrewhereInfoListPtr & prewhere_info_list, const Names & virtual_columns) { - executePrewhereActions(block, prewhere_info); + executePrewhereActions(block, prewhere_info_list); injectVirtualColumns(block, nullptr, virtual_columns); return block; } diff --git a/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.h b/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.h index 00ef131ae45..a3d7520b89a 100644 --- a/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.h +++ b/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.h @@ -23,7 +23,7 @@ public: Block header, const MergeTreeData & storage_, const StorageMetadataPtr & metadata_snapshot_, - const PrewhereInfoPtr & prewhere_info_, + const PrewhereInfoListPtr & prewhere_info_list_, UInt64 max_block_size_rows_, UInt64 preferred_block_size_bytes_, UInt64 preferred_max_column_in_block_size_bytes_, @@ -33,7 +33,7 @@ public: ~MergeTreeBaseSelectProcessor() override; - static void executePrewhereActions(Block & block, const PrewhereInfoPtr & prewhere_info); + static void executePrewhereActions(Block & block, const PrewhereInfoListPtr & prewhere_info_list); protected: Chunk generate() final; @@ -49,7 +49,7 @@ protected: static void injectVirtualColumns(Block & block, MergeTreeReadTask * task, const Names & virtual_columns); static void injectVirtualColumns(Chunk & chunk, MergeTreeReadTask * task, const Names & virtual_columns); - static Block getHeader(Block block, const PrewhereInfoPtr & prewhere_info, const Names & virtual_columns); + static Block getHeader(Block block, const PrewhereInfoListPtr & prewhere_info_list, const Names & virtual_columns); void initializeRangeReaders(MergeTreeReadTask & task); @@ -57,7 +57,7 @@ protected: const MergeTreeData & storage; StorageMetadataPtr metadata_snapshot; - PrewhereInfoPtr prewhere_info; + PrewhereInfoListPtr prewhere_info_list; UInt64 max_block_size_rows; UInt64 preferred_block_size_bytes; diff --git a/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp b/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp index f8b5e0a9c0a..f3191a76120 100644 --- a/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp +++ b/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp @@ -118,11 +118,10 @@ NameSet injectRequiredColumns(const MergeTreeData & storage, const StorageMetada MergeTreeReadTask::MergeTreeReadTask( const MergeTreeData::DataPartPtr & data_part_, const MarkRanges & mark_ranges_, const size_t part_index_in_query_, const Names & ordered_names_, const NameSet & column_name_set_, const NamesAndTypesList & columns_, - const NamesAndTypesList & pre_columns_, const bool remove_prewhere_column_, const bool should_reorder_, - MergeTreeBlockSizePredictorPtr && size_predictor_) + const NamesAndTypesList & pre_columns_, const bool should_reorder_, MergeTreeBlockSizePredictorPtr && size_predictor_) : data_part{data_part_}, mark_ranges{mark_ranges_}, part_index_in_query{part_index_in_query_}, ordered_names{ordered_names_}, column_name_set{column_name_set_}, columns{columns_}, pre_columns{pre_columns_}, - remove_prewhere_column{remove_prewhere_column_}, should_reorder{should_reorder_}, size_predictor{std::move(size_predictor_)} + should_reorder{should_reorder_}, size_predictor{std::move(size_predictor_)} { } @@ -258,7 +257,7 @@ MergeTreeReadTaskColumns getReadTaskColumns( const StorageMetadataPtr & metadata_snapshot, const MergeTreeData::DataPartPtr & data_part, const Names & required_columns, - const PrewhereInfoPtr & prewhere_info, + const PrewhereInfoListPtr & prewhere_info_list, bool check_columns) { Names column_names = required_columns; @@ -267,12 +266,14 @@ MergeTreeReadTaskColumns getReadTaskColumns( /// inject columns required for defaults evaluation bool should_reorder = !injectRequiredColumns(storage, metadata_snapshot, data_part, column_names).empty(); - if (prewhere_info) + if (prewhere_info_list) { - if (prewhere_info->alias_actions) - pre_column_names = prewhere_info->alias_actions->getRequiredColumns(); - else - pre_column_names = prewhere_info->prewhere_actions->getRequiredColumns(); + for (const auto & prewhere_info : *prewhere_info_list) + { + const auto required_column_names = (prewhere_info.alias_actions ? + prewhere_info.alias_actions->getRequiredColumns() : prewhere_info.prewhere_actions->getRequiredColumns()); + pre_column_names.insert(pre_column_names.end(), required_column_names.begin(), required_column_names.end()); + } if (pre_column_names.empty()) pre_column_names.push_back(column_names[0]); diff --git a/src/Storages/MergeTree/MergeTreeBlockReadUtils.h b/src/Storages/MergeTree/MergeTreeBlockReadUtils.h index 31d609e4242..f2537c554c3 100644 --- a/src/Storages/MergeTree/MergeTreeBlockReadUtils.h +++ b/src/Storages/MergeTree/MergeTreeBlockReadUtils.h @@ -42,8 +42,6 @@ struct MergeTreeReadTask const NamesAndTypesList & columns; /// column names to read during PREWHERE const NamesAndTypesList & pre_columns; - /// should PREWHERE column be returned to requesting side? - const bool remove_prewhere_column; /// resulting block may require reordering in accordance with `ordered_names` const bool should_reorder; /// Used to satistfy preferred_block_size_bytes limitation @@ -57,8 +55,7 @@ struct MergeTreeReadTask MergeTreeReadTask( const MergeTreeData::DataPartPtr & data_part_, const MarkRanges & mark_ranges_, const size_t part_index_in_query_, const Names & ordered_names_, const NameSet & column_name_set_, const NamesAndTypesList & columns_, - const NamesAndTypesList & pre_columns_, const bool remove_prewhere_column_, const bool should_reorder_, - MergeTreeBlockSizePredictorPtr && size_predictor_); + const NamesAndTypesList & pre_columns_, const bool should_reorder_, MergeTreeBlockSizePredictorPtr && size_predictor_); virtual ~MergeTreeReadTask(); }; @@ -78,7 +75,7 @@ MergeTreeReadTaskColumns getReadTaskColumns( const StorageMetadataPtr & metadata_snapshot, const MergeTreeData::DataPartPtr & data_part, const Names & required_columns, - const PrewhereInfoPtr & prewhere_info, + const PrewhereInfoListPtr & prewhere_info_list, bool check_columns); struct MergeTreeBlockSizePredictor diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 4e1f307137a..248efc140fd 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -833,14 +833,20 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts( plan->addStep(std::move(adding_column)); } - if (query_info.prewhere_info && query_info.prewhere_info->remove_columns_actions) + if (query_info.prewhere_info_list) { - auto expression_step = std::make_unique( - plan->getCurrentDataStream(), - query_info.prewhere_info->remove_columns_actions->getActionsDAG().clone()); + for (const auto & prewhere_info : *query_info.prewhere_info_list) + { + if (prewhere_info.remove_columns_actions) + { + auto expression_step = std::make_unique( + plan->getCurrentDataStream(), + prewhere_info.remove_columns_actions->getActionsDAG().clone()); - expression_step->setStepDescription("Remove unused columns after PREWHERE"); - plan->addStep(std::move(expression_step)); + expression_step->setStepDescription("Remove unused columns after PREWHERE"); + plan->addStep(std::move(expression_step)); + } + } } return plan; @@ -948,7 +954,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams( std::move(parts), data, metadata_snapshot, - query_info.prewhere_info, + query_info.prewhere_info_list, true, column_names, MergeTreeReadPool::BackoffSettings(settings), @@ -964,7 +970,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams( i, pool, min_marks_for_concurrent_read, max_block_size, settings.preferred_block_size_bytes, settings.preferred_max_column_in_block_size_bytes, data, metadata_snapshot, use_uncompressed_cache, - query_info.prewhere_info, reader_settings, virt_columns); + query_info.prewhere_info_list, reader_settings, virt_columns); if (i == 0) { @@ -987,7 +993,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams( auto source = std::make_shared( data, metadata_snapshot, part.data_part, max_block_size, settings.preferred_block_size_bytes, settings.preferred_max_column_in_block_size_bytes, column_names, part.ranges, use_uncompressed_cache, - query_info.prewhere_info, true, reader_settings, virt_columns, part.part_index_in_query); + query_info.prewhere_info_list, true, reader_settings, virt_columns, part.part_index_in_query); res.emplace_back(std::move(source)); } @@ -1187,7 +1193,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder( column_names, ranges_to_get_from_part, use_uncompressed_cache, - query_info.prewhere_info, + query_info.prewhere_info_list, true, reader_settings, virt_columns, @@ -1205,7 +1211,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder( column_names, ranges_to_get_from_part, use_uncompressed_cache, - query_info.prewhere_info, + query_info.prewhere_info_list, true, reader_settings, virt_columns, @@ -1359,7 +1365,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( column_names, part_it->ranges, use_uncompressed_cache, - query_info.prewhere_info, + query_info.prewhere_info_list, true, reader_settings, virt_columns, diff --git a/src/Storages/MergeTree/MergeTreeRangeReader.cpp b/src/Storages/MergeTree/MergeTreeRangeReader.cpp index c13146bd35c..2ca2b30a5eb 100644 --- a/src/Storages/MergeTree/MergeTreeRangeReader.cpp +++ b/src/Storages/MergeTree/MergeTreeRangeReader.cpp @@ -443,32 +443,79 @@ size_t MergeTreeRangeReader::ReadResult::numZerosInTail(const UInt8 * begin, con return count; } -void MergeTreeRangeReader::ReadResult::setFilter(const ColumnPtr & new_filter) +void MergeTreeRangeReader::ReadResult::addFilter(const ColumnPtr & new_filter) { - if (!new_filter && filter) - throw Exception("Can't replace existing filter with empty.", ErrorCodes::LOGICAL_ERROR); - if (filter) { - size_t new_size = new_filter->size(); + if (!new_filter) + throw Exception("Can't add an empty filter to the existing one.", ErrorCodes::LOGICAL_ERROR); + const auto new_size = new_filter->size(); if (new_size != total_rows_per_granule) - throw Exception("Can't set filter because it's size is " + toString(new_size) + " but " + throw Exception("Can't add the new filter because it's size is " + toString(new_size) + " but " + toString(total_rows_per_granule) + " rows was read.", ErrorCodes::LOGICAL_ERROR); } ConstantFilterDescription const_description(*new_filter); if (const_description.always_true) - setFilterConstTrue(); + { + if (!filter) + setFilterConstTrue(); + } else if (const_description.always_false) + { clear(); + } else { - FilterDescription filter_description(*new_filter); - filter_holder = filter_description.data_holder ? filter_description.data_holder : new_filter; - filter = typeid_cast(filter_holder.get()); - if (!filter) - throw Exception("setFilter function expected ColumnUInt8.", ErrorCodes::LOGICAL_ERROR); + FilterDescription description(*new_filter); + auto new_holder = (description.data_holder ? description.data_holder : new_filter); + auto * new_holder_cast = typeid_cast(new_holder.get()); + + if (!new_holder_cast) + throw Exception("addFilter function expected ColumnUInt8.", ErrorCodes::LOGICAL_ERROR); + + if (filter) + { + MutableColumnPtr new_mutable_holder = IColumn::mutate(std::move(new_holder)); + auto * new_mutable_holder_cast = typeid_cast(new_mutable_holder.get()); + + if (!new_mutable_holder_cast) + throw Exception("addFilter function expected ColumnUInt8.", ErrorCodes::LOGICAL_ERROR); + + const auto & data = filter->getData(); + auto it = data.begin(); + + auto & new_data = new_mutable_holder_cast->getData(); + auto n_it = new_data.begin(); + + while (it != data.end() && n_it != new_data.end()) + { + *n_it = (*n_it && *it); + ++it; + ++n_it; + } + + ConstantFilterDescription new_const_description(*new_mutable_holder); + if (new_const_description.always_true) + { + setFilterConstTrue(); + } + else if (new_const_description.always_false) + { + clear(); + } + else + { + filter_holder = std::move(new_mutable_holder); + filter = new_mutable_holder_cast; + } + } + else + { + filter_holder = std::move(new_holder); + filter = new_holder_cast; + } } } @@ -489,11 +536,14 @@ size_t MergeTreeRangeReader::ReadResult::countBytesInResultFilter(const IColumn: MergeTreeRangeReader::MergeTreeRangeReader( IMergeTreeReader * merge_tree_reader_, MergeTreeRangeReader * prev_reader_, - const PrewhereInfoPtr & prewhere_, + const PrewhereInfoListPtr & prewhere_info_list_, bool last_reader_in_chain_) : merge_tree_reader(merge_tree_reader_) - , index_granularity(&(merge_tree_reader->data_part->index_granularity)), prev_reader(prev_reader_) - , prewhere(prewhere_), last_reader_in_chain(last_reader_in_chain_), is_initialized(true) + , index_granularity(&(merge_tree_reader->data_part->index_granularity)) + , prev_reader(prev_reader_) + , prewhere_info_list(prewhere_info_list_) + , last_reader_in_chain(last_reader_in_chain_) + , is_initialized(true) { if (prev_reader) sample_block = prev_reader->getSampleBlock(); @@ -501,16 +551,19 @@ MergeTreeRangeReader::MergeTreeRangeReader( for (const auto & name_and_type : merge_tree_reader->getColumns()) sample_block.insert({name_and_type.type->createColumn(), name_and_type.type, name_and_type.name}); - if (prewhere) + if (prewhere_info_list) { - if (prewhere->alias_actions) - prewhere->alias_actions->execute(sample_block, true); + for (const auto & prewhere_info : *prewhere_info_list) + { + if (prewhere_info.alias_actions) + prewhere_info.alias_actions->execute(sample_block, true); - if (prewhere->prewhere_actions) - prewhere->prewhere_actions->execute(sample_block, true); + if (prewhere_info.prewhere_actions) + prewhere_info.prewhere_actions->execute(sample_block, true); - if (prewhere->remove_prewhere_column) - sample_block.erase(prewhere->prewhere_column_name); + if (prewhere_info.remove_prewhere_column) + sample_block.erase(prewhere_info.prewhere_column_name); + } } } @@ -701,7 +754,13 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar if (read_result.num_rows == 0) return read_result; - executePrewhereActionsAndFilterColumns(read_result); + if (prewhere_info_list) + { + for (const auto & prewhere_info : *prewhere_info_list) + { + executePrewhereActionsAndFilterColumns(read_result, prewhere_info); + } + } return read_result; } @@ -798,11 +857,8 @@ Columns MergeTreeRangeReader::continueReadingChain(ReadResult & result, size_t & return columns; } -void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & result) +void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & result, const PrewhereInfo & prewhere_info) { - if (!prewhere) - return; - const auto & header = merge_tree_reader->getColumns(); size_t num_columns = header.size(); @@ -831,14 +887,14 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r for (auto name_and_type = header.begin(); pos < num_columns; ++pos, ++name_and_type) block.insert({result.columns[pos], name_and_type->type, name_and_type->name}); - if (prewhere->alias_actions) - prewhere->alias_actions->execute(block); + if (prewhere_info.alias_actions) + prewhere_info.alias_actions->execute(block); /// Columns might be projected out. We need to store them here so that default columns can be evaluated later. result.block_before_prewhere = block; - prewhere->prewhere_actions->execute(block); + prewhere_info.prewhere_actions->execute(block); - prewhere_column_pos = block.getPositionByName(prewhere->prewhere_column_name); + prewhere_column_pos = block.getPositionByName(prewhere_info.prewhere_column_name); result.columns.clear(); result.columns.reserve(block.columns()); @@ -848,15 +904,7 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r filter.swap(result.columns[prewhere_column_pos]); } - if (result.getFilter()) - { - /// TODO: implement for prewhere chain. - /// In order to do it we need combine filter and result.filter, where filter filters only '1' in result.filter. - throw Exception("MergeTreeRangeReader chain with several prewhere actions in not implemented.", - ErrorCodes::LOGICAL_ERROR); - } - - result.setFilter(filter); + result.addFilter(filter); /// If there is a WHERE, we filter in there, and only optimize IO and shrink columns here if (!last_reader_in_chain) @@ -866,7 +914,7 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r if (result.totalRowsPerGranule() == 0) result.setFilterConstFalse(); /// If we need to filter in PREWHERE - else if (prewhere->need_filter || result.need_filter) + else if (prewhere_info.need_filter || result.need_filter) { /// If there is a filter and without optimized if (result.getFilter() && last_reader_in_chain) @@ -907,11 +955,11 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r /// Check if the PREWHERE column is needed if (!result.columns.empty()) { - if (prewhere->remove_prewhere_column) + if (prewhere_info.remove_prewhere_column) result.columns.erase(result.columns.begin() + prewhere_column_pos); else result.columns[prewhere_column_pos] = - getSampleBlock().getByName(prewhere->prewhere_column_name).type-> + getSampleBlock().getByName(prewhere_info.prewhere_column_name).type-> createColumnConst(result.num_rows, 1u)->convertToFullColumnIfConst(); } } @@ -919,7 +967,7 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r else { result.columns[prewhere_column_pos] = result.getFilterHolder()->convertToFullColumnIfConst(); - if (getSampleBlock().getByName(prewhere->prewhere_column_name).type->isNullable()) + if (getSampleBlock().getByName(prewhere_info.prewhere_column_name).type->isNullable()) result.columns[prewhere_column_pos] = makeNullable(std::move(result.columns[prewhere_column_pos])); result.clearFilter(); // Acting as a flag to not filter in PREWHERE } diff --git a/src/Storages/MergeTree/MergeTreeRangeReader.h b/src/Storages/MergeTree/MergeTreeRangeReader.h index 381b87ecffd..8f8482d1abf 100644 --- a/src/Storages/MergeTree/MergeTreeRangeReader.h +++ b/src/Storages/MergeTree/MergeTreeRangeReader.h @@ -13,7 +13,8 @@ using ColumnUInt8 = ColumnVector; class IMergeTreeReader; class MergeTreeIndexGranularity; struct PrewhereInfo; -using PrewhereInfoPtr = std::shared_ptr; +using PrewhereInfoList = std::vector; +using PrewhereInfoListPtr = std::shared_ptr; /// MergeTreeReader iterator which allows sequential reading for arbitrary number of rows between pairs of marks in the same part. /// Stores reading state, which can be inside granule. Can skip rows in current granule and start reading from next mark. @@ -24,7 +25,7 @@ public: MergeTreeRangeReader( IMergeTreeReader * merge_tree_reader_, MergeTreeRangeReader * prev_reader_, - const PrewhereInfoPtr & prewhere_, + const PrewhereInfoListPtr & prewhere_info_list, bool last_reader_in_chain_); MergeTreeRangeReader() = default; @@ -153,8 +154,8 @@ public: void addRows(size_t rows) { num_read_rows += rows; } void addRange(const MarkRange & range) { started_ranges.push_back({rows_per_granule.size(), range}); } - /// Set filter or replace old one. Filter must have more zeroes than previous. - void setFilter(const ColumnPtr & new_filter); + /// Apply a filter on top of the existing one (AND'ed) or set it if there isn't any. + void addFilter(const ColumnPtr & new_filter); /// For each granule calculate the number of filtered rows at the end. Remove them and update filter. void optimize(bool can_read_incomplete_granules); /// Remove all rows from granules. @@ -212,12 +213,12 @@ private: ReadResult startReadingChain(size_t max_rows, MarkRanges & ranges); Columns continueReadingChain(ReadResult & result, size_t & num_rows); - void executePrewhereActionsAndFilterColumns(ReadResult & result); + void executePrewhereActionsAndFilterColumns(ReadResult & result, const PrewhereInfo & prewhere_info); IMergeTreeReader * merge_tree_reader = nullptr; const MergeTreeIndexGranularity * index_granularity = nullptr; MergeTreeRangeReader * prev_reader = nullptr; /// If not nullptr, read from prev_reader firstly. - PrewhereInfoPtr prewhere; + PrewhereInfoListPtr prewhere_info_list; Stream stream; diff --git a/src/Storages/MergeTree/MergeTreeReadPool.cpp b/src/Storages/MergeTree/MergeTreeReadPool.cpp index d9a250e3f7a..a3a580fa7f2 100644 --- a/src/Storages/MergeTree/MergeTreeReadPool.cpp +++ b/src/Storages/MergeTree/MergeTreeReadPool.cpp @@ -24,7 +24,7 @@ MergeTreeReadPool::MergeTreeReadPool( RangesInDataParts && parts_, const MergeTreeData & data_, const StorageMetadataPtr & metadata_snapshot_, - const PrewhereInfoPtr & prewhere_info_, + const PrewhereInfoListPtr & prewhere_info_list_, const bool check_columns_, const Names & column_names_, const BackoffSettings & backoff_settings_, @@ -37,7 +37,7 @@ MergeTreeReadPool::MergeTreeReadPool( , column_names{column_names_} , do_not_steal_tasks{do_not_steal_tasks_} , predict_block_size_bytes{preferred_block_size_bytes_ > 0} - , prewhere_info{prewhere_info_} + , prewhere_info_list{prewhere_info_list_} , parts_ranges{std::move(parts_)} { /// parts don't contain duplicate MergeTreeDataPart's. @@ -139,7 +139,7 @@ MergeTreeReadTaskPtr MergeTreeReadPool::getTask(const size_t min_marks_to_read, return std::make_unique( part.data_part, ranges_to_get_from_part, part.part_index_in_query, ordered_names, per_part_column_name_set[part_idx], per_part_columns[part_idx], per_part_pre_columns[part_idx], - prewhere_info && prewhere_info->remove_prewhere_column, per_part_should_reorder[part_idx], std::move(curr_task_size_predictor)); + per_part_should_reorder[part_idx], std::move(curr_task_size_predictor)); } MarkRanges MergeTreeReadPool::getRestMarks(const IMergeTreeDataPart & part, const MarkRange & from) const @@ -229,7 +229,7 @@ std::vector MergeTreeReadPool::fillPerPartInfo( per_part_sum_marks.push_back(sum_marks); auto [required_columns, required_pre_columns, should_reorder] = - getReadTaskColumns(data, metadata_snapshot, part.data_part, column_names, prewhere_info, check_columns); + getReadTaskColumns(data, metadata_snapshot, part.data_part, column_names, prewhere_info_list, check_columns); /// will be used to distinguish between PREWHERE and WHERE columns when applying filter const auto & required_column_names = required_columns.getNames(); diff --git a/src/Storages/MergeTree/MergeTreeReadPool.h b/src/Storages/MergeTree/MergeTreeReadPool.h index aa6811661e6..ec9523ccbe3 100644 --- a/src/Storages/MergeTree/MergeTreeReadPool.h +++ b/src/Storages/MergeTree/MergeTreeReadPool.h @@ -71,10 +71,9 @@ private: public: MergeTreeReadPool( const size_t threads_, const size_t sum_marks_, const size_t min_marks_for_concurrent_read_, - RangesInDataParts && parts_, const MergeTreeData & data_, const StorageMetadataPtr & metadata_snapshot_, const PrewhereInfoPtr & prewhere_info_, - const bool check_columns_, const Names & column_names_, - const BackoffSettings & backoff_settings_, size_t preferred_block_size_bytes_, - const bool do_not_steal_tasks_ = false); + RangesInDataParts && parts_, const MergeTreeData & data_, const StorageMetadataPtr & metadata_snapshot_, + const PrewhereInfoListPtr & prewhere_info_list, const bool check_columns_, const Names & column_names_, + const BackoffSettings & backoff_settings_, size_t preferred_block_size_bytes_, const bool do_not_steal_tasks_ = false); MergeTreeReadTaskPtr getTask(const size_t min_marks_to_read, const size_t thread, const Names & ordered_names); @@ -107,7 +106,7 @@ private: std::vector per_part_pre_columns; std::vector per_part_should_reorder; std::vector per_part_size_predictor; - PrewhereInfoPtr prewhere_info; + PrewhereInfoListPtr prewhere_info_list; struct Part { diff --git a/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.cpp index ee0a77ba3cf..35df1106339 100644 --- a/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.cpp @@ -22,7 +22,7 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor( Names required_columns_, MarkRanges mark_ranges_, bool use_uncompressed_cache_, - const PrewhereInfoPtr & prewhere_info_, + const PrewhereInfoListPtr & prewhere_info_list_, bool check_columns, const MergeTreeReaderSettings & reader_settings_, const Names & virt_column_names_, @@ -31,7 +31,7 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor( : MergeTreeBaseSelectProcessor{ metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID()), - storage_, metadata_snapshot_, prewhere_info_, max_block_size_rows_, + storage_, metadata_snapshot_, prewhere_info_list_, max_block_size_rows_, preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, reader_settings_, use_uncompressed_cache_, virt_column_names_}, required_columns{std::move(required_columns_)}, @@ -56,7 +56,7 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor( ordered_names = header_without_virtual_columns.getNames(); - task_columns = getReadTaskColumns(storage, metadata_snapshot, data_part, required_columns, prewhere_info, check_columns); + task_columns = getReadTaskColumns(storage, metadata_snapshot, data_part, required_columns, prewhere_info_list, check_columns); /// will be used to distinguish between PREWHERE and WHERE columns when applying filter const auto & column_names = task_columns.columns.getNames(); @@ -71,7 +71,7 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor( all_mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings); - if (prewhere_info) + if (prewhere_info_list) pre_reader = data_part->getReader(task_columns.pre_columns, metadata_snapshot, all_mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings); } @@ -100,8 +100,7 @@ try task = std::make_unique( data_part, mark_ranges_for_task, part_index_in_query, ordered_names, column_name_set, - task_columns.columns, task_columns.pre_columns, prewhere_info && prewhere_info->remove_prewhere_column, - task_columns.should_reorder, std::move(size_predictor)); + task_columns.columns, task_columns.pre_columns, task_columns.should_reorder, std::move(size_predictor)); return true; } diff --git a/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.h b/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.h index c9fd06c5534..b6da7166457 100644 --- a/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.h +++ b/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.h @@ -26,7 +26,7 @@ public: Names required_columns_, MarkRanges mark_ranges, bool use_uncompressed_cache, - const PrewhereInfoPtr & prewhere_info, + const PrewhereInfoListPtr & prewhere_info_list, bool check_columns, const MergeTreeReaderSettings & reader_settings, const Names & virt_column_names = {}, diff --git a/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp index 65f9b1eba3b..cdb97f47a47 100644 --- a/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp @@ -22,7 +22,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor( Names required_columns_, MarkRanges mark_ranges_, bool use_uncompressed_cache_, - const PrewhereInfoPtr & prewhere_info_, + const PrewhereInfoListPtr & prewhere_info_list_, bool check_columns_, const MergeTreeReaderSettings & reader_settings_, const Names & virt_column_names_, @@ -31,7 +31,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor( : MergeTreeBaseSelectProcessor{ metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID()), - storage_, metadata_snapshot_, prewhere_info_, max_block_size_rows_, + storage_, metadata_snapshot_, prewhere_info_list_, max_block_size_rows_, preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, reader_settings_, use_uncompressed_cache_, virt_column_names_}, required_columns{std::move(required_columns_)}, @@ -69,7 +69,7 @@ try task_columns = getReadTaskColumns( storage, metadata_snapshot, data_part, - required_columns, prewhere_info, check_columns); + required_columns, prewhere_info_list, check_columns); auto size_predictor = (preferred_block_size_bytes == 0) ? nullptr @@ -81,8 +81,7 @@ try task = std::make_unique( data_part, all_mark_ranges, part_index_in_query, ordered_names, column_name_set, task_columns.columns, - task_columns.pre_columns, prewhere_info && prewhere_info->remove_prewhere_column, - task_columns.should_reorder, std::move(size_predictor)); + task_columns.pre_columns, task_columns.should_reorder, std::move(size_predictor)); if (!reader) { @@ -94,7 +93,7 @@ try reader = data_part->getReader(task_columns.columns, metadata_snapshot, all_mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings); - if (prewhere_info) + if (prewhere_info_list) pre_reader = data_part->getReader(task_columns.pre_columns, metadata_snapshot, all_mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings); } diff --git a/src/Storages/MergeTree/MergeTreeSelectProcessor.h b/src/Storages/MergeTree/MergeTreeSelectProcessor.h index 925c437f1ce..521bbbfdba4 100644 --- a/src/Storages/MergeTree/MergeTreeSelectProcessor.h +++ b/src/Storages/MergeTree/MergeTreeSelectProcessor.h @@ -26,7 +26,7 @@ public: Names required_columns_, MarkRanges mark_ranges, bool use_uncompressed_cache, - const PrewhereInfoPtr & prewhere_info, + const PrewhereInfoListPtr & prewhere_info_list, bool check_columns, const MergeTreeReaderSettings & reader_settings, const Names & virt_column_names = {}, diff --git a/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.cpp b/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.cpp index f57247e39ab..eb1a80acb49 100644 --- a/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.cpp @@ -18,12 +18,12 @@ MergeTreeThreadSelectBlockInputProcessor::MergeTreeThreadSelectBlockInputProcess const MergeTreeData & storage_, const StorageMetadataPtr & metadata_snapshot_, const bool use_uncompressed_cache_, - const PrewhereInfoPtr & prewhere_info_, + const PrewhereInfoListPtr & prewhere_info_list_, const MergeTreeReaderSettings & reader_settings_, const Names & virt_column_names_) : MergeTreeBaseSelectProcessor{ - pool_->getHeader(), storage_, metadata_snapshot_, prewhere_info_, + pool_->getHeader(), storage_, metadata_snapshot_, prewhere_info_list_, max_block_size_rows_, preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, reader_settings_, use_uncompressed_cache_, virt_column_names_}, @@ -78,7 +78,7 @@ bool MergeTreeThreadSelectBlockInputProcessor::getNewTask() owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings, IMergeTreeReader::ValueSizeMap{}, profile_callback); - if (prewhere_info) + if (prewhere_info_list) pre_reader = task->data_part->getReader(task->pre_columns, metadata_snapshot, rest_mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings, IMergeTreeReader::ValueSizeMap{}, profile_callback); @@ -94,7 +94,7 @@ bool MergeTreeThreadSelectBlockInputProcessor::getNewTask() owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings, reader->getAvgValueSizeHints(), profile_callback); - if (prewhere_info) + if (prewhere_info_list) pre_reader = task->data_part->getReader(task->pre_columns, metadata_snapshot, rest_mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings, reader->getAvgValueSizeHints(), profile_callback); diff --git a/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.h b/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.h index 2b2ed36fc18..dd3ba8c973c 100644 --- a/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.h +++ b/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.h @@ -24,7 +24,7 @@ public: const MergeTreeData & storage_, const StorageMetadataPtr & metadata_snapshot_, const bool use_uncompressed_cache_, - const PrewhereInfoPtr & prewhere_info_, + const PrewhereInfoListPtr & prewhere_info_list_, const MergeTreeReaderSettings & reader_settings_, const Names & virt_column_names_); diff --git a/src/Storages/SelectQueryInfo.h b/src/Storages/SelectQueryInfo.h index 5a3ada6288b..68f2f8f1361 100644 --- a/src/Storages/SelectQueryInfo.h +++ b/src/Storages/SelectQueryInfo.h @@ -32,6 +32,8 @@ struct PrewhereInfo : prewhere_actions(std::move(prewhere_actions_)), prewhere_column_name(std::move(prewhere_column_name_)) {} }; +using PrewhereInfoList = std::vector; + /// Same as PrewhereInfo, but with ActionsDAG struct PrewhereDAGInfo { @@ -75,7 +77,7 @@ struct InputOrderInfo bool operator !=(const InputOrderInfo & other) const { return !(*this == other); } }; -using PrewhereInfoPtr = std::shared_ptr; +using PrewhereInfoListPtr = std::shared_ptr; using PrewhereDAGInfoPtr = std::shared_ptr; using FilterInfoPtr = std::shared_ptr; using InputOrderInfoPtr = std::shared_ptr; @@ -104,7 +106,7 @@ struct SelectQueryInfo TreeRewriterResultPtr syntax_analyzer_result; - PrewhereInfoPtr prewhere_info; + PrewhereInfoListPtr prewhere_info_list; ReadInOrderOptimizerPtr order_optimizer; /// Can be modified while reading from storage diff --git a/src/Storages/StorageBuffer.cpp b/src/Storages/StorageBuffer.cpp index ce74567c62b..53fee054f4b 100644 --- a/src/Storages/StorageBuffer.cpp +++ b/src/Storages/StorageBuffer.cpp @@ -314,21 +314,26 @@ void StorageBuffer::read( } else { - if (query_info.prewhere_info) + if (query_info.prewhere_info_list) { - pipe_from_buffers.addSimpleTransform([&](const Block & header) - { - return std::make_shared( - header, query_info.prewhere_info->prewhere_actions, - query_info.prewhere_info->prewhere_column_name, query_info.prewhere_info->remove_prewhere_column); - }); - - if (query_info.prewhere_info->alias_actions) + for (const auto & prewhere_info : *query_info.prewhere_info_list) { pipe_from_buffers.addSimpleTransform([&](const Block & header) { - return std::make_shared(header, query_info.prewhere_info->alias_actions); + return std::make_shared( + header, prewhere_info.prewhere_actions, + prewhere_info.prewhere_column_name, + prewhere_info.remove_prewhere_column); }); + + if (prewhere_info.alias_actions) + { + pipe_from_buffers.addSimpleTransform([&](const Block & header) + { + return std::make_shared( + header, prewhere_info.alias_actions); + }); + } } }