Try fix Pipeline stuck

This commit is contained in:
Nikolai Kochetov 2020-12-27 14:02:21 +03:00
parent d6f501f497
commit a85cfa81db
3 changed files with 22 additions and 9 deletions

View File

@ -3,8 +3,10 @@
namespace DB
{
DelayedPortsProcessor::DelayedPortsProcessor(const Block & header, size_t num_ports, const PortNumbers & delayed_ports)
: IProcessor(InputPorts(num_ports, header), OutputPorts(num_ports, header))
DelayedPortsProcessor::DelayedPortsProcessor(
const Block & header, size_t num_ports, const PortNumbers & delayed_ports, bool assert_delayed_ports_empty)
: IProcessor(InputPorts(num_ports, header),
OutputPorts(num_ports - (assert_delayed_ports_empty ? delayed_ports.size() : 0), header))
, num_delayed(delayed_ports.size())
{
port_pairs.resize(num_ports);
@ -14,10 +16,14 @@ DelayedPortsProcessor::DelayedPortsProcessor(const Block & header, size_t num_po
for (size_t i = 0; i < num_ports; ++i)
{
port_pairs[i].input_port = &*input_it;
port_pairs[i].output_port = &*output_it;
++input_it;
if (output_it != outputs.end())
{
port_pairs[i].output_port = &*output_it;
++output_it;
}
}
for (const auto & delayed : delayed_ports)
port_pairs[delayed].is_delayed = true;
@ -34,7 +40,7 @@ bool DelayedPortsProcessor::processPair(PortsPair & pair)
}
};
if (pair.output_port->isFinished())
if (pair.output_port && pair.output_port->isFinished())
{
pair.input_port->close();
finish();
@ -43,17 +49,24 @@ bool DelayedPortsProcessor::processPair(PortsPair & pair)
if (pair.input_port->isFinished())
{
if (pair.output_port)
pair.output_port->finish();
finish();
return false;
}
if (!pair.output_port->canPush())
if (pair.output_port && !pair.output_port->canPush())
return false;
pair.input_port->setNeeded();
if (pair.input_port->hasData())
{
if (!pair.output_port)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Input port for DelayedPortsProcessor is assumed to have no data, but it has one");
pair.output_port->pushData(pair.input_port->pullData());
}
return true;
}

View File

@ -11,7 +11,7 @@ namespace DB
class DelayedPortsProcessor : public IProcessor
{
public:
DelayedPortsProcessor(const Block & header, size_t num_ports, const PortNumbers & delayed_ports);
DelayedPortsProcessor(const Block & header, size_t num_ports, const PortNumbers & delayed_ports, bool assert_delayed_ports_empty = false);
String getName() const override { return "DelayedPorts"; }

View File

@ -298,7 +298,7 @@ void QueryPipeline::addPipelineBefore(QueryPipeline pipeline)
pipes.emplace_back(QueryPipeline::getPipe(std::move(pipeline)));
pipe = Pipe::unitePipes(std::move(pipes), collected_processors);
auto processor = std::make_shared<DelayedPortsProcessor>(getHeader(), pipe.numOutputPorts(), delayed_streams);
auto processor = std::make_shared<DelayedPortsProcessor>(getHeader(), pipe.numOutputPorts(), delayed_streams, true);
addTransform(std::move(processor));
}