diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index 1c50316fc3f..73c46c3d0e4 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -179,9 +179,16 @@ template Block MergeSortingBlocksBlockInputStream::mergeImpl(TSortingHeap & queue) { size_t num_columns = header.columns(); - MutableColumns merged_columns = header.cloneEmptyColumns(); - /// TODO: reserve (in each column) + + /// Reserve + if (queue.isValid() && !blocks.empty()) + { + /// The expected size of output block is the same as input block + size_t size_to_reserve = blocks[0].rows(); + for (auto & column : merged_columns) + column->reserve(size_to_reserve); + } /// Take rows from queue in right order and push to 'merged'. size_t merged_rows = 0; @@ -210,6 +217,9 @@ Block MergeSortingBlocksBlockInputStream::mergeImpl(TSortingHeap & queue) break; } + if (!queue.isValid()) + blocks.clear(); + if (merged_rows == 0) return {}; diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.h b/dbms/src/DataStreams/MergingSortedBlockInputStream.h index beb3c7afc52..9f641fb7599 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.h @@ -109,8 +109,7 @@ protected: size_t num_columns = 0; std::vector source_blocks; - using CursorImpls = std::vector; - CursorImpls cursors; + SortCursorImpls cursors; using Queue = std::priority_queue; Queue queue_without_collation; diff --git a/dbms/src/Processors/Transforms/MergingSortedTransform.h b/dbms/src/Processors/Transforms/MergingSortedTransform.h index b32dd076c5f..594da6819db 100644 --- a/dbms/src/Processors/Transforms/MergingSortedTransform.h +++ b/dbms/src/Processors/Transforms/MergingSortedTransform.h @@ -111,8 +111,7 @@ protected: /// Chunks currently being merged. std::vector source_chunks; - using CursorImpls = std::vector; - CursorImpls cursors; + SortCursorImpls cursors; using Queue = std::priority_queue; Queue queue_without_collation; diff --git a/dbms/src/Processors/Transforms/SortingTransform.cpp b/dbms/src/Processors/Transforms/SortingTransform.cpp index ab87591c0d6..bccd91897e0 100644 --- a/dbms/src/Processors/Transforms/SortingTransform.cpp +++ b/dbms/src/Processors/Transforms/SortingTransform.cpp @@ -41,15 +41,9 @@ MergeSorter::MergeSorter(Chunks chunks_, SortDescription & description_, size_t chunks.swap(nonempty_chunks); if (!has_collation) - { - for (auto & cursor : cursors) - queue_without_collation.push(SortCursor(&cursor)); - } + queue_without_collation = SortingHeap(cursors); else - { - for (auto & cursor : cursors) - queue_with_collation.push(SortCursorWithCollation(&cursor)); - } + queue_with_collation = SortingHeap(cursors); } @@ -66,49 +60,55 @@ Chunk MergeSorter::read() } return !has_collation - ? mergeImpl(queue_without_collation) - : mergeImpl(queue_with_collation); + ? mergeImpl(queue_without_collation) + : mergeImpl(queue_with_collation); } -template -Chunk MergeSorter::mergeImpl(std::priority_queue & queue) +template +Chunk MergeSorter::mergeImpl(TSortingHeap & queue) { size_t num_columns = chunks[0].getNumColumns(); - MutableColumns merged_columns = chunks[0].cloneEmptyColumns(); - /// TODO: reserve (in each column) + + /// Reserve + if (queue.isValid()) + { + /// The expected size of output block is the same as input block + size_t size_to_reserve = chunks[0].getNumRows(); + for (auto & column : merged_columns) + column->reserve(size_to_reserve); + } /// Take rows from queue in right order and push to 'merged'. size_t merged_rows = 0; - while (!queue.empty()) + while (queue.isValid()) { - TSortCursor current = queue.top(); - queue.pop(); + auto current = queue.current(); + /// Append a row from queue. for (size_t i = 0; i < num_columns; ++i) merged_columns[i]->insertFrom(*current->all_columns[i], current->pos); ++total_merged_rows; ++merged_rows; - if (!current->isLast()) - { - current->next(); - queue.push(current); - } - + /// We don't need more rows because of limit has reached. if (limit && total_merged_rows == limit) { chunks.clear(); - return Chunk(std::move(merged_columns), merged_rows); + break; } + queue.next(); + + /// It's enough for current output block but we will continue. if (merged_rows == max_merged_block_size) - return Chunk(std::move(merged_columns), merged_rows); + break; } - chunks.clear(); + if (!queue.isValid()) + chunks.clear(); if (merged_rows == 0) return {}; diff --git a/dbms/src/Processors/Transforms/SortingTransform.h b/dbms/src/Processors/Transforms/SortingTransform.h index 2703501c81a..66ae1759f17 100644 --- a/dbms/src/Processors/Transforms/SortingTransform.h +++ b/dbms/src/Processors/Transforms/SortingTransform.h @@ -1,10 +1,10 @@ #pragma once + #include #include #include #include #include -#include namespace DB @@ -27,19 +27,18 @@ private: UInt64 limit; size_t total_merged_rows = 0; - using CursorImpls = std::vector; - CursorImpls cursors; + SortCursorImpls cursors; bool has_collation = false; - std::priority_queue queue_without_collation; - std::priority_queue queue_with_collation; + SortingHeap queue_without_collation; + SortingHeap queue_with_collation; /** Two different cursors are supported - with and without Collation. * Templates are used (instead of virtual functions in SortCursor) for zero-overhead. */ - template - Chunk mergeImpl(std::priority_queue & queue); + template + Chunk mergeImpl(TSortingHeap & queue); };