diff --git a/dbms/src/Core/SortCursor.h b/dbms/src/Core/SortCursor.h index 5b4db43024f..00ac39243eb 100644 --- a/dbms/src/Core/SortCursor.h +++ b/dbms/src/Core/SortCursor.h @@ -22,8 +22,8 @@ namespace DB */ struct SortCursorImpl { - ColumnRawPtrs all_columns; ColumnRawPtrs sort_columns; + ColumnRawPtrs all_columns; SortDescription desc; size_t sort_columns_size = 0; size_t pos = 0; @@ -110,21 +110,52 @@ using SortCursorImpls = std::vector; /// For easy copying. -struct SortCursor +template +struct SortCursorHelper { SortCursorImpl * impl; - SortCursor(SortCursorImpl * impl_) : impl(impl_) {} + const Derived & derived() const { return static_cast(*this); } + + SortCursorHelper(SortCursorImpl * impl_) : impl(impl_) {} SortCursorImpl * operator-> () { return impl; } const SortCursorImpl * operator-> () const { return impl; } + bool greater(const SortCursorHelper & rhs) const + { + return derived().greaterAt(rhs.derived(), impl->pos, rhs.impl->pos); + } + + /// Inverted so that the priority queue elements are removed in ascending order. + bool operator< (const SortCursorHelper & rhs) const + { + return derived().greater(rhs.derived()); + } + + /// Checks that all rows in the current block of this cursor are less than or equal to all the rows of the current block of another cursor. + bool totallyLessOrEquals(const SortCursorHelper & rhs) const + { + if (impl->rows == 0 || rhs.impl->rows == 0) + return false; + + /// The last row of this cursor is no larger than the first row of the another cursor. + return !derived().greaterAt(rhs.derived(), impl->rows - 1, 0); + } +}; + + +struct SortCursor : SortCursorHelper +{ + using SortCursorHelper::SortCursorHelper; + /// The specified row of this cursor is greater than the specified row of another cursor. bool greaterAt(const SortCursor & rhs, size_t lhs_pos, size_t rhs_pos) const { for (size_t i = 0; i < impl->sort_columns_size; ++i) { - int direction = impl->desc[i].direction; - int nulls_direction = impl->desc[i].nulls_direction; + const auto & desc = impl->desc[i]; + int direction = desc.direction; + int nulls_direction = desc.nulls_direction; int res = direction * impl->sort_columns[i]->compareAt(lhs_pos, rhs_pos, *(rhs.impl->sort_columns[i]), nulls_direction); if (res > 0) return true; @@ -133,45 +164,37 @@ struct SortCursor } return impl->order > rhs.impl->order; } +}; - /// Checks that all rows in the current block of this cursor are less than or equal to all the rows of the current block of another cursor. - bool totallyLessOrEquals(const SortCursor & rhs) const + +/// For the case with a single column and when there is no order between different cursors. +struct SimpleSortCursor : SortCursorHelper +{ + using SortCursorHelper::SortCursorHelper; + + bool greaterAt(const SimpleSortCursor & rhs, size_t lhs_pos, size_t rhs_pos) const { - if (impl->rows == 0 || rhs.impl->rows == 0) - return false; - - /// The last row of this cursor is no larger than the first row of the another cursor. - return !greaterAt(rhs, impl->rows - 1, 0); - } - - bool greater(const SortCursor & rhs) const - { - return greaterAt(rhs, impl->pos, rhs.impl->pos); - } - - /// Inverted so that the priority queue elements are removed in ascending order. - bool operator< (const SortCursor & rhs) const - { - return greater(rhs); + const auto & desc = impl->desc[0]; + int direction = desc.direction; + int nulls_direction = desc.nulls_direction; + int res = impl->sort_columns[0]->compareAt(lhs_pos, rhs_pos, *(rhs.impl->sort_columns[0]), nulls_direction); + return res != 0 && ((res > 0) == (direction > 0)); } }; /// Separate comparator for locale-sensitive string comparisons -struct SortCursorWithCollation +struct SortCursorWithCollation : SortCursorHelper { - SortCursorImpl * impl; - - SortCursorWithCollation(SortCursorImpl * impl_) : impl(impl_) {} - SortCursorImpl * operator-> () { return impl; } - const SortCursorImpl * operator-> () const { return impl; } + using SortCursorHelper::SortCursorHelper; bool greaterAt(const SortCursorWithCollation & rhs, size_t lhs_pos, size_t rhs_pos) const { for (size_t i = 0; i < impl->sort_columns_size; ++i) { - int direction = impl->desc[i].direction; - int nulls_direction = impl->desc[i].nulls_direction; + const auto & desc = impl->desc[i]; + int direction = desc.direction; + int nulls_direction = desc.nulls_direction; int res; if (impl->need_collation[i]) { @@ -189,29 +212,11 @@ struct SortCursorWithCollation } return impl->order > rhs.impl->order; } - - bool totallyLessOrEquals(const SortCursorWithCollation & rhs) const - { - if (impl->rows == 0 || rhs.impl->rows == 0) - return false; - - /// The last row of this cursor is no larger than the first row of the another cursor. - return !greaterAt(rhs, impl->rows - 1, 0); - } - - bool greater(const SortCursorWithCollation & rhs) const - { - return greaterAt(rhs, impl->pos, rhs.impl->pos); - } - - bool operator< (const SortCursorWithCollation & rhs) const - { - return greater(rhs); - } }; /** Allows to fetch data from multiple sort cursors in sorted order (merging sorted data streams). + * TODO: Replace with "Loser Tree", see https://en.wikipedia.org/wiki/K-way_merge_algorithm */ template class SortingHeap @@ -225,7 +230,8 @@ public: size_t size = cursors.size(); queue.reserve(size); for (size_t i = 0; i < size; ++i) - queue.emplace_back(&cursors[i]); + if (!cursors[i].empty()) + queue.emplace_back(&cursors[i]); std::make_heap(queue.begin(), queue.end()); } @@ -233,6 +239,10 @@ public: Cursor & current() { return queue.front(); } + size_t size() { return queue.size(); } + + Cursor & nextChild() { return queue[nextChildIndex()]; } + void next() { assert(isValid()); @@ -246,34 +256,67 @@ public: removeTop(); } + void replaceTop(Cursor new_top) + { + current() = new_top; + updateTop(); + } + + void removeTop() + { + std::pop_heap(queue.begin(), queue.end()); + queue.pop_back(); + next_idx = 0; + } + + void push(SortCursorImpl & cursor) + { + queue.emplace_back(&cursor); + std::push_heap(queue.begin(), queue.end()); + next_idx = 0; + } + private: using Container = std::vector; Container queue; + /// Cache comparison between first and second child if the order in queue has not been changed. + size_t next_idx = 0; + + size_t nextChildIndex() + { + if (next_idx == 0) + { + next_idx = 1; + + if (queue.size() > 2 && queue[1] < queue[2]) + ++next_idx; + } + + return next_idx; + } + /// This is adapted version of the function __sift_down from libc++. /// Why cannot simply use std::priority_queue? /// - because it doesn't support updating the top element and requires pop and push instead. + /// Also look at "Boost.Heap" library. void updateTop() { size_t size = queue.size(); if (size < 2) return; - size_t child_idx = 1; auto begin = queue.begin(); - auto child_it = begin + 1; - /// Right child exists and is greater than left child. - if (size > 2 && *child_it < *(child_it + 1)) - { - ++child_it; - ++child_idx; - } + size_t child_idx = nextChildIndex(); + auto child_it = begin + child_idx; /// Check if we are in order. if (*child_it < *begin) return; + next_idx = 0; + auto curr_it = begin; auto top(std::move(*begin)); do @@ -282,11 +325,12 @@ private: *curr_it = std::move(*child_it); curr_it = child_it; - if ((size - 2) / 2 < child_idx) - break; - // recompute the child based off of the updated parent child_idx = 2 * child_idx + 1; + + if (child_idx >= size) + break; + child_it = begin + child_idx; if ((child_idx + 1) < size && *child_it < *(child_it + 1)) @@ -300,12 +344,6 @@ private: } while (!(*child_it < top)); *curr_it = std::move(top); } - - void removeTop() - { - std::pop_heap(queue.begin(), queue.end()); - queue.pop_back(); - } }; } diff --git a/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp b/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp index 3607d1f917f..d23d93e7e5c 100644 --- a/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp @@ -138,14 +138,14 @@ Block AggregatingSortedBlockInputStream::readImpl() } -void AggregatingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue & queue) +void AggregatingSortedBlockInputStream::merge(MutableColumns & merged_columns, SortingHeap & queue) { size_t merged_rows = 0; /// We take the rows in the correct order and put them in `merged_block`, while the rows are no more than `max_block_size` - while (!queue.empty()) + while (queue.isValid()) { - SortCursor current = queue.top(); + SortCursor current = queue.current(); setPrimaryKeyRef(next_key, current); @@ -167,8 +167,6 @@ void AggregatingSortedBlockInputStream::merge(MutableColumns & merged_columns, s return; } - queue.pop(); - if (key_differs) { current_key.swap(next_key); @@ -202,8 +200,7 @@ void AggregatingSortedBlockInputStream::merge(MutableColumns & merged_columns, s if (!current->isLast()) { - current->next(); - queue.push(current); + queue.next(); } else { diff --git a/dbms/src/DataStreams/AggregatingSortedBlockInputStream.h b/dbms/src/DataStreams/AggregatingSortedBlockInputStream.h index 0cf4bd64d87..6ef1259e458 100644 --- a/dbms/src/DataStreams/AggregatingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/AggregatingSortedBlockInputStream.h @@ -55,7 +55,7 @@ private: /** We support two different cursors - with Collation and without. * Templates are used instead of polymorphic SortCursor and calls to virtual functions. */ - void merge(MutableColumns & merged_columns, std::priority_queue & queue); + void merge(MutableColumns & merged_columns, SortingHeap & queue); /** Extract all states of aggregate functions and merge them with the current group. */ diff --git a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp index 7e4ad04b806..ef82a6d8c5e 100644 --- a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp @@ -105,15 +105,15 @@ Block CollapsingSortedBlockInputStream::readImpl() } -void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue & queue) +void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, SortingHeap & queue) { MergeStopCondition stop_condition(average_block_sizes, max_block_size); size_t current_block_granularity; /// Take rows in correct order and put them into `merged_columns` until the rows no more than `max_block_size` - for (; !queue.empty(); ++current_pos) + for (; queue.isValid(); ++current_pos) { - SortCursor current = queue.top(); + SortCursor current = queue.current(); current_block_granularity = current->rows; if (current_key.empty()) @@ -131,8 +131,6 @@ void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, st return; } - queue.pop(); - if (key_differs) { /// We write data for the previous primary key. @@ -185,8 +183,7 @@ void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, st if (!current->isLast()) { - current->next(); - queue.push(current); + queue.next(); } else { diff --git a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.h b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.h index 7e114e614f6..2b528d27339 100644 --- a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.h @@ -73,7 +73,7 @@ private: /** We support two different cursors - with Collation and without. * Templates are used instead of polymorphic SortCursors and calls to virtual functions. */ - void merge(MutableColumns & merged_columns, std::priority_queue & queue); + void merge(MutableColumns & merged_columns, SortingHeap & queue); /// Output to result rows for the current primary key. void insertRows(MutableColumns & merged_columns, size_t block_size, MergeStopCondition & condition); diff --git a/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.cpp b/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.cpp index 340e10df12f..64a0d52c1aa 100644 --- a/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.cpp @@ -161,7 +161,7 @@ Block GraphiteRollupSortedBlockInputStream::readImpl() } -void GraphiteRollupSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue & queue) +void GraphiteRollupSortedBlockInputStream::merge(MutableColumns & merged_columns, SortingHeap & queue) { const DateLUTImpl & date_lut = DateLUT::instance(); @@ -173,9 +173,9 @@ void GraphiteRollupSortedBlockInputStream::merge(MutableColumns & merged_columns /// contribute towards current output row. /// Variables starting with next_* refer to the row at the top of the queue. - while (!queue.empty()) + while (queue.isValid()) { - SortCursor next_cursor = queue.top(); + SortCursor next_cursor = queue.current(); StringRef next_path = next_cursor->all_columns[path_column_num]->getDataAt(next_cursor->pos); bool new_path = is_first || next_path != current_group_path; @@ -253,12 +253,9 @@ void GraphiteRollupSortedBlockInputStream::merge(MutableColumns & merged_columns current_group_path = next_path; } - queue.pop(); - if (!next_cursor->isLast()) { - next_cursor->next(); - queue.push(next_cursor); + queue.next(); } else { diff --git a/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.h b/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.h index 533c267ff02..0dfdf7c300c 100644 --- a/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.h +++ b/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.h @@ -225,7 +225,7 @@ private: UInt32 selectPrecision(const Graphite::Retentions & retentions, time_t time) const; - void merge(MutableColumns & merged_columns, std::priority_queue & queue); + void merge(MutableColumns & merged_columns, SortingHeap & queue); /// Insert the values into the resulting columns, which will not be changed in the future. template diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index 1c50316fc3f..52f85f1349c 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -150,10 +150,12 @@ MergeSortingBlocksBlockInputStream::MergeSortingBlocksBlockInputStream( blocks.swap(nonempty_blocks); - if (!has_collation) + if (has_collation) + queue_with_collation = SortingHeap(cursors); + else if (description.size() > 1) queue_without_collation = SortingHeap(cursors); else - queue_with_collation = SortingHeap(cursors); + queue_simple = SortingHeap(cursors); } @@ -169,9 +171,12 @@ Block MergeSortingBlocksBlockInputStream::readImpl() return res; } - return !has_collation - ? mergeImpl(queue_without_collation) - : mergeImpl(queue_with_collation); + if (has_collation) + return mergeImpl(queue_with_collation); + else if (description.size() > 1) + return mergeImpl(queue_without_collation); + else + return mergeImpl(queue_simple); } @@ -179,9 +184,18 @@ template Block MergeSortingBlocksBlockInputStream::mergeImpl(TSortingHeap & queue) { size_t num_columns = header.columns(); - MutableColumns merged_columns = header.cloneEmptyColumns(); - /// TODO: reserve (in each column) + + /// Reserve + if (queue.isValid() && !blocks.empty()) + { + /// The expected size of output block is the same as input block + size_t size_to_reserve = blocks[0].rows(); + for (auto & column : merged_columns) + column->reserve(size_to_reserve); + } + + /// TODO: Optimization when a single block left. /// Take rows from queue in right order and push to 'merged'. size_t merged_rows = 0; @@ -210,6 +224,9 @@ Block MergeSortingBlocksBlockInputStream::mergeImpl(TSortingHeap & queue) break; } + if (!queue.isValid()) + blocks.clear(); + if (merged_rows == 0) return {}; diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.h b/dbms/src/DataStreams/MergeSortingBlockInputStream.h index 9492bdb074b..ce82f6bb120 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.h +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.h @@ -59,6 +59,7 @@ private: bool has_collation = false; SortingHeap queue_without_collation; + SortingHeap queue_simple; SortingHeap queue_with_collation; /** Two different cursors are supported - with and without Collation. diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp index 8c0707e09b0..3614d9c1d66 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp @@ -59,9 +59,9 @@ void MergingSortedBlockInputStream::init(MutableColumns & merged_columns) } if (has_collation) - initQueue(queue_with_collation); + queue_with_collation = SortingHeap(cursors); else - initQueue(queue_without_collation); + queue_without_collation = SortingHeap(cursors); } /// Let's check that all source blocks have the same structure. @@ -82,15 +82,6 @@ void MergingSortedBlockInputStream::init(MutableColumns & merged_columns) } -template -void MergingSortedBlockInputStream::initQueue(std::priority_queue & queue) -{ - for (size_t i = 0; i < cursors.size(); ++i) - if (!cursors[i].empty()) - queue.push(TSortCursor(&cursors[i])); -} - - Block MergingSortedBlockInputStream::readImpl() { if (finished) @@ -115,7 +106,7 @@ Block MergingSortedBlockInputStream::readImpl() template -void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current, std::priority_queue & queue) +void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current, SortingHeap & queue) { size_t order = current->order; size_t size = cursors.size(); @@ -125,15 +116,19 @@ void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current, while (true) { - source_blocks[order] = new detail::SharedBlock(children[order]->read()); + source_blocks[order] = new detail::SharedBlock(children[order]->read()); /// intrusive ptr if (!*source_blocks[order]) + { + queue.removeTop(); break; + } if (source_blocks[order]->rows()) { cursors[order].reset(*source_blocks[order]); - queue.push(TSortCursor(&cursors[order])); + queue.replaceTop(&cursors[order]); + source_blocks[order]->all_columns = cursors[order].all_columns; source_blocks[order]->sort_columns = cursors[order].sort_columns; break; @@ -154,19 +149,14 @@ bool MergingSortedBlockInputStream::MergeStopCondition::checkStop() const return sum_rows_count >= average; } -template -void MergingSortedBlockInputStream::fetchNextBlock(const SortCursor & current, std::priority_queue & queue); -template -void MergingSortedBlockInputStream::fetchNextBlock(const SortCursorWithCollation & current, std::priority_queue & queue); - - -template -void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue & queue) +template +void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, TSortingHeap & queue) { size_t merged_rows = 0; MergeStopCondition stop_condition(average_block_sizes, max_block_size); + /** Increase row counters. * Return true if it's time to finish generating the current data block. */ @@ -186,123 +176,100 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std:: return stop_condition.checkStop(); }; - /// Take rows in required order and put them into `merged_columns`, while the rows are no more than `max_block_size` - while (!queue.empty()) + /// Take rows in required order and put them into `merged_columns`, while the number of rows are no more than `max_block_size` + while (queue.isValid()) { - TSortCursor current = queue.top(); + auto current = queue.current(); size_t current_block_granularity = current->rows; - queue.pop(); - while (true) + /** And what if the block is totally less or equal than the rest for the current cursor? + * Or is there only one data source left in the queue? Then you can take the entire block on current cursor. + */ + if (current->isFirst() + && (queue.size() == 1 + || (queue.size() >= 2 && current.totallyLessOrEquals(queue.nextChild())))) { - /** And what if the block is totally less or equal than the rest for the current cursor? - * Or is there only one data source left in the queue? Then you can take the entire block on current cursor. - */ - if (current->isFirst() && (queue.empty() || current.totallyLessOrEquals(queue.top()))) +// std::cerr << "current block is totally less or equals\n"; + + /// If there are already data in the current block, we first return it. We'll get here again the next time we call the merge function. + if (merged_rows != 0) { - // std::cerr << "current block is totally less or equals\n"; - - /// If there are already data in the current block, we first return it. We'll get here again the next time we call the merge function. - if (merged_rows != 0) - { - //std::cerr << "merged rows is non-zero\n"; - queue.push(current); - return; - } - - /// Actually, current->order stores source number (i.e. cursors[current->order] == current) - size_t source_num = current->order; - - if (source_num >= cursors.size()) - throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR); - - for (size_t i = 0; i < num_columns; ++i) - merged_columns[i] = (*std::move(source_blocks[source_num]->getByPosition(i).column)).mutate(); - - // std::cerr << "copied columns\n"; - - merged_rows = merged_columns.at(0)->size(); - - /// Limit output - if (limit && total_merged_rows + merged_rows > limit) - { - merged_rows = limit - total_merged_rows; - for (size_t i = 0; i < num_columns; ++i) - { - auto & column = merged_columns[i]; - column = (*column->cut(0, merged_rows)).mutate(); - } - - cancel(false); - finished = true; - } - - /// Write order of rows for other columns - /// this data will be used in grather stream - if (out_row_sources_buf) - { - RowSourcePart row_source(source_num); - for (size_t i = 0; i < merged_rows; ++i) - out_row_sources_buf->write(row_source.data); - } - - //std::cerr << "fetching next block\n"; - - total_merged_rows += merged_rows; - fetchNextBlock(current, queue); + //std::cerr << "merged rows is non-zero\n"; return; } - // std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n"; - // std::cerr << "Inserting row\n"; - for (size_t i = 0; i < num_columns; ++i) - merged_columns[i]->insertFrom(*current->all_columns[i], current->pos); + /// Actually, current->order stores source number (i.e. cursors[current->order] == current) + size_t source_num = current->order; + if (source_num >= cursors.size()) + throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR); + + for (size_t i = 0; i < num_columns; ++i) + merged_columns[i] = (*std::move(source_blocks[source_num]->getByPosition(i).column)).mutate(); + +// std::cerr << "copied columns\n"; + + merged_rows = merged_columns.at(0)->size(); + + /// Limit output + if (limit && total_merged_rows + merged_rows > limit) + { + merged_rows = limit - total_merged_rows; + for (size_t i = 0; i < num_columns; ++i) + { + auto & column = merged_columns[i]; + column = (*column->cut(0, merged_rows)).mutate(); + } + + cancel(false); + finished = true; + } + + /// Write order of rows for other columns + /// this data will be used in grather stream if (out_row_sources_buf) { - /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl) - RowSourcePart row_source(current->order); - out_row_sources_buf->write(row_source.data); + RowSourcePart row_source(source_num); + for (size_t i = 0; i < merged_rows; ++i) + out_row_sources_buf->write(row_source.data); } - if (!current->isLast()) - { - // std::cerr << "moving to next row\n"; - current->next(); + //std::cerr << "fetching next block\n"; - if (queue.empty() || !(current.greater(queue.top()))) - { - if (count_row_and_check_limit(current_block_granularity)) - { - // std::cerr << "pushing back to queue\n"; - queue.push(current); - return; - } + total_merged_rows += merged_rows; + fetchNextBlock(current, queue); + return; + } - /// Do not put the cursor back in the queue, but continue to work with the current cursor. - // std::cerr << "current is still on top, using current row\n"; - continue; - } - else - { - // std::cerr << "next row is not least, pushing back to queue\n"; - queue.push(current); - } - } - else - { - /// We get the next block from the corresponding source, if there is one. - // std::cerr << "It was last row, fetching next block\n"; - fetchNextBlock(current, queue); - } +// std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n"; +// std::cerr << "Inserting row\n"; + for (size_t i = 0; i < num_columns; ++i) + merged_columns[i]->insertFrom(*current->all_columns[i], current->pos); - break; + if (out_row_sources_buf) + { + /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl) + RowSourcePart row_source(current->order); + out_row_sources_buf->write(row_source.data); + } + + if (!current->isLast()) + { +// std::cerr << "moving to next row\n"; + queue.next(); + } + else + { + /// We get the next block from the corresponding source, if there is one. +// std::cerr << "It was last row, fetching next block\n"; + fetchNextBlock(current, queue); } if (count_row_and_check_limit(current_block_granularity)) return; } + /// We have read all data. Ask childs to cancel providing more data. cancel(false); finished = true; } diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.h b/dbms/src/DataStreams/MergingSortedBlockInputStream.h index beb3c7afc52..e6c2b257013 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.h @@ -1,7 +1,5 @@ #pragma once -#include - #include #include @@ -87,7 +85,7 @@ protected: /// Gets the next block from the source corresponding to the `current`. template - void fetchNextBlock(const TSortCursor & current, std::priority_queue & queue); + void fetchNextBlock(const TSortCursor & current, SortingHeap & queue); Block header; @@ -109,14 +107,10 @@ protected: size_t num_columns = 0; std::vector source_blocks; - using CursorImpls = std::vector; - CursorImpls cursors; + SortCursorImpls cursors; - using Queue = std::priority_queue; - Queue queue_without_collation; - - using QueueWithCollation = std::priority_queue; - QueueWithCollation queue_with_collation; + SortingHeap queue_without_collation; + SortingHeap queue_with_collation; /// Used in Vertical merge algorithm to gather non-PK/non-index columns (on next step) /// If it is not nullptr then it should be populated during execution @@ -177,13 +171,10 @@ protected: private: /** We support two different cursors - with Collation and without. - * Templates are used instead of polymorphic SortCursor and calls to virtual functions. - */ - template - void initQueue(std::priority_queue & queue); - - template - void merge(MutableColumns & merged_columns, std::priority_queue & queue); + * Templates are used instead of polymorphic SortCursor and calls to virtual functions. + */ + template + void merge(MutableColumns & merged_columns, TSortingHeap & queue); Logger * log = &Logger::get("MergingSortedBlockInputStream"); diff --git a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp index e2e99815b93..967b4ebb046 100644 --- a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp @@ -48,13 +48,14 @@ Block ReplacingSortedBlockInputStream::readImpl() } -void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue & queue) +void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, SortingHeap & queue) { MergeStopCondition stop_condition(average_block_sizes, max_block_size); + /// Take the rows in needed order and put them into `merged_columns` until rows no more than `max_block_size` - while (!queue.empty()) + while (queue.isValid()) { - SortCursor current = queue.top(); + SortCursor current = queue.current(); size_t current_block_granularity = current->rows; if (current_key.empty()) @@ -68,8 +69,6 @@ void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, std if (key_differs && stop_condition.checkStop()) return; - queue.pop(); - if (key_differs) { /// Write the data for the previous primary key. @@ -98,8 +97,7 @@ void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, std if (!current->isLast()) { - current->next(); - queue.push(current); + queue.next(); } else { diff --git a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h index 7d85542520d..22920c2eb20 100644 --- a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h @@ -52,7 +52,7 @@ private: /// Sources of rows with the current primary key. PODArray current_row_sources; - void merge(MutableColumns & merged_columns, std::priority_queue & queue); + void merge(MutableColumns & merged_columns, SortingHeap & queue); /// Output into result the rows for current primary key. void insertRow(MutableColumns & merged_columns); diff --git a/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp b/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp index 9ac7d6a3397..fe29dc55916 100644 --- a/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp @@ -314,14 +314,14 @@ Block SummingSortedBlockInputStream::readImpl() } -void SummingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue & queue) +void SummingSortedBlockInputStream::merge(MutableColumns & merged_columns, SortingHeap & queue) { merged_rows = 0; /// Take the rows in needed order and put them in `merged_columns` until rows no more than `max_block_size` - while (!queue.empty()) + while (queue.isValid()) { - SortCursor current = queue.top(); + SortCursor current = queue.current(); setPrimaryKeyRef(next_key, current); @@ -383,12 +383,9 @@ void SummingSortedBlockInputStream::merge(MutableColumns & merged_columns, std:: current_row_is_zero = false; } - queue.pop(); - if (!current->isLast()) { - current->next(); - queue.push(current); + queue.next(); } else { diff --git a/dbms/src/DataStreams/SummingSortedBlockInputStream.h b/dbms/src/DataStreams/SummingSortedBlockInputStream.h index 4412e5529f8..fc02d36d3fd 100644 --- a/dbms/src/DataStreams/SummingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/SummingSortedBlockInputStream.h @@ -1,5 +1,7 @@ #pragma once +#include + #include #include #include @@ -140,7 +142,7 @@ private: /** We support two different cursors - with Collation and without. * Templates are used instead of polymorphic SortCursor and calls to virtual functions. */ - void merge(MutableColumns & merged_columns, std::priority_queue & queue); + void merge(MutableColumns & merged_columns, SortingHeap & queue); /// Insert the summed row for the current group into the result and updates some of per-block flags if the row is not "zero". void insertCurrentRowIfNeeded(MutableColumns & merged_columns); diff --git a/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.cpp b/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.cpp index 4dda97597bd..de6f7027243 100644 --- a/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.cpp @@ -82,21 +82,18 @@ Block VersionedCollapsingSortedBlockInputStream::readImpl() } -void VersionedCollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue & queue) +void VersionedCollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, SortingHeap & queue) { MergeStopCondition stop_condition(average_block_sizes, max_block_size); auto update_queue = [this, & queue](SortCursor & cursor) { - queue.pop(); - if (out_row_sources_buf) current_row_sources.emplace(cursor->order, true); if (!cursor->isLast()) { - cursor->next(); - queue.push(cursor); + queue.next(); } else { @@ -106,9 +103,9 @@ void VersionedCollapsingSortedBlockInputStream::merge(MutableColumns & merged_co }; /// Take rows in correct order and put them into `merged_columns` until the rows no more than `max_block_size` - while (!queue.empty()) + while (queue.isValid()) { - SortCursor current = queue.top(); + SortCursor current = queue.current(); size_t current_block_granularity = current->rows; SharedBlockRowRef next_key; diff --git a/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.h b/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.h index f79b564063d..c64972d9266 100644 --- a/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.h @@ -5,7 +5,7 @@ #include #include -#include +#include namespace DB @@ -204,7 +204,7 @@ private: /// Sources of rows for VERTICAL merge algorithm. Size equals to (size + number of gaps) in current_keys. std::queue current_row_sources; - void merge(MutableColumns & merged_columns, std::priority_queue & queue); + void merge(MutableColumns & merged_columns, SortingHeap & queue); /// Output to result row for the current primary key. void insertRow(size_t skip_rows, const SharedBlockRowRef & row, MutableColumns & merged_columns); diff --git a/dbms/src/Processors/Transforms/MergingSortedTransform.cpp b/dbms/src/Processors/Transforms/MergingSortedTransform.cpp index 705116ca081..ddbd91b38d1 100644 --- a/dbms/src/Processors/Transforms/MergingSortedTransform.cpp +++ b/dbms/src/Processors/Transforms/MergingSortedTransform.cpp @@ -148,9 +148,9 @@ IProcessor::Status MergingSortedTransform::prepare() return Status::NeedData; if (has_collation) - initQueue(queue_with_collation); + queue_with_collation = SortingHeap(cursors); else - initQueue(queue_without_collation); + queue_without_collation = SortingHeap(cursors); is_initialized = true; return Status::Ready; @@ -169,7 +169,6 @@ IProcessor::Status MergingSortedTransform::prepare() if (need_data) { - auto & input = *std::next(inputs.begin(), next_input_to_read); if (!input.isFinished()) { @@ -183,7 +182,11 @@ IProcessor::Status MergingSortedTransform::prepare() return Status::NeedData; updateCursor(std::move(chunk), next_input_to_read); - pushToQueue(next_input_to_read); + + if (has_collation) + queue_with_collation.push(cursors[next_input_to_read]); + else + queue_without_collation.push(cursors[next_input_to_read]); } need_data = false; @@ -201,8 +204,8 @@ void MergingSortedTransform::work() merge(queue_without_collation); } -template -void MergingSortedTransform::merge(std::priority_queue & queue) +template +void MergingSortedTransform::merge(TSortingHeap & queue) { /// Returns MergeStatus which we should return if we are going to finish now. auto can_read_another_row = [&, this]() @@ -224,77 +227,66 @@ void MergingSortedTransform::merge(std::priority_queue & queue) }; /// Take rows in required order and put them into `merged_data`, while the rows are no more than `max_block_size` - while (!queue.empty()) + while (queue.isValid()) { /// Shouldn't happen at first iteration, but check just in case. if (!can_read_another_row()) return; - TSortCursor current = queue.top(); - queue.pop(); - bool first_iteration = true; + auto current = queue.current(); - while (true) + /** And what if the block is totally less or equal than the rest for the current cursor? + * Or is there only one data source left in the queue? Then you can take the entire block on current cursor. + */ + if (current.impl->isFirst() + && (queue.size() == 1 + || (queue.size() >= 2 && current.totallyLessOrEquals(queue.nextChild())))) { - if (!first_iteration && !can_read_another_row()) + //std::cerr << "current block is totally less or equals\n"; + + /// If there are already data in the current block, we first return it. We'll get here again the next time we call the merge function. + if (merged_data.mergedRows() != 0) { - queue.push(current); - return; - } - first_iteration = false; - - /** And what if the block is totally less or equal than the rest for the current cursor? - * Or is there only one data source left in the queue? Then you can take the entire block on current cursor. - */ - if (current.impl->isFirst() && (queue.empty() || current.totallyLessOrEquals(queue.top()))) - { - //std::cerr << "current block is totally less or equals\n"; - - /// If there are already data in the current block, we first return it. We'll get here again the next time we call the merge function. - if (merged_data.mergedRows() != 0) - { - //std::cerr << "merged rows is non-zero\n"; - queue.push(current); - return; - } - - /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl) - size_t source_num = current.impl->order; - insertFromChunk(source_num); + //std::cerr << "merged rows is non-zero\n"; return; } - //std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n"; - //std::cerr << "Inserting row\n"; - merged_data.insertRow(current->all_columns, current->pos); + /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl) + size_t source_num = current.impl->order; + insertFromChunk(source_num); + queue.removeTop(); + return; + } - if (out_row_sources_buf) - { - /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl) - RowSourcePart row_source(current.impl->order); - out_row_sources_buf->write(row_source.data); - } + //std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n"; + //std::cerr << "Inserting row\n"; + merged_data.insertRow(current->all_columns, current->pos); - if (current->isLast()) - { - need_data = true; - next_input_to_read = current.impl->order; + if (out_row_sources_buf) + { + /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl) + RowSourcePart row_source(current.impl->order); + out_row_sources_buf->write(row_source.data); + } - if (limit && merged_data.totalMergedRows() >= limit) - is_finished = true; + if (!current->isLast()) + { +// std::cerr << "moving to next row\n"; + queue.next(); + } + else + { + /// We will get the next block from the corresponding source, if there is one. + queue.removeTop(); - return; - } +// std::cerr << "It was last row, fetching next block\n"; + need_data = true; + next_input_to_read = current.impl->order; - //std::cerr << "moving to next row\n"; - current->next(); + if (limit && merged_data.totalMergedRows() >= limit) + is_finished = true; - if (!queue.empty() && current.greater(queue.top())) - { - //std::cerr << "next row is not least, pushing back to queue\n"; - queue.push(current); - break; - } + return; } } is_finished = true; diff --git a/dbms/src/Processors/Transforms/MergingSortedTransform.h b/dbms/src/Processors/Transforms/MergingSortedTransform.h index b32dd076c5f..aa88fb09623 100644 --- a/dbms/src/Processors/Transforms/MergingSortedTransform.h +++ b/dbms/src/Processors/Transforms/MergingSortedTransform.h @@ -1,10 +1,10 @@ #pragma once + #include #include #include #include -#include namespace DB { @@ -111,14 +111,10 @@ protected: /// Chunks currently being merged. std::vector source_chunks; - using CursorImpls = std::vector; - CursorImpls cursors; + SortCursorImpls cursors; - using Queue = std::priority_queue; - Queue queue_without_collation; - - using QueueWithCollation = std::priority_queue; - QueueWithCollation queue_with_collation; + SortingHeap queue_without_collation; + SortingHeap queue_with_collation; private: @@ -128,8 +124,8 @@ private: bool need_data = false; size_t next_input_to_read = 0; - template - void merge(std::priority_queue & queue); + template + void merge(TSortingHeap & queue); void insertFromChunk(size_t source_num); @@ -159,22 +155,6 @@ private: shared_chunk_ptr->all_columns = cursors[source_num].all_columns; shared_chunk_ptr->sort_columns = cursors[source_num].sort_columns; } - - void pushToQueue(size_t source_num) - { - if (has_collation) - queue_with_collation.push(SortCursorWithCollation(&cursors[source_num])); - else - queue_without_collation.push(SortCursor(&cursors[source_num])); - } - - template - void initQueue(std::priority_queue & queue) - { - for (auto & cursor : cursors) - if (!cursor.empty()) - queue.push(TSortCursor(&cursor)); - } }; } diff --git a/dbms/src/Processors/Transforms/SortingTransform.cpp b/dbms/src/Processors/Transforms/SortingTransform.cpp index ab87591c0d6..30f53742ec0 100644 --- a/dbms/src/Processors/Transforms/SortingTransform.cpp +++ b/dbms/src/Processors/Transforms/SortingTransform.cpp @@ -40,16 +40,12 @@ MergeSorter::MergeSorter(Chunks chunks_, SortDescription & description_, size_t chunks.swap(nonempty_chunks); - if (!has_collation) - { - for (auto & cursor : cursors) - queue_without_collation.push(SortCursor(&cursor)); - } + if (has_collation) + queue_with_collation = SortingHeap(cursors); + else if (description.size() > 1) + queue_without_collation = SortingHeap(cursors); else - { - for (auto & cursor : cursors) - queue_with_collation.push(SortCursorWithCollation(&cursor)); - } + queue_simple = SortingHeap(cursors); } @@ -65,50 +61,61 @@ Chunk MergeSorter::read() return res; } - return !has_collation - ? mergeImpl(queue_without_collation) - : mergeImpl(queue_with_collation); + if (has_collation) + return mergeImpl(queue_with_collation); + else if (description.size() > 1) + return mergeImpl(queue_without_collation); + else + return mergeImpl(queue_simple); } -template -Chunk MergeSorter::mergeImpl(std::priority_queue & queue) +template +Chunk MergeSorter::mergeImpl(TSortingHeap & queue) { size_t num_columns = chunks[0].getNumColumns(); - MutableColumns merged_columns = chunks[0].cloneEmptyColumns(); - /// TODO: reserve (in each column) + + /// Reserve + if (queue.isValid()) + { + /// The expected size of output block is the same as input block + size_t size_to_reserve = chunks[0].getNumRows(); + for (auto & column : merged_columns) + column->reserve(size_to_reserve); + } + + /// TODO: Optimization when a single block left. /// Take rows from queue in right order and push to 'merged'. size_t merged_rows = 0; - while (!queue.empty()) + while (queue.isValid()) { - TSortCursor current = queue.top(); - queue.pop(); + auto current = queue.current(); + /// Append a row from queue. for (size_t i = 0; i < num_columns; ++i) merged_columns[i]->insertFrom(*current->all_columns[i], current->pos); ++total_merged_rows; ++merged_rows; - if (!current->isLast()) - { - current->next(); - queue.push(current); - } - + /// We don't need more rows because of limit has reached. if (limit && total_merged_rows == limit) { chunks.clear(); - return Chunk(std::move(merged_columns), merged_rows); + break; } + queue.next(); + + /// It's enough for current output block but we will continue. if (merged_rows == max_merged_block_size) - return Chunk(std::move(merged_columns), merged_rows); + break; } - chunks.clear(); + if (!queue.isValid()) + chunks.clear(); if (merged_rows == 0) return {}; diff --git a/dbms/src/Processors/Transforms/SortingTransform.h b/dbms/src/Processors/Transforms/SortingTransform.h index 2703501c81a..49bdf303c7f 100644 --- a/dbms/src/Processors/Transforms/SortingTransform.h +++ b/dbms/src/Processors/Transforms/SortingTransform.h @@ -1,10 +1,10 @@ #pragma once + #include #include #include #include #include -#include namespace DB @@ -27,19 +27,19 @@ private: UInt64 limit; size_t total_merged_rows = 0; - using CursorImpls = std::vector; - CursorImpls cursors; + SortCursorImpls cursors; bool has_collation = false; - std::priority_queue queue_without_collation; - std::priority_queue queue_with_collation; + SortingHeap queue_without_collation; + SortingHeap queue_simple; + SortingHeap queue_with_collation; /** Two different cursors are supported - with and without Collation. * Templates are used (instead of virtual functions in SortCursor) for zero-overhead. */ - template - Chunk mergeImpl(std::priority_queue & queue); + template + Chunk mergeImpl(TSortingHeap & queue); };