diff --git a/dbms/src/Processors/ResizeProcessor.cpp b/dbms/src/Processors/ResizeProcessor.cpp index 59d1f0db75e..09e483176f3 100644 --- a/dbms/src/Processors/ResizeProcessor.cpp +++ b/dbms/src/Processors/ResizeProcessor.cpp @@ -160,10 +160,10 @@ IProcessor::Status ResizeProcessor::prepare(const PortNumbers & updated_inputs, initialized = true; for (auto & input : inputs) - { - input.setNeeded(); input_ports.push_back({.port = &input, .status = InputStatus::NotActive}); - } + + for (UInt64 i = 0; i < input_ports.size(); ++i) + disabled_input_ports.push(i); for (auto & output : outputs) output_ports.push_back({.port = &output, .status = OutputStatus::NotActive}); @@ -201,6 +201,8 @@ IProcessor::Status ResizeProcessor::prepare(const PortNumbers & updated_inputs, return Status::Finished; } + std::queue inputs_with_data; + for (auto & input_number : updated_inputs) { auto & input = input_ports[input_number]; @@ -216,9 +218,9 @@ IProcessor::Status ResizeProcessor::prepare(const PortNumbers & updated_inputs, if (input.port->hasData()) { - if (input.status != InputStatus::HasData) + if (input.status != InputStatus::NotActive) { - input.status = InputStatus::HasData; + input.status = InputStatus::NotActive; inputs_with_data.push(input_number); } } @@ -229,11 +231,11 @@ IProcessor::Status ResizeProcessor::prepare(const PortNumbers & updated_inputs, auto & waiting_output = output_ports[waiting_outputs.front()]; waiting_outputs.pop(); - auto & input_with_data = input_ports[inputs_with_data.front()]; + auto input_number = inputs_with_data.front(); + auto & input_with_data = input_ports[input_number]; inputs_with_data.pop(); - waiting_output.port->pushData(input_with_data.port->pullData()); - input_with_data.status = InputStatus::NotActive; + waiting_output.port->pushData(input_with_data.port->pullData(/* set_not_deeded = */ true)); waiting_output.status = OutputStatus::NotActive; if (input_with_data.port->isFinished()) @@ -241,6 +243,22 @@ IProcessor::Status ResizeProcessor::prepare(const PortNumbers & updated_inputs, input_with_data.status = InputStatus::Finished; ++num_finished_inputs; } + else + disabled_input_ports.push(input_number); + } + + if (!inputs_with_data.empty()) + throw Exception("Has input with data, but no outputs which need data were found.", ErrorCodes::LOGICAL_ERROR); + + /// Enable more inputs if needed. + while (!disabled_input_ports.empty() + && (inputs.size() - disabled_input_ports.size()) < waiting_outputs.size()) + { + auto & input = input_ports[disabled_input_ports.front()]; + disabled_input_ports.pop(); + + input.port->setNeeded(); + input.status = InputStatus::NeedData; } if (num_finished_inputs == inputs.size()) diff --git a/dbms/src/Processors/ResizeProcessor.h b/dbms/src/Processors/ResizeProcessor.h index 3a9c906ecbd..0dfa4762188 100644 --- a/dbms/src/Processors/ResizeProcessor.h +++ b/dbms/src/Processors/ResizeProcessor.h @@ -41,7 +41,7 @@ private: size_t num_finished_inputs = 0; size_t num_finished_outputs = 0; std::queue waiting_outputs; - std::queue inputs_with_data; + std::queue disabled_input_ports; bool initialized = false; enum class OutputStatus @@ -54,7 +54,7 @@ private: enum class InputStatus { NotActive, - HasData, + NeedData, Finished, };