diff --git a/src/Processors/ya.make b/src/Processors/ya.make index a1386acb6d0..a6f8e274e9e 100644 --- a/src/Processors/ya.make +++ b/src/Processors/ya.make @@ -102,7 +102,7 @@ SRCS( QueryPlan/FillingStep.cpp QueryPlan/FilterStep.cpp QueryPlan/FinishSortingStep.cpp - QueryPlan/GroupingSetsStep.cpp + QueryPlan/IQueryPlanStep.cpp QueryPlan/ISourceStep.cpp QueryPlan/ITransformingStep.cpp diff --git a/src/QueryPipeline/Pipe.cpp b/src/QueryPipeline/Pipe.cpp index d9917b8636d..25cdf2c8fa9 100644 --- a/src/QueryPipeline/Pipe.cpp +++ b/src/QueryPipeline/Pipe.cpp @@ -439,21 +439,21 @@ void Pipe::addParallelTransforms(Processors transforms) if (output_ports.empty()) throw Exception("Cannot add parallel transforms to empty Pipe.", ErrorCodes::LOGICAL_ERROR); - std::vector inputs; - std::vector outputs; + std::vector inputs; + std::vector outputs; for (const auto & transform : transforms) { - auto current_transform_inputs = transform->getInputs(); + auto & current_transform_inputs = transform->getInputs(); if (current_transform_inputs.size() != 1) throw Exception("Each parallel transform should have one input port", ErrorCodes::LOGICAL_ERROR); - inputs.push_back(current_transform_inputs.front()); + inputs.push_back(&(current_transform_inputs.front())); - auto current_transform_outputs = transform->getOutputs(); + auto & current_transform_outputs = transform->getOutputs(); if (current_transform_outputs.size() != 1) throw Exception("Each parallel transform should have one output port", ErrorCodes::LOGICAL_ERROR); - outputs.push_back(current_transform_outputs.front()); + outputs.push_back(&(current_transform_outputs.front())); LOG_DEBUG(log, "addParallelTransforms, added inputs and outputs for processor {}", transform->getName()); } @@ -463,9 +463,11 @@ void Pipe::addParallelTransforms(Processors transforms) "but " + std::to_string(output_ports.size()) + " expected", ErrorCodes::LOGICAL_ERROR); size_t next_output = 0; - for (auto & input : inputs) + for (auto * input : inputs) { - connect(*output_ports[next_output], input); + LOG_DEBUG(log, "is input connected {}", input->isConnected()); + LOG_DEBUG(log, "is output connected {}", output_ports[next_output]->isConnected()); + connect(*output_ports[next_output], *input); ++next_output; LOG_DEBUG(log, "addParallelTransforms connect current output to new input {}", next_output); } @@ -473,11 +475,11 @@ void Pipe::addParallelTransforms(Processors transforms) output_ports.clear(); output_ports.reserve(outputs.size()); - for (auto & output : outputs) + for (auto * output : outputs) { LOG_DEBUG(log, "addParallelTransforms change outputs to new output"); - LOG_DEBUG(log, "addParallelTransforms is output connected: {}", output.isConnected()); - output_ports.emplace_back(&output); + LOG_DEBUG(log, "addParallelTransforms is output connected: {}", output->isConnected()); + output_ports.emplace_back(std::move(output)); } /// do not check output formats because they are different in case of parallel aggregations