Better priority_queue performance, part 1

This commit is contained in:
Alexey Milovidov 2019-12-21 02:38:07 +03:00
parent c2bde1f4eb
commit dba9766575
2 changed files with 70 additions and 14 deletions

View File

@ -1,4 +1,4 @@
#include <Poco/Version.h>
#include <algorithm>
#include <DataStreams/MergeSortingBlockInputStream.h>
#include <DataStreams/MergingSortedBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
@ -151,15 +151,20 @@ MergeSortingBlocksBlockInputStream::MergeSortingBlocksBlockInputStream(
blocks.swap(nonempty_blocks);
size_t size = cursors.size();
if (!has_collation)
{
for (size_t i = 0; i < cursors.size(); ++i)
queue_without_collation.push(SortCursor(&cursors[i]));
queue_without_collation.reserve(size);
for (size_t i = 0; i < size; ++i)
queue_without_collation.emplace_back(&cursors[i]);
std::make_heap(queue_without_collation.begin(), queue_without_collation.end());
}
else
{
for (size_t i = 0; i < cursors.size(); ++i)
queue_with_collation.push(SortCursorWithCollation(&cursors[i]));
queue_with_collation.reserve(size);
for (size_t i = 0; i < size; ++i)
queue_with_collation.emplace_back(&cursors[i]);
std::make_heap(queue_with_collation.begin(), queue_with_collation.end());
}
}
@ -182,8 +187,58 @@ Block MergeSortingBlocksBlockInputStream::readImpl()
}
/// This is adapted version of the function __sift_down from libc++.
template <typename It>
void updateTopHeap(It begin, It end)
{
size_t size = end - begin;
if (size < 2)
return;
size_t child_idx = 1;
It child_it = begin + 1;
/// Right child exists and is greater than left child.
if (size > 2 && *child_it < *(child_it + 1))
{
++child_it;
++child_idx;
}
/// Check if we are in order.
if (*child_it < *begin)
return;
auto curr_it = begin;
auto top(std::move(*begin));
do
{
/// We are not in heap-order, swap the parent with it's largest child.
*curr_it = std::move(*child_it);
curr_it = child_it;
if ((size - 2) / 2 < child_idx)
break;
// recompute the child based off of the updated parent
child_idx = 2 * child_idx + 1;
child_it = begin + child_idx;
if ((child_idx + 1) < size && *child_it < *(child_it + 1))
{
/// Right child exists and is greater than left child.
++child_it;
++child_idx;
}
/// Check if we are in order.
} while (!(*child_it < top));
*curr_it = std::move(top);
}
template <typename TSortCursor>
Block MergeSortingBlocksBlockInputStream::mergeImpl(std::priority_queue<TSortCursor> & queue)
Block MergeSortingBlocksBlockInputStream::mergeImpl(std::vector<TSortCursor> & queue)
{
size_t num_columns = blocks[0].columns();
@ -194,17 +249,18 @@ Block MergeSortingBlocksBlockInputStream::mergeImpl(std::priority_queue<TSortCur
size_t merged_rows = 0;
while (!queue.empty())
{
TSortCursor current = queue.top();
queue.pop();
TSortCursor current = queue.front();
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*current->all_columns[i], current->pos);
if (!current->isLast())
if (current->isValid())
{
current->next();
queue.push(current);
updateTopHeap(queue.begin(), queue.end());
}
else
queue.erase(queue.begin());
++total_merged_rows;
if (limit && total_merged_rows == limit)

View File

@ -1,5 +1,6 @@
#pragma once
#include <vector>
#include <queue>
#include <common/logger_useful.h>
@ -56,19 +57,18 @@ private:
UInt64 limit;
size_t total_merged_rows = 0;
using CursorImpls = std::vector<SortCursorImpl>;
CursorImpls cursors;
bool has_collation = false;
std::priority_queue<SortCursor> queue_without_collation;
std::priority_queue<SortCursorWithCollation> queue_with_collation;
std::vector<SortCursor> queue_without_collation;
std::vector<SortCursorWithCollation> queue_with_collation;
/** Two different cursors are supported - with and without Collation.
* Templates are used (instead of virtual functions in SortCursor) for zero-overhead.
*/
template <typename TSortCursor>
Block mergeImpl(std::priority_queue<TSortCursor> & queue);
Block mergeImpl(std::vector<TSortCursor> & queue);
};