Add sort improvement to processors.

This commit is contained in:
Nikolai Kochetov 2019-12-27 21:42:47 +03:00
parent 330676737a
commit 05fe9ef179
2 changed files with 17 additions and 25 deletions

View File

@ -41,15 +41,9 @@ MergeSorter::MergeSorter(Chunks chunks_, SortDescription & description_, size_t
chunks.swap(nonempty_chunks); chunks.swap(nonempty_chunks);
if (!has_collation) if (!has_collation)
{ queue_without_collation = SortingHeap<SortCursor>(cursors);
for (auto & cursor : cursors)
queue_without_collation.push(SortCursor(&cursor));
}
else else
{ queue_with_collation = SortingHeap<SortCursorWithCollation>(cursors);
for (auto & cursor : cursors)
queue_with_collation.push(SortCursorWithCollation(&cursor));
}
} }
@ -66,13 +60,13 @@ Chunk MergeSorter::read()
} }
return !has_collation return !has_collation
? mergeImpl<SortCursor>(queue_without_collation) ? mergeImpl(queue_without_collation)
: mergeImpl<SortCursorWithCollation>(queue_with_collation); : mergeImpl(queue_with_collation);
} }
template <typename TSortCursor> template <typename TSortingHeap>
Chunk MergeSorter::mergeImpl(std::priority_queue<TSortCursor> & queue) Chunk MergeSorter::mergeImpl(TSortingHeap & queue)
{ {
size_t num_columns = chunks[0].getNumColumns(); size_t num_columns = chunks[0].getNumColumns();
@ -81,29 +75,27 @@ Chunk MergeSorter::mergeImpl(std::priority_queue<TSortCursor> & queue)
/// Take rows from queue in right order and push to 'merged'. /// Take rows from queue in right order and push to 'merged'.
size_t merged_rows = 0; size_t merged_rows = 0;
while (!queue.empty()) while (queue.isValid())
{ {
TSortCursor current = queue.top(); auto current = queue.current();
queue.pop();
/// Append a row from queue.
for (size_t i = 0; i < num_columns; ++i) for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*current->all_columns[i], current->pos); merged_columns[i]->insertFrom(*current->all_columns[i], current->pos);
++total_merged_rows; ++total_merged_rows;
++merged_rows; ++merged_rows;
if (!current->isLast()) /// We don't need more rows because of limit has reached.
{
current->next();
queue.push(current);
}
if (limit && total_merged_rows == limit) if (limit && total_merged_rows == limit)
{ {
chunks.clear(); chunks.clear();
return Chunk(std::move(merged_columns), merged_rows); return Chunk(std::move(merged_columns), merged_rows);
} }
queue.next();
/// It's enough for current output block but we will continue.
if (merged_rows == max_merged_block_size) if (merged_rows == max_merged_block_size)
return Chunk(std::move(merged_columns), merged_rows); return Chunk(std::move(merged_columns), merged_rows);
} }

View File

@ -32,14 +32,14 @@ private:
bool has_collation = false; bool has_collation = false;
std::priority_queue<SortCursor> queue_without_collation; SortingHeap<SortCursor> queue_without_collation;
std::priority_queue<SortCursorWithCollation> queue_with_collation; SortingHeap<SortCursorWithCollation> queue_with_collation;
/** Two different cursors are supported - with and without Collation. /** Two different cursors are supported - with and without Collation.
* Templates are used (instead of virtual functions in SortCursor) for zero-overhead. * Templates are used (instead of virtual functions in SortCursor) for zero-overhead.
*/ */
template <typename TSortCursor> template <typename TSortingHeap>
Chunk mergeImpl(std::priority_queue<TSortCursor> & queue); Chunk mergeImpl(TSortingHeap & queue);
}; };