From 8ff361eda4663b3827bfc728b5dd37cb3dcadac8 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 21 Dec 2019 23:28:51 +0300 Subject: [PATCH] Optimization of sorting heap --- dbms/src/Core/SortCursor.h | 26 ++++++++++++++----- .../MergeSortingBlockInputStream.cpp | 24 ++++++++--------- 2 files changed, 31 insertions(+), 19 deletions(-) diff --git a/dbms/src/Core/SortCursor.h b/dbms/src/Core/SortCursor.h index 3753984f4ae..5f9f269c7e8 100644 --- a/dbms/src/Core/SortCursor.h +++ b/dbms/src/Core/SortCursor.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -210,6 +211,8 @@ struct SortCursorWithCollation }; +/** Allows to fetch data from multiple sort cursors in sorted order (merging sorted data streams). + */ template class SortingHeap { @@ -232,13 +235,15 @@ public: void next() { - if (current()->isValid()) + assert(isValid()); + + if (!current()->isLast()) { current()->next(); - updateTopHeap(queue.begin(), queue.end()); + updateTop(); } else - queue.erase(queue.begin()); + removeTop(); } private: @@ -246,15 +251,17 @@ private: Container queue; /// This is adapted version of the function __sift_down from libc++. - template - static void updateTopHeap(It begin, It end) + /// Why cannot simply use std::priority_queue? + /// - because it doesn't support updating the top element and requires pop and push instead. + void updateTop() { - size_t size = end - begin; + size_t size = queue.size(); if (size < 2) return; size_t child_idx = 1; - It child_it = begin + 1; + auto begin = queue.begin(); + auto child_it = begin + 1; /// Right child exists and is greater than left child. if (size > 2 && *child_it < *(child_it + 1)) @@ -293,6 +300,11 @@ private: } while (!(*child_it < top)); *curr_it = std::move(top); } + + void removeTop() + { + std::pop_heap(queue.begin(), queue.end()); + } }; } diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index eda0c586127..1c50316fc3f 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -178,9 +178,9 @@ Block MergeSortingBlocksBlockInputStream::readImpl() template Block MergeSortingBlocksBlockInputStream::mergeImpl(TSortingHeap & queue) { - size_t num_columns = blocks[0].columns(); + size_t num_columns = header.columns(); - MutableColumns merged_columns = blocks[0].cloneEmptyColumns(); + MutableColumns merged_columns = header.cloneEmptyColumns(); /// TODO: reserve (in each column) /// Take rows from queue in right order and push to 'merged'. @@ -189,31 +189,31 @@ Block MergeSortingBlocksBlockInputStream::mergeImpl(TSortingHeap & queue) { auto current = queue.current(); + /// Append a row from queue. for (size_t i = 0; i < num_columns; ++i) merged_columns[i]->insertFrom(*current->all_columns[i], current->pos); - if (queue.isValid()) - queue.next(); - else - break; - ++total_merged_rows; + ++merged_rows; + + /// We don't need more rows because of limit has reached. if (limit && total_merged_rows == limit) { - auto res = blocks[0].cloneWithColumns(std::move(merged_columns)); blocks.clear(); - return res; + break; } - ++merged_rows; + queue.next(); + + /// It's enough for current output block but we will continue. if (merged_rows == max_merged_block_size) - return blocks[0].cloneWithColumns(std::move(merged_columns)); + break; } if (merged_rows == 0) return {}; - return blocks[0].cloneWithColumns(std::move(merged_columns)); + return header.cloneWithColumns(std::move(merged_columns)); }