diff --git a/src/QueryPipeline/Pipe.cpp b/src/QueryPipeline/Pipe.cpp index 6aa10b87328..25c5f7c0781 100644 --- a/src/QueryPipeline/Pipe.cpp +++ b/src/QueryPipeline/Pipe.cpp @@ -432,55 +432,6 @@ void Pipe::addTransform(ProcessorPtr transform) addTransform(std::move(transform), static_cast(nullptr), static_cast(nullptr)); } -void Pipe::addParallelTransforms(Processors transforms) -{ - LOG_DEBUG(log, "Begin addParallelTransforms, have {} transforms", transforms.size()); - - if (output_ports.empty()) - throw Exception("Cannot add parallel transforms to empty Pipe.", ErrorCodes::LOGICAL_ERROR); - - std::vector inputs; - std::vector outputs; - for (const auto & transform : transforms) - { - auto & current_transform_inputs = transform->getInputs(); - if (current_transform_inputs.size() != 1) - throw Exception("Each parallel transform should have one input port", ErrorCodes::LOGICAL_ERROR); - - inputs.push_back(&(current_transform_inputs.front())); - - auto & current_transform_outputs = transform->getOutputs(); - if (current_transform_outputs.size() != 1) - throw Exception("Each parallel transform should have one output port", ErrorCodes::LOGICAL_ERROR); - - outputs.push_back(&(current_transform_outputs.front())); - LOG_DEBUG(log, "addParallelTransforms, added inputs and outputs for processor {}", transform->getName()); - LOG_DEBUG(log, "output structure: {}", transform->getOutputs().front().getHeader().dumpStructure()); - } - - if (inputs.size() != output_ports.size()) - throw Exception("Cannot add parallel transforms to Pipes because " + - std::to_string(transforms.size()) + " transforms were passed, " - "but " + std::to_string(output_ports.size()) + " expected", ErrorCodes::LOGICAL_ERROR); - - for (size_t i = 0; i < inputs.size(); ++i) - connect(*output_ports[i], *inputs[i]); - - output_ports = std::move(outputs); - - /// do not check output formats because they are different in case of parallel aggregations - LOG_DEBUG(log, "addParallelTransforms do not check format"); - - if (collected_processors) - collected_processors->insert(collected_processors->end(), transforms.begin(), transforms.end()); - - for (auto & transform_ptr : transforms) - processors.emplace_back(std::move(transform_ptr)); - - /// Should not change streams number, so maybe not need max_parallel_streams update - max_parallel_streams = std::max(max_parallel_streams, output_ports.size()); -} - void Pipe::addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort * extremes) { if (output_ports.empty()) diff --git a/src/QueryPipeline/Pipe.h b/src/QueryPipeline/Pipe.h index 21e8fbfe039..01a5d8bb961 100644 --- a/src/QueryPipeline/Pipe.h +++ b/src/QueryPipeline/Pipe.h @@ -64,7 +64,6 @@ public: /// Output ports should have same headers. /// If totals or extremes are not empty, transform shouldn't change header. void addTransform(ProcessorPtr transform); - void addParallelTransforms(Processors transform); void addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort * extremes); void addTransform(ProcessorPtr transform, InputPort * totals, InputPort * extremes); diff --git a/src/QueryPipeline/QueryPipelineBuilder.cpp b/src/QueryPipeline/QueryPipelineBuilder.cpp index 1377db34daf..40c64046560 100644 --- a/src/QueryPipeline/QueryPipelineBuilder.cpp +++ b/src/QueryPipeline/QueryPipelineBuilder.cpp @@ -155,12 +155,6 @@ void QueryPipelineBuilder::transform(const Transformer & transformer) pipe.transform(transformer); } -void QueryPipelineBuilder::addParallelTransforms(Processors transforms) -{ - checkInitializedAndNotCompleted(); - pipe.addParallelTransforms(transforms); -} - void QueryPipelineBuilder::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter) { checkInitializedAndNotCompleted(); diff --git a/src/QueryPipeline/QueryPipelineBuilder.h b/src/QueryPipeline/QueryPipelineBuilder.h index d920f8a7e81..9e198f45e98 100644 --- a/src/QueryPipeline/QueryPipelineBuilder.h +++ b/src/QueryPipeline/QueryPipelineBuilder.h @@ -67,8 +67,6 @@ public: /// Transform pipeline in general way. void transform(const Transformer & transformer); - /// Add transforms and connect it to outputs streams - void addParallelTransforms(Processors transform); /// Add TotalsHavingTransform. Resize pipeline to single input. Adds totals port. void addTotalsHavingTransform(ProcessorPtr transform); /// Add transform which calculates extremes. This transform adds extremes port and doesn't change inputs number.