Fix not closed ports in DelayedPortsProcessor

This commit is contained in:
Nikolai Kochetov 2021-02-09 19:35:46 +03:00
parent 63038eb911
commit db09d5a990

View File

@ -40,6 +40,11 @@ void DelayedPortsProcessor::finishPair(PortsPair & pair)
{ {
if (!pair.is_finished) if (!pair.is_finished)
{ {
if (pair.output_port)
pair.output_port->finish();
pair.input_port->close();
pair.is_finished = true; pair.is_finished = true;
++num_finished_pairs; ++num_finished_pairs;
@ -52,15 +57,12 @@ bool DelayedPortsProcessor::processPair(PortsPair & pair)
{ {
if (pair.output_port && pair.output_port->isFinished()) if (pair.output_port && pair.output_port->isFinished())
{ {
pair.input_port->close();
finishPair(pair); finishPair(pair);
return false; return false;
} }
if (pair.input_port->isFinished()) if (pair.input_port->isFinished())
{ {
if (pair.output_port)
pair.output_port->finish();
finishPair(pair); finishPair(pair);
return false; return false;
} }
@ -111,12 +113,7 @@ IProcessor::Status DelayedPortsProcessor::prepare(const PortNumbers & updated_in
if (num_finished_outputs == outputs.size()) if (num_finished_outputs == outputs.size())
{ {
for (auto & pair : port_pairs) for (auto & pair : port_pairs)
{ finishPair(pair);
if (pair.output_port)
pair.output_port->finish();
pair.input_port->close();
}
return Status::Finished; return Status::Finished;
} }