diff --git a/src/Processors/DelayedPortsProcessor.cpp b/src/Processors/DelayedPortsProcessor.cpp index d740ef08e5a..ae4ba4659aa 100644 --- a/src/Processors/DelayedPortsProcessor.cpp +++ b/src/Processors/DelayedPortsProcessor.cpp @@ -40,6 +40,11 @@ void DelayedPortsProcessor::finishPair(PortsPair & pair) { if (!pair.is_finished) { + if (pair.output_port) + pair.output_port->finish(); + + pair.input_port->close(); + pair.is_finished = true; ++num_finished_pairs; @@ -52,15 +57,12 @@ bool DelayedPortsProcessor::processPair(PortsPair & pair) { if (pair.output_port && pair.output_port->isFinished()) { - pair.input_port->close(); finishPair(pair); return false; } if (pair.input_port->isFinished()) { - if (pair.output_port) - pair.output_port->finish(); finishPair(pair); return false; } @@ -111,12 +113,7 @@ IProcessor::Status DelayedPortsProcessor::prepare(const PortNumbers & updated_in if (num_finished_outputs == outputs.size()) { for (auto & pair : port_pairs) - { - if (pair.output_port) - pair.output_port->finish(); - - pair.input_port->close(); - } + finishPair(pair); return Status::Finished; }