From b9fc9b4569323b9fa4353cd45a4b0e28a443c6b3 Mon Sep 17 00:00:00 2001 From: CurtizJ Date: Sat, 8 Feb 2020 00:07:18 +0300 Subject: [PATCH 1/3] fix order of ranges in MergeTreeDataSelectExecutor --- .../MergeTree/MergeTreeBlockReadUtils.cpp | 5 ++++- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 5 ++--- .../Storages/MergeTree/MergeTreeReadPool.cpp | 21 +++++++------------ .../MergeTree/MergeTreeSelectProcessor.cpp | 7 +------ ...01078_merge_tree_read_one_thread.reference | 1 + .../01078_merge_tree_read_one_thread.sql | 16 ++++++++++++++ 6 files changed, 31 insertions(+), 24 deletions(-) create mode 100644 dbms/tests/queries/0_stateless/01078_merge_tree_read_one_thread.reference create mode 100644 dbms/tests/queries/0_stateless/01078_merge_tree_read_one_thread.sql diff --git a/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp b/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp index 920697f3c32..7f71bfc28c1 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp @@ -73,7 +73,10 @@ MergeTreeReadTask::MergeTreeReadTask( : data_part{data_part_}, mark_ranges{mark_ranges_}, part_index_in_query{part_index_in_query_}, ordered_names{ordered_names_}, column_name_set{column_name_set_}, columns{columns_}, pre_columns{pre_columns_}, remove_prewhere_column{remove_prewhere_column_}, should_reorder{should_reorder_}, size_predictor{std::move(size_predictor_)} -{} +{ + /// We need to save marks in reverse order. + std::reverse(mark_ranges.begin(), mark_ranges.end()); +} MergeTreeReadTask::~MergeTreeReadTask() = default; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 33b5ac4999d..6fc592bdc2c 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -718,9 +718,6 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams( for (size_t i = 0; i < parts.size(); ++i) { total_rows += parts[i].getRowsCount(); - /// Let the ranges be listed from right to left so that the leftmost range can be dropped using `pop_back()`. - std::reverse(parts[i].ranges.begin(), parts[i].ranges.end()); - sum_marks_in_parts[i] = parts[i].getMarksCount(); sum_marks += sum_marks_in_parts[i]; @@ -897,6 +894,8 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder( } new_ranges.emplace_back(range.begin, range.end); } + + /// Restore left-to-right order. std::reverse(new_ranges.begin(), new_ranges.end()); } diff --git a/dbms/src/Storages/MergeTree/MergeTreeReadPool.cpp b/dbms/src/Storages/MergeTree/MergeTreeReadPool.cpp index a70dfc2d78c..2003b2f0af1 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReadPool.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeReadPool.cpp @@ -28,11 +28,6 @@ MergeTreeReadPool::MergeTreeReadPool( column_names{column_names_}, do_not_steal_tasks{do_not_steal_tasks_}, predict_block_size_bytes{preferred_block_size_bytes_ > 0}, prewhere_info{prewhere_info_}, parts_ranges{parts_} { - /// reverse from right-to-left to left-to-right - /// because 'reverse' was done in MergeTreeDataSelectExecutor - for (auto & part_ranges : parts_ranges) - std::reverse(std::begin(part_ranges.ranges), std::end(part_ranges.ranges)); - /// parts don't contain duplicate MergeTreeDataPart's. const auto per_part_sum_marks = fillPerPartInfo(parts_, check_columns_); fillPerThreadInfo(threads_, sum_marks_, per_part_sum_marks, parts_, min_marks_for_concurrent_read_); @@ -79,10 +74,9 @@ MergeTreeReadTaskPtr MergeTreeReadPool::getTask(const size_t min_marks_to_read, { const auto marks_to_get_from_range = marks_in_part; - /** Ranges are in right-to-left order, because 'reverse' was done in MergeTreeDataSelectExecutor - * and that order is supported in 'fillPerThreadInfo'. - */ + /// Ranges are in right-to-left order, because 'reverse' was done in 'fillPerThreadInfo'. ranges_to_get_from_part = thread_task.ranges; + std::reverse(ranges_to_get_from_part.begin(), ranges_to_get_from_part.end()); marks_in_part -= marks_to_get_from_range; @@ -113,11 +107,7 @@ MergeTreeReadTaskPtr MergeTreeReadPool::getTask(const size_t min_marks_to_read, marks_in_part -= marks_to_get_from_range; need_marks -= marks_to_get_from_range; } - - /** Change order to right-to-left, for MergeTreeThreadSelectBlockInputStream to get ranges with .pop_back() - * (order was changed to left-to-right due to .pop_back() above). - */ - std::reverse(std::begin(ranges_to_get_from_part), std::end(ranges_to_get_from_part)); + /// Order of ranges was changed to left-to-right due to .pop_back() above. } auto curr_task_size_predictor = !per_part_size_predictor[part_idx] ? nullptr @@ -211,7 +201,6 @@ std::vector MergeTreeReadPool::fillPerPartInfo( /// Read marks for every data part. size_t sum_marks = 0; - /// Ranges are in right-to-left order, due to 'reverse' in MergeTreeDataSelectExecutor. for (const auto & range : part.ranges) sum_marks += range.end - range.begin; @@ -251,6 +240,10 @@ void MergeTreeReadPool::fillPerThreadInfo( { threads_tasks.resize(threads); + /// Let the ranges be listed from right to left so that the leftmost range can be dropped using `pop_back()`. + for (auto & part : parts) + std::reverse(part.ranges.begin(), part.ranges.end()); + const size_t min_marks_per_thread = (sum_marks - 1) / threads + 1; for (size_t i = 0; i < threads && !parts.empty(); ++i) diff --git a/dbms/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp b/dbms/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp index dac42859eef..0f0822cd88c 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp @@ -95,11 +95,6 @@ try task_columns = getReadTaskColumns(storage, data_part, required_columns, prewhere_info, check_columns); - /** @note you could simply swap `reverse` in if and else branches of MergeTreeDataSelectExecutor, - * and remove this reverse. */ - MarkRanges remaining_mark_ranges = all_mark_ranges; - std::reverse(remaining_mark_ranges.begin(), remaining_mark_ranges.end()); - auto size_predictor = (preferred_block_size_bytes == 0) ? nullptr : std::make_unique(data_part, ordered_names, data_part->storage.getSampleBlock()); @@ -109,7 +104,7 @@ try column_name_set = NameSet{column_names.begin(), column_names.end()}; task = std::make_unique( - data_part, remaining_mark_ranges, part_index_in_query, ordered_names, column_name_set, task_columns.columns, + data_part, all_mark_ranges, part_index_in_query, ordered_names, column_name_set, task_columns.columns, task_columns.pre_columns, prewhere_info && prewhere_info->remove_prewhere_column, task_columns.should_reorder, std::move(size_predictor)); diff --git a/dbms/tests/queries/0_stateless/01078_merge_tree_read_one_thread.reference b/dbms/tests/queries/0_stateless/01078_merge_tree_read_one_thread.reference new file mode 100644 index 00000000000..697cb3a26d7 --- /dev/null +++ b/dbms/tests/queries/0_stateless/01078_merge_tree_read_one_thread.reference @@ -0,0 +1 @@ +300 diff --git a/dbms/tests/queries/0_stateless/01078_merge_tree_read_one_thread.sql b/dbms/tests/queries/0_stateless/01078_merge_tree_read_one_thread.sql new file mode 100644 index 00000000000..41fcbe81709 --- /dev/null +++ b/dbms/tests/queries/0_stateless/01078_merge_tree_read_one_thread.sql @@ -0,0 +1,16 @@ +drop table if exists t; + +create table t (a Int, b Int) engine = MergeTree order by (a, b) settings index_granularity = 400; + +insert into t select 0, 0 from numbers(50); +insert into t select 0, 1 from numbers(350); +insert into t select 1, 2 from numbers(400); +insert into t select 2, 2 from numbers(400); +insert into t select 3, 0 from numbers(100); + +select sleep(1) format Null; -- sleep a bit to wait possible merges after insert + +set max_threads = 1; +optimize table t final; + +select sum(a) from t where a in (0, 3) and b = 0; From 852772364e3b202286f0e2669d3fd0dc258ea3bf Mon Sep 17 00:00:00 2001 From: CurtizJ Date: Mon, 10 Feb 2020 15:36:01 +0300 Subject: [PATCH 2/3] replace std::vector to std::deque for MarkRanges --- dbms/src/Storages/MergeTree/MarkRange.h | 4 ++-- .../MergeTree/MergeTreeBlockReadUtils.cpp | 2 -- .../MergeTree/MergeTreeBlockReadUtils.h | 3 +-- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 19 ++++----------- .../MergeTree/MergeTreeRangeReader.cpp | 6 ++--- .../Storages/MergeTree/MergeTreeReadPool.cpp | 24 ++++--------------- dbms/src/Storages/MergeTree/MergeTreeReader.h | 2 +- 7 files changed, 17 insertions(+), 43 deletions(-) diff --git a/dbms/src/Storages/MergeTree/MarkRange.h b/dbms/src/Storages/MergeTree/MarkRange.h index 657ffe32f78..8380914a455 100644 --- a/dbms/src/Storages/MergeTree/MarkRange.h +++ b/dbms/src/Storages/MergeTree/MarkRange.h @@ -1,7 +1,7 @@ #pragma once #include -#include +#include namespace DB @@ -19,7 +19,7 @@ struct MarkRange MarkRange(const size_t begin_, const size_t end_) : begin{begin_}, end{end_} {} }; -using MarkRanges = std::vector; +using MarkRanges = std::deque; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp b/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp index 7f71bfc28c1..e1c0305abfd 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp @@ -74,8 +74,6 @@ MergeTreeReadTask::MergeTreeReadTask( ordered_names{ordered_names_}, column_name_set{column_name_set_}, columns{columns_}, pre_columns{pre_columns_}, remove_prewhere_column{remove_prewhere_column_}, should_reorder{should_reorder_}, size_predictor{std::move(size_predictor_)} { - /// We need to save marks in reverse order. - std::reverse(mark_ranges.begin(), mark_ranges.end()); } MergeTreeReadTask::~MergeTreeReadTask() = default; diff --git a/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.h b/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.h index 19c6adbd9c7..108742e1101 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.h +++ b/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.h @@ -30,8 +30,7 @@ struct MergeTreeReadTask { /// data part which should be read while performing this task MergeTreeData::DataPartPtr data_part; - /** Ranges to read from `data_part`. - * Specified in reverse order for MergeTreeThreadSelectBlockInputStream's convenience of calling .pop_back(). */ + /// Ranges to read from `data_part`. MarkRanges mark_ranges; /// for virtual `part_index` virtual column size_t part_index_in_query; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 6fc592bdc2c..28e8eb486d3 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -823,9 +823,6 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder( sum_marks_in_parts[i] = parts[i].getMarksCount(); sum_marks += sum_marks_in_parts[i]; - /// Let the ranges be listed from right to left so that the leftmost range can be dropped using `pop_back()`. - std::reverse(parts[i].ranges.begin(), parts[i].ranges.end()); - if (parts[i].data_part->index_granularity_info.is_adaptive) adaptive_parts++; } @@ -888,15 +885,12 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder( auto range = *it; while (range.begin + marks_in_range < range.end) { - new_ranges.emplace_back(range.end - marks_in_range, range.end); + new_ranges.emplace_front(range.end - marks_in_range, range.end); range.end -= marks_in_range; marks_in_range = std::min(marks_in_range * 2, max_marks_in_range); } - new_ranges.emplace_back(range.begin, range.end); + new_ranges.emplace_front(range.begin, range.end); } - - /// Restore left-to-right order. - std::reverse(new_ranges.begin(), new_ranges.end()); } return new_ranges; @@ -918,7 +912,7 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder( RangesInDataPart part = parts.back(); parts.pop_back(); - size_t & marks_in_part = sum_marks_in_parts.back(); + size_t & marks_in_part = sum_marks_in_parts.front(); /// We will not take too few rows from a part. if (marks_in_part >= min_marks_for_concurrent_read && @@ -935,9 +929,6 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder( /// We take the whole part if it is small enough. if (marks_in_part <= need_marks) { - /// Restore the order of segments. - std::reverse(part.ranges.begin(), part.ranges.end()); - ranges_to_get_from_part = part.ranges; need_marks -= marks_in_part; @@ -951,7 +942,7 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder( if (part.ranges.empty()) throw Exception("Unexpected end of ranges while spreading marks among streams", ErrorCodes::LOGICAL_ERROR); - MarkRange & range = part.ranges.back(); + MarkRange & range = part.ranges.front(); const size_t marks_in_range = range.end - range.begin; const size_t marks_to_get_from_range = std::min(marks_in_range, need_marks); @@ -961,7 +952,7 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder( marks_in_part -= marks_to_get_from_range; need_marks -= marks_to_get_from_range; if (range.begin == range.end) - part.ranges.pop_back(); + part.ranges.pop_front(); } parts.emplace_back(part); } diff --git a/dbms/src/Storages/MergeTree/MergeTreeRangeReader.cpp b/dbms/src/Storages/MergeTree/MergeTreeRangeReader.cpp index a0bd2567fe5..56e5cfdb3d2 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeRangeReader.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeRangeReader.cpp @@ -690,9 +690,9 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::startReadingChain(size_t if (stream.isFinished()) { result.addRows(stream.finalize(result.columns)); - stream = Stream(ranges.back().begin, ranges.back().end, merge_tree_reader); - result.addRange(ranges.back()); - ranges.pop_back(); + stream = Stream(ranges.front().begin, ranges.front().end, merge_tree_reader); + result.addRange(ranges.front()); + ranges.pop_front(); } auto rows_to_read = std::min(space_left, stream.numPendingRowsInCurrentGranule()); diff --git a/dbms/src/Storages/MergeTree/MergeTreeReadPool.cpp b/dbms/src/Storages/MergeTree/MergeTreeReadPool.cpp index 2003b2f0af1..fc63cca809a 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReadPool.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeReadPool.cpp @@ -73,10 +73,7 @@ MergeTreeReadTaskPtr MergeTreeReadPool::getTask(const size_t min_marks_to_read, if (marks_in_part <= need_marks) { const auto marks_to_get_from_range = marks_in_part; - - /// Ranges are in right-to-left order, because 'reverse' was done in 'fillPerThreadInfo'. ranges_to_get_from_part = thread_task.ranges; - std::reverse(ranges_to_get_from_part.begin(), ranges_to_get_from_part.end()); marks_in_part -= marks_to_get_from_range; @@ -91,7 +88,7 @@ MergeTreeReadTaskPtr MergeTreeReadPool::getTask(const size_t min_marks_to_read, /// Loop through part ranges. while (need_marks > 0 && !thread_task.ranges.empty()) { - auto & range = thread_task.ranges.back(); + auto & range = thread_task.ranges.front(); const size_t marks_in_range = range.end - range.begin; const size_t marks_to_get_from_range = std::min(marks_in_range, need_marks); @@ -100,14 +97,13 @@ MergeTreeReadTaskPtr MergeTreeReadPool::getTask(const size_t min_marks_to_read, range.begin += marks_to_get_from_range; if (range.begin == range.end) { - std::swap(range, thread_task.ranges.back()); - thread_task.ranges.pop_back(); + std::swap(range, thread_task.ranges.front()); + thread_task.ranges.pop_front(); } marks_in_part -= marks_to_get_from_range; need_marks -= marks_to_get_from_range; } - /// Order of ranges was changed to left-to-right due to .pop_back() above. } auto curr_task_size_predictor = !per_part_size_predictor[part_idx] ? nullptr @@ -240,10 +236,6 @@ void MergeTreeReadPool::fillPerThreadInfo( { threads_tasks.resize(threads); - /// Let the ranges be listed from right to left so that the leftmost range can be dropped using `pop_back()`. - for (auto & part : parts) - std::reverse(part.ranges.begin(), part.ranges.end()); - const size_t min_marks_per_thread = (sum_marks - 1) / threads + 1; for (size_t i = 0; i < threads && !parts.empty(); ++i) @@ -272,7 +264,6 @@ void MergeTreeReadPool::fillPerThreadInfo( /// Get whole part to read if it is small enough. if (marks_in_part <= need_marks) { - /// Leave ranges in right-to-left order for convenience to use .pop_back() in .getTask() ranges_to_get_from_part = part.ranges; marks_in_ranges = marks_in_part; @@ -288,7 +279,7 @@ void MergeTreeReadPool::fillPerThreadInfo( if (part.ranges.empty()) throw Exception("Unexpected end of ranges while spreading marks among threads", ErrorCodes::LOGICAL_ERROR); - MarkRange & range = part.ranges.back(); + MarkRange & range = part.ranges.front(); const size_t marks_in_range = range.end - range.begin; const size_t marks_to_get_from_range = std::min(marks_in_range, need_marks); @@ -298,13 +289,8 @@ void MergeTreeReadPool::fillPerThreadInfo( marks_in_part -= marks_to_get_from_range; need_marks -= marks_to_get_from_range; if (range.begin == range.end) - part.ranges.pop_back(); + part.ranges.pop_front(); } - - /** Change order to right-to-left, for getTask() to get ranges with .pop_back() - * (order was changed to left-to-right due to .pop_back() above). - */ - std::reverse(std::begin(ranges_to_get_from_part), std::end(ranges_to_get_from_part)); } threads_tasks[i].parts_and_ranges.push_back({ part_idx, ranges_to_get_from_part }); diff --git a/dbms/src/Storages/MergeTree/MergeTreeReader.h b/dbms/src/Storages/MergeTree/MergeTreeReader.h index b0642c06108..65485f950c6 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReader.h +++ b/dbms/src/Storages/MergeTree/MergeTreeReader.h @@ -57,7 +57,7 @@ public: size_t getFirstMarkToRead() const { - return all_mark_ranges.back().begin; + return all_mark_ranges.front().begin; } private: using FileStreams = std::map>; From 90a121cc1e2b52b09e7a36a1769b732405292eb1 Mon Sep 17 00:00:00 2001 From: CurtizJ Date: Mon, 10 Feb 2020 19:34:47 +0300 Subject: [PATCH 3/3] remove useless line --- dbms/src/Storages/MergeTree/MergeTreeReadPool.cpp | 3 --- 1 file changed, 3 deletions(-) diff --git a/dbms/src/Storages/MergeTree/MergeTreeReadPool.cpp b/dbms/src/Storages/MergeTree/MergeTreeReadPool.cpp index fc63cca809a..15d87e60e24 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReadPool.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeReadPool.cpp @@ -96,10 +96,7 @@ MergeTreeReadTaskPtr MergeTreeReadPool::getTask(const size_t min_marks_to_read, ranges_to_get_from_part.emplace_back(range.begin, range.begin + marks_to_get_from_range); range.begin += marks_to_get_from_range; if (range.begin == range.end) - { - std::swap(range, thread_task.ranges.front()); thread_task.ranges.pop_front(); - } marks_in_part -= marks_to_get_from_range; need_marks -= marks_to_get_from_range;