grouping sets: fix 'Port already connected' error

This commit is contained in:
MaxTheHuman 2021-05-17 15:03:07 +03:00 committed by Dmitry Novik
parent e7711d953a
commit fff23945fa
2 changed files with 14 additions and 12 deletions

View File

@ -102,7 +102,7 @@ SRCS(
QueryPlan/FillingStep.cpp
QueryPlan/FilterStep.cpp
QueryPlan/FinishSortingStep.cpp
QueryPlan/GroupingSetsStep.cpp
QueryPlan/IQueryPlanStep.cpp
QueryPlan/ISourceStep.cpp
QueryPlan/ITransformingStep.cpp

View File

@ -439,21 +439,21 @@ void Pipe::addParallelTransforms(Processors transforms)
if (output_ports.empty())
throw Exception("Cannot add parallel transforms to empty Pipe.", ErrorCodes::LOGICAL_ERROR);
std::vector<InputPort> inputs;
std::vector<OutputPort> outputs;
std::vector<InputPort *> inputs;
std::vector<OutputPort *> outputs;
for (const auto & transform : transforms)
{
auto current_transform_inputs = transform->getInputs();
auto & current_transform_inputs = transform->getInputs();
if (current_transform_inputs.size() != 1)
throw Exception("Each parallel transform should have one input port", ErrorCodes::LOGICAL_ERROR);
inputs.push_back(current_transform_inputs.front());
inputs.push_back(&(current_transform_inputs.front()));
auto current_transform_outputs = transform->getOutputs();
auto & current_transform_outputs = transform->getOutputs();
if (current_transform_outputs.size() != 1)
throw Exception("Each parallel transform should have one output port", ErrorCodes::LOGICAL_ERROR);
outputs.push_back(current_transform_outputs.front());
outputs.push_back(&(current_transform_outputs.front()));
LOG_DEBUG(log, "addParallelTransforms, added inputs and outputs for processor {}", transform->getName());
}
@ -463,9 +463,11 @@ void Pipe::addParallelTransforms(Processors transforms)
"but " + std::to_string(output_ports.size()) + " expected", ErrorCodes::LOGICAL_ERROR);
size_t next_output = 0;
for (auto & input : inputs)
for (auto * input : inputs)
{
connect(*output_ports[next_output], input);
LOG_DEBUG(log, "is input connected {}", input->isConnected());
LOG_DEBUG(log, "is output connected {}", output_ports[next_output]->isConnected());
connect(*output_ports[next_output], *input);
++next_output;
LOG_DEBUG(log, "addParallelTransforms connect current output to new input {}", next_output);
}
@ -473,11 +475,11 @@ void Pipe::addParallelTransforms(Processors transforms)
output_ports.clear();
output_ports.reserve(outputs.size());
for (auto & output : outputs)
for (auto * output : outputs)
{
LOG_DEBUG(log, "addParallelTransforms change outputs to new output");
LOG_DEBUG(log, "addParallelTransforms is output connected: {}", output.isConnected());
output_ports.emplace_back(&output);
LOG_DEBUG(log, "addParallelTransforms is output connected: {}", output->isConnected());
output_ports.emplace_back(std::move(output));
}
/// do not check output formats because they are different in case of parallel aggregations