From de4d7a52974efd43a51d9e9d768849f25148867c Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 22 Dec 2019 01:02:12 +0300 Subject: [PATCH 01/15] Optimization of sorting heap: minor modification that seems to be better --- dbms/src/Core/SortCursor.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Core/SortCursor.h b/dbms/src/Core/SortCursor.h index 5b4db43024f..462778bc8f9 100644 --- a/dbms/src/Core/SortCursor.h +++ b/dbms/src/Core/SortCursor.h @@ -22,8 +22,8 @@ namespace DB */ struct SortCursorImpl { - ColumnRawPtrs all_columns; ColumnRawPtrs sort_columns; + ColumnRawPtrs all_columns; SortDescription desc; size_t sort_columns_size = 0; size_t pos = 0; From eb9d6983c8c3657c9d5a63c1d2ca130f2f1e4c67 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 22 Dec 2019 01:16:39 +0300 Subject: [PATCH 02/15] Added optimization for Processors; added reserve. --- .../MergeSortingBlockInputStream.cpp | 14 ++++- .../MergingSortedBlockInputStream.h | 3 +- .../Transforms/MergingSortedTransform.h | 3 +- .../Transforms/SortingTransform.cpp | 52 +++++++++---------- .../Processors/Transforms/SortingTransform.h | 13 +++-- 5 files changed, 46 insertions(+), 39 deletions(-) diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index 1c50316fc3f..73c46c3d0e4 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -179,9 +179,16 @@ template Block MergeSortingBlocksBlockInputStream::mergeImpl(TSortingHeap & queue) { size_t num_columns = header.columns(); - MutableColumns merged_columns = header.cloneEmptyColumns(); - /// TODO: reserve (in each column) + + /// Reserve + if (queue.isValid() && !blocks.empty()) + { + /// The expected size of output block is the same as input block + size_t size_to_reserve = blocks[0].rows(); + for (auto & column : merged_columns) + column->reserve(size_to_reserve); + } /// Take rows from queue in right order and push to 'merged'. size_t merged_rows = 0; @@ -210,6 +217,9 @@ Block MergeSortingBlocksBlockInputStream::mergeImpl(TSortingHeap & queue) break; } + if (!queue.isValid()) + blocks.clear(); + if (merged_rows == 0) return {}; diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.h b/dbms/src/DataStreams/MergingSortedBlockInputStream.h index beb3c7afc52..9f641fb7599 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.h @@ -109,8 +109,7 @@ protected: size_t num_columns = 0; std::vector source_blocks; - using CursorImpls = std::vector; - CursorImpls cursors; + SortCursorImpls cursors; using Queue = std::priority_queue; Queue queue_without_collation; diff --git a/dbms/src/Processors/Transforms/MergingSortedTransform.h b/dbms/src/Processors/Transforms/MergingSortedTransform.h index b32dd076c5f..594da6819db 100644 --- a/dbms/src/Processors/Transforms/MergingSortedTransform.h +++ b/dbms/src/Processors/Transforms/MergingSortedTransform.h @@ -111,8 +111,7 @@ protected: /// Chunks currently being merged. std::vector source_chunks; - using CursorImpls = std::vector; - CursorImpls cursors; + SortCursorImpls cursors; using Queue = std::priority_queue; Queue queue_without_collation; diff --git a/dbms/src/Processors/Transforms/SortingTransform.cpp b/dbms/src/Processors/Transforms/SortingTransform.cpp index ab87591c0d6..bccd91897e0 100644 --- a/dbms/src/Processors/Transforms/SortingTransform.cpp +++ b/dbms/src/Processors/Transforms/SortingTransform.cpp @@ -41,15 +41,9 @@ MergeSorter::MergeSorter(Chunks chunks_, SortDescription & description_, size_t chunks.swap(nonempty_chunks); if (!has_collation) - { - for (auto & cursor : cursors) - queue_without_collation.push(SortCursor(&cursor)); - } + queue_without_collation = SortingHeap(cursors); else - { - for (auto & cursor : cursors) - queue_with_collation.push(SortCursorWithCollation(&cursor)); - } + queue_with_collation = SortingHeap(cursors); } @@ -66,49 +60,55 @@ Chunk MergeSorter::read() } return !has_collation - ? mergeImpl(queue_without_collation) - : mergeImpl(queue_with_collation); + ? mergeImpl(queue_without_collation) + : mergeImpl(queue_with_collation); } -template -Chunk MergeSorter::mergeImpl(std::priority_queue & queue) +template +Chunk MergeSorter::mergeImpl(TSortingHeap & queue) { size_t num_columns = chunks[0].getNumColumns(); - MutableColumns merged_columns = chunks[0].cloneEmptyColumns(); - /// TODO: reserve (in each column) + + /// Reserve + if (queue.isValid()) + { + /// The expected size of output block is the same as input block + size_t size_to_reserve = chunks[0].getNumRows(); + for (auto & column : merged_columns) + column->reserve(size_to_reserve); + } /// Take rows from queue in right order and push to 'merged'. size_t merged_rows = 0; - while (!queue.empty()) + while (queue.isValid()) { - TSortCursor current = queue.top(); - queue.pop(); + auto current = queue.current(); + /// Append a row from queue. for (size_t i = 0; i < num_columns; ++i) merged_columns[i]->insertFrom(*current->all_columns[i], current->pos); ++total_merged_rows; ++merged_rows; - if (!current->isLast()) - { - current->next(); - queue.push(current); - } - + /// We don't need more rows because of limit has reached. if (limit && total_merged_rows == limit) { chunks.clear(); - return Chunk(std::move(merged_columns), merged_rows); + break; } + queue.next(); + + /// It's enough for current output block but we will continue. if (merged_rows == max_merged_block_size) - return Chunk(std::move(merged_columns), merged_rows); + break; } - chunks.clear(); + if (!queue.isValid()) + chunks.clear(); if (merged_rows == 0) return {}; diff --git a/dbms/src/Processors/Transforms/SortingTransform.h b/dbms/src/Processors/Transforms/SortingTransform.h index 2703501c81a..66ae1759f17 100644 --- a/dbms/src/Processors/Transforms/SortingTransform.h +++ b/dbms/src/Processors/Transforms/SortingTransform.h @@ -1,10 +1,10 @@ #pragma once + #include #include #include #include #include -#include namespace DB @@ -27,19 +27,18 @@ private: UInt64 limit; size_t total_merged_rows = 0; - using CursorImpls = std::vector; - CursorImpls cursors; + SortCursorImpls cursors; bool has_collation = false; - std::priority_queue queue_without_collation; - std::priority_queue queue_with_collation; + SortingHeap queue_without_collation; + SortingHeap queue_with_collation; /** Two different cursors are supported - with and without Collation. * Templates are used (instead of virtual functions in SortCursor) for zero-overhead. */ - template - Chunk mergeImpl(std::priority_queue & queue); + template + Chunk mergeImpl(TSortingHeap & queue); }; From 9f3afed5ffd623daa32a13213f717541e7278c83 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 22 Dec 2019 03:19:07 +0300 Subject: [PATCH 03/15] Optimization of priority queue --- dbms/src/Core/SortCursor.h | 26 ++- .../AggregatingSortedBlockInputStream.cpp | 11 +- .../AggregatingSortedBlockInputStream.h | 2 +- .../CollapsingSortedBlockInputStream.cpp | 11 +- .../CollapsingSortedBlockInputStream.h | 2 +- .../GraphiteRollupSortedBlockInputStream.cpp | 11 +- .../GraphiteRollupSortedBlockInputStream.h | 2 +- .../MergeSortingBlockInputStream.cpp | 2 + .../MergingSortedBlockInputStream.cpp | 204 ++++++++---------- .../MergingSortedBlockInputStream.h | 22 +- .../ReplacingSortedBlockInputStream.cpp | 12 +- .../ReplacingSortedBlockInputStream.h | 2 +- .../SummingSortedBlockInputStream.cpp | 11 +- .../SummingSortedBlockInputStream.h | 4 +- ...sionedCollapsingSortedBlockInputStream.cpp | 11 +- ...ersionedCollapsingSortedBlockInputStream.h | 4 +- .../Transforms/SortingTransform.cpp | 2 + 17 files changed, 150 insertions(+), 189 deletions(-) diff --git a/dbms/src/Core/SortCursor.h b/dbms/src/Core/SortCursor.h index 462778bc8f9..b4e1485e27f 100644 --- a/dbms/src/Core/SortCursor.h +++ b/dbms/src/Core/SortCursor.h @@ -225,7 +225,8 @@ public: size_t size = cursors.size(); queue.reserve(size); for (size_t i = 0; i < size; ++i) - queue.emplace_back(&cursors[i]); + if (!cursors[i].empty()) + queue.emplace_back(&cursors[i]); std::make_heap(queue.begin(), queue.end()); } @@ -233,6 +234,11 @@ public: Cursor & current() { return queue.front(); } + size_t size() { return queue.size(); } + + Cursor & firstChild() { return queue[1]; } + Cursor & secondChild() { return queue[2]; } + void next() { assert(isValid()); @@ -246,6 +252,18 @@ public: removeTop(); } + void replaceTop(Cursor new_top) + { + current() = new_top; + updateTop(); + } + + void removeTop() + { + std::pop_heap(queue.begin(), queue.end()); + queue.pop_back(); + } + private: using Container = std::vector; Container queue; @@ -300,12 +318,6 @@ private: } while (!(*child_it < top)); *curr_it = std::move(top); } - - void removeTop() - { - std::pop_heap(queue.begin(), queue.end()); - queue.pop_back(); - } }; } diff --git a/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp b/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp index 3607d1f917f..d23d93e7e5c 100644 --- a/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp @@ -138,14 +138,14 @@ Block AggregatingSortedBlockInputStream::readImpl() } -void AggregatingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue & queue) +void AggregatingSortedBlockInputStream::merge(MutableColumns & merged_columns, SortingHeap & queue) { size_t merged_rows = 0; /// We take the rows in the correct order and put them in `merged_block`, while the rows are no more than `max_block_size` - while (!queue.empty()) + while (queue.isValid()) { - SortCursor current = queue.top(); + SortCursor current = queue.current(); setPrimaryKeyRef(next_key, current); @@ -167,8 +167,6 @@ void AggregatingSortedBlockInputStream::merge(MutableColumns & merged_columns, s return; } - queue.pop(); - if (key_differs) { current_key.swap(next_key); @@ -202,8 +200,7 @@ void AggregatingSortedBlockInputStream::merge(MutableColumns & merged_columns, s if (!current->isLast()) { - current->next(); - queue.push(current); + queue.next(); } else { diff --git a/dbms/src/DataStreams/AggregatingSortedBlockInputStream.h b/dbms/src/DataStreams/AggregatingSortedBlockInputStream.h index 0cf4bd64d87..6ef1259e458 100644 --- a/dbms/src/DataStreams/AggregatingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/AggregatingSortedBlockInputStream.h @@ -55,7 +55,7 @@ private: /** We support two different cursors - with Collation and without. * Templates are used instead of polymorphic SortCursor and calls to virtual functions. */ - void merge(MutableColumns & merged_columns, std::priority_queue & queue); + void merge(MutableColumns & merged_columns, SortingHeap & queue); /** Extract all states of aggregate functions and merge them with the current group. */ diff --git a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp index 7e4ad04b806..ef82a6d8c5e 100644 --- a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp @@ -105,15 +105,15 @@ Block CollapsingSortedBlockInputStream::readImpl() } -void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue & queue) +void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, SortingHeap & queue) { MergeStopCondition stop_condition(average_block_sizes, max_block_size); size_t current_block_granularity; /// Take rows in correct order and put them into `merged_columns` until the rows no more than `max_block_size` - for (; !queue.empty(); ++current_pos) + for (; queue.isValid(); ++current_pos) { - SortCursor current = queue.top(); + SortCursor current = queue.current(); current_block_granularity = current->rows; if (current_key.empty()) @@ -131,8 +131,6 @@ void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, st return; } - queue.pop(); - if (key_differs) { /// We write data for the previous primary key. @@ -185,8 +183,7 @@ void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, st if (!current->isLast()) { - current->next(); - queue.push(current); + queue.next(); } else { diff --git a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.h b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.h index 7e114e614f6..2b528d27339 100644 --- a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.h @@ -73,7 +73,7 @@ private: /** We support two different cursors - with Collation and without. * Templates are used instead of polymorphic SortCursors and calls to virtual functions. */ - void merge(MutableColumns & merged_columns, std::priority_queue & queue); + void merge(MutableColumns & merged_columns, SortingHeap & queue); /// Output to result rows for the current primary key. void insertRows(MutableColumns & merged_columns, size_t block_size, MergeStopCondition & condition); diff --git a/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.cpp b/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.cpp index 340e10df12f..64a0d52c1aa 100644 --- a/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.cpp @@ -161,7 +161,7 @@ Block GraphiteRollupSortedBlockInputStream::readImpl() } -void GraphiteRollupSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue & queue) +void GraphiteRollupSortedBlockInputStream::merge(MutableColumns & merged_columns, SortingHeap & queue) { const DateLUTImpl & date_lut = DateLUT::instance(); @@ -173,9 +173,9 @@ void GraphiteRollupSortedBlockInputStream::merge(MutableColumns & merged_columns /// contribute towards current output row. /// Variables starting with next_* refer to the row at the top of the queue. - while (!queue.empty()) + while (queue.isValid()) { - SortCursor next_cursor = queue.top(); + SortCursor next_cursor = queue.current(); StringRef next_path = next_cursor->all_columns[path_column_num]->getDataAt(next_cursor->pos); bool new_path = is_first || next_path != current_group_path; @@ -253,12 +253,9 @@ void GraphiteRollupSortedBlockInputStream::merge(MutableColumns & merged_columns current_group_path = next_path; } - queue.pop(); - if (!next_cursor->isLast()) { - next_cursor->next(); - queue.push(next_cursor); + queue.next(); } else { diff --git a/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.h b/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.h index 533c267ff02..0dfdf7c300c 100644 --- a/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.h +++ b/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.h @@ -225,7 +225,7 @@ private: UInt32 selectPrecision(const Graphite::Retentions & retentions, time_t time) const; - void merge(MutableColumns & merged_columns, std::priority_queue & queue); + void merge(MutableColumns & merged_columns, SortingHeap & queue); /// Insert the values into the resulting columns, which will not be changed in the future. template diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index 73c46c3d0e4..8664b62bf75 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -190,6 +190,8 @@ Block MergeSortingBlocksBlockInputStream::mergeImpl(TSortingHeap & queue) column->reserve(size_to_reserve); } + /// TODO: Optimization when a single block left. + /// Take rows from queue in right order and push to 'merged'. size_t merged_rows = 0; while (queue.isValid()) diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp index 8c0707e09b0..5f1b5cc927a 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp @@ -58,10 +58,10 @@ void MergingSortedBlockInputStream::init(MutableColumns & merged_columns) has_collation |= cursors[i].has_collation; } - if (has_collation) - initQueue(queue_with_collation); + if (!has_collation) + queue_without_collation = SortingHeap(cursors); else - initQueue(queue_without_collation); + queue_with_collation = SortingHeap(cursors); } /// Let's check that all source blocks have the same structure. @@ -82,15 +82,6 @@ void MergingSortedBlockInputStream::init(MutableColumns & merged_columns) } -template -void MergingSortedBlockInputStream::initQueue(std::priority_queue & queue) -{ - for (size_t i = 0; i < cursors.size(); ++i) - if (!cursors[i].empty()) - queue.push(TSortCursor(&cursors[i])); -} - - Block MergingSortedBlockInputStream::readImpl() { if (finished) @@ -115,7 +106,7 @@ Block MergingSortedBlockInputStream::readImpl() template -void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current, std::priority_queue & queue) +void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current, SortingHeap & queue) { size_t order = current->order; size_t size = cursors.size(); @@ -125,15 +116,19 @@ void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current, while (true) { - source_blocks[order] = new detail::SharedBlock(children[order]->read()); + source_blocks[order] = new detail::SharedBlock(children[order]->read()); /// intrusive ptr if (!*source_blocks[order]) + { + queue.removeTop(); break; + } if (source_blocks[order]->rows()) { cursors[order].reset(*source_blocks[order]); - queue.push(TSortCursor(&cursors[order])); + queue.replaceTop(&cursors[order]); + source_blocks[order]->all_columns = cursors[order].all_columns; source_blocks[order]->sort_columns = cursors[order].sort_columns; break; @@ -154,19 +149,14 @@ bool MergingSortedBlockInputStream::MergeStopCondition::checkStop() const return sum_rows_count >= average; } -template -void MergingSortedBlockInputStream::fetchNextBlock(const SortCursor & current, std::priority_queue & queue); -template -void MergingSortedBlockInputStream::fetchNextBlock(const SortCursorWithCollation & current, std::priority_queue & queue); - - -template -void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue & queue) +template +void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, TSortingHeap & queue) { size_t merged_rows = 0; MergeStopCondition stop_condition(average_block_sizes, max_block_size); + /** Increase row counters. * Return true if it's time to finish generating the current data block. */ @@ -186,123 +176,101 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std:: return stop_condition.checkStop(); }; - /// Take rows in required order and put them into `merged_columns`, while the rows are no more than `max_block_size` - while (!queue.empty()) + /// Take rows in required order and put them into `merged_columns`, while the number of rows are no more than `max_block_size` + while (queue.isValid()) { - TSortCursor current = queue.top(); + auto current = queue.current(); size_t current_block_granularity = current->rows; - queue.pop(); - while (true) + /** And what if the block is totally less or equal than the rest for the current cursor? + * Or is there only one data source left in the queue? Then you can take the entire block on current cursor. + */ + if (current->isFirst() + && (queue.size() == 1 + || (queue.size() == 2 && current.totallyLessOrEquals(queue.firstChild())) + || (queue.size() == 3 && current.totallyLessOrEquals(queue.firstChild()) && current.totallyLessOrEquals(queue.secondChild())))) { - /** And what if the block is totally less or equal than the rest for the current cursor? - * Or is there only one data source left in the queue? Then you can take the entire block on current cursor. - */ - if (current->isFirst() && (queue.empty() || current.totallyLessOrEquals(queue.top()))) +// std::cerr << "current block is totally less or equals\n"; + + /// If there are already data in the current block, we first return it. We'll get here again the next time we call the merge function. + if (merged_rows != 0) { - // std::cerr << "current block is totally less or equals\n"; - - /// If there are already data in the current block, we first return it. We'll get here again the next time we call the merge function. - if (merged_rows != 0) - { - //std::cerr << "merged rows is non-zero\n"; - queue.push(current); - return; - } - - /// Actually, current->order stores source number (i.e. cursors[current->order] == current) - size_t source_num = current->order; - - if (source_num >= cursors.size()) - throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR); - - for (size_t i = 0; i < num_columns; ++i) - merged_columns[i] = (*std::move(source_blocks[source_num]->getByPosition(i).column)).mutate(); - - // std::cerr << "copied columns\n"; - - merged_rows = merged_columns.at(0)->size(); - - /// Limit output - if (limit && total_merged_rows + merged_rows > limit) - { - merged_rows = limit - total_merged_rows; - for (size_t i = 0; i < num_columns; ++i) - { - auto & column = merged_columns[i]; - column = (*column->cut(0, merged_rows)).mutate(); - } - - cancel(false); - finished = true; - } - - /// Write order of rows for other columns - /// this data will be used in grather stream - if (out_row_sources_buf) - { - RowSourcePart row_source(source_num); - for (size_t i = 0; i < merged_rows; ++i) - out_row_sources_buf->write(row_source.data); - } - - //std::cerr << "fetching next block\n"; - - total_merged_rows += merged_rows; - fetchNextBlock(current, queue); + //std::cerr << "merged rows is non-zero\n"; return; } - // std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n"; - // std::cerr << "Inserting row\n"; - for (size_t i = 0; i < num_columns; ++i) - merged_columns[i]->insertFrom(*current->all_columns[i], current->pos); + /// Actually, current->order stores source number (i.e. cursors[current->order] == current) + size_t source_num = current->order; + if (source_num >= cursors.size()) + throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR); + + for (size_t i = 0; i < num_columns; ++i) + merged_columns[i] = (*std::move(source_blocks[source_num]->getByPosition(i).column)).mutate(); + +// std::cerr << "copied columns\n"; + + merged_rows = merged_columns.at(0)->size(); + + /// Limit output + if (limit && total_merged_rows + merged_rows > limit) + { + merged_rows = limit - total_merged_rows; + for (size_t i = 0; i < num_columns; ++i) + { + auto & column = merged_columns[i]; + column = (*column->cut(0, merged_rows)).mutate(); + } + + cancel(false); + finished = true; + } + + /// Write order of rows for other columns + /// this data will be used in grather stream if (out_row_sources_buf) { - /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl) - RowSourcePart row_source(current->order); - out_row_sources_buf->write(row_source.data); + RowSourcePart row_source(source_num); + for (size_t i = 0; i < merged_rows; ++i) + out_row_sources_buf->write(row_source.data); } - if (!current->isLast()) - { - // std::cerr << "moving to next row\n"; - current->next(); + //std::cerr << "fetching next block\n"; - if (queue.empty() || !(current.greater(queue.top()))) - { - if (count_row_and_check_limit(current_block_granularity)) - { - // std::cerr << "pushing back to queue\n"; - queue.push(current); - return; - } + total_merged_rows += merged_rows; + fetchNextBlock(current, queue); + return; + } - /// Do not put the cursor back in the queue, but continue to work with the current cursor. - // std::cerr << "current is still on top, using current row\n"; - continue; - } - else - { - // std::cerr << "next row is not least, pushing back to queue\n"; - queue.push(current); - } - } - else - { - /// We get the next block from the corresponding source, if there is one. - // std::cerr << "It was last row, fetching next block\n"; - fetchNextBlock(current, queue); - } +// std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n"; +// std::cerr << "Inserting row\n"; + for (size_t i = 0; i < num_columns; ++i) + merged_columns[i]->insertFrom(*current->all_columns[i], current->pos); - break; + if (out_row_sources_buf) + { + /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl) + RowSourcePart row_source(current->order); + out_row_sources_buf->write(row_source.data); + } + + if (!current->isLast()) + { +// std::cerr << "moving to next row\n"; + queue.next(); + } + else + { + /// We get the next block from the corresponding source, if there is one. +// std::cerr << "It was last row, fetching next block\n"; + fetchNextBlock(current, queue); } if (count_row_and_check_limit(current_block_granularity)) return; } + /// We have read all data. Ask childs to cancel providing more data. cancel(false); finished = true; } diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.h b/dbms/src/DataStreams/MergingSortedBlockInputStream.h index 9f641fb7599..e6c2b257013 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.h @@ -1,7 +1,5 @@ #pragma once -#include - #include #include @@ -87,7 +85,7 @@ protected: /// Gets the next block from the source corresponding to the `current`. template - void fetchNextBlock(const TSortCursor & current, std::priority_queue & queue); + void fetchNextBlock(const TSortCursor & current, SortingHeap & queue); Block header; @@ -111,11 +109,8 @@ protected: SortCursorImpls cursors; - using Queue = std::priority_queue; - Queue queue_without_collation; - - using QueueWithCollation = std::priority_queue; - QueueWithCollation queue_with_collation; + SortingHeap queue_without_collation; + SortingHeap queue_with_collation; /// Used in Vertical merge algorithm to gather non-PK/non-index columns (on next step) /// If it is not nullptr then it should be populated during execution @@ -176,13 +171,10 @@ protected: private: /** We support two different cursors - with Collation and without. - * Templates are used instead of polymorphic SortCursor and calls to virtual functions. - */ - template - void initQueue(std::priority_queue & queue); - - template - void merge(MutableColumns & merged_columns, std::priority_queue & queue); + * Templates are used instead of polymorphic SortCursor and calls to virtual functions. + */ + template + void merge(MutableColumns & merged_columns, TSortingHeap & queue); Logger * log = &Logger::get("MergingSortedBlockInputStream"); diff --git a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp index e2e99815b93..967b4ebb046 100644 --- a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp @@ -48,13 +48,14 @@ Block ReplacingSortedBlockInputStream::readImpl() } -void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue & queue) +void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, SortingHeap & queue) { MergeStopCondition stop_condition(average_block_sizes, max_block_size); + /// Take the rows in needed order and put them into `merged_columns` until rows no more than `max_block_size` - while (!queue.empty()) + while (queue.isValid()) { - SortCursor current = queue.top(); + SortCursor current = queue.current(); size_t current_block_granularity = current->rows; if (current_key.empty()) @@ -68,8 +69,6 @@ void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, std if (key_differs && stop_condition.checkStop()) return; - queue.pop(); - if (key_differs) { /// Write the data for the previous primary key. @@ -98,8 +97,7 @@ void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, std if (!current->isLast()) { - current->next(); - queue.push(current); + queue.next(); } else { diff --git a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h index 7d85542520d..22920c2eb20 100644 --- a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h @@ -52,7 +52,7 @@ private: /// Sources of rows with the current primary key. PODArray current_row_sources; - void merge(MutableColumns & merged_columns, std::priority_queue & queue); + void merge(MutableColumns & merged_columns, SortingHeap & queue); /// Output into result the rows for current primary key. void insertRow(MutableColumns & merged_columns); diff --git a/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp b/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp index 9ac7d6a3397..fe29dc55916 100644 --- a/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp @@ -314,14 +314,14 @@ Block SummingSortedBlockInputStream::readImpl() } -void SummingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue & queue) +void SummingSortedBlockInputStream::merge(MutableColumns & merged_columns, SortingHeap & queue) { merged_rows = 0; /// Take the rows in needed order and put them in `merged_columns` until rows no more than `max_block_size` - while (!queue.empty()) + while (queue.isValid()) { - SortCursor current = queue.top(); + SortCursor current = queue.current(); setPrimaryKeyRef(next_key, current); @@ -383,12 +383,9 @@ void SummingSortedBlockInputStream::merge(MutableColumns & merged_columns, std:: current_row_is_zero = false; } - queue.pop(); - if (!current->isLast()) { - current->next(); - queue.push(current); + queue.next(); } else { diff --git a/dbms/src/DataStreams/SummingSortedBlockInputStream.h b/dbms/src/DataStreams/SummingSortedBlockInputStream.h index 4412e5529f8..fc02d36d3fd 100644 --- a/dbms/src/DataStreams/SummingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/SummingSortedBlockInputStream.h @@ -1,5 +1,7 @@ #pragma once +#include + #include #include #include @@ -140,7 +142,7 @@ private: /** We support two different cursors - with Collation and without. * Templates are used instead of polymorphic SortCursor and calls to virtual functions. */ - void merge(MutableColumns & merged_columns, std::priority_queue & queue); + void merge(MutableColumns & merged_columns, SortingHeap & queue); /// Insert the summed row for the current group into the result and updates some of per-block flags if the row is not "zero". void insertCurrentRowIfNeeded(MutableColumns & merged_columns); diff --git a/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.cpp b/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.cpp index 4dda97597bd..de6f7027243 100644 --- a/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.cpp @@ -82,21 +82,18 @@ Block VersionedCollapsingSortedBlockInputStream::readImpl() } -void VersionedCollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue & queue) +void VersionedCollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, SortingHeap & queue) { MergeStopCondition stop_condition(average_block_sizes, max_block_size); auto update_queue = [this, & queue](SortCursor & cursor) { - queue.pop(); - if (out_row_sources_buf) current_row_sources.emplace(cursor->order, true); if (!cursor->isLast()) { - cursor->next(); - queue.push(cursor); + queue.next(); } else { @@ -106,9 +103,9 @@ void VersionedCollapsingSortedBlockInputStream::merge(MutableColumns & merged_co }; /// Take rows in correct order and put them into `merged_columns` until the rows no more than `max_block_size` - while (!queue.empty()) + while (queue.isValid()) { - SortCursor current = queue.top(); + SortCursor current = queue.current(); size_t current_block_granularity = current->rows; SharedBlockRowRef next_key; diff --git a/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.h b/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.h index f79b564063d..c64972d9266 100644 --- a/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.h @@ -5,7 +5,7 @@ #include #include -#include +#include namespace DB @@ -204,7 +204,7 @@ private: /// Sources of rows for VERTICAL merge algorithm. Size equals to (size + number of gaps) in current_keys. std::queue current_row_sources; - void merge(MutableColumns & merged_columns, std::priority_queue & queue); + void merge(MutableColumns & merged_columns, SortingHeap & queue); /// Output to result row for the current primary key. void insertRow(size_t skip_rows, const SharedBlockRowRef & row, MutableColumns & merged_columns); diff --git a/dbms/src/Processors/Transforms/SortingTransform.cpp b/dbms/src/Processors/Transforms/SortingTransform.cpp index bccd91897e0..4b075ca0ae9 100644 --- a/dbms/src/Processors/Transforms/SortingTransform.cpp +++ b/dbms/src/Processors/Transforms/SortingTransform.cpp @@ -80,6 +80,8 @@ Chunk MergeSorter::mergeImpl(TSortingHeap & queue) column->reserve(size_to_reserve); } + /// TODO: Optimization when a single block left. + /// Take rows from queue in right order and push to 'merged'. size_t merged_rows = 0; while (queue.isValid()) From 401c5eef811c9bd255b7503d3a29d2074e2bfe57 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 22 Dec 2019 04:37:56 +0300 Subject: [PATCH 04/15] Attempt to optimize merging sorted blocks --- dbms/src/Core/SortCursor.h | 103 +++++++++--------- .../MergingSortedBlockInputStream.cpp | 8 +- .../MergingSortedBlockInputStream.h | 1 + 3 files changed, 61 insertions(+), 51 deletions(-) diff --git a/dbms/src/Core/SortCursor.h b/dbms/src/Core/SortCursor.h index b4e1485e27f..dcc36df8292 100644 --- a/dbms/src/Core/SortCursor.h +++ b/dbms/src/Core/SortCursor.h @@ -110,21 +110,52 @@ using SortCursorImpls = std::vector; /// For easy copying. -struct SortCursor +template +struct SortCursorHelper { SortCursorImpl * impl; - SortCursor(SortCursorImpl * impl_) : impl(impl_) {} + const Derived & derived() const { return static_cast(*this); } + + SortCursorHelper(SortCursorImpl * impl_) : impl(impl_) {} SortCursorImpl * operator-> () { return impl; } const SortCursorImpl * operator-> () const { return impl; } + bool greater(const SortCursorHelper & rhs) const + { + return derived().greaterAt(rhs.derived(), impl->pos, rhs.impl->pos); + } + + /// Inverted so that the priority queue elements are removed in ascending order. + bool operator< (const SortCursorHelper & rhs) const + { + return derived().greater(rhs.derived()); + } + + /// Checks that all rows in the current block of this cursor are less than or equal to all the rows of the current block of another cursor. + bool totallyLessOrEquals(const SortCursorHelper & rhs) const + { + if (impl->rows == 0 || rhs.impl->rows == 0) + return false; + + /// The last row of this cursor is no larger than the first row of the another cursor. + return !derived().greaterAt(rhs.derived(), impl->rows - 1, 0); + } +}; + + +struct SortCursor : SortCursorHelper +{ + using SortCursorHelper::SortCursorHelper; + /// The specified row of this cursor is greater than the specified row of another cursor. bool greaterAt(const SortCursor & rhs, size_t lhs_pos, size_t rhs_pos) const { for (size_t i = 0; i < impl->sort_columns_size; ++i) { - int direction = impl->desc[i].direction; - int nulls_direction = impl->desc[i].nulls_direction; + const auto & desc = impl->desc[i]; + int direction = desc.direction; + int nulls_direction = desc.nulls_direction; int res = direction * impl->sort_columns[i]->compareAt(lhs_pos, rhs_pos, *(rhs.impl->sort_columns[i]), nulls_direction); if (res > 0) return true; @@ -133,45 +164,37 @@ struct SortCursor } return impl->order > rhs.impl->order; } +}; - /// Checks that all rows in the current block of this cursor are less than or equal to all the rows of the current block of another cursor. - bool totallyLessOrEquals(const SortCursor & rhs) const + +/// For the case with a single column and when there is no order between different cursors. +struct SimpleSortCursor : SortCursorHelper +{ + using SortCursorHelper::SortCursorHelper; + + bool greaterAt(const SimpleSortCursor & rhs, size_t lhs_pos, size_t rhs_pos) const { - if (impl->rows == 0 || rhs.impl->rows == 0) - return false; - - /// The last row of this cursor is no larger than the first row of the another cursor. - return !greaterAt(rhs, impl->rows - 1, 0); - } - - bool greater(const SortCursor & rhs) const - { - return greaterAt(rhs, impl->pos, rhs.impl->pos); - } - - /// Inverted so that the priority queue elements are removed in ascending order. - bool operator< (const SortCursor & rhs) const - { - return greater(rhs); + const auto & desc = impl->desc[0]; + int direction = desc.direction; + int nulls_direction = desc.nulls_direction; + int res = impl->sort_columns[0]->compareAt(lhs_pos, rhs_pos, *(rhs.impl->sort_columns[0]), nulls_direction); + return (res > 0) ^ (direction > 0); } }; /// Separate comparator for locale-sensitive string comparisons -struct SortCursorWithCollation +struct SortCursorWithCollation : SortCursorHelper { - SortCursorImpl * impl; - - SortCursorWithCollation(SortCursorImpl * impl_) : impl(impl_) {} - SortCursorImpl * operator-> () { return impl; } - const SortCursorImpl * operator-> () const { return impl; } + using SortCursorHelper::SortCursorHelper; bool greaterAt(const SortCursorWithCollation & rhs, size_t lhs_pos, size_t rhs_pos) const { for (size_t i = 0; i < impl->sort_columns_size; ++i) { - int direction = impl->desc[i].direction; - int nulls_direction = impl->desc[i].nulls_direction; + const auto & desc = impl->desc[i]; + int direction = desc.direction; + int nulls_direction = desc.nulls_direction; int res; if (impl->need_collation[i]) { @@ -189,29 +212,11 @@ struct SortCursorWithCollation } return impl->order > rhs.impl->order; } - - bool totallyLessOrEquals(const SortCursorWithCollation & rhs) const - { - if (impl->rows == 0 || rhs.impl->rows == 0) - return false; - - /// The last row of this cursor is no larger than the first row of the another cursor. - return !greaterAt(rhs, impl->rows - 1, 0); - } - - bool greater(const SortCursorWithCollation & rhs) const - { - return greaterAt(rhs, impl->pos, rhs.impl->pos); - } - - bool operator< (const SortCursorWithCollation & rhs) const - { - return greater(rhs); - } }; /** Allows to fetch data from multiple sort cursors in sorted order (merging sorted data streams). + * TODO: Replace with "Loser Tree", see https://en.wikipedia.org/wiki/K-way_merge_algorithm */ template class SortingHeap diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp index 5f1b5cc927a..57742fe1e35 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp @@ -60,8 +60,10 @@ void MergingSortedBlockInputStream::init(MutableColumns & merged_columns) if (!has_collation) queue_without_collation = SortingHeap(cursors); - else + else if (description.size() > 1) queue_with_collation = SortingHeap(cursors); + else + queue_simple = SortingHeap(cursors); } /// Let's check that all source blocks have the same structure. @@ -98,8 +100,10 @@ Block MergingSortedBlockInputStream::readImpl() if (has_collation) merge(merged_columns, queue_with_collation); - else + else if (description.size() > 1) merge(merged_columns, queue_without_collation); + else + merge(merged_columns, queue_simple); return header.cloneWithColumns(std::move(merged_columns)); } diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.h b/dbms/src/DataStreams/MergingSortedBlockInputStream.h index e6c2b257013..6dd8c2d6cab 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.h @@ -110,6 +110,7 @@ protected: SortCursorImpls cursors; SortingHeap queue_without_collation; + SortingHeap queue_simple; SortingHeap queue_with_collation; /// Used in Vertical merge algorithm to gather non-PK/non-index columns (on next step) From 48505446af4947f1f146d649b122bfbf63d6fe3d Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 22 Dec 2019 04:49:38 +0300 Subject: [PATCH 05/15] Addition to prev. revision --- .../DataStreams/MergeSortingBlockInputStream.cpp | 15 ++++++++++----- .../DataStreams/MergeSortingBlockInputStream.h | 1 + .../DataStreams/MergingSortedBlockInputStream.cpp | 6 +++--- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index 8664b62bf75..52f85f1349c 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -150,10 +150,12 @@ MergeSortingBlocksBlockInputStream::MergeSortingBlocksBlockInputStream( blocks.swap(nonempty_blocks); - if (!has_collation) + if (has_collation) + queue_with_collation = SortingHeap(cursors); + else if (description.size() > 1) queue_without_collation = SortingHeap(cursors); else - queue_with_collation = SortingHeap(cursors); + queue_simple = SortingHeap(cursors); } @@ -169,9 +171,12 @@ Block MergeSortingBlocksBlockInputStream::readImpl() return res; } - return !has_collation - ? mergeImpl(queue_without_collation) - : mergeImpl(queue_with_collation); + if (has_collation) + return mergeImpl(queue_with_collation); + else if (description.size() > 1) + return mergeImpl(queue_without_collation); + else + return mergeImpl(queue_simple); } diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.h b/dbms/src/DataStreams/MergeSortingBlockInputStream.h index 9492bdb074b..ce82f6bb120 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.h +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.h @@ -59,6 +59,7 @@ private: bool has_collation = false; SortingHeap queue_without_collation; + SortingHeap queue_simple; SortingHeap queue_with_collation; /** Two different cursors are supported - with and without Collation. diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp index 57742fe1e35..7f6bf6a0d90 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp @@ -58,10 +58,10 @@ void MergingSortedBlockInputStream::init(MutableColumns & merged_columns) has_collation |= cursors[i].has_collation; } - if (!has_collation) - queue_without_collation = SortingHeap(cursors); - else if (description.size() > 1) + if (has_collation) queue_with_collation = SortingHeap(cursors); + else if (description.size() > 1) + queue_without_collation = SortingHeap(cursors); else queue_simple = SortingHeap(cursors); } From b4ba1becd409f528ac6309f086d15a70ac290c64 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 22 Dec 2019 12:53:31 +0300 Subject: [PATCH 06/15] Removed wrong code from MergingSortedBlockInputStream --- dbms/src/DataStreams/MergingSortedBlockInputStream.cpp | 8 ++------ dbms/src/DataStreams/MergingSortedBlockInputStream.h | 1 - 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp index 7f6bf6a0d90..367c9c83697 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp @@ -60,10 +60,8 @@ void MergingSortedBlockInputStream::init(MutableColumns & merged_columns) if (has_collation) queue_with_collation = SortingHeap(cursors); - else if (description.size() > 1) - queue_without_collation = SortingHeap(cursors); else - queue_simple = SortingHeap(cursors); + queue_without_collation = SortingHeap(cursors); } /// Let's check that all source blocks have the same structure. @@ -100,10 +98,8 @@ Block MergingSortedBlockInputStream::readImpl() if (has_collation) merge(merged_columns, queue_with_collation); - else if (description.size() > 1) - merge(merged_columns, queue_without_collation); else - merge(merged_columns, queue_simple); + merge(merged_columns, queue_without_collation); return header.cloneWithColumns(std::move(merged_columns)); } diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.h b/dbms/src/DataStreams/MergingSortedBlockInputStream.h index 6dd8c2d6cab..e6c2b257013 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.h @@ -110,7 +110,6 @@ protected: SortCursorImpls cursors; SortingHeap queue_without_collation; - SortingHeap queue_simple; SortingHeap queue_with_collation; /// Used in Vertical merge algorithm to gather non-PK/non-index columns (on next step) From 53606c6e8f11203438b83f50c08407c7bf6c7ec8 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 22 Dec 2019 12:55:43 +0300 Subject: [PATCH 07/15] Using optimization of single column sort in Processors --- .../Processors/Transforms/SortingTransform.cpp | 15 ++++++++++----- dbms/src/Processors/Transforms/SortingTransform.h | 1 + 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/dbms/src/Processors/Transforms/SortingTransform.cpp b/dbms/src/Processors/Transforms/SortingTransform.cpp index 4b075ca0ae9..30f53742ec0 100644 --- a/dbms/src/Processors/Transforms/SortingTransform.cpp +++ b/dbms/src/Processors/Transforms/SortingTransform.cpp @@ -40,10 +40,12 @@ MergeSorter::MergeSorter(Chunks chunks_, SortDescription & description_, size_t chunks.swap(nonempty_chunks); - if (!has_collation) + if (has_collation) + queue_with_collation = SortingHeap(cursors); + else if (description.size() > 1) queue_without_collation = SortingHeap(cursors); else - queue_with_collation = SortingHeap(cursors); + queue_simple = SortingHeap(cursors); } @@ -59,9 +61,12 @@ Chunk MergeSorter::read() return res; } - return !has_collation - ? mergeImpl(queue_without_collation) - : mergeImpl(queue_with_collation); + if (has_collation) + return mergeImpl(queue_with_collation); + else if (description.size() > 1) + return mergeImpl(queue_without_collation); + else + return mergeImpl(queue_simple); } diff --git a/dbms/src/Processors/Transforms/SortingTransform.h b/dbms/src/Processors/Transforms/SortingTransform.h index 66ae1759f17..49bdf303c7f 100644 --- a/dbms/src/Processors/Transforms/SortingTransform.h +++ b/dbms/src/Processors/Transforms/SortingTransform.h @@ -32,6 +32,7 @@ private: bool has_collation = false; SortingHeap queue_without_collation; + SortingHeap queue_simple; SortingHeap queue_with_collation; /** Two different cursors are supported - with and without Collation. From 7ffa78ee95c7414cb7e3ba9f9ef606bc010fceb9 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 22 Dec 2019 14:00:30 +0300 Subject: [PATCH 08/15] Fixed error --- dbms/src/DataStreams/MergingSortedBlockInputStream.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp index 367c9c83697..c36b35ce15c 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp @@ -188,7 +188,7 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, TSort if (current->isFirst() && (queue.size() == 1 || (queue.size() == 2 && current.totallyLessOrEquals(queue.firstChild())) - || (queue.size() == 3 && current.totallyLessOrEquals(queue.firstChild()) && current.totallyLessOrEquals(queue.secondChild())))) + || (queue.size() >= 3 && current.totallyLessOrEquals(queue.firstChild()) && current.totallyLessOrEquals(queue.secondChild())))) { // std::cerr << "current block is totally less or equals\n"; From bd7eed1204b3d453ca11703f0510eb138be68fb0 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 22 Dec 2019 14:13:10 +0300 Subject: [PATCH 09/15] Cache comparison between next elements in heap --- dbms/src/Core/SortCursor.h | 44 +++++++++++++++---- .../MergingSortedBlockInputStream.cpp | 3 +- 2 files changed, 37 insertions(+), 10 deletions(-) diff --git a/dbms/src/Core/SortCursor.h b/dbms/src/Core/SortCursor.h index dcc36df8292..7c13ee3b058 100644 --- a/dbms/src/Core/SortCursor.h +++ b/dbms/src/Core/SortCursor.h @@ -241,8 +241,20 @@ public: size_t size() { return queue.size(); } - Cursor & firstChild() { return queue[1]; } - Cursor & secondChild() { return queue[2]; } + Cursor & nextChild() + { + if (next_idx) + { + return queue[next_idx]; + } + else + { + next_idx = 1; + if (queue.size() > 2 && queue[1] < queue[2]) + ++next_idx; + return queue[next_idx]; + } + } void next() { @@ -273,30 +285,46 @@ private: using Container = std::vector; Container queue; + /// Cache comparison between first and second child if the order in queue has not been changed. + size_t next_idx = 0; + /// This is adapted version of the function __sift_down from libc++. /// Why cannot simply use std::priority_queue? /// - because it doesn't support updating the top element and requires pop and push instead. + /// Also look at "Boost.Heap" library. void updateTop() { size_t size = queue.size(); if (size < 2) return; - size_t child_idx = 1; auto begin = queue.begin(); - auto child_it = begin + 1; + typename Container::iterator child_it; - /// Right child exists and is greater than left child. - if (size > 2 && *child_it < *(child_it + 1)) + if (next_idx) { - ++child_it; - ++child_idx; + child_it = begin + next_idx; + } + else + { + next_idx = 1; + child_it = begin + next_idx; + + /// Right child exists and is greater than left child. + if (size > 2 && *child_it < *(child_it + 1)) + { + ++child_it; + ++next_idx; + } } /// Check if we are in order. if (*child_it < *begin) return; + size_t child_idx = next_idx; + next_idx = 0; + auto curr_it = begin; auto top(std::move(*begin)); do diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp index c36b35ce15c..3614d9c1d66 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp @@ -187,8 +187,7 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, TSort */ if (current->isFirst() && (queue.size() == 1 - || (queue.size() == 2 && current.totallyLessOrEquals(queue.firstChild())) - || (queue.size() >= 3 && current.totallyLessOrEquals(queue.firstChild()) && current.totallyLessOrEquals(queue.secondChild())))) + || (queue.size() >= 2 && current.totallyLessOrEquals(queue.nextChild())))) { // std::cerr << "current block is totally less or equals\n"; From 8613b90d84b34b25e5fc219b9902222685ef5273 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 22 Dec 2019 17:22:42 +0300 Subject: [PATCH 10/15] Using SortingHeap in MergingSortedTransform --- dbms/src/Core/SortCursor.h | 7 ++ .../Transforms/MergingSortedTransform.cpp | 110 ++++++++---------- .../Transforms/MergingSortedTransform.h | 29 +---- 3 files changed, 62 insertions(+), 84 deletions(-) diff --git a/dbms/src/Core/SortCursor.h b/dbms/src/Core/SortCursor.h index 7c13ee3b058..a5d7dbd1f05 100644 --- a/dbms/src/Core/SortCursor.h +++ b/dbms/src/Core/SortCursor.h @@ -281,6 +281,13 @@ public: queue.pop_back(); } + void push(SortCursorImpl & cursor) + { + queue.emplace_back(&cursor); + std::push_heap(queue.begin(), queue.end()); + next_idx = 0; + } + private: using Container = std::vector; Container queue; diff --git a/dbms/src/Processors/Transforms/MergingSortedTransform.cpp b/dbms/src/Processors/Transforms/MergingSortedTransform.cpp index d8f06a7fe4a..85f54d23fe3 100644 --- a/dbms/src/Processors/Transforms/MergingSortedTransform.cpp +++ b/dbms/src/Processors/Transforms/MergingSortedTransform.cpp @@ -145,9 +145,9 @@ IProcessor::Status MergingSortedTransform::prepare() return Status::NeedData; if (has_collation) - initQueue(queue_with_collation); + queue_with_collation = SortingHeap(cursors); else - initQueue(queue_without_collation); + queue_without_collation = SortingHeap(cursors); is_initialized = true; return Status::Ready; @@ -166,7 +166,6 @@ IProcessor::Status MergingSortedTransform::prepare() if (need_data) { - auto & input = *std::next(inputs.begin(), next_input_to_read); if (!input.isFinished()) { @@ -180,7 +179,10 @@ IProcessor::Status MergingSortedTransform::prepare() return Status::NeedData; updateCursor(std::move(chunk), next_input_to_read); - pushToQueue(next_input_to_read); + if (has_collation) + queue_with_collation.push(cursors[next_input_to_read]); + else + queue_without_collation.push(cursors[next_input_to_read]); need_data = false; } } @@ -197,8 +199,8 @@ void MergingSortedTransform::work() merge(queue_without_collation); } -template -void MergingSortedTransform::merge(std::priority_queue & queue) +template +void MergingSortedTransform::merge(TSortingHeap & queue) { /// Returns MergeStatus which we should return if we are going to finish now. auto can_read_another_row = [&, this]() @@ -220,77 +222,65 @@ void MergingSortedTransform::merge(std::priority_queue & queue) }; /// Take rows in required order and put them into `merged_data`, while the rows are no more than `max_block_size` - while (!queue.empty()) + while (queue.isValid()) { /// Shouldn't happen at first iteration, but check just in case. if (!can_read_another_row()) return; - TSortCursor current = queue.top(); - queue.pop(); - bool first_iteration = true; + auto current = queue.current(); - while (true) + /** And what if the block is totally less or equal than the rest for the current cursor? + * Or is there only one data source left in the queue? Then you can take the entire block on current cursor. + */ + if (current.impl->isFirst() + && (queue.size() == 1 + || (queue.size() >= 2 && current.totallyLessOrEquals(queue.nextChild())))) { - if (!first_iteration && !can_read_another_row()) + //std::cerr << "current block is totally less or equals\n"; + + /// If there are already data in the current block, we first return it. We'll get here again the next time we call the merge function. + if (merged_data.mergedRows() != 0) { - queue.push(current); - return; - } - first_iteration = false; - - /** And what if the block is totally less or equal than the rest for the current cursor? - * Or is there only one data source left in the queue? Then you can take the entire block on current cursor. - */ - if (current.impl->isFirst() && (queue.empty() || current.totallyLessOrEquals(queue.top()))) - { - //std::cerr << "current block is totally less or equals\n"; - - /// If there are already data in the current block, we first return it. We'll get here again the next time we call the merge function. - if (merged_data.mergedRows() != 0) - { - //std::cerr << "merged rows is non-zero\n"; - queue.push(current); - return; - } - - /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl) - size_t source_num = current.impl->order; - insertFromChunk(source_num); + //std::cerr << "merged rows is non-zero\n"; return; } - //std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n"; - //std::cerr << "Inserting row\n"; - merged_data.insertRow(current->all_columns, current->pos); + /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl) + size_t source_num = current.impl->order; + insertFromChunk(source_num); + return; + } - if (out_row_sources_buf) - { - /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl) - RowSourcePart row_source(current.impl->order); - out_row_sources_buf->write(row_source.data); - } + //std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n"; + //std::cerr << "Inserting row\n"; + merged_data.insertRow(current->all_columns, current->pos); - if (current->isLast()) - { - need_data = true; - next_input_to_read = current.impl->order; + if (out_row_sources_buf) + { + /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl) + RowSourcePart row_source(current.impl->order); + out_row_sources_buf->write(row_source.data); + } - if (limit && merged_data.totalMergedRows() >= limit) - is_finished = true; + if (!current->isLast()) + { +// std::cerr << "moving to next row\n"; + queue.next(); + } + else + { + /// We will get the next block from the corresponding source, if there is one. + queue.removeTop(); - return; - } +// std::cerr << "It was last row, fetching next block\n"; + need_data = true; + next_input_to_read = current.impl->order; - //std::cerr << "moving to next row\n"; - current->next(); + if (limit && merged_data.totalMergedRows() >= limit) + is_finished = true; - if (!queue.empty() && current.greater(queue.top())) - { - //std::cerr << "next row is not least, pushing back to queue\n"; - queue.push(current); - break; - } + return; } } is_finished = true; diff --git a/dbms/src/Processors/Transforms/MergingSortedTransform.h b/dbms/src/Processors/Transforms/MergingSortedTransform.h index 594da6819db..aa88fb09623 100644 --- a/dbms/src/Processors/Transforms/MergingSortedTransform.h +++ b/dbms/src/Processors/Transforms/MergingSortedTransform.h @@ -1,10 +1,10 @@ #pragma once + #include #include #include #include -#include namespace DB { @@ -113,11 +113,8 @@ protected: SortCursorImpls cursors; - using Queue = std::priority_queue; - Queue queue_without_collation; - - using QueueWithCollation = std::priority_queue; - QueueWithCollation queue_with_collation; + SortingHeap queue_without_collation; + SortingHeap queue_with_collation; private: @@ -127,8 +124,8 @@ private: bool need_data = false; size_t next_input_to_read = 0; - template - void merge(std::priority_queue & queue); + template + void merge(TSortingHeap & queue); void insertFromChunk(size_t source_num); @@ -158,22 +155,6 @@ private: shared_chunk_ptr->all_columns = cursors[source_num].all_columns; shared_chunk_ptr->sort_columns = cursors[source_num].sort_columns; } - - void pushToQueue(size_t source_num) - { - if (has_collation) - queue_with_collation.push(SortCursorWithCollation(&cursors[source_num])); - else - queue_without_collation.push(SortCursor(&cursors[source_num])); - } - - template - void initQueue(std::priority_queue & queue) - { - for (auto & cursor : cursors) - if (!cursor.empty()) - queue.push(TSortCursor(&cursor)); - } }; } From 24a1ee4f55777c9d1fd5260c412a8293d312d574 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 2 Jan 2020 13:38:11 +0300 Subject: [PATCH 11/15] Fix direction for SimpleSortCursor. --- dbms/src/Core/SortCursor.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Core/SortCursor.h b/dbms/src/Core/SortCursor.h index a5d7dbd1f05..443e6d7dc0b 100644 --- a/dbms/src/Core/SortCursor.h +++ b/dbms/src/Core/SortCursor.h @@ -178,7 +178,7 @@ struct SimpleSortCursor : SortCursorHelper int direction = desc.direction; int nulls_direction = desc.nulls_direction; int res = impl->sort_columns[0]->compareAt(lhs_pos, rhs_pos, *(rhs.impl->sort_columns[0]), nulls_direction); - return (res > 0) ^ (direction > 0); + return (res > 0) == (direction > 0); } }; From a3cdc0b4b871fd10c71ab2aa6e001564d791f4b7 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 2 Jan 2020 14:12:21 +0300 Subject: [PATCH 12/15] Nicer code in SortingHeap. --- dbms/src/Core/SortCursor.h | 56 ++++++++++++++------------------------ 1 file changed, 21 insertions(+), 35 deletions(-) diff --git a/dbms/src/Core/SortCursor.h b/dbms/src/Core/SortCursor.h index 443e6d7dc0b..a5d6a2afac3 100644 --- a/dbms/src/Core/SortCursor.h +++ b/dbms/src/Core/SortCursor.h @@ -241,20 +241,7 @@ public: size_t size() { return queue.size(); } - Cursor & nextChild() - { - if (next_idx) - { - return queue[next_idx]; - } - else - { - next_idx = 1; - if (queue.size() > 2 && queue[1] < queue[2]) - ++next_idx; - return queue[next_idx]; - } - } + Cursor & nextChild() { return queue[nextChildIndex()]; } void next() { @@ -279,6 +266,7 @@ public: { std::pop_heap(queue.begin(), queue.end()); queue.pop_back(); + next_idx = 0; } void push(SortCursorImpl & cursor) @@ -295,6 +283,19 @@ private: /// Cache comparison between first and second child if the order in queue has not been changed. size_t next_idx = 0; + size_t nextChildIndex() + { + if (next_idx == 0) + { + next_idx = 1; + + if (queue.size() > 2 && queue[1] < queue[2]) + ++next_idx; + } + + return next_idx; + } + /// This is adapted version of the function __sift_down from libc++. /// Why cannot simply use std::priority_queue? /// - because it doesn't support updating the top element and requires pop and push instead. @@ -306,30 +307,14 @@ private: return; auto begin = queue.begin(); - typename Container::iterator child_it; - if (next_idx) - { - child_it = begin + next_idx; - } - else - { - next_idx = 1; - child_it = begin + next_idx; - - /// Right child exists and is greater than left child. - if (size > 2 && *child_it < *(child_it + 1)) - { - ++child_it; - ++next_idx; - } - } + size_t child_idx = nextChildIndex(); + auto child_it = begin + child_idx; /// Check if we are in order. if (*child_it < *begin) return; - size_t child_idx = next_idx; next_idx = 0; auto curr_it = begin; @@ -340,11 +325,12 @@ private: *curr_it = std::move(*child_it); curr_it = child_it; - if ((size - 2) / 2 < child_idx) - break; - // recompute the child based off of the updated parent child_idx = 2 * child_idx + 1; + + if (child_idx >= size) + break; + child_it = begin + child_idx; if ((child_idx + 1) < size && *child_it < *(child_it + 1)) From 92baec6488e86b4247e99309e4eee62f3fdcd557 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 2 Jan 2020 23:07:07 +0300 Subject: [PATCH 13/15] Fix MergingSorted --- dbms/src/DataStreams/MergingSortedBlockInputStream.cpp | 1 + dbms/src/Processors/Transforms/MergingSortedTransform.cpp | 1 + 2 files changed, 2 insertions(+) diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp index 3614d9c1d66..f846ed27682 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp @@ -238,6 +238,7 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, TSort total_merged_rows += merged_rows; fetchNextBlock(current, queue); + queue.removeTop(); return; } diff --git a/dbms/src/Processors/Transforms/MergingSortedTransform.cpp b/dbms/src/Processors/Transforms/MergingSortedTransform.cpp index c8a83a57fd5..ddbd91b38d1 100644 --- a/dbms/src/Processors/Transforms/MergingSortedTransform.cpp +++ b/dbms/src/Processors/Transforms/MergingSortedTransform.cpp @@ -254,6 +254,7 @@ void MergingSortedTransform::merge(TSortingHeap & queue) /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl) size_t source_num = current.impl->order; insertFromChunk(source_num); + queue.removeTop(); return; } From 73949f1614740c6c4606a732388aabf4193a4420 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 2 Jan 2020 23:26:27 +0300 Subject: [PATCH 14/15] Fix MergingSorted --- dbms/src/DataStreams/MergingSortedBlockInputStream.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp index f846ed27682..3614d9c1d66 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp @@ -238,7 +238,6 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, TSort total_merged_rows += merged_rows; fetchNextBlock(current, queue); - queue.removeTop(); return; } From 1b48ccde797e6c6b09e3354e4085fca5ab121b3c Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Fri, 3 Jan 2020 10:04:40 +0300 Subject: [PATCH 15/15] Fix comparison for SimpleSortCursor. --- dbms/src/Core/SortCursor.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Core/SortCursor.h b/dbms/src/Core/SortCursor.h index a5d6a2afac3..00ac39243eb 100644 --- a/dbms/src/Core/SortCursor.h +++ b/dbms/src/Core/SortCursor.h @@ -178,7 +178,7 @@ struct SimpleSortCursor : SortCursorHelper int direction = desc.direction; int nulls_direction = desc.nulls_direction; int res = impl->sort_columns[0]->compareAt(lhs_pos, rhs_pos, *(rhs.impl->sort_columns[0]), nulls_direction); - return (res > 0) == (direction > 0); + return res != 0 && ((res > 0) == (direction > 0)); } };