From fa8ad1a7c6295839ecd36db734d809e47c105550 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Fri, 25 Jun 2021 19:22:39 +0300 Subject: [PATCH 01/11] reduce memory usage in queries with 'ORDER BY primary_key' --- .../gtest_blocks_size_merging_streams.cpp | 4 +- .../Merges/AggregatingSortedTransform.h | 2 +- .../Merges/CollapsingSortedTransform.h | 2 +- .../FinishAggregatingInOrderTransform.h | 2 +- .../Merges/GraphiteRollupSortedTransform.h | 2 +- src/Processors/Merges/IMergingTransform.cpp | 16 ++-- src/Processors/Merges/IMergingTransform.h | 7 +- .../Merges/MergingSortedTransform.cpp | 3 +- .../Merges/MergingSortedTransform.h | 1 + .../Merges/ReplacingSortedTransform.h | 2 +- .../Merges/SummingSortedTransform.h | 2 +- .../Merges/VersionedCollapsingTransform.h | 2 +- .../QueryPlan/FinishSortingStep.cpp | 5 +- .../QueryPlan/ReadFromMergeTree.cpp | 19 +++-- src/Processors/QueryPlan/ReadFromMergeTree.h | 2 +- .../Transforms/MergeSortingTransform.cpp | 1 + .../MergeTree/MergeTreeDataMergerMutator.cpp | 2 +- .../MergeTreeInOrderSelectProcessor.cpp | 80 ++++++++++++++++++ .../MergeTreeInOrderSelectProcessor.h | 39 +++++++++ .../MergeTreeReverseSelectProcessor.cpp | 66 ++------------- .../MergeTreeReverseSelectProcessor.h | 44 ++-------- .../MergeTree/MergeTreeSelectProcessor.cpp | 83 ++++--------------- .../MergeTree/MergeTreeSelectProcessor.h | 20 ++--- .../01926_order_by_desc_limit.reference | 2 + .../0_stateless/01926_order_by_desc_limit.sql | 21 +++++ 25 files changed, 226 insertions(+), 203 deletions(-) create mode 100644 src/Storages/MergeTree/MergeTreeInOrderSelectProcessor.cpp create mode 100644 src/Storages/MergeTree/MergeTreeInOrderSelectProcessor.h create mode 100644 tests/queries/0_stateless/01926_order_by_desc_limit.reference create mode 100644 tests/queries/0_stateless/01926_order_by_desc_limit.sql diff --git a/src/DataStreams/tests/gtest_blocks_size_merging_streams.cpp b/src/DataStreams/tests/gtest_blocks_size_merging_streams.cpp index 0ce450c4e6c..2fc2dc77011 100644 --- a/src/DataStreams/tests/gtest_blocks_size_merging_streams.cpp +++ b/src/DataStreams/tests/gtest_blocks_size_merging_streams.cpp @@ -84,7 +84,7 @@ TEST(MergingSortedTest, SimpleBlockSizeTest) EXPECT_EQ(pipe.numOutputPorts(), 3); auto transform = std::make_shared(pipe.getHeader(), pipe.numOutputPorts(), sort_description, - DEFAULT_MERGE_BLOCK_SIZE, 0, nullptr, false, true); + DEFAULT_MERGE_BLOCK_SIZE, 0, false, nullptr, false, true); pipe.addTransform(std::move(transform)); @@ -129,7 +129,7 @@ TEST(MergingSortedTest, MoreInterestingBlockSizes) EXPECT_EQ(pipe.numOutputPorts(), 3); auto transform = std::make_shared(pipe.getHeader(), pipe.numOutputPorts(), sort_description, - DEFAULT_MERGE_BLOCK_SIZE, 0, nullptr, false, true); + DEFAULT_MERGE_BLOCK_SIZE, 0, false, nullptr, false, true); pipe.addTransform(std::move(transform)); diff --git a/src/Processors/Merges/AggregatingSortedTransform.h b/src/Processors/Merges/AggregatingSortedTransform.h index a0425d4c376..b4a7d4c8106 100644 --- a/src/Processors/Merges/AggregatingSortedTransform.h +++ b/src/Processors/Merges/AggregatingSortedTransform.h @@ -16,7 +16,7 @@ public: const Block & header, size_t num_inputs, SortDescription description_, size_t max_block_size) : IMergingTransform( - num_inputs, header, header, true, + num_inputs, header, header, true, false, header, num_inputs, std::move(description_), diff --git a/src/Processors/Merges/CollapsingSortedTransform.h b/src/Processors/Merges/CollapsingSortedTransform.h index 9e6bd306eee..059e90c1de8 100644 --- a/src/Processors/Merges/CollapsingSortedTransform.h +++ b/src/Processors/Merges/CollapsingSortedTransform.h @@ -20,7 +20,7 @@ public: WriteBuffer * out_row_sources_buf_ = nullptr, bool use_average_block_sizes = false) : IMergingTransform( - num_inputs, header, header, true, + num_inputs, header, header, true, false, header, num_inputs, std::move(description_), diff --git a/src/Processors/Merges/FinishAggregatingInOrderTransform.h b/src/Processors/Merges/FinishAggregatingInOrderTransform.h index 4f9e53bd7d5..f3bcc9dd878 100644 --- a/src/Processors/Merges/FinishAggregatingInOrderTransform.h +++ b/src/Processors/Merges/FinishAggregatingInOrderTransform.h @@ -19,7 +19,7 @@ public: SortDescription description, size_t max_block_size) : IMergingTransform( - num_inputs, header, header, true, + num_inputs, header, header, true, false, header, num_inputs, params, diff --git a/src/Processors/Merges/GraphiteRollupSortedTransform.h b/src/Processors/Merges/GraphiteRollupSortedTransform.h index 5104801aa0d..9432482fe16 100644 --- a/src/Processors/Merges/GraphiteRollupSortedTransform.h +++ b/src/Processors/Merges/GraphiteRollupSortedTransform.h @@ -15,7 +15,7 @@ public: SortDescription description_, size_t max_block_size, Graphite::Params params_, time_t time_of_merge_) : IMergingTransform( - num_inputs, header, header, true, + num_inputs, header, header, true, false, header, num_inputs, std::move(description_), diff --git a/src/Processors/Merges/IMergingTransform.cpp b/src/Processors/Merges/IMergingTransform.cpp index eff786b150f..8be21e3e291 100644 --- a/src/Processors/Merges/IMergingTransform.cpp +++ b/src/Processors/Merges/IMergingTransform.cpp @@ -14,9 +14,11 @@ IMergingTransformBase::IMergingTransformBase( size_t num_inputs, const Block & input_header, const Block & output_header, - bool have_all_inputs_) + bool have_all_inputs_, + bool expected_one_block_) : IProcessor(InputPorts(num_inputs, input_header), {output_header}) , have_all_inputs(have_all_inputs_) + , expected_one_block(expected_one_block_) { } @@ -64,10 +66,7 @@ IProcessor::Status IMergingTransformBase::prepareInitializeInputs() continue; if (input_states[i].is_initialized) - { - // input.setNotNeeded(); continue; - } input.setNeeded(); @@ -77,12 +76,17 @@ IProcessor::Status IMergingTransformBase::prepareInitializeInputs() continue; } - auto chunk = input.pull(); + /// setNotNeeded after reading first chunk, because in optimismtic case + /// (e.g. with optimized 'ORDER BY primary_key LIMIT n' and small 'n') + /// we won't have to read any chunks anymore; + auto chunk = input.pull(expected_one_block); if (!chunk.hasRows()) { - if (!input.isFinished()) + { + input.setNeeded(); all_inputs_has_data = false; + } continue; } diff --git a/src/Processors/Merges/IMergingTransform.h b/src/Processors/Merges/IMergingTransform.h index ce673131ab6..695aa4eb896 100644 --- a/src/Processors/Merges/IMergingTransform.h +++ b/src/Processors/Merges/IMergingTransform.h @@ -16,7 +16,8 @@ public: size_t num_inputs, const Block & input_header, const Block & output_header, - bool have_all_inputs_); + bool have_all_inputs_, + bool expected_one_block_); OutputPort & getOutputPort() { return outputs.front(); } @@ -66,6 +67,7 @@ private: std::vector input_states; std::atomic have_all_inputs; bool is_initialized = false; + bool expected_one_block = false; IProcessor::Status prepareInitializeInputs(); }; @@ -81,8 +83,9 @@ public: const Block & input_header, const Block & output_header, bool have_all_inputs_, + bool expected_one_block_, Args && ... args) - : IMergingTransformBase(num_inputs, input_header, output_header, have_all_inputs_) + : IMergingTransformBase(num_inputs, input_header, output_header, have_all_inputs_, expected_one_block_) , algorithm(std::forward(args) ...) { } diff --git a/src/Processors/Merges/MergingSortedTransform.cpp b/src/Processors/Merges/MergingSortedTransform.cpp index ec1bdc59683..f185ec6ad8e 100644 --- a/src/Processors/Merges/MergingSortedTransform.cpp +++ b/src/Processors/Merges/MergingSortedTransform.cpp @@ -13,12 +13,13 @@ MergingSortedTransform::MergingSortedTransform( SortDescription description_, size_t max_block_size, UInt64 limit_, + bool expected_one_block_, WriteBuffer * out_row_sources_buf_, bool quiet_, bool use_average_block_sizes, bool have_all_inputs_) : IMergingTransform( - num_inputs, header, header, have_all_inputs_, + num_inputs, header, header, have_all_inputs_, expected_one_block_, header, num_inputs, std::move(description_), diff --git a/src/Processors/Merges/MergingSortedTransform.h b/src/Processors/Merges/MergingSortedTransform.h index 93bd36d8aec..0e616eb2c78 100644 --- a/src/Processors/Merges/MergingSortedTransform.h +++ b/src/Processors/Merges/MergingSortedTransform.h @@ -17,6 +17,7 @@ public: SortDescription description, size_t max_block_size, UInt64 limit_ = 0, + bool expected_one_block_ = false, WriteBuffer * out_row_sources_buf_ = nullptr, bool quiet_ = false, bool use_average_block_sizes = false, diff --git a/src/Processors/Merges/ReplacingSortedTransform.h b/src/Processors/Merges/ReplacingSortedTransform.h index 757e19e2cbe..c1df68f3a76 100644 --- a/src/Processors/Merges/ReplacingSortedTransform.h +++ b/src/Processors/Merges/ReplacingSortedTransform.h @@ -18,7 +18,7 @@ public: WriteBuffer * out_row_sources_buf_ = nullptr, bool use_average_block_sizes = false) : IMergingTransform( - num_inputs, header, header, true, + num_inputs, header, header, true, false, header, num_inputs, std::move(description_), diff --git a/src/Processors/Merges/SummingSortedTransform.h b/src/Processors/Merges/SummingSortedTransform.h index 22361bb1a44..730d6604950 100644 --- a/src/Processors/Merges/SummingSortedTransform.h +++ b/src/Processors/Merges/SummingSortedTransform.h @@ -19,7 +19,7 @@ public: const Names & partition_key_columns, size_t max_block_size) : IMergingTransform( - num_inputs, header, header, true, + num_inputs, header, header, true, false, header, num_inputs, std::move(description_), diff --git a/src/Processors/Merges/VersionedCollapsingTransform.h b/src/Processors/Merges/VersionedCollapsingTransform.h index f593734c603..6251f842898 100644 --- a/src/Processors/Merges/VersionedCollapsingTransform.h +++ b/src/Processors/Merges/VersionedCollapsingTransform.h @@ -19,7 +19,7 @@ public: WriteBuffer * out_row_sources_buf_ = nullptr, bool use_average_block_sizes = false) : IMergingTransform( - num_inputs, header, header, true, + num_inputs, header, header, true, false, header, num_inputs, std::move(description_), diff --git a/src/Processors/QueryPlan/FinishSortingStep.cpp b/src/Processors/QueryPlan/FinishSortingStep.cpp index a2e056b3029..6f21865592b 100644 --- a/src/Processors/QueryPlan/FinishSortingStep.cpp +++ b/src/Processors/QueryPlan/FinishSortingStep.cpp @@ -58,11 +58,14 @@ void FinishSortingStep::transformPipeline(QueryPipeline & pipeline, const BuildQ if (pipeline.getNumStreams() > 1) { UInt64 limit_for_merging = (need_finish_sorting ? 0 : limit); + bool expected_one_block = limit_for_merging && limit_for_merging < max_block_size; auto transform = std::make_shared( pipeline.getHeader(), pipeline.getNumStreams(), prefix_description, - max_block_size, limit_for_merging); + max_block_size, + limit_for_merging, + expected_one_block); pipeline.addTransform(std::move(transform)); } diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index fd5de98b4c0..3698fa09020 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -13,7 +13,7 @@ #include #include #include -#include +#include #include #include #include @@ -175,12 +175,13 @@ template ProcessorPtr ReadFromMergeTree::createSource( const RangesInDataPart & part, const Names & required_columns, - bool use_uncompressed_cache) + bool use_uncompressed_cache, + bool one_range_per_task) { return std::make_shared( data, metadata_snapshot, part.data_part, max_block_size, preferred_block_size_bytes, preferred_max_column_in_block_size_bytes, required_columns, part.ranges, use_uncompressed_cache, - prewhere_info, true, reader_settings, virt_column_names, part.part_index_in_query); + prewhere_info, true, reader_settings, virt_column_names, part.part_index_in_query, one_range_per_task); } Pipe ReadFromMergeTree::readInOrder( @@ -190,11 +191,15 @@ Pipe ReadFromMergeTree::readInOrder( bool use_uncompressed_cache) { Pipes pipes; + /// For reading in order it makes sense to read only + /// one range per task to reduce number of read rows. + bool one_range_per_task = read_type != ReadType::Default; + for (const auto & part : parts_with_range) { auto source = read_type == ReadType::InReverseOrder - ? createSource(part, required_columns, use_uncompressed_cache) - : createSource(part, required_columns, use_uncompressed_cache); + ? createSource(part, required_columns, use_uncompressed_cache, one_range_per_task) + : createSource(part, required_columns, use_uncompressed_cache, one_range_per_task); pipes.emplace_back(std::move(source)); } @@ -445,6 +450,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder( } parts_with_ranges.emplace_back(part); } + ranges_to_get_from_part = split_ranges(ranges_to_get_from_part, input_order_info->direction); new_parts.emplace_back(part.data_part, part.part_index_in_query, std::move(ranges_to_get_from_part)); } @@ -482,7 +488,8 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder( pipe.getHeader(), pipe.numOutputPorts(), sort_description, - max_block_size); + max_block_size, + 0, true); pipe.addTransform(std::move(transform)); } diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.h b/src/Processors/QueryPlan/ReadFromMergeTree.h index 6e1efffdb02..d44a6e80cf6 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.h +++ b/src/Processors/QueryPlan/ReadFromMergeTree.h @@ -111,7 +111,7 @@ private: Pipe readInOrder(RangesInDataParts parts_with_range, Names required_columns, ReadType read_type, bool use_uncompressed_cache); template - ProcessorPtr createSource(const RangesInDataPart & part, const Names & required_columns, bool use_uncompressed_cache); + ProcessorPtr createSource(const RangesInDataPart & part, const Names & required_columns, bool use_uncompressed_cache, bool one_range_per_task); Pipe spreadMarkRangesAmongStreams( RangesInDataParts && parts_with_ranges, diff --git a/src/Processors/Transforms/MergeSortingTransform.cpp b/src/Processors/Transforms/MergeSortingTransform.cpp index 1806693db3a..ca78a29071e 100644 --- a/src/Processors/Transforms/MergeSortingTransform.cpp +++ b/src/Processors/Transforms/MergeSortingTransform.cpp @@ -200,6 +200,7 @@ void MergeSortingTransform::consume(Chunk chunk) description, max_merged_block_size, limit, + false, nullptr, quiet, use_average_block_sizes, diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 766d988500d..200ff4f681d 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -891,7 +891,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor { case MergeTreeData::MergingParams::Ordinary: merged_transform = std::make_unique( - header, pipes.size(), sort_description, merge_block_size, 0, rows_sources_write_buf.get(), true, blocks_are_granules_size); + header, pipes.size(), sort_description, merge_block_size, 0, false, rows_sources_write_buf.get(), true, blocks_are_granules_size); break; case MergeTreeData::MergingParams::Collapsing: diff --git a/src/Storages/MergeTree/MergeTreeInOrderSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeInOrderSelectProcessor.cpp new file mode 100644 index 00000000000..8e959249d93 --- /dev/null +++ b/src/Storages/MergeTree/MergeTreeInOrderSelectProcessor.cpp @@ -0,0 +1,80 @@ +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int MEMORY_LIMIT_EXCEEDED; +} + +MergeTreeInOrderSelectProcessor::MergeTreeInOrderSelectProcessor( + const MergeTreeData & storage_, + const StorageMetadataPtr & metadata_snapshot_, + const MergeTreeData::DataPartPtr & owned_data_part_, + UInt64 max_block_size_rows_, + size_t preferred_block_size_bytes_, + size_t preferred_max_column_in_block_size_bytes_, + Names required_columns_, + MarkRanges mark_ranges_, + bool use_uncompressed_cache_, + const PrewhereInfoPtr & prewhere_info_, + bool check_columns_, + const MergeTreeReaderSettings & reader_settings_, + const Names & virt_column_names_, + bool one_range_per_task_, + bool quiet) + : MergeTreeSelectProcessor{ + storage_, metadata_snapshot_, owned_data_part_, max_block_size_rows_, + preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, + required_columns_, std::move(mark_ranges_), use_uncompressed_cache_, prewhere_info_, + check_columns_, reader_settings_, virt_column_names_, one_range_per_task_} +{ + if (!quiet) + LOG_DEBUG(log, "Reading {} ranges in order from part {}, approx. {} rows starting from {}", + all_mark_ranges.size(), data_part->name, total_rows, + data_part->index_granularity.getMarkStartingRow(all_mark_ranges.front().begin)); +} + +bool MergeTreeInOrderSelectProcessor::getNewTask() +try +{ + /// Produce no more than one task + if (all_mark_ranges.empty()) + { + finish(); + return false; + } + + auto size_predictor = (preferred_block_size_bytes == 0) + ? nullptr + : std::make_unique(data_part, ordered_names, metadata_snapshot->getSampleBlock()); + + MarkRanges mark_ranges_for_task; + if (one_range_per_task) + { + mark_ranges_for_task = { std::move(all_mark_ranges.front()) }; + all_mark_ranges.pop_front(); + } + else + { + mark_ranges_for_task = std::move(all_mark_ranges); + all_mark_ranges.clear(); + } + + task = std::make_unique( + data_part, mark_ranges_for_task, part_index_in_query, ordered_names, column_name_set, task_columns.columns, + task_columns.pre_columns, prewhere_info && prewhere_info->remove_prewhere_column, + task_columns.should_reorder, std::move(size_predictor)); + + return true; +} +catch (...) +{ + /// Suspicion of the broken part. A part is added to the queue for verification. + if (getCurrentExceptionCode() != ErrorCodes::MEMORY_LIMIT_EXCEEDED) + storage.reportBrokenPart(data_part->name); + throw; +} + +} diff --git a/src/Storages/MergeTree/MergeTreeInOrderSelectProcessor.h b/src/Storages/MergeTree/MergeTreeInOrderSelectProcessor.h new file mode 100644 index 00000000000..1a300620278 --- /dev/null +++ b/src/Storages/MergeTree/MergeTreeInOrderSelectProcessor.h @@ -0,0 +1,39 @@ +#pragma once +#include + +namespace DB +{ + + +/// Used to read data from single part with select query in order of primary key. +/// Cares about PREWHERE, virtual columns, indexes etc. +/// To read data from multiple parts, Storage (MergeTree) creates multiple such objects. +class MergeTreeInOrderSelectProcessor : public MergeTreeSelectProcessor +{ +public: + MergeTreeInOrderSelectProcessor( + const MergeTreeData & storage, + const StorageMetadataPtr & metadata_snapshot, + const MergeTreeData::DataPartPtr & owned_data_part, + UInt64 max_block_size_rows, + size_t preferred_block_size_bytes, + size_t preferred_max_column_in_block_size_bytes, + Names required_columns_, + MarkRanges mark_ranges, + bool use_uncompressed_cache, + const PrewhereInfoPtr & prewhere_info, + bool check_columns, + const MergeTreeReaderSettings & reader_settings, + const Names & virt_column_names = {}, + bool one_range_per_task_ = false, + bool quiet = false); + + String getName() const override { return "MergeTreeInOrder"; } + +private: + bool getNewTask() override; + + Poco::Logger * log = &Poco::Logger::get("MergeTreeInOrderSelectProcessor"); +}; + +} diff --git a/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.cpp index e9527efaa4a..c144a7d16e7 100644 --- a/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.cpp @@ -1,8 +1,4 @@ #include -#include -#include -#include - namespace DB { @@ -23,62 +19,27 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor( MarkRanges mark_ranges_, bool use_uncompressed_cache_, const PrewhereInfoPtr & prewhere_info_, - bool check_columns, + bool check_columns_, const MergeTreeReaderSettings & reader_settings_, const Names & virt_column_names_, - size_t part_index_in_query_, + bool one_range_per_task_, bool quiet) - : - MergeTreeBaseSelectProcessor{ - metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID()), - storage_, metadata_snapshot_, prewhere_info_, max_block_size_rows_, + : MergeTreeSelectProcessor{ + storage_, metadata_snapshot_, owned_data_part_, max_block_size_rows_, preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, - reader_settings_, use_uncompressed_cache_, virt_column_names_}, - required_columns{std::move(required_columns_)}, - data_part{owned_data_part_}, - all_mark_ranges(std::move(mark_ranges_)), - part_index_in_query(part_index_in_query_), - path(data_part->getFullRelativePath()) + required_columns_, std::move(mark_ranges_), use_uncompressed_cache_, prewhere_info_, + check_columns_, reader_settings_, virt_column_names_, one_range_per_task_} { - /// Let's estimate total number of rows for progress bar. - for (const auto & range : all_mark_ranges) - total_marks_count += range.end - range.begin; - - size_t total_rows = data_part->index_granularity.getRowsCountInRanges(all_mark_ranges); - if (!quiet) LOG_DEBUG(log, "Reading {} ranges in reverse order from part {}, approx. {} rows starting from {}", all_mark_ranges.size(), data_part->name, total_rows, data_part->index_granularity.getMarkStartingRow(all_mark_ranges.front().begin)); - - addTotalRowsApprox(total_rows); - - ordered_names = header_without_virtual_columns.getNames(); - - task_columns = getReadTaskColumns(storage, metadata_snapshot, data_part, required_columns, prewhere_info, check_columns); - - /// will be used to distinguish between PREWHERE and WHERE columns when applying filter - const auto & column_names = task_columns.columns.getNames(); - column_name_set = NameSet{column_names.begin(), column_names.end()}; - - if (use_uncompressed_cache) - owned_uncompressed_cache = storage.getContext()->getUncompressedCache(); - - owned_mark_cache = storage.getContext()->getMarkCache(); - - reader = data_part->getReader(task_columns.columns, metadata_snapshot, - all_mark_ranges, owned_uncompressed_cache.get(), - owned_mark_cache.get(), reader_settings); - - if (prewhere_info) - pre_reader = data_part->getReader(task_columns.pre_columns, metadata_snapshot, all_mark_ranges, - owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings); } bool MergeTreeReverseSelectProcessor::getNewTask() try { - if ((chunks.empty() && all_mark_ranges.empty()) || total_marks_count == 0) + if (chunks.empty() && all_mark_ranges.empty()) { finish(); return false; @@ -141,17 +102,4 @@ Chunk MergeTreeReverseSelectProcessor::readFromPart() return res; } -void MergeTreeReverseSelectProcessor::finish() -{ - /** Close the files (before destroying the object). - * When many sources are created, but simultaneously reading only a few of them, - * buffers don't waste memory. - */ - reader.reset(); - pre_reader.reset(); - data_part.reset(); -} - -MergeTreeReverseSelectProcessor::~MergeTreeReverseSelectProcessor() = default; - } diff --git a/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.h b/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.h index c9fd06c5534..6f507f57138 100644 --- a/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.h +++ b/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.h @@ -1,19 +1,15 @@ #pragma once -#include -#include -#include -#include -#include -#include +#include namespace DB { /// Used to read data from single part with select query +/// in reverse order of primary key. /// Cares about PREWHERE, virtual columns, indexes etc. /// To read data from multiple parts, Storage (MergeTree) creates multiple such objects. -class MergeTreeReverseSelectProcessor : public MergeTreeBaseSelectProcessor +class MergeTreeReverseSelectProcessor : public MergeTreeSelectProcessor { public: MergeTreeReverseSelectProcessor( @@ -30,46 +26,16 @@ public: bool check_columns, const MergeTreeReaderSettings & reader_settings, const Names & virt_column_names = {}, - size_t part_index_in_query = 0, + bool one_range_per_task_ = false, bool quiet = false); - ~MergeTreeReverseSelectProcessor() override; - String getName() const override { return "MergeTreeReverse"; } - /// Closes readers and unlock part locks - void finish(); - -protected: - +private: bool getNewTask() override; Chunk readFromPart() override; -private: - Block header; - - /// Used by Task - Names required_columns; - /// Names from header. Used in order to order columns in read blocks. - Names ordered_names; - NameSet column_name_set; - - MergeTreeReadTaskColumns task_columns; - - /// Data part will not be removed if the pointer owns it - MergeTreeData::DataPartPtr data_part; - - /// Mark ranges we should read (in ascending order) - MarkRanges all_mark_ranges; - /// Total number of marks we should read - size_t total_marks_count = 0; - /// Value of _part_index virtual column (used only in SelectExecutor) - size_t part_index_in_query = 0; - - String path; - Chunks chunks; - Poco::Logger * log = &Poco::Logger::get("MergeTreeReverseSelectProcessor"); }; diff --git a/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp index 980afa170e9..4bb4799f396 100644 --- a/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp @@ -26,10 +26,8 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor( bool check_columns_, const MergeTreeReaderSettings & reader_settings_, const Names & virt_column_names_, - size_t part_index_in_query_, - bool quiet) - : - MergeTreeBaseSelectProcessor{ + bool one_range_per_task_) + : MergeTreeBaseSelectProcessor{ metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID()), storage_, metadata_snapshot_, prewhere_info_, max_block_size_rows_, preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, @@ -37,78 +35,33 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor( required_columns{std::move(required_columns_)}, data_part{owned_data_part_}, all_mark_ranges(std::move(mark_ranges_)), - part_index_in_query(part_index_in_query_), - check_columns(check_columns_) + one_range_per_task(one_range_per_task_), + check_columns(check_columns_), + total_rows(data_part->index_granularity.getRowsCountInRanges(all_mark_ranges)) { - /// Let's estimate total number of rows for progress bar. - for (const auto & range : all_mark_ranges) - total_marks_count += range.end - range.begin; - - size_t total_rows = data_part->index_granularity.getRowsCountInRanges(all_mark_ranges); - - if (!quiet) - LOG_DEBUG(log, "Reading {} ranges from part {}, approx. {} rows starting from {}", - all_mark_ranges.size(), data_part->name, total_rows, - data_part->index_granularity.getMarkStartingRow(all_mark_ranges.front().begin)); - - addTotalRowsApprox(total_rows); - ordered_names = header_without_virtual_columns.getNames(); -} - - -bool MergeTreeSelectProcessor::getNewTask() -try -{ - /// Produce no more than one task - if (!is_first_task || total_marks_count == 0) - { - finish(); - return false; - } - is_first_task = false; + /// will be used to distinguish between PREWHERE and WHERE columns when applying filter + const auto & column_names = task_columns.columns.getNames(); + column_name_set = NameSet{column_names.begin(), column_names.end()}; task_columns = getReadTaskColumns( storage, metadata_snapshot, data_part, required_columns, prewhere_info, check_columns); - auto size_predictor = (preferred_block_size_bytes == 0) - ? nullptr - : std::make_unique(data_part, ordered_names, metadata_snapshot->getSampleBlock()); + if (use_uncompressed_cache) + owned_uncompressed_cache = storage.getContext()->getUncompressedCache(); - /// will be used to distinguish between PREWHERE and WHERE columns when applying filter - const auto & column_names = task_columns.columns.getNames(); - column_name_set = NameSet{column_names.begin(), column_names.end()}; + owned_mark_cache = storage.getContext()->getMarkCache(); - task = std::make_unique( - data_part, all_mark_ranges, part_index_in_query, ordered_names, column_name_set, task_columns.columns, - task_columns.pre_columns, prewhere_info && prewhere_info->remove_prewhere_column, - task_columns.should_reorder, std::move(size_predictor)); + reader = data_part->getReader(task_columns.columns, metadata_snapshot, all_mark_ranges, + owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings); - if (!reader) - { - if (use_uncompressed_cache) - owned_uncompressed_cache = storage.getContext()->getUncompressedCache(); - - owned_mark_cache = storage.getContext()->getMarkCache(); - - reader = data_part->getReader(task_columns.columns, metadata_snapshot, all_mark_ranges, + if (prewhere_info) + pre_reader = data_part->getReader(task_columns.pre_columns, metadata_snapshot, all_mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings); - if (prewhere_info) - pre_reader = data_part->getReader(task_columns.pre_columns, metadata_snapshot, all_mark_ranges, - owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings); - } - - return true; + addTotalRowsApprox(total_rows); + ordered_names = header_without_virtual_columns.getNames(); } -catch (...) -{ - /// Suspicion of the broken part. A part is added to the queue for verification. - if (getCurrentExceptionCode() != ErrorCodes::MEMORY_LIMIT_EXCEEDED) - storage.reportBrokenPart(data_part->name); - throw; -} - void MergeTreeSelectProcessor::finish() { @@ -121,8 +74,6 @@ void MergeTreeSelectProcessor::finish() data_part.reset(); } - MergeTreeSelectProcessor::~MergeTreeSelectProcessor() = default; - } diff --git a/src/Storages/MergeTree/MergeTreeSelectProcessor.h b/src/Storages/MergeTree/MergeTreeSelectProcessor.h index 925c437f1ce..7b938a77d69 100644 --- a/src/Storages/MergeTree/MergeTreeSelectProcessor.h +++ b/src/Storages/MergeTree/MergeTreeSelectProcessor.h @@ -1,6 +1,6 @@ #pragma once #include -#include +#include #include #include #include @@ -30,21 +30,18 @@ public: bool check_columns, const MergeTreeReaderSettings & reader_settings, const Names & virt_column_names = {}, - size_t part_index_in_query = 0, - bool quiet = false); + bool one_range_per_task_ = false); ~MergeTreeSelectProcessor() override; - String getName() const override { return "MergeTree"; } + String getName() const override = 0; /// Closes readers and unlock part locks void finish(); protected: - bool getNewTask() override; - -private: + bool getNewTask() override = 0; /// Used by Task Names required_columns; @@ -59,15 +56,14 @@ private: /// Mark ranges we should read (in ascending order) MarkRanges all_mark_ranges; - /// Total number of marks we should read - size_t total_marks_count = 0; /// Value of _part_index virtual column (used only in SelectExecutor) size_t part_index_in_query = 0; + /// If true, every task will be created only with one range. + /// It reduces amount of read data for queries with small LIMIT. + bool one_range_per_task = false; bool check_columns; - bool is_first_task = true; - - Poco::Logger * log = &Poco::Logger::get("MergeTreeSelectProcessor"); + size_t total_rows; }; } diff --git a/tests/queries/0_stateless/01926_order_by_desc_limit.reference b/tests/queries/0_stateless/01926_order_by_desc_limit.reference new file mode 100644 index 00000000000..6ed281c757a --- /dev/null +++ b/tests/queries/0_stateless/01926_order_by_desc_limit.reference @@ -0,0 +1,2 @@ +1 +1 diff --git a/tests/queries/0_stateless/01926_order_by_desc_limit.sql b/tests/queries/0_stateless/01926_order_by_desc_limit.sql new file mode 100644 index 00000000000..7ea102e11e9 --- /dev/null +++ b/tests/queries/0_stateless/01926_order_by_desc_limit.sql @@ -0,0 +1,21 @@ +DROP TABLE IF EXISTS order_by_desc; + +CREATE TABLE order_by_desc (u UInt32, s String) +ENGINE MergeTree ORDER BY u PARTITION BY u % 100 +SETTINGS index_granularity = 1024; + +INSERT INTO order_by_desc SELECT number, repeat('a', 1024) FROM numbers(1024 * 300); +OPTIMIZE TABLE order_by_desc FINAL; + +SELECT s FROM order_by_desc ORDER BY u DESC LIMIT 10 FORMAT Null +SETTINGS max_memory_usage = '400M'; + +SELECT s FROM order_by_desc ORDER BY u LIMIT 10 FORMAT Null +SETTINGS max_memory_usage = '400M'; + +SYSTEM FLUSH LOGS; + +SELECT read_rows < 110000 FROM system.query_log +WHERE type = 'QueryFinish' AND current_database = currentDatabase() +AND event_time > now() - INTERVAL 10 SECOND +AND lower(query) LIKE lower('SELECT s FROM order_by_desc ORDER BY u%'); From fafae3ab8d05d63adf9fc169707723d52d14b983 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Fri, 25 Jun 2021 19:58:25 +0300 Subject: [PATCH 02/11] fix tests --- src/Storages/MergeTree/MergeTreeSelectProcessor.cpp | 5 ----- .../01551_mergetree_read_in_order_spread.reference | 6 +++--- tests/queries/0_stateless/01861_explain_pipeline.reference | 4 ++-- 3 files changed, 5 insertions(+), 10 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp index 4bb4799f396..f849bfff1e9 100644 --- a/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp @@ -7,11 +7,6 @@ namespace DB { -namespace ErrorCodes -{ - extern const int MEMORY_LIMIT_EXCEEDED; -} - MergeTreeSelectProcessor::MergeTreeSelectProcessor( const MergeTreeData & storage_, const StorageMetadataPtr & metadata_snapshot_, diff --git a/tests/queries/0_stateless/01551_mergetree_read_in_order_spread.reference b/tests/queries/0_stateless/01551_mergetree_read_in_order_spread.reference index 2843b305f0a..cdc595a3c57 100644 --- a/tests/queries/0_stateless/01551_mergetree_read_in_order_spread.reference +++ b/tests/queries/0_stateless/01551_mergetree_read_in_order_spread.reference @@ -9,9 +9,9 @@ ExpressionTransform (SettingQuotaAndLimits) (ReadFromMergeTree) ExpressionTransform × 4 - MergeTree 0 → 1 + MergeTreeInOrder 0 → 1 MergingSortedTransform 2 → 1 ExpressionTransform × 2 - MergeTree × 2 0 → 1 + MergeTreeInOrder × 2 0 → 1 ExpressionTransform - MergeTree 0 → 1 + MergeTreeInOrder 0 → 1 diff --git a/tests/queries/0_stateless/01861_explain_pipeline.reference b/tests/queries/0_stateless/01861_explain_pipeline.reference index 9d62fb9f6b8..63ba55f5a04 100644 --- a/tests/queries/0_stateless/01861_explain_pipeline.reference +++ b/tests/queries/0_stateless/01861_explain_pipeline.reference @@ -5,7 +5,7 @@ ExpressionTransform ExpressionTransform ReplacingSorted 2 → 1 ExpressionTransform × 2 - MergeTree × 2 0 → 1 + MergeTreeInOrder × 2 0 → 1 0 0 1 1 2 2 @@ -22,4 +22,4 @@ ExpressionTransform × 2 Copy × 2 1 → 2 AddingSelector × 2 ExpressionTransform × 2 - MergeTree × 2 0 → 1 + MergeTreeInOrder × 2 0 → 1 From 2828b4ed73eb196c1b5210065e46e7a1dc5a4eeb Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Sun, 27 Jun 2021 17:43:20 +0300 Subject: [PATCH 03/11] fix tests --- src/Storages/ya.make | 1 + tests/queries/skip_list.json | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/Storages/ya.make b/src/Storages/ya.make index d8692524210..61369177ee7 100644 --- a/src/Storages/ya.make +++ b/src/Storages/ya.make @@ -58,6 +58,7 @@ SRCS( MergeTree/MergeTreeDataSelectExecutor.cpp MergeTree/MergeTreeDataWriter.cpp MergeTree/MergeTreeDeduplicationLog.cpp + MergeTree/MergeTreeInOrderSelectProcessor.cpp MergeTree/MergeTreeIndexAggregatorBloomFilter.cpp MergeTree/MergeTreeIndexBloomFilter.cpp MergeTree/MergeTreeIndexConditionBloomFilter.cpp diff --git a/tests/queries/skip_list.json b/tests/queries/skip_list.json index 52c2d468498..fb69464bf23 100644 --- a/tests/queries/skip_list.json +++ b/tests/queries/skip_list.json @@ -516,7 +516,8 @@ "01913_if_int_decimal", "01913_join_push_down_bug", "01921_with_fill_with_totals", - "01924_argmax_bitmap_state" + "01924_argmax_bitmap_state", + "01926_order_by_desc_limit" ], "parallel": [ From cb042afa9da982c59fd5192e8b3beb0f722dc60c Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Tue, 13 Jul 2021 17:24:45 +0300 Subject: [PATCH 04/11] pushdown limit while reading in order of primary key --- src/Interpreters/InterpreterSelectQuery.cpp | 6 ++++-- .../Merges/AggregatingSortedTransform.h | 2 +- .../Merges/CollapsingSortedTransform.h | 2 +- .../FinishAggregatingInOrderTransform.h | 2 +- .../Merges/GraphiteRollupSortedTransform.h | 2 +- src/Processors/Merges/IMergingTransform.cpp | 6 +++--- src/Processors/Merges/IMergingTransform.h | 8 ++++---- .../Merges/MergingSortedTransform.cpp | 4 ++-- .../Merges/MergingSortedTransform.h | 2 +- .../Merges/ReplacingSortedTransform.h | 2 +- .../Merges/SummingSortedTransform.h | 2 +- .../Merges/VersionedCollapsingTransform.h | 2 +- .../QueryPlan/FinishSortingStep.cpp | 4 ++-- .../QueryPlan/ReadFromMergeTree.cpp | 19 ++++++++++--------- src/Processors/QueryPlan/ReadFromMergeTree.h | 4 ++-- .../MergeTreeInOrderSelectProcessor.cpp | 8 ++++---- .../MergeTreeInOrderSelectProcessor.h | 2 +- .../MergeTreeReverseSelectProcessor.cpp | 4 ++-- .../MergeTreeReverseSelectProcessor.h | 2 +- .../MergeTree/MergeTreeSelectProcessor.cpp | 4 ++-- .../MergeTree/MergeTreeSelectProcessor.h | 4 ++-- src/Storages/ReadInOrderOptimizer.cpp | 5 +++-- src/Storages/ReadInOrderOptimizer.h | 2 +- src/Storages/SelectQueryInfo.h | 5 +++-- 24 files changed, 54 insertions(+), 49 deletions(-) diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 22314b0aab6..a337c4ab74d 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -1926,11 +1926,13 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc } } + /// If we don't have filtration, we can pushdown limit to reading stage for optimizations. + UInt64 limit = (query.where() && query.prewhere()) ? getLimitForSorting(query, context) : 0; if (query_info.projection) query_info.projection->input_order_info - = query_info.projection->order_optimizer->getInputOrder(query_info.projection->desc->metadata, context); + = query_info.projection->order_optimizer->getInputOrder(query_info.projection->desc->metadata, context, limit); else - query_info.input_order_info = query_info.order_optimizer->getInputOrder(metadata_snapshot, context); + query_info.input_order_info = query_info.order_optimizer->getInputOrder(metadata_snapshot, context, limit); } StreamLocalLimits limits; diff --git a/src/Processors/Merges/AggregatingSortedTransform.h b/src/Processors/Merges/AggregatingSortedTransform.h index b4a7d4c8106..e8bf90c2b31 100644 --- a/src/Processors/Merges/AggregatingSortedTransform.h +++ b/src/Processors/Merges/AggregatingSortedTransform.h @@ -16,7 +16,7 @@ public: const Block & header, size_t num_inputs, SortDescription description_, size_t max_block_size) : IMergingTransform( - num_inputs, header, header, true, false, + num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false, header, num_inputs, std::move(description_), diff --git a/src/Processors/Merges/CollapsingSortedTransform.h b/src/Processors/Merges/CollapsingSortedTransform.h index 059e90c1de8..87c466f31e8 100644 --- a/src/Processors/Merges/CollapsingSortedTransform.h +++ b/src/Processors/Merges/CollapsingSortedTransform.h @@ -20,7 +20,7 @@ public: WriteBuffer * out_row_sources_buf_ = nullptr, bool use_average_block_sizes = false) : IMergingTransform( - num_inputs, header, header, true, false, + num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false, header, num_inputs, std::move(description_), diff --git a/src/Processors/Merges/FinishAggregatingInOrderTransform.h b/src/Processors/Merges/FinishAggregatingInOrderTransform.h index f3bcc9dd878..6d5e334311f 100644 --- a/src/Processors/Merges/FinishAggregatingInOrderTransform.h +++ b/src/Processors/Merges/FinishAggregatingInOrderTransform.h @@ -19,7 +19,7 @@ public: SortDescription description, size_t max_block_size) : IMergingTransform( - num_inputs, header, header, true, false, + num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false, header, num_inputs, params, diff --git a/src/Processors/Merges/GraphiteRollupSortedTransform.h b/src/Processors/Merges/GraphiteRollupSortedTransform.h index 9432482fe16..46272f00eed 100644 --- a/src/Processors/Merges/GraphiteRollupSortedTransform.h +++ b/src/Processors/Merges/GraphiteRollupSortedTransform.h @@ -15,7 +15,7 @@ public: SortDescription description_, size_t max_block_size, Graphite::Params params_, time_t time_of_merge_) : IMergingTransform( - num_inputs, header, header, true, false, + num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false, header, num_inputs, std::move(description_), diff --git a/src/Processors/Merges/IMergingTransform.cpp b/src/Processors/Merges/IMergingTransform.cpp index 8be21e3e291..cba78390c97 100644 --- a/src/Processors/Merges/IMergingTransform.cpp +++ b/src/Processors/Merges/IMergingTransform.cpp @@ -15,10 +15,10 @@ IMergingTransformBase::IMergingTransformBase( const Block & input_header, const Block & output_header, bool have_all_inputs_, - bool expected_one_block_) + bool has_limit_below_one_block_) : IProcessor(InputPorts(num_inputs, input_header), {output_header}) , have_all_inputs(have_all_inputs_) - , expected_one_block(expected_one_block_) + , has_limit_below_one_block(has_limit_below_one_block_) { } @@ -79,7 +79,7 @@ IProcessor::Status IMergingTransformBase::prepareInitializeInputs() /// setNotNeeded after reading first chunk, because in optimismtic case /// (e.g. with optimized 'ORDER BY primary_key LIMIT n' and small 'n') /// we won't have to read any chunks anymore; - auto chunk = input.pull(expected_one_block); + auto chunk = input.pull(has_limit_below_one_block); if (!chunk.hasRows()) { if (!input.isFinished()) diff --git a/src/Processors/Merges/IMergingTransform.h b/src/Processors/Merges/IMergingTransform.h index 695aa4eb896..8b0a44ae025 100644 --- a/src/Processors/Merges/IMergingTransform.h +++ b/src/Processors/Merges/IMergingTransform.h @@ -17,7 +17,7 @@ public: const Block & input_header, const Block & output_header, bool have_all_inputs_, - bool expected_one_block_); + bool has_limit_below_one_block_); OutputPort & getOutputPort() { return outputs.front(); } @@ -67,7 +67,7 @@ private: std::vector input_states; std::atomic have_all_inputs; bool is_initialized = false; - bool expected_one_block = false; + bool has_limit_below_one_block = false; IProcessor::Status prepareInitializeInputs(); }; @@ -83,9 +83,9 @@ public: const Block & input_header, const Block & output_header, bool have_all_inputs_, - bool expected_one_block_, + bool has_limit_below_one_block_, Args && ... args) - : IMergingTransformBase(num_inputs, input_header, output_header, have_all_inputs_, expected_one_block_) + : IMergingTransformBase(num_inputs, input_header, output_header, have_all_inputs_, has_limit_below_one_block_) , algorithm(std::forward(args) ...) { } diff --git a/src/Processors/Merges/MergingSortedTransform.cpp b/src/Processors/Merges/MergingSortedTransform.cpp index f185ec6ad8e..92fafa4242c 100644 --- a/src/Processors/Merges/MergingSortedTransform.cpp +++ b/src/Processors/Merges/MergingSortedTransform.cpp @@ -13,13 +13,13 @@ MergingSortedTransform::MergingSortedTransform( SortDescription description_, size_t max_block_size, UInt64 limit_, - bool expected_one_block_, + bool has_limit_below_one_block_, WriteBuffer * out_row_sources_buf_, bool quiet_, bool use_average_block_sizes, bool have_all_inputs_) : IMergingTransform( - num_inputs, header, header, have_all_inputs_, expected_one_block_, + num_inputs, header, header, have_all_inputs_, has_limit_below_one_block_, header, num_inputs, std::move(description_), diff --git a/src/Processors/Merges/MergingSortedTransform.h b/src/Processors/Merges/MergingSortedTransform.h index 0e616eb2c78..1fa9b1275bd 100644 --- a/src/Processors/Merges/MergingSortedTransform.h +++ b/src/Processors/Merges/MergingSortedTransform.h @@ -17,7 +17,7 @@ public: SortDescription description, size_t max_block_size, UInt64 limit_ = 0, - bool expected_one_block_ = false, + bool has_limit_below_one_block_ = false, WriteBuffer * out_row_sources_buf_ = nullptr, bool quiet_ = false, bool use_average_block_sizes = false, diff --git a/src/Processors/Merges/ReplacingSortedTransform.h b/src/Processors/Merges/ReplacingSortedTransform.h index c1df68f3a76..e760cdf0d2b 100644 --- a/src/Processors/Merges/ReplacingSortedTransform.h +++ b/src/Processors/Merges/ReplacingSortedTransform.h @@ -18,7 +18,7 @@ public: WriteBuffer * out_row_sources_buf_ = nullptr, bool use_average_block_sizes = false) : IMergingTransform( - num_inputs, header, header, true, false, + num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false, header, num_inputs, std::move(description_), diff --git a/src/Processors/Merges/SummingSortedTransform.h b/src/Processors/Merges/SummingSortedTransform.h index 730d6604950..0287caed5aa 100644 --- a/src/Processors/Merges/SummingSortedTransform.h +++ b/src/Processors/Merges/SummingSortedTransform.h @@ -19,7 +19,7 @@ public: const Names & partition_key_columns, size_t max_block_size) : IMergingTransform( - num_inputs, header, header, true, false, + num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false, header, num_inputs, std::move(description_), diff --git a/src/Processors/Merges/VersionedCollapsingTransform.h b/src/Processors/Merges/VersionedCollapsingTransform.h index 6251f842898..f260e20f1da 100644 --- a/src/Processors/Merges/VersionedCollapsingTransform.h +++ b/src/Processors/Merges/VersionedCollapsingTransform.h @@ -19,7 +19,7 @@ public: WriteBuffer * out_row_sources_buf_ = nullptr, bool use_average_block_sizes = false) : IMergingTransform( - num_inputs, header, header, true, false, + num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false, header, num_inputs, std::move(description_), diff --git a/src/Processors/QueryPlan/FinishSortingStep.cpp b/src/Processors/QueryPlan/FinishSortingStep.cpp index 6f21865592b..d3eac99c8ac 100644 --- a/src/Processors/QueryPlan/FinishSortingStep.cpp +++ b/src/Processors/QueryPlan/FinishSortingStep.cpp @@ -58,14 +58,14 @@ void FinishSortingStep::transformPipeline(QueryPipeline & pipeline, const BuildQ if (pipeline.getNumStreams() > 1) { UInt64 limit_for_merging = (need_finish_sorting ? 0 : limit); - bool expected_one_block = limit_for_merging && limit_for_merging < max_block_size; + bool has_limit_below_one_block = limit_for_merging && limit_for_merging < max_block_size; auto transform = std::make_shared( pipeline.getHeader(), pipeline.getNumStreams(), prefix_description, max_block_size, limit_for_merging, - expected_one_block); + has_limit_below_one_block); pipeline.addTransform(std::move(transform)); } diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 8930adf662a..3f048b6c355 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -177,30 +177,31 @@ ProcessorPtr ReadFromMergeTree::createSource( const RangesInDataPart & part, const Names & required_columns, bool use_uncompressed_cache, - bool one_range_per_task) + bool has_limit_below_one_block) { return std::make_shared( data, metadata_snapshot, part.data_part, max_block_size, preferred_block_size_bytes, preferred_max_column_in_block_size_bytes, required_columns, part.ranges, use_uncompressed_cache, - prewhere_info, actions_settings, true, reader_settings, virt_column_names, part.part_index_in_query, one_range_per_task); + prewhere_info, actions_settings, true, reader_settings, virt_column_names, part.part_index_in_query, has_limit_below_one_block); } Pipe ReadFromMergeTree::readInOrder( RangesInDataParts parts_with_range, Names required_columns, ReadType read_type, - bool use_uncompressed_cache) + bool use_uncompressed_cache, + UInt64 limit) { Pipes pipes; /// For reading in order it makes sense to read only /// one range per task to reduce number of read rows. - bool one_range_per_task = read_type != ReadType::Default; + bool has_limit_below_one_block = read_type != ReadType::Default && limit && limit < max_block_size; for (const auto & part : parts_with_range) { auto source = read_type == ReadType::InReverseOrder - ? createSource(part, required_columns, use_uncompressed_cache, one_range_per_task) - : createSource(part, required_columns, use_uncompressed_cache, one_range_per_task); + ? createSource(part, required_columns, use_uncompressed_cache, has_limit_below_one_block) + : createSource(part, required_columns, use_uncompressed_cache, has_limit_below_one_block); pipes.emplace_back(std::move(source)); } @@ -226,7 +227,7 @@ Pipe ReadFromMergeTree::read( return readFromPool(parts_with_range, required_columns, max_streams, min_marks_for_concurrent_read, use_uncompressed_cache); - auto pipe = readInOrder(parts_with_range, required_columns, read_type, use_uncompressed_cache); + auto pipe = readInOrder(parts_with_range, required_columns, read_type, use_uncompressed_cache, 0); /// Use ConcatProcessor to concat sources together. /// It is needed to read in parts order (and so in PK order) if single thread is used. @@ -460,8 +461,8 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder( ? ReadFromMergeTree::ReadType::InOrder : ReadFromMergeTree::ReadType::InReverseOrder; - pipes.emplace_back(read(std::move(new_parts), column_names, read_type, - requested_num_streams, info.min_marks_for_concurrent_read, info.use_uncompressed_cache)); + pipes.emplace_back(readInOrder(std::move(new_parts), column_names, read_type, + info.use_uncompressed_cache, input_order_info->limit)); } if (need_preliminary_merge) diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.h b/src/Processors/QueryPlan/ReadFromMergeTree.h index 12d5959bf29..ecba08bea29 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.h +++ b/src/Processors/QueryPlan/ReadFromMergeTree.h @@ -109,10 +109,10 @@ private: Pipe read(RangesInDataParts parts_with_range, Names required_columns, ReadType read_type, size_t max_streams, size_t min_marks_for_concurrent_read, bool use_uncompressed_cache); Pipe readFromPool(RangesInDataParts parts_with_ranges, Names required_columns, size_t max_streams, size_t min_marks_for_concurrent_read, bool use_uncompressed_cache); - Pipe readInOrder(RangesInDataParts parts_with_range, Names required_columns, ReadType read_type, bool use_uncompressed_cache); + Pipe readInOrder(RangesInDataParts parts_with_range, Names required_columns, ReadType read_type, bool use_uncompressed_cache, UInt64 limit); template - ProcessorPtr createSource(const RangesInDataPart & part, const Names & required_columns, bool use_uncompressed_cache, bool one_range_per_task); + ProcessorPtr createSource(const RangesInDataPart & part, const Names & required_columns, bool use_uncompressed_cache, bool has_limit_below_one_block); Pipe spreadMarkRangesAmongStreams( RangesInDataParts && parts_with_ranges, diff --git a/src/Storages/MergeTree/MergeTreeInOrderSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeInOrderSelectProcessor.cpp index 0ec60568836..7a6bc747411 100644 --- a/src/Storages/MergeTree/MergeTreeInOrderSelectProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeInOrderSelectProcessor.cpp @@ -23,13 +23,13 @@ MergeTreeInOrderSelectProcessor::MergeTreeInOrderSelectProcessor( bool check_columns_, const MergeTreeReaderSettings & reader_settings_, const Names & virt_column_names_, - bool one_range_per_task_, + bool has_limit_below_one_block_, bool quiet) : MergeTreeSelectProcessor{ storage_, metadata_snapshot_, owned_data_part_, max_block_size_rows_, preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, required_columns_, std::move(mark_ranges_), use_uncompressed_cache_, prewhere_info_, - std::move(actions_settings), check_columns_, reader_settings_, virt_column_names_, one_range_per_task_} + std::move(actions_settings), check_columns_, reader_settings_, virt_column_names_, has_limit_below_one_block_} { if (!quiet) LOG_DEBUG(log, "Reading {} ranges in order from part {}, approx. {} rows starting from {}", @@ -40,7 +40,6 @@ MergeTreeInOrderSelectProcessor::MergeTreeInOrderSelectProcessor( bool MergeTreeInOrderSelectProcessor::getNewTask() try { - /// Produce no more than one task if (all_mark_ranges.empty()) { finish(); @@ -52,7 +51,8 @@ try : std::make_unique(data_part, ordered_names, metadata_snapshot->getSampleBlock()); MarkRanges mark_ranges_for_task; - if (one_range_per_task) + /// If we need to read few rows, set one range per task to reduce number of read data. + if (has_limit_below_one_block) { mark_ranges_for_task = { std::move(all_mark_ranges.front()) }; all_mark_ranges.pop_front(); diff --git a/src/Storages/MergeTree/MergeTreeInOrderSelectProcessor.h b/src/Storages/MergeTree/MergeTreeInOrderSelectProcessor.h index d1bc1bfe4c4..f0471c27c0f 100644 --- a/src/Storages/MergeTree/MergeTreeInOrderSelectProcessor.h +++ b/src/Storages/MergeTree/MergeTreeInOrderSelectProcessor.h @@ -26,7 +26,7 @@ public: bool check_columns, const MergeTreeReaderSettings & reader_settings, const Names & virt_column_names = {}, - bool one_range_per_task_ = false, + bool has_limit_below_one_block_ = false, bool quiet = false); String getName() const override { return "MergeTreeInOrder"; } diff --git a/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.cpp index d93650e8a2c..47e7b69ae39 100644 --- a/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.cpp @@ -23,13 +23,13 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor( bool check_columns_, const MergeTreeReaderSettings & reader_settings_, const Names & virt_column_names_, - bool one_range_per_task_, + bool has_limit_below_one_block_, bool quiet) : MergeTreeSelectProcessor{ storage_, metadata_snapshot_, owned_data_part_, max_block_size_rows_, preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, required_columns_, std::move(mark_ranges_), use_uncompressed_cache_, prewhere_info_, - std::move(actions_settings), check_columns_, reader_settings_, virt_column_names_, one_range_per_task_} + std::move(actions_settings), check_columns_, reader_settings_, virt_column_names_, has_limit_below_one_block_} { if (!quiet) LOG_DEBUG(log, "Reading {} ranges in reverse order from part {}, approx. {} rows starting from {}", diff --git a/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.h b/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.h index 33fe51c1bb6..22588727211 100644 --- a/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.h +++ b/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.h @@ -27,7 +27,7 @@ public: bool check_columns, const MergeTreeReaderSettings & reader_settings, const Names & virt_column_names = {}, - bool one_range_per_task_ = false, + bool has_limit_below_one_block_ = false, bool quiet = false); String getName() const override { return "MergeTreeReverse"; } diff --git a/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp index 3351e64c5b0..a8544110374 100644 --- a/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp @@ -22,7 +22,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor( bool check_columns_, const MergeTreeReaderSettings & reader_settings_, const Names & virt_column_names_, - bool one_range_per_task_) + bool has_limit_below_one_block_) : MergeTreeBaseSelectProcessor{ metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID()), storage_, metadata_snapshot_, prewhere_info_, std::move(actions_settings), max_block_size_rows_, @@ -31,7 +31,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor( required_columns{std::move(required_columns_)}, data_part{owned_data_part_}, all_mark_ranges(std::move(mark_ranges_)), - one_range_per_task(one_range_per_task_), + has_limit_below_one_block(has_limit_below_one_block_), check_columns(check_columns_), total_rows(data_part->index_granularity.getRowsCountInRanges(all_mark_ranges)) { diff --git a/src/Storages/MergeTree/MergeTreeSelectProcessor.h b/src/Storages/MergeTree/MergeTreeSelectProcessor.h index c172a8fdb14..a9a464041ff 100644 --- a/src/Storages/MergeTree/MergeTreeSelectProcessor.h +++ b/src/Storages/MergeTree/MergeTreeSelectProcessor.h @@ -31,7 +31,7 @@ public: bool check_columns, const MergeTreeReaderSettings & reader_settings, const Names & virt_column_names = {}, - bool one_range_per_task_ = false); + bool has_limit_below_one_block_ = false); ~MergeTreeSelectProcessor() override; @@ -61,7 +61,7 @@ protected: size_t part_index_in_query = 0; /// If true, every task will be created only with one range. /// It reduces amount of read data for queries with small LIMIT. - bool one_range_per_task = false; + bool has_limit_below_one_block = false; bool check_columns; size_t total_rows; diff --git a/src/Storages/ReadInOrderOptimizer.cpp b/src/Storages/ReadInOrderOptimizer.cpp index 87273330b34..912d284bfc0 100644 --- a/src/Storages/ReadInOrderOptimizer.cpp +++ b/src/Storages/ReadInOrderOptimizer.cpp @@ -37,7 +37,7 @@ ReadInOrderOptimizer::ReadInOrderOptimizer( array_join_result_to_source = syntax_result->array_join_result_to_source; } -InputOrderInfoPtr ReadInOrderOptimizer::getInputOrder(const StorageMetadataPtr & metadata_snapshot, ContextPtr context) const +InputOrderInfoPtr ReadInOrderOptimizer::getInputOrder(const StorageMetadataPtr & metadata_snapshot, ContextPtr context, UInt64 limit) const { Names sorting_key_columns = metadata_snapshot->getSortingKeyColumns(); if (!metadata_snapshot->hasSortingKey()) @@ -155,7 +155,8 @@ InputOrderInfoPtr ReadInOrderOptimizer::getInputOrder(const StorageMetadataPtr & if (order_key_prefix_descr.empty()) return {}; - return std::make_shared(std::move(order_key_prefix_descr), read_direction); + + return std::make_shared(std::move(order_key_prefix_descr), read_direction, limit); } } diff --git a/src/Storages/ReadInOrderOptimizer.h b/src/Storages/ReadInOrderOptimizer.h index 0abf2923a98..2686d081855 100644 --- a/src/Storages/ReadInOrderOptimizer.h +++ b/src/Storages/ReadInOrderOptimizer.h @@ -22,7 +22,7 @@ public: const SortDescription & required_sort_description, const TreeRewriterResultPtr & syntax_result); - InputOrderInfoPtr getInputOrder(const StorageMetadataPtr & metadata_snapshot, ContextPtr context) const; + InputOrderInfoPtr getInputOrder(const StorageMetadataPtr & metadata_snapshot, ContextPtr context, UInt64 limit = 0) const; private: /// Actions for every element of order expression to analyze functions for monotonicity diff --git a/src/Storages/SelectQueryInfo.h b/src/Storages/SelectQueryInfo.h index cf2c4d72f59..3b3c0fa1258 100644 --- a/src/Storages/SelectQueryInfo.h +++ b/src/Storages/SelectQueryInfo.h @@ -83,9 +83,10 @@ struct InputOrderInfo { SortDescription order_key_prefix_descr; int direction; + UInt64 limit; - InputOrderInfo(const SortDescription & order_key_prefix_descr_, int direction_) - : order_key_prefix_descr(order_key_prefix_descr_), direction(direction_) {} + InputOrderInfo(const SortDescription & order_key_prefix_descr_, int direction_, UInt64 limit_) + : order_key_prefix_descr(order_key_prefix_descr_), direction(direction_), limit(limit_) {} bool operator ==(const InputOrderInfo & other) const { From 03c785931adbc60e647463673ee837183d0c9517 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Fri, 16 Jul 2021 17:27:38 +0300 Subject: [PATCH 05/11] fix pushdown of limit to reading stage --- src/Interpreters/InterpreterSelectQuery.cpp | 2 +- src/Processors/QueryPlan/ReadFromMergeTree.cpp | 4 ++-- src/Storages/MergeTree/MergeTreeSelectProcessor.cpp | 2 ++ src/Storages/MergeTree/MergeTreeSelectProcessor.h | 1 + 4 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 14b977f6864..0750dd6e27c 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -1925,7 +1925,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc } /// If we don't have filtration, we can pushdown limit to reading stage for optimizations. - UInt64 limit = (query.where() && query.prewhere()) ? getLimitForSorting(query, context) : 0; + UInt64 limit = (query.where() || query.prewhere()) ? 0 : getLimitForSorting(query, context); if (query_info.projection) query_info.projection->input_order_info = query_info.projection->order_optimizer->getInputOrder(query_info.projection->desc->metadata, context, limit); diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 612b3d5da3e..6abf37eb008 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -181,8 +181,8 @@ ProcessorPtr ReadFromMergeTree::createSource( { return std::make_shared( data, metadata_snapshot, part.data_part, max_block_size, preferred_block_size_bytes, - preferred_max_column_in_block_size_bytes, required_columns, part.ranges, use_uncompressed_cache, - prewhere_info, actions_settings, true, reader_settings, virt_column_names, has_limit_below_one_block); + preferred_max_column_in_block_size_bytes, required_columns, part.ranges, use_uncompressed_cache, prewhere_info, + actions_settings, true, reader_settings, virt_column_names, part.part_index_in_query, has_limit_below_one_block); } Pipe ReadFromMergeTree::readInOrder( diff --git a/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp index 638e04820f3..440b47b12cf 100644 --- a/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp @@ -22,6 +22,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor( bool check_columns, const MergeTreeReaderSettings & reader_settings_, const Names & virt_column_names_, + size_t part_index_in_query_, bool has_limit_below_one_block_) : MergeTreeBaseSelectProcessor{ metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID()), @@ -31,6 +32,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor( required_columns{std::move(required_columns_)}, data_part{owned_data_part_}, all_mark_ranges(std::move(mark_ranges_)), + part_index_in_query(part_index_in_query_), has_limit_below_one_block(has_limit_below_one_block_), total_rows(data_part->index_granularity.getRowsCountInRanges(all_mark_ranges)) { diff --git a/src/Storages/MergeTree/MergeTreeSelectProcessor.h b/src/Storages/MergeTree/MergeTreeSelectProcessor.h index 83f1a0dab94..621a3bd740c 100644 --- a/src/Storages/MergeTree/MergeTreeSelectProcessor.h +++ b/src/Storages/MergeTree/MergeTreeSelectProcessor.h @@ -31,6 +31,7 @@ public: bool check_columns, const MergeTreeReaderSettings & reader_settings, const Names & virt_column_names = {}, + size_t part_index_in_query_ = 0, bool has_limit_below_one_block_ = false); ~MergeTreeSelectProcessor() override; From 048e089a589df66a1713a9ffcea78cb4f3748f42 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Fri, 16 Jul 2021 17:47:03 +0300 Subject: [PATCH 06/11] remove useless code --- src/Storages/MergeTree/MergeTreeSelectProcessor.h | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeSelectProcessor.h b/src/Storages/MergeTree/MergeTreeSelectProcessor.h index 621a3bd740c..0d83823de42 100644 --- a/src/Storages/MergeTree/MergeTreeSelectProcessor.h +++ b/src/Storages/MergeTree/MergeTreeSelectProcessor.h @@ -36,15 +36,10 @@ public: ~MergeTreeSelectProcessor() override; - String getName() const override = 0; - /// Closes readers and unlock part locks void finish(); protected: - - bool getNewTask() override = 0; - /// Used by Task Names required_columns; /// Names from header. Used in order to order columns in read blocks. From f61ea15d56e0624accec14f7f1e966ad3e1e3ce0 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Fri, 16 Jul 2021 20:40:00 +0300 Subject: [PATCH 07/11] fix pushdown of limit to reading stage --- src/Interpreters/InterpreterSelectQuery.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 0750dd6e27c..95a8da4216c 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -1925,7 +1925,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc } /// If we don't have filtration, we can pushdown limit to reading stage for optimizations. - UInt64 limit = (query.where() || query.prewhere()) ? 0 : getLimitForSorting(query, context); + UInt64 limit = (query.where() || query.prewhere() || query.groupBy()) ? 0 : getLimitForSorting(query, context); if (query_info.projection) query_info.projection->input_order_info = query_info.projection->order_optimizer->getInputOrder(query_info.projection->desc->metadata, context, limit); From fc9d72e75dd4176b73e712f7b1db47ca3a371d20 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Mon, 2 Aug 2021 15:03:55 +0300 Subject: [PATCH 08/11] fix performance of short queries with large number of columns --- src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp | 4 ++-- src/Storages/MergeTree/MergeTreeBaseSelectProcessor.h | 2 +- src/Storages/MergeTree/MergeTreeInOrderSelectProcessor.cpp | 2 +- src/Storages/MergeTree/MergeTreeReadPool.cpp | 2 +- src/Storages/MergeTree/MergeTreeReverseSelectProcessor.cpp | 2 +- src/Storages/MergeTree/MergeTreeSelectProcessor.cpp | 1 + src/Storages/MergeTree/MergeTreeSelectProcessor.h | 3 +++ 7 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp index 0ebf5ec33d7..c91d60c5de7 100644 --- a/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp @@ -468,7 +468,7 @@ Block MergeTreeBaseSelectProcessor::transformHeader( std::unique_ptr MergeTreeBaseSelectProcessor::getSizePredictor( const MergeTreeData::DataPartPtr & data_part, const MergeTreeReadTaskColumns & task_columns, - const StorageMetadataPtr & metadata_snapshot) + const Block & sample_block) { const auto & required_column_names = task_columns.columns.getNames(); const auto & required_pre_column_names = task_columns.pre_columns.getNames(); @@ -476,7 +476,7 @@ std::unique_ptr MergeTreeBaseSelectProcessor::getSi complete_column_names.insert(required_pre_column_names.begin(), required_pre_column_names.end()); return std::make_unique( - data_part, Names(complete_column_names.begin(), complete_column_names.end()), metadata_snapshot->getSampleBlock()); + data_part, Names(complete_column_names.begin(), complete_column_names.end()), sample_block); } MergeTreeBaseSelectProcessor::~MergeTreeBaseSelectProcessor() = default; diff --git a/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.h b/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.h index 3f6a841f780..d102e4f07a4 100644 --- a/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.h +++ b/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.h @@ -40,7 +40,7 @@ public: static std::unique_ptr getSizePredictor( const MergeTreeData::DataPartPtr & data_part, const MergeTreeReadTaskColumns & task_columns, - const StorageMetadataPtr & metadata_snapshot); + const Block & sample_block); protected: Chunk generate() final; diff --git a/src/Storages/MergeTree/MergeTreeInOrderSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeInOrderSelectProcessor.cpp index b56430003b1..70fea1414e6 100644 --- a/src/Storages/MergeTree/MergeTreeInOrderSelectProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeInOrderSelectProcessor.cpp @@ -18,7 +18,7 @@ try } auto size_predictor = (preferred_block_size_bytes == 0) ? nullptr - : getSizePredictor(data_part, task_columns, metadata_snapshot); + : getSizePredictor(data_part, task_columns, sample_block); MarkRanges mark_ranges_for_task; /// If we need to read few rows, set one range per task to reduce number of read data. diff --git a/src/Storages/MergeTree/MergeTreeReadPool.cpp b/src/Storages/MergeTree/MergeTreeReadPool.cpp index 30a6edf9f92..b18f22c3ab1 100644 --- a/src/Storages/MergeTree/MergeTreeReadPool.cpp +++ b/src/Storages/MergeTree/MergeTreeReadPool.cpp @@ -231,7 +231,7 @@ std::vector MergeTreeReadPool::fillPerPartInfo( auto task_columns = getReadTaskColumns(data, metadata_snapshot, part.data_part, column_names, prewhere_info, check_columns); auto size_predictor = !predict_block_size_bytes ? nullptr - : MergeTreeBaseSelectProcessor::getSizePredictor(part.data_part, task_columns, metadata_snapshot); + : MergeTreeBaseSelectProcessor::getSizePredictor(part.data_part, task_columns, sample_block); per_part_size_predictor.emplace_back(std::move(size_predictor)); diff --git a/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.cpp index 7ab71bedb4a..5edf322cd6a 100644 --- a/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.cpp @@ -27,7 +27,7 @@ try all_mark_ranges.pop_back(); auto size_predictor = (preferred_block_size_bytes == 0) ? nullptr - : getSizePredictor(data_part, task_columns, metadata_snapshot); + : getSizePredictor(data_part, task_columns, sample_block); task = std::make_unique( data_part, mark_ranges_for_task, part_index_in_query, ordered_names, column_name_set, diff --git a/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp index 440b47b12cf..63f3fc639e1 100644 --- a/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp @@ -31,6 +31,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor( reader_settings_, use_uncompressed_cache_, virt_column_names_}, required_columns{std::move(required_columns_)}, data_part{owned_data_part_}, + sample_block(metadata_snapshot_->getSampleBlock()), all_mark_ranges(std::move(mark_ranges_)), part_index_in_query(part_index_in_query_), has_limit_below_one_block(has_limit_below_one_block_), diff --git a/src/Storages/MergeTree/MergeTreeSelectProcessor.h b/src/Storages/MergeTree/MergeTreeSelectProcessor.h index 13662c087e8..20a102c6ff6 100644 --- a/src/Storages/MergeTree/MergeTreeSelectProcessor.h +++ b/src/Storages/MergeTree/MergeTreeSelectProcessor.h @@ -51,6 +51,9 @@ protected: /// Data part will not be removed if the pointer owns it MergeTreeData::DataPartPtr data_part; + /// Cache getSampleBlock call, which might be heavy. + Block sample_block; + /// Mark ranges we should read (in ascending order) MarkRanges all_mark_ranges; /// Value of _part_index virtual column (used only in SelectExecutor) From ad65e3c17bcd9c5e7f2da1e63bbd7089fa1fdcfc Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Tue, 3 Aug 2021 15:32:17 +0300 Subject: [PATCH 09/11] slightly more optimize reading in order of key --- src/Processors/QueryPlan/ReadFromMergeTree.cpp | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 74482b48f03..f8c12449c7e 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -409,7 +409,6 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder( { RangesInDataPart part = parts_with_ranges.back(); parts_with_ranges.pop_back(); - size_t & marks_in_part = info.sum_marks_in_parts.back(); /// We will not take too few rows from a part. @@ -424,8 +423,13 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder( MarkRanges ranges_to_get_from_part; + /// We take full part if it contains enough marks or + /// if we know limit and part contains less than 'limit' rows. + bool take_full_part = marks_in_part <= need_marks + || (input_order_info->limit && input_order_info->limit < part.getRowsCount()); + /// We take the whole part if it is small enough. - if (marks_in_part <= need_marks) + if (take_full_part) { ranges_to_get_from_part = part.ranges; From 915ef50c2db2899cf1cd11e648d50f4734664402 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Wed, 4 Aug 2021 18:18:03 +0300 Subject: [PATCH 10/11] optimize reading in order of key more --- src/Interpreters/InterpreterSelectQuery.cpp | 10 ++++++++-- src/Parsers/ASTSelectQuery.h | 2 ++ src/Processors/QueryPlan/FinishSortingStep.cpp | 6 ++++-- src/Processors/QueryPlan/FinishSortingStep.h | 6 ++++-- .../MergeTreeInOrderSelectProcessor.cpp | 7 +++++-- .../MergeTreeReverseSelectProcessor.cpp | 3 +++ .../MergeTree/MergeTreeSelectProcessor.cpp | 17 +++++++++++------ .../MergeTree/MergeTreeSelectProcessor.h | 8 +++++++- 8 files changed, 44 insertions(+), 15 deletions(-) diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 1b6361c9037..5180892a966 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -1928,7 +1928,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc } /// If we don't have filtration, we can pushdown limit to reading stage for optimizations. - UInt64 limit = (query.where() || query.prewhere() || query.groupBy()) ? 0 : getLimitForSorting(query, context); + UInt64 limit = (query.hasFiltration() || query.groupBy()) ? 0 : getLimitForSorting(query, context); if (query_info.projection) query_info.projection->input_order_info = query_info.projection->order_optimizer->getInputOrder(query_info.projection->desc->metadata, context, limit); @@ -2291,8 +2291,14 @@ void InterpreterSelectQuery::executeOrderOptimized(QueryPlan & query_plan, Input { const Settings & settings = context->getSettingsRef(); + const auto & query = getSelectQuery(); auto finish_sorting_step = std::make_unique( - query_plan.getCurrentDataStream(), input_sorting_info->order_key_prefix_descr, output_order_descr, settings.max_block_size, limit); + query_plan.getCurrentDataStream(), + input_sorting_info->order_key_prefix_descr, + output_order_descr, + settings.max_block_size, + limit, + query.hasFiltration()); query_plan.addStep(std::move(finish_sorting_step)); } diff --git a/src/Parsers/ASTSelectQuery.h b/src/Parsers/ASTSelectQuery.h index db4d7e76320..c382da47539 100644 --- a/src/Parsers/ASTSelectQuery.h +++ b/src/Parsers/ASTSelectQuery.h @@ -69,6 +69,8 @@ public: const ASTPtr limitLength() const { return getExpression(Expression::LIMIT_LENGTH); } const ASTPtr settings() const { return getExpression(Expression::SETTINGS); } + bool hasFiltration() const { return where() || prewhere() || having(); } + /// Set/Reset/Remove expression. void setExpression(Expression expr, ASTPtr && ast); diff --git a/src/Processors/QueryPlan/FinishSortingStep.cpp b/src/Processors/QueryPlan/FinishSortingStep.cpp index d3eac99c8ac..718eeb96cd8 100644 --- a/src/Processors/QueryPlan/FinishSortingStep.cpp +++ b/src/Processors/QueryPlan/FinishSortingStep.cpp @@ -31,12 +31,14 @@ FinishSortingStep::FinishSortingStep( SortDescription prefix_description_, SortDescription result_description_, size_t max_block_size_, - UInt64 limit_) + UInt64 limit_, + bool has_filtration_) : ITransformingStep(input_stream_, input_stream_.header, getTraits(limit_)) , prefix_description(std::move(prefix_description_)) , result_description(std::move(result_description_)) , max_block_size(max_block_size_) , limit(limit_) + , has_filtration(has_filtration_) { /// TODO: check input_stream is sorted by prefix_description. output_stream->sort_description = result_description; @@ -58,7 +60,7 @@ void FinishSortingStep::transformPipeline(QueryPipeline & pipeline, const BuildQ if (pipeline.getNumStreams() > 1) { UInt64 limit_for_merging = (need_finish_sorting ? 0 : limit); - bool has_limit_below_one_block = limit_for_merging && limit_for_merging < max_block_size; + bool has_limit_below_one_block = !has_filtration && limit_for_merging && limit_for_merging < max_block_size; auto transform = std::make_shared( pipeline.getHeader(), pipeline.getNumStreams(), diff --git a/src/Processors/QueryPlan/FinishSortingStep.h b/src/Processors/QueryPlan/FinishSortingStep.h index 9fe031e792d..5ea3a6d91b5 100644 --- a/src/Processors/QueryPlan/FinishSortingStep.h +++ b/src/Processors/QueryPlan/FinishSortingStep.h @@ -13,8 +13,9 @@ public: const DataStream & input_stream_, SortDescription prefix_description_, SortDescription result_description_, - size_t max_block_size, - UInt64 limit); + size_t max_block_size_, + UInt64 limit_, + bool has_filtration_); String getName() const override { return "FinishSorting"; } @@ -31,6 +32,7 @@ private: SortDescription result_description; size_t max_block_size; UInt64 limit; + bool has_filtration; }; } diff --git a/src/Storages/MergeTree/MergeTreeInOrderSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeInOrderSelectProcessor.cpp index 70fea1414e6..48a9d62d872 100644 --- a/src/Storages/MergeTree/MergeTreeInOrderSelectProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeInOrderSelectProcessor.cpp @@ -17,8 +17,8 @@ try return false; } - auto size_predictor = (preferred_block_size_bytes == 0) ? nullptr - : getSizePredictor(data_part, task_columns, sample_block); + if (!reader) + initializeReaders(); MarkRanges mark_ranges_for_task; /// If we need to read few rows, set one range per task to reduce number of read data. @@ -33,6 +33,9 @@ try all_mark_ranges.clear(); } + auto size_predictor = (preferred_block_size_bytes == 0) ? nullptr + : getSizePredictor(data_part, task_columns, sample_block); + task = std::make_unique( data_part, mark_ranges_for_task, part_index_in_query, ordered_names, column_name_set, task_columns.columns, task_columns.pre_columns, prewhere_info && prewhere_info->remove_prewhere_column, diff --git a/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.cpp index 5edf322cd6a..16ce9823ebb 100644 --- a/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.cpp @@ -22,6 +22,9 @@ try if (all_mark_ranges.empty()) return true; + if (!reader) + initializeReaders(); + /// Read ranges from right to left. MarkRanges mark_ranges_for_task = { all_mark_ranges.back() }; all_mark_ranges.pop_back(); diff --git a/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp index 63f3fc639e1..98077605f89 100644 --- a/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp @@ -19,7 +19,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor( bool use_uncompressed_cache_, const PrewhereInfoPtr & prewhere_info_, ExpressionActionsSettings actions_settings, - bool check_columns, + bool check_columns_, const MergeTreeReaderSettings & reader_settings_, const Names & virt_column_names_, size_t part_index_in_query_, @@ -35,16 +35,23 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor( all_mark_ranges(std::move(mark_ranges_)), part_index_in_query(part_index_in_query_), has_limit_below_one_block(has_limit_below_one_block_), + check_columns(check_columns_), total_rows(data_part->index_granularity.getRowsCountInRanges(all_mark_ranges)) { - /// will be used to distinguish between PREWHERE and WHERE columns when applying filter - const auto & column_names = task_columns.columns.getNames(); - column_name_set = NameSet{column_names.begin(), column_names.end()}; + addTotalRowsApprox(total_rows); + ordered_names = header_without_virtual_columns.getNames(); +} +void MergeTreeSelectProcessor::initializeReaders() +{ task_columns = getReadTaskColumns( storage, metadata_snapshot, data_part, required_columns, prewhere_info, check_columns); + /// Will be used to distinguish between PREWHERE and WHERE columns when applying filter + const auto & column_names = task_columns.columns.getNames(); + column_name_set = NameSet{column_names.begin(), column_names.end()}; + if (use_uncompressed_cache) owned_uncompressed_cache = storage.getContext()->getUncompressedCache(); @@ -57,8 +64,6 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor( pre_reader = data_part->getReader(task_columns.pre_columns, metadata_snapshot, all_mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings); - addTotalRowsApprox(total_rows); - ordered_names = header_without_virtual_columns.getNames(); } void MergeTreeSelectProcessor::finish() diff --git a/src/Storages/MergeTree/MergeTreeSelectProcessor.h b/src/Storages/MergeTree/MergeTreeSelectProcessor.h index 20a102c6ff6..ea4cd349cba 100644 --- a/src/Storages/MergeTree/MergeTreeSelectProcessor.h +++ b/src/Storages/MergeTree/MergeTreeSelectProcessor.h @@ -28,7 +28,7 @@ public: bool use_uncompressed_cache, const PrewhereInfoPtr & prewhere_info, ExpressionActionsSettings actions_settings, - bool check_columns, + bool check_columns_, const MergeTreeReaderSettings & reader_settings, const Names & virt_column_names = {}, size_t part_index_in_query_ = 0, @@ -40,6 +40,10 @@ public: void finish(); protected: + /// Defer initialization from constructor, because it may be heavy + /// and it's better to do it lazily in `getNewTask`, which is executing in parallel. + void initializeReaders(); + /// Used by Task Names required_columns; /// Names from header. Used in order to order columns in read blocks. @@ -61,6 +65,8 @@ protected: /// If true, every task will be created only with one range. /// It reduces amount of read data for queries with small LIMIT. bool has_limit_below_one_block = false; + + bool check_columns; size_t total_rows = 0; }; From 9586bb7e901455286c717684bf17cd3442c2e0c8 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Wed, 4 Aug 2021 20:42:22 +0300 Subject: [PATCH 11/11] fix pvs --- src/Storages/MergeTree/MergeTreeBlockReadUtils.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/MergeTreeBlockReadUtils.h b/src/Storages/MergeTree/MergeTreeBlockReadUtils.h index 31d609e4242..4c4081bd83b 100644 --- a/src/Storages/MergeTree/MergeTreeBlockReadUtils.h +++ b/src/Storages/MergeTree/MergeTreeBlockReadUtils.h @@ -70,7 +70,7 @@ struct MergeTreeReadTaskColumns /// column names to read during PREWHERE NamesAndTypesList pre_columns; /// resulting block may require reordering in accordance with `ordered_names` - bool should_reorder; + bool should_reorder = false; }; MergeTreeReadTaskColumns getReadTaskColumns(