Update QueryPipeline::addDelayingPipeline

This commit is contained in:
Nikolai Kochetov 2020-09-16 13:25:01 +03:00
parent a8e671f015
commit 8be45dd3cc

View File

@ -264,16 +264,22 @@ QueryPipeline QueryPipeline::unitePipelines(
void QueryPipeline::addDelayingPipeline(QueryPipeline pipeline)
{
pipeline.resize(1);
checkInitializedAndNotCompleted();
assertBlocksHaveEqualStructure(getHeader(), pipeline.getHeader(), "QueryPipeline");
IProcessor::PortNumbers delayed_streams(pipe.numOutputPorts());
for (size_t i = 0; i < delayed_streams.size(); ++i)
delayed_streams[i] = i;
auto * collected_processors = pipe.collected_processors;
Pipes pipes;
pipes.emplace_back(QueryPipeline::getPipe(std::move(pipeline)));
pipes.emplace_back(std::move(pipe));
pipes.emplace_back(QueryPipeline::getPipe(std::move(pipeline)));
pipe = Pipe::unitePipes(std::move(pipes), collected_processors);
pipe.addTransform(std::make_shared<ConcatProcessor>(getHeader(), 2));
auto processor = std::make_shared<DelayedPortsProcessor>(getHeader(), pipe.numOutputPorts(), delayed_streams);
addTransform(std::move(processor));
}
void QueryPipeline::setProgressCallback(const ProgressCallback & callback)