Update ResizeProcessor.

This commit is contained in:
Nikolai Kochetov 2020-01-13 13:32:55 +03:00
parent dffbd21cba
commit b3c18638dc
2 changed files with 28 additions and 10 deletions

View File

@ -160,10 +160,10 @@ IProcessor::Status ResizeProcessor::prepare(const PortNumbers & updated_inputs,
initialized = true;
for (auto & input : inputs)
{
input.setNeeded();
input_ports.push_back({.port = &input, .status = InputStatus::NotActive});
}
for (UInt64 i = 0; i < input_ports.size(); ++i)
disabled_input_ports.push(i);
for (auto & output : outputs)
output_ports.push_back({.port = &output, .status = OutputStatus::NotActive});
@ -201,6 +201,8 @@ IProcessor::Status ResizeProcessor::prepare(const PortNumbers & updated_inputs,
return Status::Finished;
}
std::queue<UInt64> inputs_with_data;
for (auto & input_number : updated_inputs)
{
auto & input = input_ports[input_number];
@ -216,9 +218,9 @@ IProcessor::Status ResizeProcessor::prepare(const PortNumbers & updated_inputs,
if (input.port->hasData())
{
if (input.status != InputStatus::HasData)
if (input.status != InputStatus::NotActive)
{
input.status = InputStatus::HasData;
input.status = InputStatus::NotActive;
inputs_with_data.push(input_number);
}
}
@ -229,11 +231,11 @@ IProcessor::Status ResizeProcessor::prepare(const PortNumbers & updated_inputs,
auto & waiting_output = output_ports[waiting_outputs.front()];
waiting_outputs.pop();
auto & input_with_data = input_ports[inputs_with_data.front()];
auto input_number = inputs_with_data.front();
auto & input_with_data = input_ports[input_number];
inputs_with_data.pop();
waiting_output.port->pushData(input_with_data.port->pullData());
input_with_data.status = InputStatus::NotActive;
waiting_output.port->pushData(input_with_data.port->pullData(/* set_not_deeded = */ true));
waiting_output.status = OutputStatus::NotActive;
if (input_with_data.port->isFinished())
@ -241,6 +243,22 @@ IProcessor::Status ResizeProcessor::prepare(const PortNumbers & updated_inputs,
input_with_data.status = InputStatus::Finished;
++num_finished_inputs;
}
else
disabled_input_ports.push(input_number);
}
if (!inputs_with_data.empty())
throw Exception("Has input with data, but no outputs which need data were found.", ErrorCodes::LOGICAL_ERROR);
/// Enable more inputs if needed.
while (!disabled_input_ports.empty()
&& (inputs.size() - disabled_input_ports.size()) < waiting_outputs.size())
{
auto & input = input_ports[disabled_input_ports.front()];
disabled_input_ports.pop();
input.port->setNeeded();
input.status = InputStatus::NeedData;
}
if (num_finished_inputs == inputs.size())

View File

@ -41,7 +41,7 @@ private:
size_t num_finished_inputs = 0;
size_t num_finished_outputs = 0;
std::queue<UInt64> waiting_outputs;
std::queue<UInt64> inputs_with_data;
std::queue<UInt64> disabled_input_ports;
bool initialized = false;
enum class OutputStatus
@ -54,7 +54,7 @@ private:
enum class InputStatus
{
NotActive,
HasData,
NeedData,
Finished,
};