Merge remote-tracking branch 'origin/better-priority-queue' into gcc-libcxx

This commit is contained in:
Alexey Milovidov 2019-12-21 23:29:40 +03:00
commit 2c63baa52a
2 changed files with 31 additions and 19 deletions

View File

@ -1,5 +1,6 @@
#pragma once #pragma once
#include <cassert>
#include <vector> #include <vector>
#include <algorithm> #include <algorithm>
@ -210,6 +211,8 @@ struct SortCursorWithCollation
}; };
/** Allows to fetch data from multiple sort cursors in sorted order (merging sorted data streams).
*/
template <typename Cursor> template <typename Cursor>
class SortingHeap class SortingHeap
{ {
@ -232,13 +235,15 @@ public:
void next() void next()
{ {
if (current()->isValid()) assert(isValid());
if (!current()->isLast())
{ {
current()->next(); current()->next();
updateTopHeap(queue.begin(), queue.end()); updateTop();
} }
else else
queue.erase(queue.begin()); removeTop();
} }
private: private:
@ -246,15 +251,17 @@ private:
Container queue; Container queue;
/// This is adapted version of the function __sift_down from libc++. /// This is adapted version of the function __sift_down from libc++.
template <typename It> /// Why cannot simply use std::priority_queue?
static void updateTopHeap(It begin, It end) /// - because it doesn't support updating the top element and requires pop and push instead.
void updateTop()
{ {
size_t size = end - begin; size_t size = queue.size();
if (size < 2) if (size < 2)
return; return;
size_t child_idx = 1; size_t child_idx = 1;
It child_it = begin + 1; auto begin = queue.begin();
auto child_it = begin + 1;
/// Right child exists and is greater than left child. /// Right child exists and is greater than left child.
if (size > 2 && *child_it < *(child_it + 1)) if (size > 2 && *child_it < *(child_it + 1))
@ -293,6 +300,11 @@ private:
} while (!(*child_it < top)); } while (!(*child_it < top));
*curr_it = std::move(top); *curr_it = std::move(top);
} }
void removeTop()
{
std::pop_heap(queue.begin(), queue.end());
}
}; };
} }

View File

@ -178,9 +178,9 @@ Block MergeSortingBlocksBlockInputStream::readImpl()
template <typename TSortingHeap> template <typename TSortingHeap>
Block MergeSortingBlocksBlockInputStream::mergeImpl(TSortingHeap & queue) Block MergeSortingBlocksBlockInputStream::mergeImpl(TSortingHeap & queue)
{ {
size_t num_columns = blocks[0].columns(); size_t num_columns = header.columns();
MutableColumns merged_columns = blocks[0].cloneEmptyColumns(); MutableColumns merged_columns = header.cloneEmptyColumns();
/// TODO: reserve (in each column) /// TODO: reserve (in each column)
/// Take rows from queue in right order and push to 'merged'. /// Take rows from queue in right order and push to 'merged'.
@ -189,31 +189,31 @@ Block MergeSortingBlocksBlockInputStream::mergeImpl(TSortingHeap & queue)
{ {
auto current = queue.current(); auto current = queue.current();
/// Append a row from queue.
for (size_t i = 0; i < num_columns; ++i) for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*current->all_columns[i], current->pos); merged_columns[i]->insertFrom(*current->all_columns[i], current->pos);
if (queue.isValid())
queue.next();
else
break;
++total_merged_rows; ++total_merged_rows;
++merged_rows;
/// We don't need more rows because of limit has reached.
if (limit && total_merged_rows == limit) if (limit && total_merged_rows == limit)
{ {
auto res = blocks[0].cloneWithColumns(std::move(merged_columns));
blocks.clear(); blocks.clear();
return res; break;
} }
++merged_rows; queue.next();
/// It's enough for current output block but we will continue.
if (merged_rows == max_merged_block_size) if (merged_rows == max_merged_block_size)
return blocks[0].cloneWithColumns(std::move(merged_columns)); break;
} }
if (merged_rows == 0) if (merged_rows == 0)
return {}; return {};
return blocks[0].cloneWithColumns(std::move(merged_columns)); return header.cloneWithColumns(std::move(merged_columns));
} }