From 8613b90d84b34b25e5fc219b9902222685ef5273 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 22 Dec 2019 17:22:42 +0300 Subject: [PATCH] Using SortingHeap in MergingSortedTransform --- dbms/src/Core/SortCursor.h | 7 ++ .../Transforms/MergingSortedTransform.cpp | 110 ++++++++---------- .../Transforms/MergingSortedTransform.h | 29 +---- 3 files changed, 62 insertions(+), 84 deletions(-) diff --git a/dbms/src/Core/SortCursor.h b/dbms/src/Core/SortCursor.h index 7c13ee3b058..a5d7dbd1f05 100644 --- a/dbms/src/Core/SortCursor.h +++ b/dbms/src/Core/SortCursor.h @@ -281,6 +281,13 @@ public: queue.pop_back(); } + void push(SortCursorImpl & cursor) + { + queue.emplace_back(&cursor); + std::push_heap(queue.begin(), queue.end()); + next_idx = 0; + } + private: using Container = std::vector; Container queue; diff --git a/dbms/src/Processors/Transforms/MergingSortedTransform.cpp b/dbms/src/Processors/Transforms/MergingSortedTransform.cpp index d8f06a7fe4a..85f54d23fe3 100644 --- a/dbms/src/Processors/Transforms/MergingSortedTransform.cpp +++ b/dbms/src/Processors/Transforms/MergingSortedTransform.cpp @@ -145,9 +145,9 @@ IProcessor::Status MergingSortedTransform::prepare() return Status::NeedData; if (has_collation) - initQueue(queue_with_collation); + queue_with_collation = SortingHeap(cursors); else - initQueue(queue_without_collation); + queue_without_collation = SortingHeap(cursors); is_initialized = true; return Status::Ready; @@ -166,7 +166,6 @@ IProcessor::Status MergingSortedTransform::prepare() if (need_data) { - auto & input = *std::next(inputs.begin(), next_input_to_read); if (!input.isFinished()) { @@ -180,7 +179,10 @@ IProcessor::Status MergingSortedTransform::prepare() return Status::NeedData; updateCursor(std::move(chunk), next_input_to_read); - pushToQueue(next_input_to_read); + if (has_collation) + queue_with_collation.push(cursors[next_input_to_read]); + else + queue_without_collation.push(cursors[next_input_to_read]); need_data = false; } } @@ -197,8 +199,8 @@ void MergingSortedTransform::work() merge(queue_without_collation); } -template -void MergingSortedTransform::merge(std::priority_queue & queue) +template +void MergingSortedTransform::merge(TSortingHeap & queue) { /// Returns MergeStatus which we should return if we are going to finish now. auto can_read_another_row = [&, this]() @@ -220,77 +222,65 @@ void MergingSortedTransform::merge(std::priority_queue & queue) }; /// Take rows in required order and put them into `merged_data`, while the rows are no more than `max_block_size` - while (!queue.empty()) + while (queue.isValid()) { /// Shouldn't happen at first iteration, but check just in case. if (!can_read_another_row()) return; - TSortCursor current = queue.top(); - queue.pop(); - bool first_iteration = true; + auto current = queue.current(); - while (true) + /** And what if the block is totally less or equal than the rest for the current cursor? + * Or is there only one data source left in the queue? Then you can take the entire block on current cursor. + */ + if (current.impl->isFirst() + && (queue.size() == 1 + || (queue.size() >= 2 && current.totallyLessOrEquals(queue.nextChild())))) { - if (!first_iteration && !can_read_another_row()) + //std::cerr << "current block is totally less or equals\n"; + + /// If there are already data in the current block, we first return it. We'll get here again the next time we call the merge function. + if (merged_data.mergedRows() != 0) { - queue.push(current); - return; - } - first_iteration = false; - - /** And what if the block is totally less or equal than the rest for the current cursor? - * Or is there only one data source left in the queue? Then you can take the entire block on current cursor. - */ - if (current.impl->isFirst() && (queue.empty() || current.totallyLessOrEquals(queue.top()))) - { - //std::cerr << "current block is totally less or equals\n"; - - /// If there are already data in the current block, we first return it. We'll get here again the next time we call the merge function. - if (merged_data.mergedRows() != 0) - { - //std::cerr << "merged rows is non-zero\n"; - queue.push(current); - return; - } - - /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl) - size_t source_num = current.impl->order; - insertFromChunk(source_num); + //std::cerr << "merged rows is non-zero\n"; return; } - //std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n"; - //std::cerr << "Inserting row\n"; - merged_data.insertRow(current->all_columns, current->pos); + /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl) + size_t source_num = current.impl->order; + insertFromChunk(source_num); + return; + } - if (out_row_sources_buf) - { - /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl) - RowSourcePart row_source(current.impl->order); - out_row_sources_buf->write(row_source.data); - } + //std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n"; + //std::cerr << "Inserting row\n"; + merged_data.insertRow(current->all_columns, current->pos); - if (current->isLast()) - { - need_data = true; - next_input_to_read = current.impl->order; + if (out_row_sources_buf) + { + /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl) + RowSourcePart row_source(current.impl->order); + out_row_sources_buf->write(row_source.data); + } - if (limit && merged_data.totalMergedRows() >= limit) - is_finished = true; + if (!current->isLast()) + { +// std::cerr << "moving to next row\n"; + queue.next(); + } + else + { + /// We will get the next block from the corresponding source, if there is one. + queue.removeTop(); - return; - } +// std::cerr << "It was last row, fetching next block\n"; + need_data = true; + next_input_to_read = current.impl->order; - //std::cerr << "moving to next row\n"; - current->next(); + if (limit && merged_data.totalMergedRows() >= limit) + is_finished = true; - if (!queue.empty() && current.greater(queue.top())) - { - //std::cerr << "next row is not least, pushing back to queue\n"; - queue.push(current); - break; - } + return; } } is_finished = true; diff --git a/dbms/src/Processors/Transforms/MergingSortedTransform.h b/dbms/src/Processors/Transforms/MergingSortedTransform.h index 594da6819db..aa88fb09623 100644 --- a/dbms/src/Processors/Transforms/MergingSortedTransform.h +++ b/dbms/src/Processors/Transforms/MergingSortedTransform.h @@ -1,10 +1,10 @@ #pragma once + #include #include #include #include -#include namespace DB { @@ -113,11 +113,8 @@ protected: SortCursorImpls cursors; - using Queue = std::priority_queue; - Queue queue_without_collation; - - using QueueWithCollation = std::priority_queue; - QueueWithCollation queue_with_collation; + SortingHeap queue_without_collation; + SortingHeap queue_with_collation; private: @@ -127,8 +124,8 @@ private: bool need_data = false; size_t next_input_to_read = 0; - template - void merge(std::priority_queue & queue); + template + void merge(TSortingHeap & queue); void insertFromChunk(size_t source_num); @@ -158,22 +155,6 @@ private: shared_chunk_ptr->all_columns = cursors[source_num].all_columns; shared_chunk_ptr->sort_columns = cursors[source_num].sort_columns; } - - void pushToQueue(size_t source_num) - { - if (has_collation) - queue_with_collation.push(SortCursorWithCollation(&cursors[source_num])); - else - queue_without_collation.push(SortCursor(&cursors[source_num])); - } - - template - void initQueue(std::priority_queue & queue) - { - for (auto & cursor : cursors) - if (!cursor.empty()) - queue.push(TSortCursor(&cursor)); - } }; }