Merge pull request #20251 from ClickHouse/fix-20242

Fix not closed ports in DelayedPortsProcessor
This commit is contained in:
Nikolai Kochetov 2021-02-10 12:28:28 +03:00 committed by GitHub
commit 253c72afc4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -40,6 +40,11 @@ void DelayedPortsProcessor::finishPair(PortsPair & pair)
{
if (!pair.is_finished)
{
if (pair.output_port)
pair.output_port->finish();
pair.input_port->close();
pair.is_finished = true;
++num_finished_pairs;
@ -52,15 +57,12 @@ bool DelayedPortsProcessor::processPair(PortsPair & pair)
{
if (pair.output_port && pair.output_port->isFinished())
{
pair.input_port->close();
finishPair(pair);
return false;
}
if (pair.input_port->isFinished())
{
if (pair.output_port)
pair.output_port->finish();
finishPair(pair);
return false;
}
@ -111,12 +113,7 @@ IProcessor::Status DelayedPortsProcessor::prepare(const PortNumbers & updated_in
if (num_finished_outputs == outputs.size())
{
for (auto & pair : port_pairs)
{
if (pair.output_port)
pair.output_port->finish();
pair.input_port->close();
}
finishPair(pair);
return Status::Finished;
}