mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-25 03:00:49 +00:00
Merge pull request #8335 from ClickHouse/better-priority-queue
Optimization of ORDER BY and merging
This commit is contained in:
commit
35770403fc
@ -22,8 +22,8 @@ namespace DB
|
||||
*/
|
||||
struct SortCursorImpl
|
||||
{
|
||||
ColumnRawPtrs all_columns;
|
||||
ColumnRawPtrs sort_columns;
|
||||
ColumnRawPtrs all_columns;
|
||||
SortDescription desc;
|
||||
size_t sort_columns_size = 0;
|
||||
size_t pos = 0;
|
||||
@ -110,21 +110,52 @@ using SortCursorImpls = std::vector<SortCursorImpl>;
|
||||
|
||||
|
||||
/// For easy copying.
|
||||
struct SortCursor
|
||||
template <typename Derived>
|
||||
struct SortCursorHelper
|
||||
{
|
||||
SortCursorImpl * impl;
|
||||
|
||||
SortCursor(SortCursorImpl * impl_) : impl(impl_) {}
|
||||
const Derived & derived() const { return static_cast<const Derived &>(*this); }
|
||||
|
||||
SortCursorHelper(SortCursorImpl * impl_) : impl(impl_) {}
|
||||
SortCursorImpl * operator-> () { return impl; }
|
||||
const SortCursorImpl * operator-> () const { return impl; }
|
||||
|
||||
bool greater(const SortCursorHelper & rhs) const
|
||||
{
|
||||
return derived().greaterAt(rhs.derived(), impl->pos, rhs.impl->pos);
|
||||
}
|
||||
|
||||
/// Inverted so that the priority queue elements are removed in ascending order.
|
||||
bool operator< (const SortCursorHelper & rhs) const
|
||||
{
|
||||
return derived().greater(rhs.derived());
|
||||
}
|
||||
|
||||
/// Checks that all rows in the current block of this cursor are less than or equal to all the rows of the current block of another cursor.
|
||||
bool totallyLessOrEquals(const SortCursorHelper & rhs) const
|
||||
{
|
||||
if (impl->rows == 0 || rhs.impl->rows == 0)
|
||||
return false;
|
||||
|
||||
/// The last row of this cursor is no larger than the first row of the another cursor.
|
||||
return !derived().greaterAt(rhs.derived(), impl->rows - 1, 0);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
struct SortCursor : SortCursorHelper<SortCursor>
|
||||
{
|
||||
using SortCursorHelper<SortCursor>::SortCursorHelper;
|
||||
|
||||
/// The specified row of this cursor is greater than the specified row of another cursor.
|
||||
bool greaterAt(const SortCursor & rhs, size_t lhs_pos, size_t rhs_pos) const
|
||||
{
|
||||
for (size_t i = 0; i < impl->sort_columns_size; ++i)
|
||||
{
|
||||
int direction = impl->desc[i].direction;
|
||||
int nulls_direction = impl->desc[i].nulls_direction;
|
||||
const auto & desc = impl->desc[i];
|
||||
int direction = desc.direction;
|
||||
int nulls_direction = desc.nulls_direction;
|
||||
int res = direction * impl->sort_columns[i]->compareAt(lhs_pos, rhs_pos, *(rhs.impl->sort_columns[i]), nulls_direction);
|
||||
if (res > 0)
|
||||
return true;
|
||||
@ -133,45 +164,37 @@ struct SortCursor
|
||||
}
|
||||
return impl->order > rhs.impl->order;
|
||||
}
|
||||
};
|
||||
|
||||
/// Checks that all rows in the current block of this cursor are less than or equal to all the rows of the current block of another cursor.
|
||||
bool totallyLessOrEquals(const SortCursor & rhs) const
|
||||
|
||||
/// For the case with a single column and when there is no order between different cursors.
|
||||
struct SimpleSortCursor : SortCursorHelper<SimpleSortCursor>
|
||||
{
|
||||
using SortCursorHelper<SimpleSortCursor>::SortCursorHelper;
|
||||
|
||||
bool greaterAt(const SimpleSortCursor & rhs, size_t lhs_pos, size_t rhs_pos) const
|
||||
{
|
||||
if (impl->rows == 0 || rhs.impl->rows == 0)
|
||||
return false;
|
||||
|
||||
/// The last row of this cursor is no larger than the first row of the another cursor.
|
||||
return !greaterAt(rhs, impl->rows - 1, 0);
|
||||
}
|
||||
|
||||
bool greater(const SortCursor & rhs) const
|
||||
{
|
||||
return greaterAt(rhs, impl->pos, rhs.impl->pos);
|
||||
}
|
||||
|
||||
/// Inverted so that the priority queue elements are removed in ascending order.
|
||||
bool operator< (const SortCursor & rhs) const
|
||||
{
|
||||
return greater(rhs);
|
||||
const auto & desc = impl->desc[0];
|
||||
int direction = desc.direction;
|
||||
int nulls_direction = desc.nulls_direction;
|
||||
int res = impl->sort_columns[0]->compareAt(lhs_pos, rhs_pos, *(rhs.impl->sort_columns[0]), nulls_direction);
|
||||
return res != 0 && ((res > 0) == (direction > 0));
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
/// Separate comparator for locale-sensitive string comparisons
|
||||
struct SortCursorWithCollation
|
||||
struct SortCursorWithCollation : SortCursorHelper<SortCursorWithCollation>
|
||||
{
|
||||
SortCursorImpl * impl;
|
||||
|
||||
SortCursorWithCollation(SortCursorImpl * impl_) : impl(impl_) {}
|
||||
SortCursorImpl * operator-> () { return impl; }
|
||||
const SortCursorImpl * operator-> () const { return impl; }
|
||||
using SortCursorHelper<SortCursorWithCollation>::SortCursorHelper;
|
||||
|
||||
bool greaterAt(const SortCursorWithCollation & rhs, size_t lhs_pos, size_t rhs_pos) const
|
||||
{
|
||||
for (size_t i = 0; i < impl->sort_columns_size; ++i)
|
||||
{
|
||||
int direction = impl->desc[i].direction;
|
||||
int nulls_direction = impl->desc[i].nulls_direction;
|
||||
const auto & desc = impl->desc[i];
|
||||
int direction = desc.direction;
|
||||
int nulls_direction = desc.nulls_direction;
|
||||
int res;
|
||||
if (impl->need_collation[i])
|
||||
{
|
||||
@ -189,29 +212,11 @@ struct SortCursorWithCollation
|
||||
}
|
||||
return impl->order > rhs.impl->order;
|
||||
}
|
||||
|
||||
bool totallyLessOrEquals(const SortCursorWithCollation & rhs) const
|
||||
{
|
||||
if (impl->rows == 0 || rhs.impl->rows == 0)
|
||||
return false;
|
||||
|
||||
/// The last row of this cursor is no larger than the first row of the another cursor.
|
||||
return !greaterAt(rhs, impl->rows - 1, 0);
|
||||
}
|
||||
|
||||
bool greater(const SortCursorWithCollation & rhs) const
|
||||
{
|
||||
return greaterAt(rhs, impl->pos, rhs.impl->pos);
|
||||
}
|
||||
|
||||
bool operator< (const SortCursorWithCollation & rhs) const
|
||||
{
|
||||
return greater(rhs);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
/** Allows to fetch data from multiple sort cursors in sorted order (merging sorted data streams).
|
||||
* TODO: Replace with "Loser Tree", see https://en.wikipedia.org/wiki/K-way_merge_algorithm
|
||||
*/
|
||||
template <typename Cursor>
|
||||
class SortingHeap
|
||||
@ -225,7 +230,8 @@ public:
|
||||
size_t size = cursors.size();
|
||||
queue.reserve(size);
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
queue.emplace_back(&cursors[i]);
|
||||
if (!cursors[i].empty())
|
||||
queue.emplace_back(&cursors[i]);
|
||||
std::make_heap(queue.begin(), queue.end());
|
||||
}
|
||||
|
||||
@ -233,6 +239,10 @@ public:
|
||||
|
||||
Cursor & current() { return queue.front(); }
|
||||
|
||||
size_t size() { return queue.size(); }
|
||||
|
||||
Cursor & nextChild() { return queue[nextChildIndex()]; }
|
||||
|
||||
void next()
|
||||
{
|
||||
assert(isValid());
|
||||
@ -246,34 +256,67 @@ public:
|
||||
removeTop();
|
||||
}
|
||||
|
||||
void replaceTop(Cursor new_top)
|
||||
{
|
||||
current() = new_top;
|
||||
updateTop();
|
||||
}
|
||||
|
||||
void removeTop()
|
||||
{
|
||||
std::pop_heap(queue.begin(), queue.end());
|
||||
queue.pop_back();
|
||||
next_idx = 0;
|
||||
}
|
||||
|
||||
void push(SortCursorImpl & cursor)
|
||||
{
|
||||
queue.emplace_back(&cursor);
|
||||
std::push_heap(queue.begin(), queue.end());
|
||||
next_idx = 0;
|
||||
}
|
||||
|
||||
private:
|
||||
using Container = std::vector<Cursor>;
|
||||
Container queue;
|
||||
|
||||
/// Cache comparison between first and second child if the order in queue has not been changed.
|
||||
size_t next_idx = 0;
|
||||
|
||||
size_t nextChildIndex()
|
||||
{
|
||||
if (next_idx == 0)
|
||||
{
|
||||
next_idx = 1;
|
||||
|
||||
if (queue.size() > 2 && queue[1] < queue[2])
|
||||
++next_idx;
|
||||
}
|
||||
|
||||
return next_idx;
|
||||
}
|
||||
|
||||
/// This is adapted version of the function __sift_down from libc++.
|
||||
/// Why cannot simply use std::priority_queue?
|
||||
/// - because it doesn't support updating the top element and requires pop and push instead.
|
||||
/// Also look at "Boost.Heap" library.
|
||||
void updateTop()
|
||||
{
|
||||
size_t size = queue.size();
|
||||
if (size < 2)
|
||||
return;
|
||||
|
||||
size_t child_idx = 1;
|
||||
auto begin = queue.begin();
|
||||
auto child_it = begin + 1;
|
||||
|
||||
/// Right child exists and is greater than left child.
|
||||
if (size > 2 && *child_it < *(child_it + 1))
|
||||
{
|
||||
++child_it;
|
||||
++child_idx;
|
||||
}
|
||||
size_t child_idx = nextChildIndex();
|
||||
auto child_it = begin + child_idx;
|
||||
|
||||
/// Check if we are in order.
|
||||
if (*child_it < *begin)
|
||||
return;
|
||||
|
||||
next_idx = 0;
|
||||
|
||||
auto curr_it = begin;
|
||||
auto top(std::move(*begin));
|
||||
do
|
||||
@ -282,11 +325,12 @@ private:
|
||||
*curr_it = std::move(*child_it);
|
||||
curr_it = child_it;
|
||||
|
||||
if ((size - 2) / 2 < child_idx)
|
||||
break;
|
||||
|
||||
// recompute the child based off of the updated parent
|
||||
child_idx = 2 * child_idx + 1;
|
||||
|
||||
if (child_idx >= size)
|
||||
break;
|
||||
|
||||
child_it = begin + child_idx;
|
||||
|
||||
if ((child_idx + 1) < size && *child_it < *(child_it + 1))
|
||||
@ -300,12 +344,6 @@ private:
|
||||
} while (!(*child_it < top));
|
||||
*curr_it = std::move(top);
|
||||
}
|
||||
|
||||
void removeTop()
|
||||
{
|
||||
std::pop_heap(queue.begin(), queue.end());
|
||||
queue.pop_back();
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -138,14 +138,14 @@ Block AggregatingSortedBlockInputStream::readImpl()
|
||||
}
|
||||
|
||||
|
||||
void AggregatingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue)
|
||||
void AggregatingSortedBlockInputStream::merge(MutableColumns & merged_columns, SortingHeap<SortCursor> & queue)
|
||||
{
|
||||
size_t merged_rows = 0;
|
||||
|
||||
/// We take the rows in the correct order and put them in `merged_block`, while the rows are no more than `max_block_size`
|
||||
while (!queue.empty())
|
||||
while (queue.isValid())
|
||||
{
|
||||
SortCursor current = queue.top();
|
||||
SortCursor current = queue.current();
|
||||
|
||||
setPrimaryKeyRef(next_key, current);
|
||||
|
||||
@ -167,8 +167,6 @@ void AggregatingSortedBlockInputStream::merge(MutableColumns & merged_columns, s
|
||||
return;
|
||||
}
|
||||
|
||||
queue.pop();
|
||||
|
||||
if (key_differs)
|
||||
{
|
||||
current_key.swap(next_key);
|
||||
@ -202,8 +200,7 @@ void AggregatingSortedBlockInputStream::merge(MutableColumns & merged_columns, s
|
||||
|
||||
if (!current->isLast())
|
||||
{
|
||||
current->next();
|
||||
queue.push(current);
|
||||
queue.next();
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -55,7 +55,7 @@ private:
|
||||
/** We support two different cursors - with Collation and without.
|
||||
* Templates are used instead of polymorphic SortCursor and calls to virtual functions.
|
||||
*/
|
||||
void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue);
|
||||
void merge(MutableColumns & merged_columns, SortingHeap<SortCursor> & queue);
|
||||
|
||||
/** Extract all states of aggregate functions and merge them with the current group.
|
||||
*/
|
||||
|
@ -105,15 +105,15 @@ Block CollapsingSortedBlockInputStream::readImpl()
|
||||
}
|
||||
|
||||
|
||||
void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue)
|
||||
void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, SortingHeap<SortCursor> & queue)
|
||||
{
|
||||
|
||||
MergeStopCondition stop_condition(average_block_sizes, max_block_size);
|
||||
size_t current_block_granularity;
|
||||
/// Take rows in correct order and put them into `merged_columns` until the rows no more than `max_block_size`
|
||||
for (; !queue.empty(); ++current_pos)
|
||||
for (; queue.isValid(); ++current_pos)
|
||||
{
|
||||
SortCursor current = queue.top();
|
||||
SortCursor current = queue.current();
|
||||
current_block_granularity = current->rows;
|
||||
|
||||
if (current_key.empty())
|
||||
@ -131,8 +131,6 @@ void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, st
|
||||
return;
|
||||
}
|
||||
|
||||
queue.pop();
|
||||
|
||||
if (key_differs)
|
||||
{
|
||||
/// We write data for the previous primary key.
|
||||
@ -185,8 +183,7 @@ void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, st
|
||||
|
||||
if (!current->isLast())
|
||||
{
|
||||
current->next();
|
||||
queue.push(current);
|
||||
queue.next();
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -73,7 +73,7 @@ private:
|
||||
/** We support two different cursors - with Collation and without.
|
||||
* Templates are used instead of polymorphic SortCursors and calls to virtual functions.
|
||||
*/
|
||||
void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue);
|
||||
void merge(MutableColumns & merged_columns, SortingHeap<SortCursor> & queue);
|
||||
|
||||
/// Output to result rows for the current primary key.
|
||||
void insertRows(MutableColumns & merged_columns, size_t block_size, MergeStopCondition & condition);
|
||||
|
@ -161,7 +161,7 @@ Block GraphiteRollupSortedBlockInputStream::readImpl()
|
||||
}
|
||||
|
||||
|
||||
void GraphiteRollupSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue)
|
||||
void GraphiteRollupSortedBlockInputStream::merge(MutableColumns & merged_columns, SortingHeap<SortCursor> & queue)
|
||||
{
|
||||
const DateLUTImpl & date_lut = DateLUT::instance();
|
||||
|
||||
@ -173,9 +173,9 @@ void GraphiteRollupSortedBlockInputStream::merge(MutableColumns & merged_columns
|
||||
/// contribute towards current output row.
|
||||
/// Variables starting with next_* refer to the row at the top of the queue.
|
||||
|
||||
while (!queue.empty())
|
||||
while (queue.isValid())
|
||||
{
|
||||
SortCursor next_cursor = queue.top();
|
||||
SortCursor next_cursor = queue.current();
|
||||
|
||||
StringRef next_path = next_cursor->all_columns[path_column_num]->getDataAt(next_cursor->pos);
|
||||
bool new_path = is_first || next_path != current_group_path;
|
||||
@ -253,12 +253,9 @@ void GraphiteRollupSortedBlockInputStream::merge(MutableColumns & merged_columns
|
||||
current_group_path = next_path;
|
||||
}
|
||||
|
||||
queue.pop();
|
||||
|
||||
if (!next_cursor->isLast())
|
||||
{
|
||||
next_cursor->next();
|
||||
queue.push(next_cursor);
|
||||
queue.next();
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -225,7 +225,7 @@ private:
|
||||
UInt32 selectPrecision(const Graphite::Retentions & retentions, time_t time) const;
|
||||
|
||||
|
||||
void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue);
|
||||
void merge(MutableColumns & merged_columns, SortingHeap<SortCursor> & queue);
|
||||
|
||||
/// Insert the values into the resulting columns, which will not be changed in the future.
|
||||
template <typename TSortCursor>
|
||||
|
@ -150,10 +150,12 @@ MergeSortingBlocksBlockInputStream::MergeSortingBlocksBlockInputStream(
|
||||
|
||||
blocks.swap(nonempty_blocks);
|
||||
|
||||
if (!has_collation)
|
||||
if (has_collation)
|
||||
queue_with_collation = SortingHeap<SortCursorWithCollation>(cursors);
|
||||
else if (description.size() > 1)
|
||||
queue_without_collation = SortingHeap<SortCursor>(cursors);
|
||||
else
|
||||
queue_with_collation = SortingHeap<SortCursorWithCollation>(cursors);
|
||||
queue_simple = SortingHeap<SimpleSortCursor>(cursors);
|
||||
}
|
||||
|
||||
|
||||
@ -169,9 +171,12 @@ Block MergeSortingBlocksBlockInputStream::readImpl()
|
||||
return res;
|
||||
}
|
||||
|
||||
return !has_collation
|
||||
? mergeImpl(queue_without_collation)
|
||||
: mergeImpl(queue_with_collation);
|
||||
if (has_collation)
|
||||
return mergeImpl(queue_with_collation);
|
||||
else if (description.size() > 1)
|
||||
return mergeImpl(queue_without_collation);
|
||||
else
|
||||
return mergeImpl(queue_simple);
|
||||
}
|
||||
|
||||
|
||||
@ -179,9 +184,18 @@ template <typename TSortingHeap>
|
||||
Block MergeSortingBlocksBlockInputStream::mergeImpl(TSortingHeap & queue)
|
||||
{
|
||||
size_t num_columns = header.columns();
|
||||
|
||||
MutableColumns merged_columns = header.cloneEmptyColumns();
|
||||
/// TODO: reserve (in each column)
|
||||
|
||||
/// Reserve
|
||||
if (queue.isValid() && !blocks.empty())
|
||||
{
|
||||
/// The expected size of output block is the same as input block
|
||||
size_t size_to_reserve = blocks[0].rows();
|
||||
for (auto & column : merged_columns)
|
||||
column->reserve(size_to_reserve);
|
||||
}
|
||||
|
||||
/// TODO: Optimization when a single block left.
|
||||
|
||||
/// Take rows from queue in right order and push to 'merged'.
|
||||
size_t merged_rows = 0;
|
||||
@ -210,6 +224,9 @@ Block MergeSortingBlocksBlockInputStream::mergeImpl(TSortingHeap & queue)
|
||||
break;
|
||||
}
|
||||
|
||||
if (!queue.isValid())
|
||||
blocks.clear();
|
||||
|
||||
if (merged_rows == 0)
|
||||
return {};
|
||||
|
||||
|
@ -59,6 +59,7 @@ private:
|
||||
bool has_collation = false;
|
||||
|
||||
SortingHeap<SortCursor> queue_without_collation;
|
||||
SortingHeap<SimpleSortCursor> queue_simple;
|
||||
SortingHeap<SortCursorWithCollation> queue_with_collation;
|
||||
|
||||
/** Two different cursors are supported - with and without Collation.
|
||||
|
@ -59,9 +59,9 @@ void MergingSortedBlockInputStream::init(MutableColumns & merged_columns)
|
||||
}
|
||||
|
||||
if (has_collation)
|
||||
initQueue(queue_with_collation);
|
||||
queue_with_collation = SortingHeap<SortCursorWithCollation>(cursors);
|
||||
else
|
||||
initQueue(queue_without_collation);
|
||||
queue_without_collation = SortingHeap<SortCursor>(cursors);
|
||||
}
|
||||
|
||||
/// Let's check that all source blocks have the same structure.
|
||||
@ -82,15 +82,6 @@ void MergingSortedBlockInputStream::init(MutableColumns & merged_columns)
|
||||
}
|
||||
|
||||
|
||||
template <typename TSortCursor>
|
||||
void MergingSortedBlockInputStream::initQueue(std::priority_queue<TSortCursor> & queue)
|
||||
{
|
||||
for (size_t i = 0; i < cursors.size(); ++i)
|
||||
if (!cursors[i].empty())
|
||||
queue.push(TSortCursor(&cursors[i]));
|
||||
}
|
||||
|
||||
|
||||
Block MergingSortedBlockInputStream::readImpl()
|
||||
{
|
||||
if (finished)
|
||||
@ -115,7 +106,7 @@ Block MergingSortedBlockInputStream::readImpl()
|
||||
|
||||
|
||||
template <typename TSortCursor>
|
||||
void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current, std::priority_queue<TSortCursor> & queue)
|
||||
void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current, SortingHeap<TSortCursor> & queue)
|
||||
{
|
||||
size_t order = current->order;
|
||||
size_t size = cursors.size();
|
||||
@ -125,15 +116,19 @@ void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current,
|
||||
|
||||
while (true)
|
||||
{
|
||||
source_blocks[order] = new detail::SharedBlock(children[order]->read());
|
||||
source_blocks[order] = new detail::SharedBlock(children[order]->read()); /// intrusive ptr
|
||||
|
||||
if (!*source_blocks[order])
|
||||
{
|
||||
queue.removeTop();
|
||||
break;
|
||||
}
|
||||
|
||||
if (source_blocks[order]->rows())
|
||||
{
|
||||
cursors[order].reset(*source_blocks[order]);
|
||||
queue.push(TSortCursor(&cursors[order]));
|
||||
queue.replaceTop(&cursors[order]);
|
||||
|
||||
source_blocks[order]->all_columns = cursors[order].all_columns;
|
||||
source_blocks[order]->sort_columns = cursors[order].sort_columns;
|
||||
break;
|
||||
@ -154,19 +149,14 @@ bool MergingSortedBlockInputStream::MergeStopCondition::checkStop() const
|
||||
return sum_rows_count >= average;
|
||||
}
|
||||
|
||||
template
|
||||
void MergingSortedBlockInputStream::fetchNextBlock<SortCursor>(const SortCursor & current, std::priority_queue<SortCursor> & queue);
|
||||
|
||||
template
|
||||
void MergingSortedBlockInputStream::fetchNextBlock<SortCursorWithCollation>(const SortCursorWithCollation & current, std::priority_queue<SortCursorWithCollation> & queue);
|
||||
|
||||
|
||||
template <typename TSortCursor>
|
||||
void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<TSortCursor> & queue)
|
||||
template <typename TSortingHeap>
|
||||
void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, TSortingHeap & queue)
|
||||
{
|
||||
size_t merged_rows = 0;
|
||||
|
||||
MergeStopCondition stop_condition(average_block_sizes, max_block_size);
|
||||
|
||||
/** Increase row counters.
|
||||
* Return true if it's time to finish generating the current data block.
|
||||
*/
|
||||
@ -186,123 +176,100 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::
|
||||
return stop_condition.checkStop();
|
||||
};
|
||||
|
||||
/// Take rows in required order and put them into `merged_columns`, while the rows are no more than `max_block_size`
|
||||
while (!queue.empty())
|
||||
/// Take rows in required order and put them into `merged_columns`, while the number of rows are no more than `max_block_size`
|
||||
while (queue.isValid())
|
||||
{
|
||||
TSortCursor current = queue.top();
|
||||
auto current = queue.current();
|
||||
size_t current_block_granularity = current->rows;
|
||||
queue.pop();
|
||||
|
||||
while (true)
|
||||
/** And what if the block is totally less or equal than the rest for the current cursor?
|
||||
* Or is there only one data source left in the queue? Then you can take the entire block on current cursor.
|
||||
*/
|
||||
if (current->isFirst()
|
||||
&& (queue.size() == 1
|
||||
|| (queue.size() >= 2 && current.totallyLessOrEquals(queue.nextChild()))))
|
||||
{
|
||||
/** And what if the block is totally less or equal than the rest for the current cursor?
|
||||
* Or is there only one data source left in the queue? Then you can take the entire block on current cursor.
|
||||
*/
|
||||
if (current->isFirst() && (queue.empty() || current.totallyLessOrEquals(queue.top())))
|
||||
// std::cerr << "current block is totally less or equals\n";
|
||||
|
||||
/// If there are already data in the current block, we first return it. We'll get here again the next time we call the merge function.
|
||||
if (merged_rows != 0)
|
||||
{
|
||||
// std::cerr << "current block is totally less or equals\n";
|
||||
|
||||
/// If there are already data in the current block, we first return it. We'll get here again the next time we call the merge function.
|
||||
if (merged_rows != 0)
|
||||
{
|
||||
//std::cerr << "merged rows is non-zero\n";
|
||||
queue.push(current);
|
||||
return;
|
||||
}
|
||||
|
||||
/// Actually, current->order stores source number (i.e. cursors[current->order] == current)
|
||||
size_t source_num = current->order;
|
||||
|
||||
if (source_num >= cursors.size())
|
||||
throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
merged_columns[i] = (*std::move(source_blocks[source_num]->getByPosition(i).column)).mutate();
|
||||
|
||||
// std::cerr << "copied columns\n";
|
||||
|
||||
merged_rows = merged_columns.at(0)->size();
|
||||
|
||||
/// Limit output
|
||||
if (limit && total_merged_rows + merged_rows > limit)
|
||||
{
|
||||
merged_rows = limit - total_merged_rows;
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
{
|
||||
auto & column = merged_columns[i];
|
||||
column = (*column->cut(0, merged_rows)).mutate();
|
||||
}
|
||||
|
||||
cancel(false);
|
||||
finished = true;
|
||||
}
|
||||
|
||||
/// Write order of rows for other columns
|
||||
/// this data will be used in grather stream
|
||||
if (out_row_sources_buf)
|
||||
{
|
||||
RowSourcePart row_source(source_num);
|
||||
for (size_t i = 0; i < merged_rows; ++i)
|
||||
out_row_sources_buf->write(row_source.data);
|
||||
}
|
||||
|
||||
//std::cerr << "fetching next block\n";
|
||||
|
||||
total_merged_rows += merged_rows;
|
||||
fetchNextBlock(current, queue);
|
||||
//std::cerr << "merged rows is non-zero\n";
|
||||
return;
|
||||
}
|
||||
|
||||
// std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n";
|
||||
// std::cerr << "Inserting row\n";
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
merged_columns[i]->insertFrom(*current->all_columns[i], current->pos);
|
||||
/// Actually, current->order stores source number (i.e. cursors[current->order] == current)
|
||||
size_t source_num = current->order;
|
||||
|
||||
if (source_num >= cursors.size())
|
||||
throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
merged_columns[i] = (*std::move(source_blocks[source_num]->getByPosition(i).column)).mutate();
|
||||
|
||||
// std::cerr << "copied columns\n";
|
||||
|
||||
merged_rows = merged_columns.at(0)->size();
|
||||
|
||||
/// Limit output
|
||||
if (limit && total_merged_rows + merged_rows > limit)
|
||||
{
|
||||
merged_rows = limit - total_merged_rows;
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
{
|
||||
auto & column = merged_columns[i];
|
||||
column = (*column->cut(0, merged_rows)).mutate();
|
||||
}
|
||||
|
||||
cancel(false);
|
||||
finished = true;
|
||||
}
|
||||
|
||||
/// Write order of rows for other columns
|
||||
/// this data will be used in grather stream
|
||||
if (out_row_sources_buf)
|
||||
{
|
||||
/// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
|
||||
RowSourcePart row_source(current->order);
|
||||
out_row_sources_buf->write(row_source.data);
|
||||
RowSourcePart row_source(source_num);
|
||||
for (size_t i = 0; i < merged_rows; ++i)
|
||||
out_row_sources_buf->write(row_source.data);
|
||||
}
|
||||
|
||||
if (!current->isLast())
|
||||
{
|
||||
// std::cerr << "moving to next row\n";
|
||||
current->next();
|
||||
//std::cerr << "fetching next block\n";
|
||||
|
||||
if (queue.empty() || !(current.greater(queue.top())))
|
||||
{
|
||||
if (count_row_and_check_limit(current_block_granularity))
|
||||
{
|
||||
// std::cerr << "pushing back to queue\n";
|
||||
queue.push(current);
|
||||
return;
|
||||
}
|
||||
total_merged_rows += merged_rows;
|
||||
fetchNextBlock(current, queue);
|
||||
return;
|
||||
}
|
||||
|
||||
/// Do not put the cursor back in the queue, but continue to work with the current cursor.
|
||||
// std::cerr << "current is still on top, using current row\n";
|
||||
continue;
|
||||
}
|
||||
else
|
||||
{
|
||||
// std::cerr << "next row is not least, pushing back to queue\n";
|
||||
queue.push(current);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/// We get the next block from the corresponding source, if there is one.
|
||||
// std::cerr << "It was last row, fetching next block\n";
|
||||
fetchNextBlock(current, queue);
|
||||
}
|
||||
// std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n";
|
||||
// std::cerr << "Inserting row\n";
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
merged_columns[i]->insertFrom(*current->all_columns[i], current->pos);
|
||||
|
||||
break;
|
||||
if (out_row_sources_buf)
|
||||
{
|
||||
/// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
|
||||
RowSourcePart row_source(current->order);
|
||||
out_row_sources_buf->write(row_source.data);
|
||||
}
|
||||
|
||||
if (!current->isLast())
|
||||
{
|
||||
// std::cerr << "moving to next row\n";
|
||||
queue.next();
|
||||
}
|
||||
else
|
||||
{
|
||||
/// We get the next block from the corresponding source, if there is one.
|
||||
// std::cerr << "It was last row, fetching next block\n";
|
||||
fetchNextBlock(current, queue);
|
||||
}
|
||||
|
||||
if (count_row_and_check_limit(current_block_granularity))
|
||||
return;
|
||||
}
|
||||
|
||||
/// We have read all data. Ask childs to cancel providing more data.
|
||||
cancel(false);
|
||||
finished = true;
|
||||
}
|
||||
|
@ -1,7 +1,5 @@
|
||||
#pragma once
|
||||
|
||||
#include <queue>
|
||||
|
||||
#include <boost/smart_ptr/intrusive_ptr.hpp>
|
||||
|
||||
#include <common/logger_useful.h>
|
||||
@ -87,7 +85,7 @@ protected:
|
||||
|
||||
/// Gets the next block from the source corresponding to the `current`.
|
||||
template <typename TSortCursor>
|
||||
void fetchNextBlock(const TSortCursor & current, std::priority_queue<TSortCursor> & queue);
|
||||
void fetchNextBlock(const TSortCursor & current, SortingHeap<TSortCursor> & queue);
|
||||
|
||||
|
||||
Block header;
|
||||
@ -109,14 +107,10 @@ protected:
|
||||
size_t num_columns = 0;
|
||||
std::vector<SharedBlockPtr> source_blocks;
|
||||
|
||||
using CursorImpls = std::vector<SortCursorImpl>;
|
||||
CursorImpls cursors;
|
||||
SortCursorImpls cursors;
|
||||
|
||||
using Queue = std::priority_queue<SortCursor>;
|
||||
Queue queue_without_collation;
|
||||
|
||||
using QueueWithCollation = std::priority_queue<SortCursorWithCollation>;
|
||||
QueueWithCollation queue_with_collation;
|
||||
SortingHeap<SortCursor> queue_without_collation;
|
||||
SortingHeap<SortCursorWithCollation> queue_with_collation;
|
||||
|
||||
/// Used in Vertical merge algorithm to gather non-PK/non-index columns (on next step)
|
||||
/// If it is not nullptr then it should be populated during execution
|
||||
@ -177,13 +171,10 @@ protected:
|
||||
private:
|
||||
|
||||
/** We support two different cursors - with Collation and without.
|
||||
* Templates are used instead of polymorphic SortCursor and calls to virtual functions.
|
||||
*/
|
||||
template <typename TSortCursor>
|
||||
void initQueue(std::priority_queue<TSortCursor> & queue);
|
||||
|
||||
template <typename TSortCursor>
|
||||
void merge(MutableColumns & merged_columns, std::priority_queue<TSortCursor> & queue);
|
||||
* Templates are used instead of polymorphic SortCursor and calls to virtual functions.
|
||||
*/
|
||||
template <typename TSortingHeap>
|
||||
void merge(MutableColumns & merged_columns, TSortingHeap & queue);
|
||||
|
||||
Logger * log = &Logger::get("MergingSortedBlockInputStream");
|
||||
|
||||
|
@ -48,13 +48,14 @@ Block ReplacingSortedBlockInputStream::readImpl()
|
||||
}
|
||||
|
||||
|
||||
void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue)
|
||||
void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, SortingHeap<SortCursor> & queue)
|
||||
{
|
||||
MergeStopCondition stop_condition(average_block_sizes, max_block_size);
|
||||
|
||||
/// Take the rows in needed order and put them into `merged_columns` until rows no more than `max_block_size`
|
||||
while (!queue.empty())
|
||||
while (queue.isValid())
|
||||
{
|
||||
SortCursor current = queue.top();
|
||||
SortCursor current = queue.current();
|
||||
size_t current_block_granularity = current->rows;
|
||||
|
||||
if (current_key.empty())
|
||||
@ -68,8 +69,6 @@ void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, std
|
||||
if (key_differs && stop_condition.checkStop())
|
||||
return;
|
||||
|
||||
queue.pop();
|
||||
|
||||
if (key_differs)
|
||||
{
|
||||
/// Write the data for the previous primary key.
|
||||
@ -98,8 +97,7 @@ void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, std
|
||||
|
||||
if (!current->isLast())
|
||||
{
|
||||
current->next();
|
||||
queue.push(current);
|
||||
queue.next();
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -52,7 +52,7 @@ private:
|
||||
/// Sources of rows with the current primary key.
|
||||
PODArray<RowSourcePart> current_row_sources;
|
||||
|
||||
void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue);
|
||||
void merge(MutableColumns & merged_columns, SortingHeap<SortCursor> & queue);
|
||||
|
||||
/// Output into result the rows for current primary key.
|
||||
void insertRow(MutableColumns & merged_columns);
|
||||
|
@ -314,14 +314,14 @@ Block SummingSortedBlockInputStream::readImpl()
|
||||
}
|
||||
|
||||
|
||||
void SummingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue)
|
||||
void SummingSortedBlockInputStream::merge(MutableColumns & merged_columns, SortingHeap<SortCursor> & queue)
|
||||
{
|
||||
merged_rows = 0;
|
||||
|
||||
/// Take the rows in needed order and put them in `merged_columns` until rows no more than `max_block_size`
|
||||
while (!queue.empty())
|
||||
while (queue.isValid())
|
||||
{
|
||||
SortCursor current = queue.top();
|
||||
SortCursor current = queue.current();
|
||||
|
||||
setPrimaryKeyRef(next_key, current);
|
||||
|
||||
@ -383,12 +383,9 @@ void SummingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::
|
||||
current_row_is_zero = false;
|
||||
}
|
||||
|
||||
queue.pop();
|
||||
|
||||
if (!current->isLast())
|
||||
{
|
||||
current->next();
|
||||
queue.push(current);
|
||||
queue.next();
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -1,5 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <queue>
|
||||
|
||||
#include <Core/Row.h>
|
||||
#include <Core/ColumnNumbers.h>
|
||||
#include <Common/AlignedBuffer.h>
|
||||
@ -140,7 +142,7 @@ private:
|
||||
/** We support two different cursors - with Collation and without.
|
||||
* Templates are used instead of polymorphic SortCursor and calls to virtual functions.
|
||||
*/
|
||||
void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue);
|
||||
void merge(MutableColumns & merged_columns, SortingHeap<SortCursor> & queue);
|
||||
|
||||
/// Insert the summed row for the current group into the result and updates some of per-block flags if the row is not "zero".
|
||||
void insertCurrentRowIfNeeded(MutableColumns & merged_columns);
|
||||
|
@ -82,21 +82,18 @@ Block VersionedCollapsingSortedBlockInputStream::readImpl()
|
||||
}
|
||||
|
||||
|
||||
void VersionedCollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue)
|
||||
void VersionedCollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, SortingHeap<SortCursor> & queue)
|
||||
{
|
||||
MergeStopCondition stop_condition(average_block_sizes, max_block_size);
|
||||
|
||||
auto update_queue = [this, & queue](SortCursor & cursor)
|
||||
{
|
||||
queue.pop();
|
||||
|
||||
if (out_row_sources_buf)
|
||||
current_row_sources.emplace(cursor->order, true);
|
||||
|
||||
if (!cursor->isLast())
|
||||
{
|
||||
cursor->next();
|
||||
queue.push(cursor);
|
||||
queue.next();
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -106,9 +103,9 @@ void VersionedCollapsingSortedBlockInputStream::merge(MutableColumns & merged_co
|
||||
};
|
||||
|
||||
/// Take rows in correct order and put them into `merged_columns` until the rows no more than `max_block_size`
|
||||
while (!queue.empty())
|
||||
while (queue.isValid())
|
||||
{
|
||||
SortCursor current = queue.top();
|
||||
SortCursor current = queue.current();
|
||||
size_t current_block_granularity = current->rows;
|
||||
|
||||
SharedBlockRowRef next_key;
|
||||
|
@ -5,7 +5,7 @@
|
||||
#include <DataStreams/MergingSortedBlockInputStream.h>
|
||||
#include <DataStreams/ColumnGathererStream.h>
|
||||
|
||||
#include <deque>
|
||||
#include <queue>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -204,7 +204,7 @@ private:
|
||||
/// Sources of rows for VERTICAL merge algorithm. Size equals to (size + number of gaps) in current_keys.
|
||||
std::queue<RowSourcePart> current_row_sources;
|
||||
|
||||
void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue);
|
||||
void merge(MutableColumns & merged_columns, SortingHeap<SortCursor> & queue);
|
||||
|
||||
/// Output to result row for the current primary key.
|
||||
void insertRow(size_t skip_rows, const SharedBlockRowRef & row, MutableColumns & merged_columns);
|
||||
|
@ -148,9 +148,9 @@ IProcessor::Status MergingSortedTransform::prepare()
|
||||
return Status::NeedData;
|
||||
|
||||
if (has_collation)
|
||||
initQueue(queue_with_collation);
|
||||
queue_with_collation = SortingHeap<SortCursorWithCollation>(cursors);
|
||||
else
|
||||
initQueue(queue_without_collation);
|
||||
queue_without_collation = SortingHeap<SortCursor>(cursors);
|
||||
|
||||
is_initialized = true;
|
||||
return Status::Ready;
|
||||
@ -169,7 +169,6 @@ IProcessor::Status MergingSortedTransform::prepare()
|
||||
|
||||
if (need_data)
|
||||
{
|
||||
|
||||
auto & input = *std::next(inputs.begin(), next_input_to_read);
|
||||
if (!input.isFinished())
|
||||
{
|
||||
@ -183,7 +182,11 @@ IProcessor::Status MergingSortedTransform::prepare()
|
||||
return Status::NeedData;
|
||||
|
||||
updateCursor(std::move(chunk), next_input_to_read);
|
||||
pushToQueue(next_input_to_read);
|
||||
|
||||
if (has_collation)
|
||||
queue_with_collation.push(cursors[next_input_to_read]);
|
||||
else
|
||||
queue_without_collation.push(cursors[next_input_to_read]);
|
||||
}
|
||||
|
||||
need_data = false;
|
||||
@ -201,8 +204,8 @@ void MergingSortedTransform::work()
|
||||
merge(queue_without_collation);
|
||||
}
|
||||
|
||||
template <typename TSortCursor>
|
||||
void MergingSortedTransform::merge(std::priority_queue<TSortCursor> & queue)
|
||||
template <typename TSortingHeap>
|
||||
void MergingSortedTransform::merge(TSortingHeap & queue)
|
||||
{
|
||||
/// Returns MergeStatus which we should return if we are going to finish now.
|
||||
auto can_read_another_row = [&, this]()
|
||||
@ -224,77 +227,66 @@ void MergingSortedTransform::merge(std::priority_queue<TSortCursor> & queue)
|
||||
};
|
||||
|
||||
/// Take rows in required order and put them into `merged_data`, while the rows are no more than `max_block_size`
|
||||
while (!queue.empty())
|
||||
while (queue.isValid())
|
||||
{
|
||||
/// Shouldn't happen at first iteration, but check just in case.
|
||||
if (!can_read_another_row())
|
||||
return;
|
||||
|
||||
TSortCursor current = queue.top();
|
||||
queue.pop();
|
||||
bool first_iteration = true;
|
||||
auto current = queue.current();
|
||||
|
||||
while (true)
|
||||
/** And what if the block is totally less or equal than the rest for the current cursor?
|
||||
* Or is there only one data source left in the queue? Then you can take the entire block on current cursor.
|
||||
*/
|
||||
if (current.impl->isFirst()
|
||||
&& (queue.size() == 1
|
||||
|| (queue.size() >= 2 && current.totallyLessOrEquals(queue.nextChild()))))
|
||||
{
|
||||
if (!first_iteration && !can_read_another_row())
|
||||
//std::cerr << "current block is totally less or equals\n";
|
||||
|
||||
/// If there are already data in the current block, we first return it. We'll get here again the next time we call the merge function.
|
||||
if (merged_data.mergedRows() != 0)
|
||||
{
|
||||
queue.push(current);
|
||||
return;
|
||||
}
|
||||
first_iteration = false;
|
||||
|
||||
/** And what if the block is totally less or equal than the rest for the current cursor?
|
||||
* Or is there only one data source left in the queue? Then you can take the entire block on current cursor.
|
||||
*/
|
||||
if (current.impl->isFirst() && (queue.empty() || current.totallyLessOrEquals(queue.top())))
|
||||
{
|
||||
//std::cerr << "current block is totally less or equals\n";
|
||||
|
||||
/// If there are already data in the current block, we first return it. We'll get here again the next time we call the merge function.
|
||||
if (merged_data.mergedRows() != 0)
|
||||
{
|
||||
//std::cerr << "merged rows is non-zero\n";
|
||||
queue.push(current);
|
||||
return;
|
||||
}
|
||||
|
||||
/// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
|
||||
size_t source_num = current.impl->order;
|
||||
insertFromChunk(source_num);
|
||||
//std::cerr << "merged rows is non-zero\n";
|
||||
return;
|
||||
}
|
||||
|
||||
//std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n";
|
||||
//std::cerr << "Inserting row\n";
|
||||
merged_data.insertRow(current->all_columns, current->pos);
|
||||
/// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
|
||||
size_t source_num = current.impl->order;
|
||||
insertFromChunk(source_num);
|
||||
queue.removeTop();
|
||||
return;
|
||||
}
|
||||
|
||||
if (out_row_sources_buf)
|
||||
{
|
||||
/// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
|
||||
RowSourcePart row_source(current.impl->order);
|
||||
out_row_sources_buf->write(row_source.data);
|
||||
}
|
||||
//std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n";
|
||||
//std::cerr << "Inserting row\n";
|
||||
merged_data.insertRow(current->all_columns, current->pos);
|
||||
|
||||
if (current->isLast())
|
||||
{
|
||||
need_data = true;
|
||||
next_input_to_read = current.impl->order;
|
||||
if (out_row_sources_buf)
|
||||
{
|
||||
/// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
|
||||
RowSourcePart row_source(current.impl->order);
|
||||
out_row_sources_buf->write(row_source.data);
|
||||
}
|
||||
|
||||
if (limit && merged_data.totalMergedRows() >= limit)
|
||||
is_finished = true;
|
||||
if (!current->isLast())
|
||||
{
|
||||
// std::cerr << "moving to next row\n";
|
||||
queue.next();
|
||||
}
|
||||
else
|
||||
{
|
||||
/// We will get the next block from the corresponding source, if there is one.
|
||||
queue.removeTop();
|
||||
|
||||
return;
|
||||
}
|
||||
// std::cerr << "It was last row, fetching next block\n";
|
||||
need_data = true;
|
||||
next_input_to_read = current.impl->order;
|
||||
|
||||
//std::cerr << "moving to next row\n";
|
||||
current->next();
|
||||
if (limit && merged_data.totalMergedRows() >= limit)
|
||||
is_finished = true;
|
||||
|
||||
if (!queue.empty() && current.greater(queue.top()))
|
||||
{
|
||||
//std::cerr << "next row is not least, pushing back to queue\n";
|
||||
queue.push(current);
|
||||
break;
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
is_finished = true;
|
||||
|
@ -1,10 +1,10 @@
|
||||
#pragma once
|
||||
|
||||
#include <Processors/IProcessor.h>
|
||||
#include <Core/SortDescription.h>
|
||||
#include <Core/SortCursor.h>
|
||||
#include <Processors/SharedChunk.h>
|
||||
|
||||
#include <queue>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -111,14 +111,10 @@ protected:
|
||||
/// Chunks currently being merged.
|
||||
std::vector<SharedChunkPtr> source_chunks;
|
||||
|
||||
using CursorImpls = std::vector<SortCursorImpl>;
|
||||
CursorImpls cursors;
|
||||
SortCursorImpls cursors;
|
||||
|
||||
using Queue = std::priority_queue<SortCursor>;
|
||||
Queue queue_without_collation;
|
||||
|
||||
using QueueWithCollation = std::priority_queue<SortCursorWithCollation>;
|
||||
QueueWithCollation queue_with_collation;
|
||||
SortingHeap<SortCursor> queue_without_collation;
|
||||
SortingHeap<SortCursorWithCollation> queue_with_collation;
|
||||
|
||||
private:
|
||||
|
||||
@ -128,8 +124,8 @@ private:
|
||||
bool need_data = false;
|
||||
size_t next_input_to_read = 0;
|
||||
|
||||
template <typename TSortCursor>
|
||||
void merge(std::priority_queue<TSortCursor> & queue);
|
||||
template <typename TSortingHeap>
|
||||
void merge(TSortingHeap & queue);
|
||||
|
||||
void insertFromChunk(size_t source_num);
|
||||
|
||||
@ -159,22 +155,6 @@ private:
|
||||
shared_chunk_ptr->all_columns = cursors[source_num].all_columns;
|
||||
shared_chunk_ptr->sort_columns = cursors[source_num].sort_columns;
|
||||
}
|
||||
|
||||
void pushToQueue(size_t source_num)
|
||||
{
|
||||
if (has_collation)
|
||||
queue_with_collation.push(SortCursorWithCollation(&cursors[source_num]));
|
||||
else
|
||||
queue_without_collation.push(SortCursor(&cursors[source_num]));
|
||||
}
|
||||
|
||||
template <typename TSortCursor>
|
||||
void initQueue(std::priority_queue<TSortCursor> & queue)
|
||||
{
|
||||
for (auto & cursor : cursors)
|
||||
if (!cursor.empty())
|
||||
queue.push(TSortCursor(&cursor));
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -40,16 +40,12 @@ MergeSorter::MergeSorter(Chunks chunks_, SortDescription & description_, size_t
|
||||
|
||||
chunks.swap(nonempty_chunks);
|
||||
|
||||
if (!has_collation)
|
||||
{
|
||||
for (auto & cursor : cursors)
|
||||
queue_without_collation.push(SortCursor(&cursor));
|
||||
}
|
||||
if (has_collation)
|
||||
queue_with_collation = SortingHeap<SortCursorWithCollation>(cursors);
|
||||
else if (description.size() > 1)
|
||||
queue_without_collation = SortingHeap<SortCursor>(cursors);
|
||||
else
|
||||
{
|
||||
for (auto & cursor : cursors)
|
||||
queue_with_collation.push(SortCursorWithCollation(&cursor));
|
||||
}
|
||||
queue_simple = SortingHeap<SimpleSortCursor>(cursors);
|
||||
}
|
||||
|
||||
|
||||
@ -65,50 +61,61 @@ Chunk MergeSorter::read()
|
||||
return res;
|
||||
}
|
||||
|
||||
return !has_collation
|
||||
? mergeImpl<SortCursor>(queue_without_collation)
|
||||
: mergeImpl<SortCursorWithCollation>(queue_with_collation);
|
||||
if (has_collation)
|
||||
return mergeImpl(queue_with_collation);
|
||||
else if (description.size() > 1)
|
||||
return mergeImpl(queue_without_collation);
|
||||
else
|
||||
return mergeImpl(queue_simple);
|
||||
}
|
||||
|
||||
|
||||
template <typename TSortCursor>
|
||||
Chunk MergeSorter::mergeImpl(std::priority_queue<TSortCursor> & queue)
|
||||
template <typename TSortingHeap>
|
||||
Chunk MergeSorter::mergeImpl(TSortingHeap & queue)
|
||||
{
|
||||
size_t num_columns = chunks[0].getNumColumns();
|
||||
|
||||
MutableColumns merged_columns = chunks[0].cloneEmptyColumns();
|
||||
/// TODO: reserve (in each column)
|
||||
|
||||
/// Reserve
|
||||
if (queue.isValid())
|
||||
{
|
||||
/// The expected size of output block is the same as input block
|
||||
size_t size_to_reserve = chunks[0].getNumRows();
|
||||
for (auto & column : merged_columns)
|
||||
column->reserve(size_to_reserve);
|
||||
}
|
||||
|
||||
/// TODO: Optimization when a single block left.
|
||||
|
||||
/// Take rows from queue in right order and push to 'merged'.
|
||||
size_t merged_rows = 0;
|
||||
while (!queue.empty())
|
||||
while (queue.isValid())
|
||||
{
|
||||
TSortCursor current = queue.top();
|
||||
queue.pop();
|
||||
auto current = queue.current();
|
||||
|
||||
/// Append a row from queue.
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
merged_columns[i]->insertFrom(*current->all_columns[i], current->pos);
|
||||
|
||||
++total_merged_rows;
|
||||
++merged_rows;
|
||||
|
||||
if (!current->isLast())
|
||||
{
|
||||
current->next();
|
||||
queue.push(current);
|
||||
}
|
||||
|
||||
/// We don't need more rows because of limit has reached.
|
||||
if (limit && total_merged_rows == limit)
|
||||
{
|
||||
chunks.clear();
|
||||
return Chunk(std::move(merged_columns), merged_rows);
|
||||
break;
|
||||
}
|
||||
|
||||
queue.next();
|
||||
|
||||
/// It's enough for current output block but we will continue.
|
||||
if (merged_rows == max_merged_block_size)
|
||||
return Chunk(std::move(merged_columns), merged_rows);
|
||||
break;
|
||||
}
|
||||
|
||||
chunks.clear();
|
||||
if (!queue.isValid())
|
||||
chunks.clear();
|
||||
|
||||
if (merged_rows == 0)
|
||||
return {};
|
||||
|
@ -1,10 +1,10 @@
|
||||
#pragma once
|
||||
|
||||
#include <Processors/IProcessor.h>
|
||||
#include <Core/SortDescription.h>
|
||||
#include <Core/SortCursor.h>
|
||||
#include <DataStreams/IBlockInputStream.h>
|
||||
#include <Processors/ISource.h>
|
||||
#include <queue>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -27,19 +27,19 @@ private:
|
||||
UInt64 limit;
|
||||
size_t total_merged_rows = 0;
|
||||
|
||||
using CursorImpls = std::vector<SortCursorImpl>;
|
||||
CursorImpls cursors;
|
||||
SortCursorImpls cursors;
|
||||
|
||||
bool has_collation = false;
|
||||
|
||||
std::priority_queue<SortCursor> queue_without_collation;
|
||||
std::priority_queue<SortCursorWithCollation> queue_with_collation;
|
||||
SortingHeap<SortCursor> queue_without_collation;
|
||||
SortingHeap<SimpleSortCursor> queue_simple;
|
||||
SortingHeap<SortCursorWithCollation> queue_with_collation;
|
||||
|
||||
/** Two different cursors are supported - with and without Collation.
|
||||
* Templates are used (instead of virtual functions in SortCursor) for zero-overhead.
|
||||
*/
|
||||
template <typename TSortCursor>
|
||||
Chunk mergeImpl(std::priority_queue<TSortCursor> & queue);
|
||||
template <typename TSortingHeap>
|
||||
Chunk mergeImpl(TSortingHeap & queue);
|
||||
};
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user