Fix MergingSortedTransform. Update PipelineExecutor.

This commit is contained in:
Nikolai Kochetov 2019-03-01 17:41:12 +03:00
parent 49d1899aed
commit c388e57a40
6 changed files with 15 additions and 16 deletions

View File

@ -119,14 +119,15 @@ void PipelineExecutor::processFinishedExecutionQueueSafe()
bool PipelineExecutor::addProcessorToPrepareQueueIfUpdated(Edge & edge)
{
auto & node = graph[edge.to];
/// Don't add processor if nothing was read from port.
if (edge.version == edge.prev_version)
if (node.status != ExecStatus::New && edge.version == edge.prev_version)
return false;
edge.prev_version = edge.version;
auto & node = graph[edge.to];
if (node.status == ExecStatus::Idle)
if (node.status == ExecStatus::Idle || node.status == ExecStatus::New)
{
prepare_queue.push(edge.to);
node.status = ExecStatus::Preparing;

View File

@ -26,6 +26,7 @@ private:
enum class ExecStatus
{
New,
Idle,
Preparing,
Executing,
@ -39,7 +40,7 @@ private:
Edges directEdges;
Edges backEdges;
ExecStatus status = ExecStatus::Idle;
ExecStatus status = ExecStatus::New;
IProcessor::Status last_processor_status;
};

View File

@ -11,7 +11,7 @@ class ISink : public IProcessor
protected:
InputPort & input;
Chunk current_chunk;
bool has_input;
bool has_input = false;
virtual void consume(Chunk block) = 0;

View File

@ -29,7 +29,7 @@ LimitTransform::Status LimitTransform::prepare()
}
/// Push block if can.
if (block_processed)
if (has_block && block_processed)
{
output.push(std::move(current_chunk));
has_block = false;

View File

@ -80,10 +80,7 @@ IProcessor::Status MergingSortedTransform::prepare()
/// Push if has data.
if (merged_data.mergedRows())
{
output.push(merged_data.pull());
return Status::PortFull;
}
if (!is_initialized)
{
@ -185,7 +182,7 @@ void MergingSortedTransform::merge(std::priority_queue<TSortCursor> & queue)
return false;
}
if (merged_data.totalMergedRows() >= max_block_size)
if (merged_data.mergedRows() >= max_block_size)
{
//std::cerr << "max_block_size reached\n";
return false;
@ -251,7 +248,7 @@ void MergingSortedTransform::merge(std::priority_queue<TSortCursor> & queue)
need_data = true;
next_input_to_read = current.impl->order;
if (merged_data.totalMergedRows() >= limit)
if (limit && merged_data.totalMergedRows() >= limit)
is_finished = true;
return;

View File

@ -46,7 +46,7 @@ private:
usleep(sleep_useconds);
MutableColumns columns;
columns.emplace_back(ColumnUInt64::create(block_size));
columns.emplace_back(ColumnUInt64::create());
for (UInt64 i = 0; i < block_size; ++i, current_number += step)
columns.back()->insert(Field(current_number));
@ -144,13 +144,13 @@ try
auto transform2 = std::make_shared<SleepyTransform>(100000);
auto transform3 = std::make_shared<SleepyTransform>(100000);
auto limit1 = std::make_shared<LimitTransform>(source1->getPort().getHeader(), 10, 0);
auto limit2 = std::make_shared<LimitTransform>(source2->getPort().getHeader(), 10, 0);
auto limit3 = std::make_shared<LimitTransform>(source3->getPort().getHeader(), 10, 0);
auto limit1 = std::make_shared<LimitTransform>(source1->getPort().getHeader(), 20, 0);
auto limit2 = std::make_shared<LimitTransform>(source2->getPort().getHeader(), 20, 0);
auto limit3 = std::make_shared<LimitTransform>(source3->getPort().getHeader(), 20, 0);
SortDescription description = {{0, 1, 1}};
auto merge = std::make_shared<MergingSortedTransform>(source1->getPort().getHeader(), 3, description, 2);
auto limit_fin = std::make_shared<LimitTransform>(source1->getPort().getHeader(), 27, 0);
auto limit_fin = std::make_shared<LimitTransform>(source1->getPort().getHeader(), 54, 0);
auto sink = std::make_shared<PrintSink>("");
connect(source1->getPort(), transform1->getInputPort());