ClickHouse/dbms/src/Processors/Transforms/MergingSortedTransform.cpp

338 lines
9.3 KiB
C++
Raw Normal View History

2019-02-26 18:40:08 +00:00
#include <Processors/Transforms/MergingSortedTransform.h>
#include <DataStreams/ColumnGathererStream.h>
#include <IO/WriteBuffer.h>
2019-08-21 16:38:27 +00:00
#include <DataStreams/materializeBlock.h>
2019-02-26 18:40:08 +00:00
namespace DB
{
2020-02-25 18:10:48 +00:00
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
2019-02-26 18:40:08 +00:00
MergingSortedTransform::MergingSortedTransform(
const Block & header,
size_t num_inputs,
const SortDescription & description_,
2019-08-03 11:02:40 +00:00
size_t max_block_size_,
UInt64 limit_,
bool quiet_,
bool have_all_inputs_)
2020-02-18 14:15:50 +00:00
: IProcessor(InputPorts(num_inputs, header), {header})
2019-08-03 11:02:40 +00:00
, description(description_), max_block_size(max_block_size_), limit(limit_), quiet(quiet_)
, have_all_inputs(have_all_inputs_)
2019-02-26 18:40:08 +00:00
, merged_data(header), source_chunks(num_inputs), cursors(num_inputs)
{
auto & sample = outputs.front().getHeader();
2019-02-26 18:40:08 +00:00
/// Replace column names in description to positions.
for (auto & column_description : description)
{
has_collation |= column_description.collator != nullptr;
if (!column_description.column_name.empty())
{
column_description.column_number = sample.getPositionByName(column_description.column_name);
column_description.column_name.clear();
}
}
}
void MergingSortedTransform::addInput()
{
if (have_all_inputs)
throw Exception("MergingSortedTransform already have all inputs.", ErrorCodes::LOGICAL_ERROR);
inputs.emplace_back(outputs.front().getHeader(), this);
source_chunks.emplace_back();
cursors.emplace_back();
}
void MergingSortedTransform::setHaveAllInputs()
{
if (have_all_inputs)
throw Exception("MergingSortedTransform already have all inputs.", ErrorCodes::LOGICAL_ERROR);
have_all_inputs = true;
}
2019-02-26 18:40:08 +00:00
IProcessor::Status MergingSortedTransform::prepare()
{
if (!have_all_inputs)
return Status::NeedData;
auto & output = outputs.front();
2019-02-26 18:40:08 +00:00
/// Special case for no inputs.
if (inputs.empty())
{
output.finish();
return Status::Finished;
}
/// Check can output.
if (output.isFinished())
{
for (auto & in : inputs)
in.close();
return Status::Finished;
}
/// Do not disable inputs, so it will work in the same way as with AsynchronousBlockInputStream, like before.
bool is_port_full = !output.canPush();
2019-02-26 18:40:08 +00:00
/// Special case for single input.
if (inputs.size() == 1)
{
auto & input = inputs.front();
2019-02-26 18:40:08 +00:00
if (input.isFinished())
{
output.finish();
return Status::Finished;
}
input.setNeeded();
2019-02-26 18:40:08 +00:00
if (input.hasData())
{
if (!is_port_full)
output.push(input.pull());
return Status::PortFull;
}
2019-02-26 18:40:08 +00:00
return Status::NeedData;
}
/// Push if has data.
if (merged_data.mergedRows() && !is_port_full)
2019-02-26 18:40:08 +00:00
output.push(merged_data.pull());
if (!is_initialized)
{
/// Check for inputs we need.
bool all_inputs_has_data = true;
auto it = inputs.begin();
for (size_t i = 0; it != inputs.end(); ++i, ++it)
2019-02-26 18:40:08 +00:00
{
auto & input = *it;
if (input.isFinished())
2019-02-26 18:40:08 +00:00
continue;
if (!cursors[i].empty())
{
// input.setNotNeeded();
2019-02-26 18:40:08 +00:00
continue;
}
input.setNeeded();
2019-02-26 18:40:08 +00:00
if (!input.hasData())
2019-02-26 18:40:08 +00:00
{
all_inputs_has_data = false;
continue;
}
auto chunk = input.pull();
2019-10-21 23:51:36 +00:00
if (!chunk.hasRows())
2019-02-26 18:40:08 +00:00
{
2019-12-24 13:30:28 +00:00
if (!input.isFinished())
all_inputs_has_data = false;
2019-02-26 18:40:08 +00:00
continue;
}
updateCursor(std::move(chunk), i);
}
if (!all_inputs_has_data)
return Status::NeedData;
if (has_collation)
queue_with_collation = SortingHeap<SortCursorWithCollation>(cursors);
2019-02-26 18:40:08 +00:00
else
queue_without_collation = SortingHeap<SortCursor>(cursors);
2019-02-26 18:40:08 +00:00
is_initialized = true;
return Status::Ready;
}
else
{
if (is_finished)
{
2020-01-16 15:52:52 +00:00
if (is_port_full)
return Status::PortFull;
2019-02-26 18:40:08 +00:00
for (auto & input : inputs)
input.close();
outputs.front().finish();
2019-02-26 18:40:08 +00:00
return Status::Finished;
}
if (need_data)
{
auto & input = *std::next(inputs.begin(), next_input_to_read);
2019-02-26 18:40:08 +00:00
if (!input.isFinished())
{
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
2019-03-01 15:30:00 +00:00
auto chunk = input.pull();
2019-12-24 13:30:28 +00:00
if (!chunk.hasRows() && !input.isFinished())
2019-03-01 15:30:00 +00:00
return Status::NeedData;
updateCursor(std::move(chunk), next_input_to_read);
if (has_collation)
queue_with_collation.push(cursors[next_input_to_read]);
else
queue_without_collation.push(cursors[next_input_to_read]);
2019-02-26 18:40:08 +00:00
}
2019-12-24 13:30:28 +00:00
need_data = false;
2019-02-26 18:40:08 +00:00
}
if (is_port_full)
return Status::PortFull;
2019-02-26 18:40:08 +00:00
return Status::Ready;
}
}
void MergingSortedTransform::work()
{
if (has_collation)
merge(queue_with_collation);
else
merge(queue_without_collation);
}
template <typename TSortingHeap>
void MergingSortedTransform::merge(TSortingHeap & queue)
2019-02-26 18:40:08 +00:00
{
/// Returns MergeStatus which we should return if we are going to finish now.
auto can_read_another_row = [&, this]()
{
if (limit && merged_data.totalMergedRows() >= limit)
{
//std::cerr << "Limit reached\n";
is_finished = true;
return false;
}
if (merged_data.mergedRows() >= max_block_size)
2019-02-26 18:40:08 +00:00
{
//std::cerr << "max_block_size reached\n";
return false;
}
return true;
};
/// Take rows in required order and put them into `merged_data`, while the rows are no more than `max_block_size`
while (queue.isValid())
2019-02-26 18:40:08 +00:00
{
/// Shouldn't happen at first iteration, but check just in case.
if (!can_read_another_row())
return;
auto current = queue.current();
2019-02-26 18:40:08 +00:00
/** And what if the block is totally less or equal than the rest for the current cursor?
* Or is there only one data source left in the queue? Then you can take the entire block on current cursor.
*/
if (current.impl->isFirst()
&& (queue.size() == 1
|| (queue.size() >= 2 && current.totallyLessOrEquals(queue.nextChild()))))
2019-02-26 18:40:08 +00:00
{
//std::cerr << "current block is totally less or equals\n";
2019-02-26 18:40:08 +00:00
/// If there are already data in the current block, we first return it. We'll get here again the next time we call the merge function.
if (merged_data.mergedRows() != 0)
2019-02-26 18:40:08 +00:00
{
//std::cerr << "merged rows is non-zero\n";
2019-02-26 18:40:08 +00:00
return;
}
/// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
size_t source_num = current.impl->order;
insertFromChunk(source_num);
2020-01-02 20:07:07 +00:00
queue.removeTop();
return;
}
2019-02-26 18:40:08 +00:00
//std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n";
//std::cerr << "Inserting row\n";
merged_data.insertRow(current->all_columns, current->pos);
2019-02-26 18:40:08 +00:00
if (out_row_sources_buf)
{
/// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
RowSourcePart row_source(current.impl->order);
out_row_sources_buf->write(row_source.data);
}
2019-02-26 18:40:08 +00:00
if (!current->isLast())
{
// std::cerr << "moving to next row\n";
queue.next();
}
else
{
/// We will get the next block from the corresponding source, if there is one.
queue.removeTop();
2019-02-26 18:40:08 +00:00
// std::cerr << "It was last row, fetching next block\n";
need_data = true;
next_input_to_read = current.impl->order;
2019-02-26 18:40:08 +00:00
if (limit && merged_data.totalMergedRows() >= limit)
is_finished = true;
2019-02-26 18:40:08 +00:00
return;
2019-02-26 18:40:08 +00:00
}
}
2019-03-01 15:30:00 +00:00
is_finished = true;
2019-02-26 18:40:08 +00:00
}
void MergingSortedTransform::insertFromChunk(size_t source_num)
{
if (source_num >= cursors.size())
throw Exception("Logical error in MergingSortedTrandform", ErrorCodes::LOGICAL_ERROR);
//std::cerr << "copied columns\n";
2020-03-02 12:56:47 +00:00
auto num_rows = source_chunks[source_num].getNumRows();
2019-02-26 18:40:08 +00:00
UInt64 total_merged_rows_after_insertion = merged_data.mergedRows() + num_rows;
if (limit && total_merged_rows_after_insertion > limit)
{
num_rows = total_merged_rows_after_insertion - limit;
2020-03-02 12:56:47 +00:00
merged_data.insertFromChunk(std::move(source_chunks[source_num]), num_rows);
2019-02-26 18:40:08 +00:00
is_finished = true;
}
else
{
2020-03-02 12:56:47 +00:00
merged_data.insertFromChunk(std::move(source_chunks[source_num]), 0);
2019-02-26 18:40:08 +00:00
need_data = true;
next_input_to_read = source_num;
}
2020-03-02 12:56:47 +00:00
source_chunks[source_num] = Chunk();
2019-02-26 18:40:08 +00:00
if (out_row_sources_buf)
{
RowSourcePart row_source(source_num);
for (size_t i = 0; i < num_rows; ++i)
out_row_sources_buf->write(row_source.data);
}
}
}