From 08f943462fa243ef0a18fa27a60be543e25cda74 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 30 Nov 2023 18:23:05 +0000 Subject: [PATCH] Delay totals port for creating sets as well. --- src/Processors/QueryPlan/CreatingSetsStep.cpp | 2 + src/QueryPipeline/Pipe.cpp | 211 ++++++++---------- src/QueryPipeline/Pipe.h | 5 + src/QueryPipeline/QueryPipelineBuilder.cpp | 14 +- 4 files changed, 110 insertions(+), 122 deletions(-) diff --git a/src/Processors/QueryPlan/CreatingSetsStep.cpp b/src/Processors/QueryPlan/CreatingSetsStep.cpp index 3e4dfb0c7d1..37f81ffd160 100644 --- a/src/Processors/QueryPlan/CreatingSetsStep.cpp +++ b/src/Processors/QueryPlan/CreatingSetsStep.cpp @@ -111,6 +111,8 @@ QueryPipelineBuilderPtr CreatingSetsStep::updatePipeline(QueryPipelineBuilders p else delayed_pipeline = std::move(*pipelines.front()); + delayed_pipeline.dropTotalsAndExtremes(); + QueryPipelineProcessorsCollector collector(*main_pipeline, this); main_pipeline->addPipelineBefore(std::move(delayed_pipeline)); auto added_processors = collector.detachProcessors(); diff --git a/src/QueryPipeline/Pipe.cpp b/src/QueryPipeline/Pipe.cpp index b1c82d7a7e8..fd433638252 100644 --- a/src/QueryPipeline/Pipe.cpp +++ b/src/QueryPipeline/Pipe.cpp @@ -434,68 +434,130 @@ void Pipe::addTransform(ProcessorPtr transform) } void Pipe::addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort * extremes) +{ + addTransform(std::move(transform), + static_cast(nullptr), static_cast(nullptr), + totals, extremes); +} + +void Pipe::addTransform(ProcessorPtr transform, InputPort * totals, InputPort * extremes) +{ + addTransform(std::move(transform), + totals, extremes, + static_cast(nullptr), static_cast(nullptr)); +} + +void Pipe::addTransform( + ProcessorPtr transform, + InputPort * totals_in, InputPort * extremes_in, + OutputPort * totals_out, OutputPort * extremes_out) { if (output_ports.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add transform to empty Pipe"); + if (totals_in && !totals_port) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add transform consuming totals to Pipe because Pipe does not have totals"); + + if (extremes_in && !extremes_port) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add transform consuming extremes to Pipe because Pipe does not have extremes"); + + if (totals_out && !totals_in && totals_port) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add transform with totals to Pipe because it already has totals"); + + if (extremes_out && !extremes_in && extremes_port) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add transform with extremes to Pipe because it already has extremes"); + auto & inputs = transform->getInputs(); - if (inputs.size() != output_ports.size()) + auto & outputs = transform->getOutputs(); + + size_t expected_inputs = output_ports.size() + (totals_in ? 1 : 0) + (extremes_in ? 1 : 0); + if (inputs.size() != expected_inputs) throw Exception( ErrorCodes::LOGICAL_ERROR, "Cannot add transform {} to Pipe because it has {} input ports, but {} expected", transform->getName(), inputs.size(), - output_ports.size()); + expected_inputs); - if (totals && totals_port) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add transform with totals to Pipe because it already has totals"); + if (outputs.size() <= (totals_out ? 1 : 0) + (extremes_out ? 1 : 0)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add transform {} to Pipes because it has no outputs", + transform->getName()); - if (extremes && extremes_port) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add transform with extremes to Pipe because it already has extremes"); + bool found_totals_in = false; + bool found_extremes_in = false; - if (totals) - totals_port = totals; - if (extremes) - extremes_port = extremes; - - size_t next_output = 0; for (auto & input : inputs) { - connect(*output_ports[next_output], input); - ++next_output; + if (&input == totals_in) + found_totals_in = true; + else if (&input == extremes_in) + found_extremes_in = true; } - auto & outputs = transform->getOutputs(); + if (totals_in && !found_totals_in) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Cannot add transform {} to Pipes because specified totals port does not belong to it", + transform->getName()); - output_ports.clear(); - output_ports.reserve(outputs.size()); + if (extremes_in && !found_extremes_in) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Cannot add transform {} to Pipes because specified extremes port does not belong to it", + transform->getName()); - bool found_totals = false; - bool found_extremes = false; + bool found_totals_out = false; + bool found_extremes_out = false; for (auto & output : outputs) { - if (&output == totals) - found_totals = true; - else if (&output == extremes) - found_extremes = true; - else - output_ports.emplace_back(&output); + if (&output == totals_out) + found_totals_out = true; + else if (&output == extremes_out) + found_extremes_out = true; } - if (totals && !found_totals) + if (totals_out && !found_totals_out) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add transform {} to Pipes because specified totals port does not belong to it", transform->getName()); - if (extremes && !found_extremes) + if (extremes_out && !found_extremes_out) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add transform {} to Pipes because specified extremes port does not belong to it", transform->getName()); - if (output_ports.empty()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add transform {} to Pipes because it has no outputs", - transform->getName()); + if (totals_in) + { + connect(*totals_port, *totals_in); + totals_port = nullptr; + } + if (extremes_in) + { + connect(*extremes_port, *extremes_in); + extremes_port = nullptr; + } + + totals_port = totals_out ? totals_out : totals_port; + extremes_port = extremes_out ? extremes_out : extremes_port; + + size_t next_output = 0; + for (auto & input : inputs) + { + if (&input != totals_in && &input != extremes_in) + { + connect(*output_ports[next_output], input); + ++next_output; + } + } + + output_ports.clear(); + output_ports.reserve(outputs.size()); + for (auto & output : outputs) + { + if (&output != totals_out && &output != extremes_out) + output_ports.emplace_back(&output); + } header = output_ports.front()->getHeader(); for (size_t i = 1; i < output_ports.size(); ++i) @@ -508,100 +570,11 @@ void Pipe::addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort if (extremes_port) assertBlocksHaveEqualStructure(header, extremes_port->getHeader(), "Pipes"); - if (collected_processors) - collected_processors->emplace_back(transform); - processors->emplace_back(std::move(transform)); - max_parallel_streams = std::max(max_parallel_streams, output_ports.size()); -} - -void Pipe::addTransform(ProcessorPtr transform, InputPort * totals, InputPort * extremes) -{ - if (output_ports.empty()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add transform to empty Pipe"); - - auto & inputs = transform->getInputs(); - size_t expected_inputs = output_ports.size() + (totals ? 1 : 0) + (extremes ? 1 : 0); - if (inputs.size() != expected_inputs) - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Cannot add transform {} to Pipe because it has {} input ports, but {} expected", - transform->getName(), - inputs.size(), - expected_inputs); - - if (totals && !totals_port) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add transform consuming totals to Pipe because Pipe does not have totals"); - - if (extremes && !extremes_port) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add transform consuming extremes to Pipe because Pipe does not have extremes"); - - if (totals) - { - connect(*totals_port, *totals); - totals_port = nullptr; - } - if (extremes) - { - connect(*extremes_port, *extremes); - extremes_port = nullptr; - } - - bool found_totals = false; - bool found_extremes = false; - - size_t next_output = 0; - for (auto & input : inputs) - { - if (&input == totals) - found_totals = true; - else if (&input == extremes) - found_extremes = true; - else - { - connect(*output_ports[next_output], input); - ++next_output; - } - } - - if (totals && !found_totals) - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Cannot add transform {} to Pipes because specified totals port does not belong to it", - transform->getName()); - - if (extremes && !found_extremes) - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Cannot add transform {} to Pipes because specified extremes port does not belong to it", - transform->getName()); - - auto & outputs = transform->getOutputs(); - if (outputs.empty()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add transform {} to Pipes because it has no outputs", transform->getName()); - - output_ports.clear(); - output_ports.reserve(outputs.size()); - - for (auto & output : outputs) - output_ports.emplace_back(&output); - - header = output_ports.front()->getHeader(); - for (size_t i = 1; i < output_ports.size(); ++i) - assertBlocksHaveEqualStructure(header, output_ports[i]->getHeader(), "Pipes"); - - if (totals_port) - assertBlocksHaveEqualStructure(header, totals_port->getHeader(), "Pipes"); - - if (extremes_port) - assertBlocksHaveEqualStructure(header, extremes_port->getHeader(), "Pipes"); - if (collected_processors) collected_processors->emplace_back(transform); - processors->emplace_back(std::move(transform)); - max_parallel_streams = std::max(max_parallel_streams, output_ports.size()); } diff --git a/src/QueryPipeline/Pipe.h b/src/QueryPipeline/Pipe.h index 09931e38578..ec102605677 100644 --- a/src/QueryPipeline/Pipe.h +++ b/src/QueryPipeline/Pipe.h @@ -69,6 +69,11 @@ public: void addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort * extremes); void addTransform(ProcessorPtr transform, InputPort * totals, InputPort * extremes); + void addTransform( + ProcessorPtr transform, + InputPort * totals_in, InputPort * extremes_in, + OutputPort * totals_out, OutputPort * extremes_out); + enum class StreamType { Main = 0, /// Stream for query data. There may be several streams of this type. diff --git a/src/QueryPipeline/QueryPipelineBuilder.cpp b/src/QueryPipeline/QueryPipelineBuilder.cpp index f13d1c56d7f..401987d46ba 100644 --- a/src/QueryPipeline/QueryPipelineBuilder.cpp +++ b/src/QueryPipeline/QueryPipelineBuilder.cpp @@ -602,7 +602,9 @@ void QueryPipelineBuilder::addPipelineBefore(QueryPipelineBuilder pipeline) throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline for CreatingSets should have empty header. Got: {}", pipeline.getHeader().dumpStructure()); - IProcessor::PortNumbers delayed_streams(pipe.numOutputPorts()); + bool has_totals = pipe.getTotalsPort(); + bool has_extremes = pipe.getExtremesPort(); + IProcessor::PortNumbers delayed_streams(pipe.numOutputPorts() + (has_totals ? 1 : 0) + (has_extremes ? 1 : 0)); for (size_t i = 0; i < delayed_streams.size(); ++i) delayed_streams[i] = i; @@ -613,8 +615,14 @@ void QueryPipelineBuilder::addPipelineBefore(QueryPipelineBuilder pipeline) pipes.emplace_back(QueryPipelineBuilder::getPipe(std::move(pipeline), resources)); pipe = Pipe::unitePipes(std::move(pipes), collected_processors, true); - auto processor = std::make_shared(getHeader(), pipe.numOutputPorts(), delayed_streams, true); - addTransform(std::move(processor)); + auto processor = std::make_shared(getHeader(), delayed_streams.size(), delayed_streams, true); + auto in = processor->getInputs().begin(); + auto out = processor->getOutputs().begin(); + InputPort * totals_in = has_totals ? &*(in++) : nullptr; + InputPort * extremes_in = has_extremes ? &*(in++) : nullptr; + OutputPort * totals_out = has_totals ? &*(out++) : nullptr; + OutputPort * extremes_out = has_extremes ? &*(out++) : nullptr; + pipe.addTransform(std::move(processor), totals_in, extremes_in, totals_out, extremes_out); } void QueryPipelineBuilder::setProcessListElement(QueryStatusPtr elem)