mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-25 11:10:49 +00:00
Optimization of priority queue
This commit is contained in:
parent
eb9d6983c8
commit
9f3afed5ff
@ -225,7 +225,8 @@ public:
|
|||||||
size_t size = cursors.size();
|
size_t size = cursors.size();
|
||||||
queue.reserve(size);
|
queue.reserve(size);
|
||||||
for (size_t i = 0; i < size; ++i)
|
for (size_t i = 0; i < size; ++i)
|
||||||
queue.emplace_back(&cursors[i]);
|
if (!cursors[i].empty())
|
||||||
|
queue.emplace_back(&cursors[i]);
|
||||||
std::make_heap(queue.begin(), queue.end());
|
std::make_heap(queue.begin(), queue.end());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -233,6 +234,11 @@ public:
|
|||||||
|
|
||||||
Cursor & current() { return queue.front(); }
|
Cursor & current() { return queue.front(); }
|
||||||
|
|
||||||
|
size_t size() { return queue.size(); }
|
||||||
|
|
||||||
|
Cursor & firstChild() { return queue[1]; }
|
||||||
|
Cursor & secondChild() { return queue[2]; }
|
||||||
|
|
||||||
void next()
|
void next()
|
||||||
{
|
{
|
||||||
assert(isValid());
|
assert(isValid());
|
||||||
@ -246,6 +252,18 @@ public:
|
|||||||
removeTop();
|
removeTop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void replaceTop(Cursor new_top)
|
||||||
|
{
|
||||||
|
current() = new_top;
|
||||||
|
updateTop();
|
||||||
|
}
|
||||||
|
|
||||||
|
void removeTop()
|
||||||
|
{
|
||||||
|
std::pop_heap(queue.begin(), queue.end());
|
||||||
|
queue.pop_back();
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
using Container = std::vector<Cursor>;
|
using Container = std::vector<Cursor>;
|
||||||
Container queue;
|
Container queue;
|
||||||
@ -300,12 +318,6 @@ private:
|
|||||||
} while (!(*child_it < top));
|
} while (!(*child_it < top));
|
||||||
*curr_it = std::move(top);
|
*curr_it = std::move(top);
|
||||||
}
|
}
|
||||||
|
|
||||||
void removeTop()
|
|
||||||
{
|
|
||||||
std::pop_heap(queue.begin(), queue.end());
|
|
||||||
queue.pop_back();
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -138,14 +138,14 @@ Block AggregatingSortedBlockInputStream::readImpl()
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void AggregatingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue)
|
void AggregatingSortedBlockInputStream::merge(MutableColumns & merged_columns, SortingHeap<SortCursor> & queue)
|
||||||
{
|
{
|
||||||
size_t merged_rows = 0;
|
size_t merged_rows = 0;
|
||||||
|
|
||||||
/// We take the rows in the correct order and put them in `merged_block`, while the rows are no more than `max_block_size`
|
/// We take the rows in the correct order and put them in `merged_block`, while the rows are no more than `max_block_size`
|
||||||
while (!queue.empty())
|
while (queue.isValid())
|
||||||
{
|
{
|
||||||
SortCursor current = queue.top();
|
SortCursor current = queue.current();
|
||||||
|
|
||||||
setPrimaryKeyRef(next_key, current);
|
setPrimaryKeyRef(next_key, current);
|
||||||
|
|
||||||
@ -167,8 +167,6 @@ void AggregatingSortedBlockInputStream::merge(MutableColumns & merged_columns, s
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
queue.pop();
|
|
||||||
|
|
||||||
if (key_differs)
|
if (key_differs)
|
||||||
{
|
{
|
||||||
current_key.swap(next_key);
|
current_key.swap(next_key);
|
||||||
@ -202,8 +200,7 @@ void AggregatingSortedBlockInputStream::merge(MutableColumns & merged_columns, s
|
|||||||
|
|
||||||
if (!current->isLast())
|
if (!current->isLast())
|
||||||
{
|
{
|
||||||
current->next();
|
queue.next();
|
||||||
queue.push(current);
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -55,7 +55,7 @@ private:
|
|||||||
/** We support two different cursors - with Collation and without.
|
/** We support two different cursors - with Collation and without.
|
||||||
* Templates are used instead of polymorphic SortCursor and calls to virtual functions.
|
* Templates are used instead of polymorphic SortCursor and calls to virtual functions.
|
||||||
*/
|
*/
|
||||||
void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue);
|
void merge(MutableColumns & merged_columns, SortingHeap<SortCursor> & queue);
|
||||||
|
|
||||||
/** Extract all states of aggregate functions and merge them with the current group.
|
/** Extract all states of aggregate functions and merge them with the current group.
|
||||||
*/
|
*/
|
||||||
|
@ -105,15 +105,15 @@ Block CollapsingSortedBlockInputStream::readImpl()
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue)
|
void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, SortingHeap<SortCursor> & queue)
|
||||||
{
|
{
|
||||||
|
|
||||||
MergeStopCondition stop_condition(average_block_sizes, max_block_size);
|
MergeStopCondition stop_condition(average_block_sizes, max_block_size);
|
||||||
size_t current_block_granularity;
|
size_t current_block_granularity;
|
||||||
/// Take rows in correct order and put them into `merged_columns` until the rows no more than `max_block_size`
|
/// Take rows in correct order and put them into `merged_columns` until the rows no more than `max_block_size`
|
||||||
for (; !queue.empty(); ++current_pos)
|
for (; queue.isValid(); ++current_pos)
|
||||||
{
|
{
|
||||||
SortCursor current = queue.top();
|
SortCursor current = queue.current();
|
||||||
current_block_granularity = current->rows;
|
current_block_granularity = current->rows;
|
||||||
|
|
||||||
if (current_key.empty())
|
if (current_key.empty())
|
||||||
@ -131,8 +131,6 @@ void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, st
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
queue.pop();
|
|
||||||
|
|
||||||
if (key_differs)
|
if (key_differs)
|
||||||
{
|
{
|
||||||
/// We write data for the previous primary key.
|
/// We write data for the previous primary key.
|
||||||
@ -185,8 +183,7 @@ void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, st
|
|||||||
|
|
||||||
if (!current->isLast())
|
if (!current->isLast())
|
||||||
{
|
{
|
||||||
current->next();
|
queue.next();
|
||||||
queue.push(current);
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -73,7 +73,7 @@ private:
|
|||||||
/** We support two different cursors - with Collation and without.
|
/** We support two different cursors - with Collation and without.
|
||||||
* Templates are used instead of polymorphic SortCursors and calls to virtual functions.
|
* Templates are used instead of polymorphic SortCursors and calls to virtual functions.
|
||||||
*/
|
*/
|
||||||
void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue);
|
void merge(MutableColumns & merged_columns, SortingHeap<SortCursor> & queue);
|
||||||
|
|
||||||
/// Output to result rows for the current primary key.
|
/// Output to result rows for the current primary key.
|
||||||
void insertRows(MutableColumns & merged_columns, size_t block_size, MergeStopCondition & condition);
|
void insertRows(MutableColumns & merged_columns, size_t block_size, MergeStopCondition & condition);
|
||||||
|
@ -161,7 +161,7 @@ Block GraphiteRollupSortedBlockInputStream::readImpl()
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void GraphiteRollupSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue)
|
void GraphiteRollupSortedBlockInputStream::merge(MutableColumns & merged_columns, SortingHeap<SortCursor> & queue)
|
||||||
{
|
{
|
||||||
const DateLUTImpl & date_lut = DateLUT::instance();
|
const DateLUTImpl & date_lut = DateLUT::instance();
|
||||||
|
|
||||||
@ -173,9 +173,9 @@ void GraphiteRollupSortedBlockInputStream::merge(MutableColumns & merged_columns
|
|||||||
/// contribute towards current output row.
|
/// contribute towards current output row.
|
||||||
/// Variables starting with next_* refer to the row at the top of the queue.
|
/// Variables starting with next_* refer to the row at the top of the queue.
|
||||||
|
|
||||||
while (!queue.empty())
|
while (queue.isValid())
|
||||||
{
|
{
|
||||||
SortCursor next_cursor = queue.top();
|
SortCursor next_cursor = queue.current();
|
||||||
|
|
||||||
StringRef next_path = next_cursor->all_columns[path_column_num]->getDataAt(next_cursor->pos);
|
StringRef next_path = next_cursor->all_columns[path_column_num]->getDataAt(next_cursor->pos);
|
||||||
bool new_path = is_first || next_path != current_group_path;
|
bool new_path = is_first || next_path != current_group_path;
|
||||||
@ -253,12 +253,9 @@ void GraphiteRollupSortedBlockInputStream::merge(MutableColumns & merged_columns
|
|||||||
current_group_path = next_path;
|
current_group_path = next_path;
|
||||||
}
|
}
|
||||||
|
|
||||||
queue.pop();
|
|
||||||
|
|
||||||
if (!next_cursor->isLast())
|
if (!next_cursor->isLast())
|
||||||
{
|
{
|
||||||
next_cursor->next();
|
queue.next();
|
||||||
queue.push(next_cursor);
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -225,7 +225,7 @@ private:
|
|||||||
UInt32 selectPrecision(const Graphite::Retentions & retentions, time_t time) const;
|
UInt32 selectPrecision(const Graphite::Retentions & retentions, time_t time) const;
|
||||||
|
|
||||||
|
|
||||||
void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue);
|
void merge(MutableColumns & merged_columns, SortingHeap<SortCursor> & queue);
|
||||||
|
|
||||||
/// Insert the values into the resulting columns, which will not be changed in the future.
|
/// Insert the values into the resulting columns, which will not be changed in the future.
|
||||||
template <typename TSortCursor>
|
template <typename TSortCursor>
|
||||||
|
@ -190,6 +190,8 @@ Block MergeSortingBlocksBlockInputStream::mergeImpl(TSortingHeap & queue)
|
|||||||
column->reserve(size_to_reserve);
|
column->reserve(size_to_reserve);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// TODO: Optimization when a single block left.
|
||||||
|
|
||||||
/// Take rows from queue in right order and push to 'merged'.
|
/// Take rows from queue in right order and push to 'merged'.
|
||||||
size_t merged_rows = 0;
|
size_t merged_rows = 0;
|
||||||
while (queue.isValid())
|
while (queue.isValid())
|
||||||
|
@ -58,10 +58,10 @@ void MergingSortedBlockInputStream::init(MutableColumns & merged_columns)
|
|||||||
has_collation |= cursors[i].has_collation;
|
has_collation |= cursors[i].has_collation;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (has_collation)
|
if (!has_collation)
|
||||||
initQueue(queue_with_collation);
|
queue_without_collation = SortingHeap<SortCursor>(cursors);
|
||||||
else
|
else
|
||||||
initQueue(queue_without_collation);
|
queue_with_collation = SortingHeap<SortCursorWithCollation>(cursors);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Let's check that all source blocks have the same structure.
|
/// Let's check that all source blocks have the same structure.
|
||||||
@ -82,15 +82,6 @@ void MergingSortedBlockInputStream::init(MutableColumns & merged_columns)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
template <typename TSortCursor>
|
|
||||||
void MergingSortedBlockInputStream::initQueue(std::priority_queue<TSortCursor> & queue)
|
|
||||||
{
|
|
||||||
for (size_t i = 0; i < cursors.size(); ++i)
|
|
||||||
if (!cursors[i].empty())
|
|
||||||
queue.push(TSortCursor(&cursors[i]));
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
Block MergingSortedBlockInputStream::readImpl()
|
Block MergingSortedBlockInputStream::readImpl()
|
||||||
{
|
{
|
||||||
if (finished)
|
if (finished)
|
||||||
@ -115,7 +106,7 @@ Block MergingSortedBlockInputStream::readImpl()
|
|||||||
|
|
||||||
|
|
||||||
template <typename TSortCursor>
|
template <typename TSortCursor>
|
||||||
void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current, std::priority_queue<TSortCursor> & queue)
|
void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current, SortingHeap<TSortCursor> & queue)
|
||||||
{
|
{
|
||||||
size_t order = current->order;
|
size_t order = current->order;
|
||||||
size_t size = cursors.size();
|
size_t size = cursors.size();
|
||||||
@ -125,15 +116,19 @@ void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current,
|
|||||||
|
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
source_blocks[order] = new detail::SharedBlock(children[order]->read());
|
source_blocks[order] = new detail::SharedBlock(children[order]->read()); /// intrusive ptr
|
||||||
|
|
||||||
if (!*source_blocks[order])
|
if (!*source_blocks[order])
|
||||||
|
{
|
||||||
|
queue.removeTop();
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
if (source_blocks[order]->rows())
|
if (source_blocks[order]->rows())
|
||||||
{
|
{
|
||||||
cursors[order].reset(*source_blocks[order]);
|
cursors[order].reset(*source_blocks[order]);
|
||||||
queue.push(TSortCursor(&cursors[order]));
|
queue.replaceTop(&cursors[order]);
|
||||||
|
|
||||||
source_blocks[order]->all_columns = cursors[order].all_columns;
|
source_blocks[order]->all_columns = cursors[order].all_columns;
|
||||||
source_blocks[order]->sort_columns = cursors[order].sort_columns;
|
source_blocks[order]->sort_columns = cursors[order].sort_columns;
|
||||||
break;
|
break;
|
||||||
@ -154,19 +149,14 @@ bool MergingSortedBlockInputStream::MergeStopCondition::checkStop() const
|
|||||||
return sum_rows_count >= average;
|
return sum_rows_count >= average;
|
||||||
}
|
}
|
||||||
|
|
||||||
template
|
|
||||||
void MergingSortedBlockInputStream::fetchNextBlock<SortCursor>(const SortCursor & current, std::priority_queue<SortCursor> & queue);
|
|
||||||
|
|
||||||
template
|
template <typename TSortingHeap>
|
||||||
void MergingSortedBlockInputStream::fetchNextBlock<SortCursorWithCollation>(const SortCursorWithCollation & current, std::priority_queue<SortCursorWithCollation> & queue);
|
void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, TSortingHeap & queue)
|
||||||
|
|
||||||
|
|
||||||
template <typename TSortCursor>
|
|
||||||
void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<TSortCursor> & queue)
|
|
||||||
{
|
{
|
||||||
size_t merged_rows = 0;
|
size_t merged_rows = 0;
|
||||||
|
|
||||||
MergeStopCondition stop_condition(average_block_sizes, max_block_size);
|
MergeStopCondition stop_condition(average_block_sizes, max_block_size);
|
||||||
|
|
||||||
/** Increase row counters.
|
/** Increase row counters.
|
||||||
* Return true if it's time to finish generating the current data block.
|
* Return true if it's time to finish generating the current data block.
|
||||||
*/
|
*/
|
||||||
@ -186,123 +176,101 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::
|
|||||||
return stop_condition.checkStop();
|
return stop_condition.checkStop();
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Take rows in required order and put them into `merged_columns`, while the rows are no more than `max_block_size`
|
/// Take rows in required order and put them into `merged_columns`, while the number of rows are no more than `max_block_size`
|
||||||
while (!queue.empty())
|
while (queue.isValid())
|
||||||
{
|
{
|
||||||
TSortCursor current = queue.top();
|
auto current = queue.current();
|
||||||
size_t current_block_granularity = current->rows;
|
size_t current_block_granularity = current->rows;
|
||||||
queue.pop();
|
|
||||||
|
|
||||||
while (true)
|
/** And what if the block is totally less or equal than the rest for the current cursor?
|
||||||
|
* Or is there only one data source left in the queue? Then you can take the entire block on current cursor.
|
||||||
|
*/
|
||||||
|
if (current->isFirst()
|
||||||
|
&& (queue.size() == 1
|
||||||
|
|| (queue.size() == 2 && current.totallyLessOrEquals(queue.firstChild()))
|
||||||
|
|| (queue.size() == 3 && current.totallyLessOrEquals(queue.firstChild()) && current.totallyLessOrEquals(queue.secondChild()))))
|
||||||
{
|
{
|
||||||
/** And what if the block is totally less or equal than the rest for the current cursor?
|
// std::cerr << "current block is totally less or equals\n";
|
||||||
* Or is there only one data source left in the queue? Then you can take the entire block on current cursor.
|
|
||||||
*/
|
/// If there are already data in the current block, we first return it. We'll get here again the next time we call the merge function.
|
||||||
if (current->isFirst() && (queue.empty() || current.totallyLessOrEquals(queue.top())))
|
if (merged_rows != 0)
|
||||||
{
|
{
|
||||||
// std::cerr << "current block is totally less or equals\n";
|
//std::cerr << "merged rows is non-zero\n";
|
||||||
|
|
||||||
/// If there are already data in the current block, we first return it. We'll get here again the next time we call the merge function.
|
|
||||||
if (merged_rows != 0)
|
|
||||||
{
|
|
||||||
//std::cerr << "merged rows is non-zero\n";
|
|
||||||
queue.push(current);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Actually, current->order stores source number (i.e. cursors[current->order] == current)
|
|
||||||
size_t source_num = current->order;
|
|
||||||
|
|
||||||
if (source_num >= cursors.size())
|
|
||||||
throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR);
|
|
||||||
|
|
||||||
for (size_t i = 0; i < num_columns; ++i)
|
|
||||||
merged_columns[i] = (*std::move(source_blocks[source_num]->getByPosition(i).column)).mutate();
|
|
||||||
|
|
||||||
// std::cerr << "copied columns\n";
|
|
||||||
|
|
||||||
merged_rows = merged_columns.at(0)->size();
|
|
||||||
|
|
||||||
/// Limit output
|
|
||||||
if (limit && total_merged_rows + merged_rows > limit)
|
|
||||||
{
|
|
||||||
merged_rows = limit - total_merged_rows;
|
|
||||||
for (size_t i = 0; i < num_columns; ++i)
|
|
||||||
{
|
|
||||||
auto & column = merged_columns[i];
|
|
||||||
column = (*column->cut(0, merged_rows)).mutate();
|
|
||||||
}
|
|
||||||
|
|
||||||
cancel(false);
|
|
||||||
finished = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Write order of rows for other columns
|
|
||||||
/// this data will be used in grather stream
|
|
||||||
if (out_row_sources_buf)
|
|
||||||
{
|
|
||||||
RowSourcePart row_source(source_num);
|
|
||||||
for (size_t i = 0; i < merged_rows; ++i)
|
|
||||||
out_row_sources_buf->write(row_source.data);
|
|
||||||
}
|
|
||||||
|
|
||||||
//std::cerr << "fetching next block\n";
|
|
||||||
|
|
||||||
total_merged_rows += merged_rows;
|
|
||||||
fetchNextBlock(current, queue);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n";
|
/// Actually, current->order stores source number (i.e. cursors[current->order] == current)
|
||||||
// std::cerr << "Inserting row\n";
|
size_t source_num = current->order;
|
||||||
for (size_t i = 0; i < num_columns; ++i)
|
|
||||||
merged_columns[i]->insertFrom(*current->all_columns[i], current->pos);
|
|
||||||
|
|
||||||
|
if (source_num >= cursors.size())
|
||||||
|
throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR);
|
||||||
|
|
||||||
|
for (size_t i = 0; i < num_columns; ++i)
|
||||||
|
merged_columns[i] = (*std::move(source_blocks[source_num]->getByPosition(i).column)).mutate();
|
||||||
|
|
||||||
|
// std::cerr << "copied columns\n";
|
||||||
|
|
||||||
|
merged_rows = merged_columns.at(0)->size();
|
||||||
|
|
||||||
|
/// Limit output
|
||||||
|
if (limit && total_merged_rows + merged_rows > limit)
|
||||||
|
{
|
||||||
|
merged_rows = limit - total_merged_rows;
|
||||||
|
for (size_t i = 0; i < num_columns; ++i)
|
||||||
|
{
|
||||||
|
auto & column = merged_columns[i];
|
||||||
|
column = (*column->cut(0, merged_rows)).mutate();
|
||||||
|
}
|
||||||
|
|
||||||
|
cancel(false);
|
||||||
|
finished = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Write order of rows for other columns
|
||||||
|
/// this data will be used in grather stream
|
||||||
if (out_row_sources_buf)
|
if (out_row_sources_buf)
|
||||||
{
|
{
|
||||||
/// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
|
RowSourcePart row_source(source_num);
|
||||||
RowSourcePart row_source(current->order);
|
for (size_t i = 0; i < merged_rows; ++i)
|
||||||
out_row_sources_buf->write(row_source.data);
|
out_row_sources_buf->write(row_source.data);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!current->isLast())
|
//std::cerr << "fetching next block\n";
|
||||||
{
|
|
||||||
// std::cerr << "moving to next row\n";
|
|
||||||
current->next();
|
|
||||||
|
|
||||||
if (queue.empty() || !(current.greater(queue.top())))
|
total_merged_rows += merged_rows;
|
||||||
{
|
fetchNextBlock(current, queue);
|
||||||
if (count_row_and_check_limit(current_block_granularity))
|
return;
|
||||||
{
|
}
|
||||||
// std::cerr << "pushing back to queue\n";
|
|
||||||
queue.push(current);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Do not put the cursor back in the queue, but continue to work with the current cursor.
|
// std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n";
|
||||||
// std::cerr << "current is still on top, using current row\n";
|
// std::cerr << "Inserting row\n";
|
||||||
continue;
|
for (size_t i = 0; i < num_columns; ++i)
|
||||||
}
|
merged_columns[i]->insertFrom(*current->all_columns[i], current->pos);
|
||||||
else
|
|
||||||
{
|
|
||||||
// std::cerr << "next row is not least, pushing back to queue\n";
|
|
||||||
queue.push(current);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/// We get the next block from the corresponding source, if there is one.
|
|
||||||
// std::cerr << "It was last row, fetching next block\n";
|
|
||||||
fetchNextBlock(current, queue);
|
|
||||||
}
|
|
||||||
|
|
||||||
break;
|
if (out_row_sources_buf)
|
||||||
|
{
|
||||||
|
/// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
|
||||||
|
RowSourcePart row_source(current->order);
|
||||||
|
out_row_sources_buf->write(row_source.data);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!current->isLast())
|
||||||
|
{
|
||||||
|
// std::cerr << "moving to next row\n";
|
||||||
|
queue.next();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/// We get the next block from the corresponding source, if there is one.
|
||||||
|
// std::cerr << "It was last row, fetching next block\n";
|
||||||
|
fetchNextBlock(current, queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (count_row_and_check_limit(current_block_granularity))
|
if (count_row_and_check_limit(current_block_granularity))
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// We have read all data. Ask childs to cancel providing more data.
|
||||||
cancel(false);
|
cancel(false);
|
||||||
finished = true;
|
finished = true;
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,5 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <queue>
|
|
||||||
|
|
||||||
#include <boost/smart_ptr/intrusive_ptr.hpp>
|
#include <boost/smart_ptr/intrusive_ptr.hpp>
|
||||||
|
|
||||||
#include <common/logger_useful.h>
|
#include <common/logger_useful.h>
|
||||||
@ -87,7 +85,7 @@ protected:
|
|||||||
|
|
||||||
/// Gets the next block from the source corresponding to the `current`.
|
/// Gets the next block from the source corresponding to the `current`.
|
||||||
template <typename TSortCursor>
|
template <typename TSortCursor>
|
||||||
void fetchNextBlock(const TSortCursor & current, std::priority_queue<TSortCursor> & queue);
|
void fetchNextBlock(const TSortCursor & current, SortingHeap<TSortCursor> & queue);
|
||||||
|
|
||||||
|
|
||||||
Block header;
|
Block header;
|
||||||
@ -111,11 +109,8 @@ protected:
|
|||||||
|
|
||||||
SortCursorImpls cursors;
|
SortCursorImpls cursors;
|
||||||
|
|
||||||
using Queue = std::priority_queue<SortCursor>;
|
SortingHeap<SortCursor> queue_without_collation;
|
||||||
Queue queue_without_collation;
|
SortingHeap<SortCursorWithCollation> queue_with_collation;
|
||||||
|
|
||||||
using QueueWithCollation = std::priority_queue<SortCursorWithCollation>;
|
|
||||||
QueueWithCollation queue_with_collation;
|
|
||||||
|
|
||||||
/// Used in Vertical merge algorithm to gather non-PK/non-index columns (on next step)
|
/// Used in Vertical merge algorithm to gather non-PK/non-index columns (on next step)
|
||||||
/// If it is not nullptr then it should be populated during execution
|
/// If it is not nullptr then it should be populated during execution
|
||||||
@ -176,13 +171,10 @@ protected:
|
|||||||
private:
|
private:
|
||||||
|
|
||||||
/** We support two different cursors - with Collation and without.
|
/** We support two different cursors - with Collation and without.
|
||||||
* Templates are used instead of polymorphic SortCursor and calls to virtual functions.
|
* Templates are used instead of polymorphic SortCursor and calls to virtual functions.
|
||||||
*/
|
*/
|
||||||
template <typename TSortCursor>
|
template <typename TSortingHeap>
|
||||||
void initQueue(std::priority_queue<TSortCursor> & queue);
|
void merge(MutableColumns & merged_columns, TSortingHeap & queue);
|
||||||
|
|
||||||
template <typename TSortCursor>
|
|
||||||
void merge(MutableColumns & merged_columns, std::priority_queue<TSortCursor> & queue);
|
|
||||||
|
|
||||||
Logger * log = &Logger::get("MergingSortedBlockInputStream");
|
Logger * log = &Logger::get("MergingSortedBlockInputStream");
|
||||||
|
|
||||||
|
@ -48,13 +48,14 @@ Block ReplacingSortedBlockInputStream::readImpl()
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue)
|
void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, SortingHeap<SortCursor> & queue)
|
||||||
{
|
{
|
||||||
MergeStopCondition stop_condition(average_block_sizes, max_block_size);
|
MergeStopCondition stop_condition(average_block_sizes, max_block_size);
|
||||||
|
|
||||||
/// Take the rows in needed order and put them into `merged_columns` until rows no more than `max_block_size`
|
/// Take the rows in needed order and put them into `merged_columns` until rows no more than `max_block_size`
|
||||||
while (!queue.empty())
|
while (queue.isValid())
|
||||||
{
|
{
|
||||||
SortCursor current = queue.top();
|
SortCursor current = queue.current();
|
||||||
size_t current_block_granularity = current->rows;
|
size_t current_block_granularity = current->rows;
|
||||||
|
|
||||||
if (current_key.empty())
|
if (current_key.empty())
|
||||||
@ -68,8 +69,6 @@ void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, std
|
|||||||
if (key_differs && stop_condition.checkStop())
|
if (key_differs && stop_condition.checkStop())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
queue.pop();
|
|
||||||
|
|
||||||
if (key_differs)
|
if (key_differs)
|
||||||
{
|
{
|
||||||
/// Write the data for the previous primary key.
|
/// Write the data for the previous primary key.
|
||||||
@ -98,8 +97,7 @@ void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, std
|
|||||||
|
|
||||||
if (!current->isLast())
|
if (!current->isLast())
|
||||||
{
|
{
|
||||||
current->next();
|
queue.next();
|
||||||
queue.push(current);
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -52,7 +52,7 @@ private:
|
|||||||
/// Sources of rows with the current primary key.
|
/// Sources of rows with the current primary key.
|
||||||
PODArray<RowSourcePart> current_row_sources;
|
PODArray<RowSourcePart> current_row_sources;
|
||||||
|
|
||||||
void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue);
|
void merge(MutableColumns & merged_columns, SortingHeap<SortCursor> & queue);
|
||||||
|
|
||||||
/// Output into result the rows for current primary key.
|
/// Output into result the rows for current primary key.
|
||||||
void insertRow(MutableColumns & merged_columns);
|
void insertRow(MutableColumns & merged_columns);
|
||||||
|
@ -314,14 +314,14 @@ Block SummingSortedBlockInputStream::readImpl()
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void SummingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue)
|
void SummingSortedBlockInputStream::merge(MutableColumns & merged_columns, SortingHeap<SortCursor> & queue)
|
||||||
{
|
{
|
||||||
merged_rows = 0;
|
merged_rows = 0;
|
||||||
|
|
||||||
/// Take the rows in needed order and put them in `merged_columns` until rows no more than `max_block_size`
|
/// Take the rows in needed order and put them in `merged_columns` until rows no more than `max_block_size`
|
||||||
while (!queue.empty())
|
while (queue.isValid())
|
||||||
{
|
{
|
||||||
SortCursor current = queue.top();
|
SortCursor current = queue.current();
|
||||||
|
|
||||||
setPrimaryKeyRef(next_key, current);
|
setPrimaryKeyRef(next_key, current);
|
||||||
|
|
||||||
@ -383,12 +383,9 @@ void SummingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::
|
|||||||
current_row_is_zero = false;
|
current_row_is_zero = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
queue.pop();
|
|
||||||
|
|
||||||
if (!current->isLast())
|
if (!current->isLast())
|
||||||
{
|
{
|
||||||
current->next();
|
queue.next();
|
||||||
queue.push(current);
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -1,5 +1,7 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include <queue>
|
||||||
|
|
||||||
#include <Core/Row.h>
|
#include <Core/Row.h>
|
||||||
#include <Core/ColumnNumbers.h>
|
#include <Core/ColumnNumbers.h>
|
||||||
#include <Common/AlignedBuffer.h>
|
#include <Common/AlignedBuffer.h>
|
||||||
@ -140,7 +142,7 @@ private:
|
|||||||
/** We support two different cursors - with Collation and without.
|
/** We support two different cursors - with Collation and without.
|
||||||
* Templates are used instead of polymorphic SortCursor and calls to virtual functions.
|
* Templates are used instead of polymorphic SortCursor and calls to virtual functions.
|
||||||
*/
|
*/
|
||||||
void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue);
|
void merge(MutableColumns & merged_columns, SortingHeap<SortCursor> & queue);
|
||||||
|
|
||||||
/// Insert the summed row for the current group into the result and updates some of per-block flags if the row is not "zero".
|
/// Insert the summed row for the current group into the result and updates some of per-block flags if the row is not "zero".
|
||||||
void insertCurrentRowIfNeeded(MutableColumns & merged_columns);
|
void insertCurrentRowIfNeeded(MutableColumns & merged_columns);
|
||||||
|
@ -82,21 +82,18 @@ Block VersionedCollapsingSortedBlockInputStream::readImpl()
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void VersionedCollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue)
|
void VersionedCollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, SortingHeap<SortCursor> & queue)
|
||||||
{
|
{
|
||||||
MergeStopCondition stop_condition(average_block_sizes, max_block_size);
|
MergeStopCondition stop_condition(average_block_sizes, max_block_size);
|
||||||
|
|
||||||
auto update_queue = [this, & queue](SortCursor & cursor)
|
auto update_queue = [this, & queue](SortCursor & cursor)
|
||||||
{
|
{
|
||||||
queue.pop();
|
|
||||||
|
|
||||||
if (out_row_sources_buf)
|
if (out_row_sources_buf)
|
||||||
current_row_sources.emplace(cursor->order, true);
|
current_row_sources.emplace(cursor->order, true);
|
||||||
|
|
||||||
if (!cursor->isLast())
|
if (!cursor->isLast())
|
||||||
{
|
{
|
||||||
cursor->next();
|
queue.next();
|
||||||
queue.push(cursor);
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -106,9 +103,9 @@ void VersionedCollapsingSortedBlockInputStream::merge(MutableColumns & merged_co
|
|||||||
};
|
};
|
||||||
|
|
||||||
/// Take rows in correct order and put them into `merged_columns` until the rows no more than `max_block_size`
|
/// Take rows in correct order and put them into `merged_columns` until the rows no more than `max_block_size`
|
||||||
while (!queue.empty())
|
while (queue.isValid())
|
||||||
{
|
{
|
||||||
SortCursor current = queue.top();
|
SortCursor current = queue.current();
|
||||||
size_t current_block_granularity = current->rows;
|
size_t current_block_granularity = current->rows;
|
||||||
|
|
||||||
SharedBlockRowRef next_key;
|
SharedBlockRowRef next_key;
|
||||||
|
@ -5,7 +5,7 @@
|
|||||||
#include <DataStreams/MergingSortedBlockInputStream.h>
|
#include <DataStreams/MergingSortedBlockInputStream.h>
|
||||||
#include <DataStreams/ColumnGathererStream.h>
|
#include <DataStreams/ColumnGathererStream.h>
|
||||||
|
|
||||||
#include <deque>
|
#include <queue>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -204,7 +204,7 @@ private:
|
|||||||
/// Sources of rows for VERTICAL merge algorithm. Size equals to (size + number of gaps) in current_keys.
|
/// Sources of rows for VERTICAL merge algorithm. Size equals to (size + number of gaps) in current_keys.
|
||||||
std::queue<RowSourcePart> current_row_sources;
|
std::queue<RowSourcePart> current_row_sources;
|
||||||
|
|
||||||
void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue);
|
void merge(MutableColumns & merged_columns, SortingHeap<SortCursor> & queue);
|
||||||
|
|
||||||
/// Output to result row for the current primary key.
|
/// Output to result row for the current primary key.
|
||||||
void insertRow(size_t skip_rows, const SharedBlockRowRef & row, MutableColumns & merged_columns);
|
void insertRow(size_t skip_rows, const SharedBlockRowRef & row, MutableColumns & merged_columns);
|
||||||
|
@ -80,6 +80,8 @@ Chunk MergeSorter::mergeImpl(TSortingHeap & queue)
|
|||||||
column->reserve(size_to_reserve);
|
column->reserve(size_to_reserve);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// TODO: Optimization when a single block left.
|
||||||
|
|
||||||
/// Take rows from queue in right order and push to 'merged'.
|
/// Take rows from queue in right order and push to 'merged'.
|
||||||
size_t merged_rows = 0;
|
size_t merged_rows = 0;
|
||||||
while (queue.isValid())
|
while (queue.isValid())
|
||||||
|
Loading…
Reference in New Issue
Block a user