diff --git a/src/Processors/DelayedPortsProcessor.cpp b/src/Processors/DelayedPortsProcessor.cpp index 5a6297413e1..a634f348771 100644 --- a/src/Processors/DelayedPortsProcessor.cpp +++ b/src/Processors/DelayedPortsProcessor.cpp @@ -3,8 +3,10 @@ namespace DB { -DelayedPortsProcessor::DelayedPortsProcessor(const Block & header, size_t num_ports, const PortNumbers & delayed_ports) - : IProcessor(InputPorts(num_ports, header), OutputPorts(num_ports, header)) +DelayedPortsProcessor::DelayedPortsProcessor( + const Block & header, size_t num_ports, const PortNumbers & delayed_ports, bool assert_delayed_ports_empty) + : IProcessor(InputPorts(num_ports, header), + OutputPorts(num_ports - (assert_delayed_ports_empty ? delayed_ports.size() : 0), header)) , num_delayed(delayed_ports.size()) { port_pairs.resize(num_ports); @@ -14,9 +16,13 @@ DelayedPortsProcessor::DelayedPortsProcessor(const Block & header, size_t num_po for (size_t i = 0; i < num_ports; ++i) { port_pairs[i].input_port = &*input_it; - port_pairs[i].output_port = &*output_it; ++input_it; - ++output_it; + + if (output_it != outputs.end()) + { + port_pairs[i].output_port = &*output_it; + ++output_it; + } } for (const auto & delayed : delayed_ports) @@ -34,7 +40,7 @@ bool DelayedPortsProcessor::processPair(PortsPair & pair) } }; - if (pair.output_port->isFinished()) + if (pair.output_port && pair.output_port->isFinished()) { pair.input_port->close(); finish(); @@ -43,17 +49,24 @@ bool DelayedPortsProcessor::processPair(PortsPair & pair) if (pair.input_port->isFinished()) { - pair.output_port->finish(); + if (pair.output_port) + pair.output_port->finish(); finish(); return false; } - if (!pair.output_port->canPush()) + if (pair.output_port && !pair.output_port->canPush()) return false; pair.input_port->setNeeded(); if (pair.input_port->hasData()) + { + if (!pair.output_port) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Input port for DelayedPortsProcessor is assumed to have no data, but it has one"); + pair.output_port->pushData(pair.input_port->pullData()); + } return true; } diff --git a/src/Processors/DelayedPortsProcessor.h b/src/Processors/DelayedPortsProcessor.h index 44dd632f8a8..77409332d8e 100644 --- a/src/Processors/DelayedPortsProcessor.h +++ b/src/Processors/DelayedPortsProcessor.h @@ -11,7 +11,7 @@ namespace DB class DelayedPortsProcessor : public IProcessor { public: - DelayedPortsProcessor(const Block & header, size_t num_ports, const PortNumbers & delayed_ports); + DelayedPortsProcessor(const Block & header, size_t num_ports, const PortNumbers & delayed_ports, bool assert_delayed_ports_empty = false); String getName() const override { return "DelayedPorts"; } diff --git a/src/Processors/QueryPipeline.cpp b/src/Processors/QueryPipeline.cpp index ffcab2e7a8d..ff086fd5b11 100644 --- a/src/Processors/QueryPipeline.cpp +++ b/src/Processors/QueryPipeline.cpp @@ -298,7 +298,7 @@ void QueryPipeline::addPipelineBefore(QueryPipeline pipeline) pipes.emplace_back(QueryPipeline::getPipe(std::move(pipeline))); pipe = Pipe::unitePipes(std::move(pipes), collected_processors); - auto processor = std::make_shared(getHeader(), pipe.numOutputPorts(), delayed_streams); + auto processor = std::make_shared(getHeader(), pipe.numOutputPorts(), delayed_streams, true); addTransform(std::move(processor)); }