diff --git a/dbms/src/Core/SortCursor.h b/dbms/src/Core/SortCursor.h index 462778bc8f9..b4e1485e27f 100644 --- a/dbms/src/Core/SortCursor.h +++ b/dbms/src/Core/SortCursor.h @@ -225,7 +225,8 @@ public: size_t size = cursors.size(); queue.reserve(size); for (size_t i = 0; i < size; ++i) - queue.emplace_back(&cursors[i]); + if (!cursors[i].empty()) + queue.emplace_back(&cursors[i]); std::make_heap(queue.begin(), queue.end()); } @@ -233,6 +234,11 @@ public: Cursor & current() { return queue.front(); } + size_t size() { return queue.size(); } + + Cursor & firstChild() { return queue[1]; } + Cursor & secondChild() { return queue[2]; } + void next() { assert(isValid()); @@ -246,6 +252,18 @@ public: removeTop(); } + void replaceTop(Cursor new_top) + { + current() = new_top; + updateTop(); + } + + void removeTop() + { + std::pop_heap(queue.begin(), queue.end()); + queue.pop_back(); + } + private: using Container = std::vector; Container queue; @@ -300,12 +318,6 @@ private: } while (!(*child_it < top)); *curr_it = std::move(top); } - - void removeTop() - { - std::pop_heap(queue.begin(), queue.end()); - queue.pop_back(); - } }; } diff --git a/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp b/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp index 3607d1f917f..d23d93e7e5c 100644 --- a/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp @@ -138,14 +138,14 @@ Block AggregatingSortedBlockInputStream::readImpl() } -void AggregatingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue & queue) +void AggregatingSortedBlockInputStream::merge(MutableColumns & merged_columns, SortingHeap & queue) { size_t merged_rows = 0; /// We take the rows in the correct order and put them in `merged_block`, while the rows are no more than `max_block_size` - while (!queue.empty()) + while (queue.isValid()) { - SortCursor current = queue.top(); + SortCursor current = queue.current(); setPrimaryKeyRef(next_key, current); @@ -167,8 +167,6 @@ void AggregatingSortedBlockInputStream::merge(MutableColumns & merged_columns, s return; } - queue.pop(); - if (key_differs) { current_key.swap(next_key); @@ -202,8 +200,7 @@ void AggregatingSortedBlockInputStream::merge(MutableColumns & merged_columns, s if (!current->isLast()) { - current->next(); - queue.push(current); + queue.next(); } else { diff --git a/dbms/src/DataStreams/AggregatingSortedBlockInputStream.h b/dbms/src/DataStreams/AggregatingSortedBlockInputStream.h index 0cf4bd64d87..6ef1259e458 100644 --- a/dbms/src/DataStreams/AggregatingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/AggregatingSortedBlockInputStream.h @@ -55,7 +55,7 @@ private: /** We support two different cursors - with Collation and without. * Templates are used instead of polymorphic SortCursor and calls to virtual functions. */ - void merge(MutableColumns & merged_columns, std::priority_queue & queue); + void merge(MutableColumns & merged_columns, SortingHeap & queue); /** Extract all states of aggregate functions and merge them with the current group. */ diff --git a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp index 7e4ad04b806..ef82a6d8c5e 100644 --- a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp @@ -105,15 +105,15 @@ Block CollapsingSortedBlockInputStream::readImpl() } -void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue & queue) +void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, SortingHeap & queue) { MergeStopCondition stop_condition(average_block_sizes, max_block_size); size_t current_block_granularity; /// Take rows in correct order and put them into `merged_columns` until the rows no more than `max_block_size` - for (; !queue.empty(); ++current_pos) + for (; queue.isValid(); ++current_pos) { - SortCursor current = queue.top(); + SortCursor current = queue.current(); current_block_granularity = current->rows; if (current_key.empty()) @@ -131,8 +131,6 @@ void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, st return; } - queue.pop(); - if (key_differs) { /// We write data for the previous primary key. @@ -185,8 +183,7 @@ void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, st if (!current->isLast()) { - current->next(); - queue.push(current); + queue.next(); } else { diff --git a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.h b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.h index 7e114e614f6..2b528d27339 100644 --- a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.h @@ -73,7 +73,7 @@ private: /** We support two different cursors - with Collation and without. * Templates are used instead of polymorphic SortCursors and calls to virtual functions. */ - void merge(MutableColumns & merged_columns, std::priority_queue & queue); + void merge(MutableColumns & merged_columns, SortingHeap & queue); /// Output to result rows for the current primary key. void insertRows(MutableColumns & merged_columns, size_t block_size, MergeStopCondition & condition); diff --git a/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.cpp b/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.cpp index 340e10df12f..64a0d52c1aa 100644 --- a/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.cpp @@ -161,7 +161,7 @@ Block GraphiteRollupSortedBlockInputStream::readImpl() } -void GraphiteRollupSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue & queue) +void GraphiteRollupSortedBlockInputStream::merge(MutableColumns & merged_columns, SortingHeap & queue) { const DateLUTImpl & date_lut = DateLUT::instance(); @@ -173,9 +173,9 @@ void GraphiteRollupSortedBlockInputStream::merge(MutableColumns & merged_columns /// contribute towards current output row. /// Variables starting with next_* refer to the row at the top of the queue. - while (!queue.empty()) + while (queue.isValid()) { - SortCursor next_cursor = queue.top(); + SortCursor next_cursor = queue.current(); StringRef next_path = next_cursor->all_columns[path_column_num]->getDataAt(next_cursor->pos); bool new_path = is_first || next_path != current_group_path; @@ -253,12 +253,9 @@ void GraphiteRollupSortedBlockInputStream::merge(MutableColumns & merged_columns current_group_path = next_path; } - queue.pop(); - if (!next_cursor->isLast()) { - next_cursor->next(); - queue.push(next_cursor); + queue.next(); } else { diff --git a/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.h b/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.h index 533c267ff02..0dfdf7c300c 100644 --- a/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.h +++ b/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.h @@ -225,7 +225,7 @@ private: UInt32 selectPrecision(const Graphite::Retentions & retentions, time_t time) const; - void merge(MutableColumns & merged_columns, std::priority_queue & queue); + void merge(MutableColumns & merged_columns, SortingHeap & queue); /// Insert the values into the resulting columns, which will not be changed in the future. template diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index 73c46c3d0e4..8664b62bf75 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -190,6 +190,8 @@ Block MergeSortingBlocksBlockInputStream::mergeImpl(TSortingHeap & queue) column->reserve(size_to_reserve); } + /// TODO: Optimization when a single block left. + /// Take rows from queue in right order and push to 'merged'. size_t merged_rows = 0; while (queue.isValid()) diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp index 8c0707e09b0..5f1b5cc927a 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp @@ -58,10 +58,10 @@ void MergingSortedBlockInputStream::init(MutableColumns & merged_columns) has_collation |= cursors[i].has_collation; } - if (has_collation) - initQueue(queue_with_collation); + if (!has_collation) + queue_without_collation = SortingHeap(cursors); else - initQueue(queue_without_collation); + queue_with_collation = SortingHeap(cursors); } /// Let's check that all source blocks have the same structure. @@ -82,15 +82,6 @@ void MergingSortedBlockInputStream::init(MutableColumns & merged_columns) } -template -void MergingSortedBlockInputStream::initQueue(std::priority_queue & queue) -{ - for (size_t i = 0; i < cursors.size(); ++i) - if (!cursors[i].empty()) - queue.push(TSortCursor(&cursors[i])); -} - - Block MergingSortedBlockInputStream::readImpl() { if (finished) @@ -115,7 +106,7 @@ Block MergingSortedBlockInputStream::readImpl() template -void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current, std::priority_queue & queue) +void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current, SortingHeap & queue) { size_t order = current->order; size_t size = cursors.size(); @@ -125,15 +116,19 @@ void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current, while (true) { - source_blocks[order] = new detail::SharedBlock(children[order]->read()); + source_blocks[order] = new detail::SharedBlock(children[order]->read()); /// intrusive ptr if (!*source_blocks[order]) + { + queue.removeTop(); break; + } if (source_blocks[order]->rows()) { cursors[order].reset(*source_blocks[order]); - queue.push(TSortCursor(&cursors[order])); + queue.replaceTop(&cursors[order]); + source_blocks[order]->all_columns = cursors[order].all_columns; source_blocks[order]->sort_columns = cursors[order].sort_columns; break; @@ -154,19 +149,14 @@ bool MergingSortedBlockInputStream::MergeStopCondition::checkStop() const return sum_rows_count >= average; } -template -void MergingSortedBlockInputStream::fetchNextBlock(const SortCursor & current, std::priority_queue & queue); -template -void MergingSortedBlockInputStream::fetchNextBlock(const SortCursorWithCollation & current, std::priority_queue & queue); - - -template -void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue & queue) +template +void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, TSortingHeap & queue) { size_t merged_rows = 0; MergeStopCondition stop_condition(average_block_sizes, max_block_size); + /** Increase row counters. * Return true if it's time to finish generating the current data block. */ @@ -186,123 +176,101 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std:: return stop_condition.checkStop(); }; - /// Take rows in required order and put them into `merged_columns`, while the rows are no more than `max_block_size` - while (!queue.empty()) + /// Take rows in required order and put them into `merged_columns`, while the number of rows are no more than `max_block_size` + while (queue.isValid()) { - TSortCursor current = queue.top(); + auto current = queue.current(); size_t current_block_granularity = current->rows; - queue.pop(); - while (true) + /** And what if the block is totally less or equal than the rest for the current cursor? + * Or is there only one data source left in the queue? Then you can take the entire block on current cursor. + */ + if (current->isFirst() + && (queue.size() == 1 + || (queue.size() == 2 && current.totallyLessOrEquals(queue.firstChild())) + || (queue.size() == 3 && current.totallyLessOrEquals(queue.firstChild()) && current.totallyLessOrEquals(queue.secondChild())))) { - /** And what if the block is totally less or equal than the rest for the current cursor? - * Or is there only one data source left in the queue? Then you can take the entire block on current cursor. - */ - if (current->isFirst() && (queue.empty() || current.totallyLessOrEquals(queue.top()))) +// std::cerr << "current block is totally less or equals\n"; + + /// If there are already data in the current block, we first return it. We'll get here again the next time we call the merge function. + if (merged_rows != 0) { - // std::cerr << "current block is totally less or equals\n"; - - /// If there are already data in the current block, we first return it. We'll get here again the next time we call the merge function. - if (merged_rows != 0) - { - //std::cerr << "merged rows is non-zero\n"; - queue.push(current); - return; - } - - /// Actually, current->order stores source number (i.e. cursors[current->order] == current) - size_t source_num = current->order; - - if (source_num >= cursors.size()) - throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR); - - for (size_t i = 0; i < num_columns; ++i) - merged_columns[i] = (*std::move(source_blocks[source_num]->getByPosition(i).column)).mutate(); - - // std::cerr << "copied columns\n"; - - merged_rows = merged_columns.at(0)->size(); - - /// Limit output - if (limit && total_merged_rows + merged_rows > limit) - { - merged_rows = limit - total_merged_rows; - for (size_t i = 0; i < num_columns; ++i) - { - auto & column = merged_columns[i]; - column = (*column->cut(0, merged_rows)).mutate(); - } - - cancel(false); - finished = true; - } - - /// Write order of rows for other columns - /// this data will be used in grather stream - if (out_row_sources_buf) - { - RowSourcePart row_source(source_num); - for (size_t i = 0; i < merged_rows; ++i) - out_row_sources_buf->write(row_source.data); - } - - //std::cerr << "fetching next block\n"; - - total_merged_rows += merged_rows; - fetchNextBlock(current, queue); + //std::cerr << "merged rows is non-zero\n"; return; } - // std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n"; - // std::cerr << "Inserting row\n"; - for (size_t i = 0; i < num_columns; ++i) - merged_columns[i]->insertFrom(*current->all_columns[i], current->pos); + /// Actually, current->order stores source number (i.e. cursors[current->order] == current) + size_t source_num = current->order; + if (source_num >= cursors.size()) + throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR); + + for (size_t i = 0; i < num_columns; ++i) + merged_columns[i] = (*std::move(source_blocks[source_num]->getByPosition(i).column)).mutate(); + +// std::cerr << "copied columns\n"; + + merged_rows = merged_columns.at(0)->size(); + + /// Limit output + if (limit && total_merged_rows + merged_rows > limit) + { + merged_rows = limit - total_merged_rows; + for (size_t i = 0; i < num_columns; ++i) + { + auto & column = merged_columns[i]; + column = (*column->cut(0, merged_rows)).mutate(); + } + + cancel(false); + finished = true; + } + + /// Write order of rows for other columns + /// this data will be used in grather stream if (out_row_sources_buf) { - /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl) - RowSourcePart row_source(current->order); - out_row_sources_buf->write(row_source.data); + RowSourcePart row_source(source_num); + for (size_t i = 0; i < merged_rows; ++i) + out_row_sources_buf->write(row_source.data); } - if (!current->isLast()) - { - // std::cerr << "moving to next row\n"; - current->next(); + //std::cerr << "fetching next block\n"; - if (queue.empty() || !(current.greater(queue.top()))) - { - if (count_row_and_check_limit(current_block_granularity)) - { - // std::cerr << "pushing back to queue\n"; - queue.push(current); - return; - } + total_merged_rows += merged_rows; + fetchNextBlock(current, queue); + return; + } - /// Do not put the cursor back in the queue, but continue to work with the current cursor. - // std::cerr << "current is still on top, using current row\n"; - continue; - } - else - { - // std::cerr << "next row is not least, pushing back to queue\n"; - queue.push(current); - } - } - else - { - /// We get the next block from the corresponding source, if there is one. - // std::cerr << "It was last row, fetching next block\n"; - fetchNextBlock(current, queue); - } +// std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n"; +// std::cerr << "Inserting row\n"; + for (size_t i = 0; i < num_columns; ++i) + merged_columns[i]->insertFrom(*current->all_columns[i], current->pos); - break; + if (out_row_sources_buf) + { + /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl) + RowSourcePart row_source(current->order); + out_row_sources_buf->write(row_source.data); + } + + if (!current->isLast()) + { +// std::cerr << "moving to next row\n"; + queue.next(); + } + else + { + /// We get the next block from the corresponding source, if there is one. +// std::cerr << "It was last row, fetching next block\n"; + fetchNextBlock(current, queue); } if (count_row_and_check_limit(current_block_granularity)) return; } + /// We have read all data. Ask childs to cancel providing more data. cancel(false); finished = true; } diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.h b/dbms/src/DataStreams/MergingSortedBlockInputStream.h index 9f641fb7599..e6c2b257013 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.h @@ -1,7 +1,5 @@ #pragma once -#include - #include #include @@ -87,7 +85,7 @@ protected: /// Gets the next block from the source corresponding to the `current`. template - void fetchNextBlock(const TSortCursor & current, std::priority_queue & queue); + void fetchNextBlock(const TSortCursor & current, SortingHeap & queue); Block header; @@ -111,11 +109,8 @@ protected: SortCursorImpls cursors; - using Queue = std::priority_queue; - Queue queue_without_collation; - - using QueueWithCollation = std::priority_queue; - QueueWithCollation queue_with_collation; + SortingHeap queue_without_collation; + SortingHeap queue_with_collation; /// Used in Vertical merge algorithm to gather non-PK/non-index columns (on next step) /// If it is not nullptr then it should be populated during execution @@ -176,13 +171,10 @@ protected: private: /** We support two different cursors - with Collation and without. - * Templates are used instead of polymorphic SortCursor and calls to virtual functions. - */ - template - void initQueue(std::priority_queue & queue); - - template - void merge(MutableColumns & merged_columns, std::priority_queue & queue); + * Templates are used instead of polymorphic SortCursor and calls to virtual functions. + */ + template + void merge(MutableColumns & merged_columns, TSortingHeap & queue); Logger * log = &Logger::get("MergingSortedBlockInputStream"); diff --git a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp index e2e99815b93..967b4ebb046 100644 --- a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp @@ -48,13 +48,14 @@ Block ReplacingSortedBlockInputStream::readImpl() } -void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue & queue) +void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, SortingHeap & queue) { MergeStopCondition stop_condition(average_block_sizes, max_block_size); + /// Take the rows in needed order and put them into `merged_columns` until rows no more than `max_block_size` - while (!queue.empty()) + while (queue.isValid()) { - SortCursor current = queue.top(); + SortCursor current = queue.current(); size_t current_block_granularity = current->rows; if (current_key.empty()) @@ -68,8 +69,6 @@ void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, std if (key_differs && stop_condition.checkStop()) return; - queue.pop(); - if (key_differs) { /// Write the data for the previous primary key. @@ -98,8 +97,7 @@ void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, std if (!current->isLast()) { - current->next(); - queue.push(current); + queue.next(); } else { diff --git a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h index 7d85542520d..22920c2eb20 100644 --- a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h @@ -52,7 +52,7 @@ private: /// Sources of rows with the current primary key. PODArray current_row_sources; - void merge(MutableColumns & merged_columns, std::priority_queue & queue); + void merge(MutableColumns & merged_columns, SortingHeap & queue); /// Output into result the rows for current primary key. void insertRow(MutableColumns & merged_columns); diff --git a/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp b/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp index 9ac7d6a3397..fe29dc55916 100644 --- a/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp @@ -314,14 +314,14 @@ Block SummingSortedBlockInputStream::readImpl() } -void SummingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue & queue) +void SummingSortedBlockInputStream::merge(MutableColumns & merged_columns, SortingHeap & queue) { merged_rows = 0; /// Take the rows in needed order and put them in `merged_columns` until rows no more than `max_block_size` - while (!queue.empty()) + while (queue.isValid()) { - SortCursor current = queue.top(); + SortCursor current = queue.current(); setPrimaryKeyRef(next_key, current); @@ -383,12 +383,9 @@ void SummingSortedBlockInputStream::merge(MutableColumns & merged_columns, std:: current_row_is_zero = false; } - queue.pop(); - if (!current->isLast()) { - current->next(); - queue.push(current); + queue.next(); } else { diff --git a/dbms/src/DataStreams/SummingSortedBlockInputStream.h b/dbms/src/DataStreams/SummingSortedBlockInputStream.h index 4412e5529f8..fc02d36d3fd 100644 --- a/dbms/src/DataStreams/SummingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/SummingSortedBlockInputStream.h @@ -1,5 +1,7 @@ #pragma once +#include + #include #include #include @@ -140,7 +142,7 @@ private: /** We support two different cursors - with Collation and without. * Templates are used instead of polymorphic SortCursor and calls to virtual functions. */ - void merge(MutableColumns & merged_columns, std::priority_queue & queue); + void merge(MutableColumns & merged_columns, SortingHeap & queue); /// Insert the summed row for the current group into the result and updates some of per-block flags if the row is not "zero". void insertCurrentRowIfNeeded(MutableColumns & merged_columns); diff --git a/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.cpp b/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.cpp index 4dda97597bd..de6f7027243 100644 --- a/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.cpp @@ -82,21 +82,18 @@ Block VersionedCollapsingSortedBlockInputStream::readImpl() } -void VersionedCollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue & queue) +void VersionedCollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, SortingHeap & queue) { MergeStopCondition stop_condition(average_block_sizes, max_block_size); auto update_queue = [this, & queue](SortCursor & cursor) { - queue.pop(); - if (out_row_sources_buf) current_row_sources.emplace(cursor->order, true); if (!cursor->isLast()) { - cursor->next(); - queue.push(cursor); + queue.next(); } else { @@ -106,9 +103,9 @@ void VersionedCollapsingSortedBlockInputStream::merge(MutableColumns & merged_co }; /// Take rows in correct order and put them into `merged_columns` until the rows no more than `max_block_size` - while (!queue.empty()) + while (queue.isValid()) { - SortCursor current = queue.top(); + SortCursor current = queue.current(); size_t current_block_granularity = current->rows; SharedBlockRowRef next_key; diff --git a/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.h b/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.h index f79b564063d..c64972d9266 100644 --- a/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.h @@ -5,7 +5,7 @@ #include #include -#include +#include namespace DB @@ -204,7 +204,7 @@ private: /// Sources of rows for VERTICAL merge algorithm. Size equals to (size + number of gaps) in current_keys. std::queue current_row_sources; - void merge(MutableColumns & merged_columns, std::priority_queue & queue); + void merge(MutableColumns & merged_columns, SortingHeap & queue); /// Output to result row for the current primary key. void insertRow(size_t skip_rows, const SharedBlockRowRef & row, MutableColumns & merged_columns); diff --git a/dbms/src/Processors/Transforms/SortingTransform.cpp b/dbms/src/Processors/Transforms/SortingTransform.cpp index bccd91897e0..4b075ca0ae9 100644 --- a/dbms/src/Processors/Transforms/SortingTransform.cpp +++ b/dbms/src/Processors/Transforms/SortingTransform.cpp @@ -80,6 +80,8 @@ Chunk MergeSorter::mergeImpl(TSortingHeap & queue) column->reserve(size_to_reserve); } + /// TODO: Optimization when a single block left. + /// Take rows from queue in right order and push to 'merged'. size_t merged_rows = 0; while (queue.isValid())