2019-02-26 18:40:08 +00:00
|
|
|
#include <Processors/Transforms/MergingSortedTransform.h>
|
|
|
|
#include <DataStreams/ColumnGathererStream.h>
|
|
|
|
#include <IO/WriteBuffer.h>
|
2019-08-21 16:38:27 +00:00
|
|
|
#include <DataStreams/materializeBlock.h>
|
2019-02-26 18:40:08 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
MergingSortedTransform::MergingSortedTransform(
|
|
|
|
const Block & header,
|
|
|
|
size_t num_inputs,
|
|
|
|
const SortDescription & description_,
|
2019-08-03 11:02:40 +00:00
|
|
|
size_t max_block_size_,
|
|
|
|
UInt64 limit_,
|
|
|
|
bool quiet_,
|
|
|
|
bool have_all_inputs_)
|
2019-08-21 16:38:27 +00:00
|
|
|
: IProcessor(InputPorts(num_inputs, header), {materializeBlock(header)})
|
2019-08-03 11:02:40 +00:00
|
|
|
, description(description_), max_block_size(max_block_size_), limit(limit_), quiet(quiet_)
|
|
|
|
, have_all_inputs(have_all_inputs_)
|
2019-02-26 18:40:08 +00:00
|
|
|
, merged_data(header), source_chunks(num_inputs), cursors(num_inputs)
|
|
|
|
{
|
2019-02-27 11:24:14 +00:00
|
|
|
auto & sample = outputs.front().getHeader();
|
2019-02-26 18:40:08 +00:00
|
|
|
/// Replace column names in description to positions.
|
|
|
|
for (auto & column_description : description)
|
|
|
|
{
|
|
|
|
has_collation |= column_description.collator != nullptr;
|
|
|
|
if (!column_description.column_name.empty())
|
|
|
|
{
|
|
|
|
column_description.column_number = sample.getPositionByName(column_description.column_name);
|
|
|
|
column_description.column_name.clear();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-04-16 11:40:15 +00:00
|
|
|
void MergingSortedTransform::addInput()
|
|
|
|
{
|
|
|
|
if (have_all_inputs)
|
|
|
|
throw Exception("MergingSortedTransform already have all inputs.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
inputs.emplace_back(outputs.front().getHeader(), this);
|
|
|
|
source_chunks.emplace_back();
|
|
|
|
cursors.emplace_back();
|
|
|
|
}
|
|
|
|
|
|
|
|
void MergingSortedTransform::setHaveAllInputs()
|
|
|
|
{
|
|
|
|
if (have_all_inputs)
|
|
|
|
throw Exception("MergingSortedTransform already have all inputs.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
have_all_inputs = true;
|
|
|
|
}
|
|
|
|
|
2019-02-26 18:40:08 +00:00
|
|
|
IProcessor::Status MergingSortedTransform::prepare()
|
|
|
|
{
|
2019-04-16 11:40:15 +00:00
|
|
|
if (!have_all_inputs)
|
|
|
|
return Status::NeedData;
|
|
|
|
|
2019-02-27 11:24:14 +00:00
|
|
|
auto & output = outputs.front();
|
2019-02-26 18:40:08 +00:00
|
|
|
|
|
|
|
/// Special case for no inputs.
|
|
|
|
if (inputs.empty())
|
|
|
|
{
|
|
|
|
output.finish();
|
|
|
|
return Status::Finished;
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Check can output.
|
|
|
|
|
|
|
|
if (output.isFinished())
|
|
|
|
{
|
|
|
|
for (auto & in : inputs)
|
|
|
|
in.close();
|
|
|
|
|
|
|
|
return Status::Finished;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!output.isNeeded())
|
|
|
|
{
|
|
|
|
for (auto & in : inputs)
|
|
|
|
in.setNotNeeded();
|
|
|
|
|
|
|
|
return Status::PortFull;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (output.hasData())
|
|
|
|
return Status::PortFull;
|
|
|
|
|
|
|
|
/// Special case for single input.
|
|
|
|
if (inputs.size() == 1)
|
|
|
|
{
|
2019-02-27 11:24:14 +00:00
|
|
|
auto & input = inputs.front();
|
2019-02-26 18:40:08 +00:00
|
|
|
if (input.isFinished())
|
|
|
|
{
|
|
|
|
output.finish();
|
|
|
|
return Status::Finished;
|
|
|
|
}
|
|
|
|
|
|
|
|
input.setNeeded();
|
|
|
|
if (input.hasData())
|
|
|
|
output.push(input.pull());
|
|
|
|
|
|
|
|
return Status::NeedData;
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Push if has data.
|
|
|
|
if (merged_data.mergedRows())
|
|
|
|
output.push(merged_data.pull());
|
|
|
|
|
|
|
|
if (!is_initialized)
|
|
|
|
{
|
|
|
|
/// Check for inputs we need.
|
|
|
|
bool all_inputs_has_data = true;
|
2019-02-27 11:24:14 +00:00
|
|
|
auto it = inputs.begin();
|
|
|
|
for (size_t i = 0; it != inputs.end(); ++i, ++it)
|
2019-02-26 18:40:08 +00:00
|
|
|
{
|
2019-02-27 11:24:14 +00:00
|
|
|
auto & input = *it;
|
|
|
|
if (input.isFinished())
|
2019-02-26 18:40:08 +00:00
|
|
|
continue;
|
|
|
|
|
|
|
|
if (!cursors[i].empty())
|
|
|
|
{
|
2019-02-27 11:24:14 +00:00
|
|
|
input.setNotNeeded();
|
2019-02-26 18:40:08 +00:00
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
2019-02-27 11:24:14 +00:00
|
|
|
input.setNeeded();
|
2019-02-26 18:40:08 +00:00
|
|
|
|
2019-02-27 11:24:14 +00:00
|
|
|
if (!input.hasData())
|
2019-02-26 18:40:08 +00:00
|
|
|
{
|
|
|
|
all_inputs_has_data = false;
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
2019-02-27 11:24:14 +00:00
|
|
|
auto chunk = input.pull();
|
2019-02-26 18:40:08 +00:00
|
|
|
if (chunk.hasNoRows())
|
|
|
|
{
|
|
|
|
all_inputs_has_data = false;
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
updateCursor(std::move(chunk), i);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!all_inputs_has_data)
|
|
|
|
return Status::NeedData;
|
|
|
|
|
|
|
|
if (has_collation)
|
|
|
|
initQueue(queue_with_collation);
|
|
|
|
else
|
|
|
|
initQueue(queue_without_collation);
|
|
|
|
|
|
|
|
is_initialized = true;
|
|
|
|
return Status::Ready;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
if (is_finished)
|
|
|
|
{
|
|
|
|
for (auto & input : inputs)
|
|
|
|
input.close();
|
|
|
|
|
2019-02-27 11:24:14 +00:00
|
|
|
outputs.front().finish();
|
2019-02-26 18:40:08 +00:00
|
|
|
|
|
|
|
return Status::Finished;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (need_data)
|
|
|
|
{
|
2019-02-27 11:24:14 +00:00
|
|
|
|
|
|
|
auto & input = *std::next(inputs.begin(), next_input_to_read);
|
2019-02-26 18:40:08 +00:00
|
|
|
if (!input.isFinished())
|
|
|
|
{
|
|
|
|
input.setNeeded();
|
|
|
|
|
|
|
|
if (!input.hasData())
|
|
|
|
return Status::NeedData;
|
|
|
|
|
2019-03-01 15:30:00 +00:00
|
|
|
auto chunk = input.pull();
|
2019-10-21 16:26:29 +00:00
|
|
|
if (!chunk.hasRows())
|
2019-03-01 15:30:00 +00:00
|
|
|
return Status::NeedData;
|
|
|
|
|
|
|
|
updateCursor(std::move(chunk), next_input_to_read);
|
2019-02-26 18:40:08 +00:00
|
|
|
pushToQueue(next_input_to_read);
|
|
|
|
need_data = false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return Status::Ready;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void MergingSortedTransform::work()
|
|
|
|
{
|
|
|
|
if (has_collation)
|
|
|
|
merge(queue_with_collation);
|
|
|
|
else
|
|
|
|
merge(queue_without_collation);
|
|
|
|
}
|
|
|
|
|
|
|
|
template <typename TSortCursor>
|
|
|
|
void MergingSortedTransform::merge(std::priority_queue<TSortCursor> & queue)
|
|
|
|
{
|
|
|
|
/// Returns MergeStatus which we should return if we are going to finish now.
|
|
|
|
auto can_read_another_row = [&, this]()
|
|
|
|
{
|
|
|
|
if (limit && merged_data.totalMergedRows() >= limit)
|
|
|
|
{
|
|
|
|
//std::cerr << "Limit reached\n";
|
|
|
|
is_finished = true;
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2019-03-01 14:41:12 +00:00
|
|
|
if (merged_data.mergedRows() >= max_block_size)
|
2019-02-26 18:40:08 +00:00
|
|
|
{
|
|
|
|
//std::cerr << "max_block_size reached\n";
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
return true;
|
|
|
|
};
|
|
|
|
|
|
|
|
/// Take rows in required order and put them into `merged_data`, while the rows are no more than `max_block_size`
|
|
|
|
while (!queue.empty())
|
|
|
|
{
|
|
|
|
/// Shouldn't happen at first iteration, but check just in case.
|
|
|
|
if (!can_read_another_row())
|
|
|
|
return;
|
|
|
|
|
|
|
|
TSortCursor current = queue.top();
|
|
|
|
queue.pop();
|
|
|
|
bool first_iteration = true;
|
|
|
|
|
|
|
|
while (true)
|
|
|
|
{
|
|
|
|
if (!first_iteration && !can_read_another_row())
|
|
|
|
{
|
|
|
|
queue.push(current);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
first_iteration = false;
|
|
|
|
|
|
|
|
/** And what if the block is totally less or equal than the rest for the current cursor?
|
|
|
|
* Or is there only one data source left in the queue? Then you can take the entire block on current cursor.
|
|
|
|
*/
|
|
|
|
if (current.impl->isFirst() && (queue.empty() || current.totallyLessOrEquals(queue.top())))
|
|
|
|
{
|
|
|
|
//std::cerr << "current block is totally less or equals\n";
|
|
|
|
|
|
|
|
/// If there are already data in the current block, we first return it. We'll get here again the next time we call the merge function.
|
|
|
|
if (merged_data.mergedRows() != 0)
|
|
|
|
{
|
|
|
|
//std::cerr << "merged rows is non-zero\n";
|
|
|
|
queue.push(current);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
|
|
|
|
size_t source_num = current.impl->order;
|
|
|
|
insertFromChunk(source_num);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
//std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n";
|
|
|
|
//std::cerr << "Inserting row\n";
|
|
|
|
merged_data.insertRow(current->all_columns, current->pos);
|
|
|
|
|
|
|
|
if (out_row_sources_buf)
|
|
|
|
{
|
|
|
|
/// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
|
|
|
|
RowSourcePart row_source(current.impl->order);
|
|
|
|
out_row_sources_buf->write(row_source.data);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (current->isLast())
|
|
|
|
{
|
|
|
|
need_data = true;
|
|
|
|
next_input_to_read = current.impl->order;
|
|
|
|
|
2019-03-01 14:41:12 +00:00
|
|
|
if (limit && merged_data.totalMergedRows() >= limit)
|
2019-02-26 18:40:08 +00:00
|
|
|
is_finished = true;
|
|
|
|
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
//std::cerr << "moving to next row\n";
|
|
|
|
current->next();
|
|
|
|
|
|
|
|
if (!queue.empty() && current.greater(queue.top()))
|
|
|
|
{
|
|
|
|
//std::cerr << "next row is not least, pushing back to queue\n";
|
|
|
|
queue.push(current);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2019-03-01 15:30:00 +00:00
|
|
|
is_finished = true;
|
2019-02-26 18:40:08 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void MergingSortedTransform::insertFromChunk(size_t source_num)
|
|
|
|
{
|
|
|
|
if (source_num >= cursors.size())
|
|
|
|
throw Exception("Logical error in MergingSortedTrandform", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
//std::cerr << "copied columns\n";
|
|
|
|
|
|
|
|
auto num_rows = source_chunks[source_num]->getNumRows();
|
|
|
|
|
|
|
|
UInt64 total_merged_rows_after_insertion = merged_data.mergedRows() + num_rows;
|
|
|
|
if (limit && total_merged_rows_after_insertion > limit)
|
|
|
|
{
|
|
|
|
num_rows = total_merged_rows_after_insertion - limit;
|
|
|
|
merged_data.insertFromChunk(std::move(*source_chunks[source_num]), num_rows);
|
|
|
|
is_finished = true;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
merged_data.insertFromChunk(std::move(*source_chunks[source_num]), 0);
|
|
|
|
need_data = true;
|
|
|
|
next_input_to_read = source_num;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (out_row_sources_buf)
|
|
|
|
{
|
|
|
|
RowSourcePart row_source(source_num);
|
|
|
|
for (size_t i = 0; i < num_rows; ++i)
|
|
|
|
out_row_sources_buf->write(row_source.data);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
}
|