diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index 9f6f8173cde..6ec9e054204 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -1,4 +1,4 @@ -#include +#include #include #include #include @@ -151,15 +151,20 @@ MergeSortingBlocksBlockInputStream::MergeSortingBlocksBlockInputStream( blocks.swap(nonempty_blocks); + size_t size = cursors.size(); if (!has_collation) { - for (size_t i = 0; i < cursors.size(); ++i) - queue_without_collation.push(SortCursor(&cursors[i])); + queue_without_collation.reserve(size); + for (size_t i = 0; i < size; ++i) + queue_without_collation.emplace_back(&cursors[i]); + std::make_heap(queue_without_collation.begin(), queue_without_collation.end()); } else { - for (size_t i = 0; i < cursors.size(); ++i) - queue_with_collation.push(SortCursorWithCollation(&cursors[i])); + queue_with_collation.reserve(size); + for (size_t i = 0; i < size; ++i) + queue_with_collation.emplace_back(&cursors[i]); + std::make_heap(queue_with_collation.begin(), queue_with_collation.end()); } } @@ -182,8 +187,58 @@ Block MergeSortingBlocksBlockInputStream::readImpl() } +/// This is adapted version of the function __sift_down from libc++. +template +void updateTopHeap(It begin, It end) +{ + size_t size = end - begin; + if (size < 2) + return; + + size_t child_idx = 1; + It child_it = begin + 1; + + /// Right child exists and is greater than left child. + if (size > 2 && *child_it < *(child_it + 1)) + { + ++child_it; + ++child_idx; + } + + /// Check if we are in order. + if (*child_it < *begin) + return; + + auto curr_it = begin; + auto top(std::move(*begin)); + do + { + /// We are not in heap-order, swap the parent with it's largest child. + *curr_it = std::move(*child_it); + curr_it = child_it; + + if ((size - 2) / 2 < child_idx) + break; + + // recompute the child based off of the updated parent + child_idx = 2 * child_idx + 1; + child_it = begin + child_idx; + + if ((child_idx + 1) < size && *child_it < *(child_it + 1)) + { + /// Right child exists and is greater than left child. + ++child_it; + ++child_idx; + } + + /// Check if we are in order. + } while (!(*child_it < top)); + *curr_it = std::move(top); +} + + template -Block MergeSortingBlocksBlockInputStream::mergeImpl(std::priority_queue & queue) +Block MergeSortingBlocksBlockInputStream::mergeImpl(std::vector & queue) { size_t num_columns = blocks[0].columns(); @@ -194,17 +249,18 @@ Block MergeSortingBlocksBlockInputStream::mergeImpl(std::priority_queueinsertFrom(*current->all_columns[i], current->pos); - if (!current->isLast()) + if (current->isValid()) { current->next(); - queue.push(current); + updateTopHeap(queue.begin(), queue.end()); } + else + queue.erase(queue.begin()); ++total_merged_rows; if (limit && total_merged_rows == limit) diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.h b/dbms/src/DataStreams/MergeSortingBlockInputStream.h index a8b8e8cfd3b..8736801f5df 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.h +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -56,19 +57,18 @@ private: UInt64 limit; size_t total_merged_rows = 0; - using CursorImpls = std::vector; CursorImpls cursors; bool has_collation = false; - std::priority_queue queue_without_collation; - std::priority_queue queue_with_collation; + std::vector queue_without_collation; + std::vector queue_with_collation; /** Two different cursors are supported - with and without Collation. * Templates are used (instead of virtual functions in SortCursor) for zero-overhead. */ template - Block mergeImpl(std::priority_queue & queue); + Block mergeImpl(std::vector & queue); };