From 05fe9ef179a24d7bead4773c31ab9b2f3bc239c3 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Fri, 27 Dec 2019 21:42:47 +0300 Subject: [PATCH] Add sort improvement to processors. --- .../Transforms/SortingTransform.cpp | 34 +++++++------------ .../Processors/Transforms/SortingTransform.h | 8 ++--- 2 files changed, 17 insertions(+), 25 deletions(-) diff --git a/dbms/src/Processors/Transforms/SortingTransform.cpp b/dbms/src/Processors/Transforms/SortingTransform.cpp index ab87591c0d6..1062beb7dbe 100644 --- a/dbms/src/Processors/Transforms/SortingTransform.cpp +++ b/dbms/src/Processors/Transforms/SortingTransform.cpp @@ -41,15 +41,9 @@ MergeSorter::MergeSorter(Chunks chunks_, SortDescription & description_, size_t chunks.swap(nonempty_chunks); if (!has_collation) - { - for (auto & cursor : cursors) - queue_without_collation.push(SortCursor(&cursor)); - } + queue_without_collation = SortingHeap(cursors); else - { - for (auto & cursor : cursors) - queue_with_collation.push(SortCursorWithCollation(&cursor)); - } + queue_with_collation = SortingHeap(cursors); } @@ -66,13 +60,13 @@ Chunk MergeSorter::read() } return !has_collation - ? mergeImpl(queue_without_collation) - : mergeImpl(queue_with_collation); + ? mergeImpl(queue_without_collation) + : mergeImpl(queue_with_collation); } -template -Chunk MergeSorter::mergeImpl(std::priority_queue & queue) +template +Chunk MergeSorter::mergeImpl(TSortingHeap & queue) { size_t num_columns = chunks[0].getNumColumns(); @@ -81,29 +75,27 @@ Chunk MergeSorter::mergeImpl(std::priority_queue & queue) /// Take rows from queue in right order and push to 'merged'. size_t merged_rows = 0; - while (!queue.empty()) + while (queue.isValid()) { - TSortCursor current = queue.top(); - queue.pop(); + auto current = queue.current(); + /// Append a row from queue. for (size_t i = 0; i < num_columns; ++i) merged_columns[i]->insertFrom(*current->all_columns[i], current->pos); ++total_merged_rows; ++merged_rows; - if (!current->isLast()) - { - current->next(); - queue.push(current); - } - + /// We don't need more rows because of limit has reached. if (limit && total_merged_rows == limit) { chunks.clear(); return Chunk(std::move(merged_columns), merged_rows); } + queue.next(); + + /// It's enough for current output block but we will continue. if (merged_rows == max_merged_block_size) return Chunk(std::move(merged_columns), merged_rows); } diff --git a/dbms/src/Processors/Transforms/SortingTransform.h b/dbms/src/Processors/Transforms/SortingTransform.h index 2703501c81a..207f670acf3 100644 --- a/dbms/src/Processors/Transforms/SortingTransform.h +++ b/dbms/src/Processors/Transforms/SortingTransform.h @@ -32,14 +32,14 @@ private: bool has_collation = false; - std::priority_queue queue_without_collation; - std::priority_queue queue_with_collation; + SortingHeap queue_without_collation; + SortingHeap queue_with_collation; /** Two different cursors are supported - with and without Collation. * Templates are used (instead of virtual functions in SortCursor) for zero-overhead. */ - template - Chunk mergeImpl(std::priority_queue & queue); + template + Chunk mergeImpl(TSortingHeap & queue); };