Updates in AggregatingTransform and MergingSortedTransform.

This commit is contained in:
Nikolai Kochetov 2020-01-16 13:45:34 +03:00
parent 4b86ef22b5
commit 5aa503beb3
2 changed files with 17 additions and 14 deletions

View File

@ -433,9 +433,11 @@ IProcessor::Status AggregatingTransform::prepare()
}
}
input.setNeeded();
if (!input.hasData())
{
input.setNeeded();
return Status::NeedData;
}
current_chunk = input.pull(/*set_not_needed = */ !is_consume_finished);
read_current_chunk = true;

View File

@ -74,16 +74,8 @@ IProcessor::Status MergingSortedTransform::prepare()
return Status::Finished;
}
if (!output.isNeeded())
{
for (auto & in : inputs)
in.setNotNeeded();
return Status::PortFull;
}
if (output.hasData())
return Status::PortFull;
/// Do not disable inputs, so it will work in the same way as with AsynchronousBlockInputStream, like before.
bool is_port_full = !output.canPush();
/// Special case for single input.
if (inputs.size() == 1)
@ -96,14 +88,20 @@ IProcessor::Status MergingSortedTransform::prepare()
}
input.setNeeded();
if (input.hasData())
{
if (!is_port_full)
output.push(input.pull());
return Status::PortFull;
}
return Status::NeedData;
}
/// Push if has data.
if (merged_data.mergedRows())
if (merged_data.mergedRows() && !is_port_full)
output.push(merged_data.pull());
if (!is_initialized)
@ -119,7 +117,7 @@ IProcessor::Status MergingSortedTransform::prepare()
if (!cursors[i].empty())
{
input.setNotNeeded();
// input.setNotNeeded();
continue;
}
@ -192,6 +190,9 @@ IProcessor::Status MergingSortedTransform::prepare()
need_data = false;
}
if (is_port_full)
return Status::PortFull;
return Status::Ready;
}
}