mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 16:12:01 +00:00
Using SortingHeap in MergingSortedTransform
This commit is contained in:
parent
906bfdbaa7
commit
8613b90d84
@ -281,6 +281,13 @@ public:
|
||||
queue.pop_back();
|
||||
}
|
||||
|
||||
void push(SortCursorImpl & cursor)
|
||||
{
|
||||
queue.emplace_back(&cursor);
|
||||
std::push_heap(queue.begin(), queue.end());
|
||||
next_idx = 0;
|
||||
}
|
||||
|
||||
private:
|
||||
using Container = std::vector<Cursor>;
|
||||
Container queue;
|
||||
|
@ -145,9 +145,9 @@ IProcessor::Status MergingSortedTransform::prepare()
|
||||
return Status::NeedData;
|
||||
|
||||
if (has_collation)
|
||||
initQueue(queue_with_collation);
|
||||
queue_with_collation = SortingHeap<SortCursorWithCollation>(cursors);
|
||||
else
|
||||
initQueue(queue_without_collation);
|
||||
queue_without_collation = SortingHeap<SortCursor>(cursors);
|
||||
|
||||
is_initialized = true;
|
||||
return Status::Ready;
|
||||
@ -166,7 +166,6 @@ IProcessor::Status MergingSortedTransform::prepare()
|
||||
|
||||
if (need_data)
|
||||
{
|
||||
|
||||
auto & input = *std::next(inputs.begin(), next_input_to_read);
|
||||
if (!input.isFinished())
|
||||
{
|
||||
@ -180,7 +179,10 @@ IProcessor::Status MergingSortedTransform::prepare()
|
||||
return Status::NeedData;
|
||||
|
||||
updateCursor(std::move(chunk), next_input_to_read);
|
||||
pushToQueue(next_input_to_read);
|
||||
if (has_collation)
|
||||
queue_with_collation.push(cursors[next_input_to_read]);
|
||||
else
|
||||
queue_without_collation.push(cursors[next_input_to_read]);
|
||||
need_data = false;
|
||||
}
|
||||
}
|
||||
@ -197,8 +199,8 @@ void MergingSortedTransform::work()
|
||||
merge(queue_without_collation);
|
||||
}
|
||||
|
||||
template <typename TSortCursor>
|
||||
void MergingSortedTransform::merge(std::priority_queue<TSortCursor> & queue)
|
||||
template <typename TSortingHeap>
|
||||
void MergingSortedTransform::merge(TSortingHeap & queue)
|
||||
{
|
||||
/// Returns MergeStatus which we should return if we are going to finish now.
|
||||
auto can_read_another_row = [&, this]()
|
||||
@ -220,77 +222,65 @@ void MergingSortedTransform::merge(std::priority_queue<TSortCursor> & queue)
|
||||
};
|
||||
|
||||
/// Take rows in required order and put them into `merged_data`, while the rows are no more than `max_block_size`
|
||||
while (!queue.empty())
|
||||
while (queue.isValid())
|
||||
{
|
||||
/// Shouldn't happen at first iteration, but check just in case.
|
||||
if (!can_read_another_row())
|
||||
return;
|
||||
|
||||
TSortCursor current = queue.top();
|
||||
queue.pop();
|
||||
bool first_iteration = true;
|
||||
auto current = queue.current();
|
||||
|
||||
while (true)
|
||||
/** And what if the block is totally less or equal than the rest for the current cursor?
|
||||
* Or is there only one data source left in the queue? Then you can take the entire block on current cursor.
|
||||
*/
|
||||
if (current.impl->isFirst()
|
||||
&& (queue.size() == 1
|
||||
|| (queue.size() >= 2 && current.totallyLessOrEquals(queue.nextChild()))))
|
||||
{
|
||||
if (!first_iteration && !can_read_another_row())
|
||||
//std::cerr << "current block is totally less or equals\n";
|
||||
|
||||
/// If there are already data in the current block, we first return it. We'll get here again the next time we call the merge function.
|
||||
if (merged_data.mergedRows() != 0)
|
||||
{
|
||||
queue.push(current);
|
||||
return;
|
||||
}
|
||||
first_iteration = false;
|
||||
|
||||
/** And what if the block is totally less or equal than the rest for the current cursor?
|
||||
* Or is there only one data source left in the queue? Then you can take the entire block on current cursor.
|
||||
*/
|
||||
if (current.impl->isFirst() && (queue.empty() || current.totallyLessOrEquals(queue.top())))
|
||||
{
|
||||
//std::cerr << "current block is totally less or equals\n";
|
||||
|
||||
/// If there are already data in the current block, we first return it. We'll get here again the next time we call the merge function.
|
||||
if (merged_data.mergedRows() != 0)
|
||||
{
|
||||
//std::cerr << "merged rows is non-zero\n";
|
||||
queue.push(current);
|
||||
return;
|
||||
}
|
||||
|
||||
/// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
|
||||
size_t source_num = current.impl->order;
|
||||
insertFromChunk(source_num);
|
||||
//std::cerr << "merged rows is non-zero\n";
|
||||
return;
|
||||
}
|
||||
|
||||
//std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n";
|
||||
//std::cerr << "Inserting row\n";
|
||||
merged_data.insertRow(current->all_columns, current->pos);
|
||||
/// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
|
||||
size_t source_num = current.impl->order;
|
||||
insertFromChunk(source_num);
|
||||
return;
|
||||
}
|
||||
|
||||
if (out_row_sources_buf)
|
||||
{
|
||||
/// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
|
||||
RowSourcePart row_source(current.impl->order);
|
||||
out_row_sources_buf->write(row_source.data);
|
||||
}
|
||||
//std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n";
|
||||
//std::cerr << "Inserting row\n";
|
||||
merged_data.insertRow(current->all_columns, current->pos);
|
||||
|
||||
if (current->isLast())
|
||||
{
|
||||
need_data = true;
|
||||
next_input_to_read = current.impl->order;
|
||||
if (out_row_sources_buf)
|
||||
{
|
||||
/// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
|
||||
RowSourcePart row_source(current.impl->order);
|
||||
out_row_sources_buf->write(row_source.data);
|
||||
}
|
||||
|
||||
if (limit && merged_data.totalMergedRows() >= limit)
|
||||
is_finished = true;
|
||||
if (!current->isLast())
|
||||
{
|
||||
// std::cerr << "moving to next row\n";
|
||||
queue.next();
|
||||
}
|
||||
else
|
||||
{
|
||||
/// We will get the next block from the corresponding source, if there is one.
|
||||
queue.removeTop();
|
||||
|
||||
return;
|
||||
}
|
||||
// std::cerr << "It was last row, fetching next block\n";
|
||||
need_data = true;
|
||||
next_input_to_read = current.impl->order;
|
||||
|
||||
//std::cerr << "moving to next row\n";
|
||||
current->next();
|
||||
if (limit && merged_data.totalMergedRows() >= limit)
|
||||
is_finished = true;
|
||||
|
||||
if (!queue.empty() && current.greater(queue.top()))
|
||||
{
|
||||
//std::cerr << "next row is not least, pushing back to queue\n";
|
||||
queue.push(current);
|
||||
break;
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
is_finished = true;
|
||||
|
@ -1,10 +1,10 @@
|
||||
#pragma once
|
||||
|
||||
#include <Processors/IProcessor.h>
|
||||
#include <Core/SortDescription.h>
|
||||
#include <Core/SortCursor.h>
|
||||
#include <Processors/SharedChunk.h>
|
||||
|
||||
#include <queue>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -113,11 +113,8 @@ protected:
|
||||
|
||||
SortCursorImpls cursors;
|
||||
|
||||
using Queue = std::priority_queue<SortCursor>;
|
||||
Queue queue_without_collation;
|
||||
|
||||
using QueueWithCollation = std::priority_queue<SortCursorWithCollation>;
|
||||
QueueWithCollation queue_with_collation;
|
||||
SortingHeap<SortCursor> queue_without_collation;
|
||||
SortingHeap<SortCursorWithCollation> queue_with_collation;
|
||||
|
||||
private:
|
||||
|
||||
@ -127,8 +124,8 @@ private:
|
||||
bool need_data = false;
|
||||
size_t next_input_to_read = 0;
|
||||
|
||||
template <typename TSortCursor>
|
||||
void merge(std::priority_queue<TSortCursor> & queue);
|
||||
template <typename TSortingHeap>
|
||||
void merge(TSortingHeap & queue);
|
||||
|
||||
void insertFromChunk(size_t source_num);
|
||||
|
||||
@ -158,22 +155,6 @@ private:
|
||||
shared_chunk_ptr->all_columns = cursors[source_num].all_columns;
|
||||
shared_chunk_ptr->sort_columns = cursors[source_num].sort_columns;
|
||||
}
|
||||
|
||||
void pushToQueue(size_t source_num)
|
||||
{
|
||||
if (has_collation)
|
||||
queue_with_collation.push(SortCursorWithCollation(&cursors[source_num]));
|
||||
else
|
||||
queue_without_collation.push(SortCursor(&cursors[source_num]));
|
||||
}
|
||||
|
||||
template <typename TSortCursor>
|
||||
void initQueue(std::priority_queue<TSortCursor> & queue)
|
||||
{
|
||||
for (auto & cursor : cursors)
|
||||
if (!cursor.empty())
|
||||
queue.push(TSortCursor(&cursor));
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user