Optimization of sorting heap

This commit is contained in:
Alexey Milovidov 2019-12-21 02:57:57 +03:00
parent dba9766575
commit 9960a79b36
3 changed files with 108 additions and 83 deletions

View File

@ -1,5 +1,8 @@
#pragma once
#include <vector>
#include <algorithm>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Core/SortDescription.h>
@ -98,9 +101,12 @@ struct SortCursorImpl
bool isFirst() const { return pos == 0; }
bool isLast() const { return pos + 1 >= rows; }
bool isValid() const { return pos < rows; }
void next() { ++pos; }
};
using SortCursorImpls = std::vector<SortCursorImpl>;
/// For easy copying.
struct SortCursor
@ -203,4 +209,90 @@ struct SortCursorWithCollation
}
};
template <typename Cursor>
class SortingHeap
{
public:
SortingHeap() = default;
template <typename Cursors>
SortingHeap(const Cursors & cursors)
{
size_t size = cursors.size();
queue.reserve(size);
for (size_t i = 0; i < size; ++i)
queue.emplace_back(&cursors[i]);
std::make_heap(queue.begin(), queue.end());
}
bool isValid() const { return !queue.empty(); }
Cursor & current() { return queue.front(); }
void next()
{
if (current()->isValid())
{
current()->next();
updateTopHeap(queue.begin(), queue.end());
}
else
queue.erase(queue.begin());
}
private:
using Container = std::vector<Cursor>;
Container queue;
/// This is adapted version of the function __sift_down from libc++.
template <typename It>
static void updateTopHeap(It begin, It end)
{
size_t size = end - begin;
if (size < 2)
return;
size_t child_idx = 1;
It child_it = begin + 1;
/// Right child exists and is greater than left child.
if (size > 2 && *child_it < *(child_it + 1))
{
++child_it;
++child_idx;
}
/// Check if we are in order.
if (*child_it < *begin)
return;
auto curr_it = begin;
auto top(std::move(*begin));
do
{
/// We are not in heap-order, swap the parent with it's largest child.
*curr_it = std::move(*child_it);
curr_it = child_it;
if ((size - 2) / 2 < child_idx)
break;
// recompute the child based off of the updated parent
child_idx = 2 * child_idx + 1;
child_it = begin + child_idx;
if ((child_idx + 1) < size && *child_it < *(child_it + 1))
{
/// Right child exists and is greater than left child.
++child_it;
++child_idx;
}
/// Check if we are in order.
} while (!(*child_it < top));
*curr_it = std::move(top);
}
};
}

View File

@ -1,4 +1,3 @@
#include <algorithm>
#include <DataStreams/MergeSortingBlockInputStream.h>
#include <DataStreams/MergingSortedBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
@ -153,19 +152,9 @@ MergeSortingBlocksBlockInputStream::MergeSortingBlocksBlockInputStream(
size_t size = cursors.size();
if (!has_collation)
{
queue_without_collation.reserve(size);
for (size_t i = 0; i < size; ++i)
queue_without_collation.emplace_back(&cursors[i]);
std::make_heap(queue_without_collation.begin(), queue_without_collation.end());
}
queue_without_collation = SortingHeap<SortCursor>(cursors);
else
{
queue_with_collation.reserve(size);
for (size_t i = 0; i < size; ++i)
queue_with_collation.emplace_back(&cursors[i]);
std::make_heap(queue_with_collation.begin(), queue_with_collation.end());
}
queue_with_collation = SortingHeap<SortCursorWithCollation>(cursors);
}
@ -182,63 +171,13 @@ Block MergeSortingBlocksBlockInputStream::readImpl()
}
return !has_collation
? mergeImpl<SortCursor>(queue_without_collation)
: mergeImpl<SortCursorWithCollation>(queue_with_collation);
? mergeImpl(queue_without_collation)
: mergeImpl(queue_with_collation);
}
/// This is adapted version of the function __sift_down from libc++.
template <typename It>
void updateTopHeap(It begin, It end)
{
size_t size = end - begin;
if (size < 2)
return;
size_t child_idx = 1;
It child_it = begin + 1;
/// Right child exists and is greater than left child.
if (size > 2 && *child_it < *(child_it + 1))
{
++child_it;
++child_idx;
}
/// Check if we are in order.
if (*child_it < *begin)
return;
auto curr_it = begin;
auto top(std::move(*begin));
do
{
/// We are not in heap-order, swap the parent with it's largest child.
*curr_it = std::move(*child_it);
curr_it = child_it;
if ((size - 2) / 2 < child_idx)
break;
// recompute the child based off of the updated parent
child_idx = 2 * child_idx + 1;
child_it = begin + child_idx;
if ((child_idx + 1) < size && *child_it < *(child_it + 1))
{
/// Right child exists and is greater than left child.
++child_it;
++child_idx;
}
/// Check if we are in order.
} while (!(*child_it < top));
*curr_it = std::move(top);
}
template <typename TSortCursor>
Block MergeSortingBlocksBlockInputStream::mergeImpl(std::vector<TSortCursor> & queue)
template <typename TSortingHeap>
Block MergeSortingBlocksBlockInputStream::mergeImpl(TSortingHeap & queue)
{
size_t num_columns = blocks[0].columns();
@ -247,20 +186,17 @@ Block MergeSortingBlocksBlockInputStream::mergeImpl(std::vector<TSortCursor> & q
/// Take rows from queue in right order and push to 'merged'.
size_t merged_rows = 0;
while (!queue.empty())
while (queue.isValid())
{
TSortCursor current = queue.front();
auto current = queue.current();
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*current->all_columns[i], current->pos);
if (current->isValid())
{
current->next();
updateTopHeap(queue.begin(), queue.end());
}
if (queue.isValid())
queue.next();
else
queue.erase(queue.begin());
break;
++total_merged_rows;
if (limit && total_merged_rows == limit)

View File

@ -1,8 +1,5 @@
#pragma once
#include <vector>
#include <queue>
#include <common/logger_useful.h>
#include <Common/filesystemHelpers.h>
@ -57,18 +54,18 @@ private:
UInt64 limit;
size_t total_merged_rows = 0;
CursorImpls cursors;
SortCursorImpls cursors;
bool has_collation = false;
std::vector<SortCursor> queue_without_collation;
std::vector<SortCursorWithCollation> queue_with_collation;
SortingHeap<SortCursor> queue_without_collation;
SortingHeap<SortCursorWithCollation> queue_with_collation;
/** Two different cursors are supported - with and without Collation.
* Templates are used (instead of virtual functions in SortCursor) for zero-overhead.
*/
template <typename TSortCursor>
Block mergeImpl(std::vector<TSortCursor> & queue);
template <typename TSortingHeap>
Block mergeImpl(TSortingHeap & queue);
};