Even task consuming in ResizeProcessor.

This commit is contained in:
Nikolai Kochetov 2019-04-26 14:34:51 +03:00
parent b475fb1d4c
commit 4a9c0a1e56
2 changed files with 61 additions and 34 deletions

View File

@ -6,57 +6,84 @@ namespace DB
ResizeProcessor::Status ResizeProcessor::prepare() ResizeProcessor::Status ResizeProcessor::prepare()
{ {
auto cur_output = outputs.begin(); bool is_first_output = true;
auto output_end = current_output;
bool all_outs_full_or_unneeded = true; bool all_outs_full_or_unneeded = true;
bool all_outs_finished = true; bool all_outs_finished = true;
bool is_first_input = true;
auto input_end = current_input;
bool all_inputs_finished = true;
auto is_end_input = [&]() { return !is_first_input && current_input == input_end; };
auto is_end_output = [&]() { return !is_first_output && current_output == output_end; };
auto inc_current_input = [&]()
{
is_first_input = false;
++current_input;
if (current_input == inputs.end())
current_input = inputs.begin();
};
auto inc_current_output = [&]()
{
is_first_output = false;
++current_output;
if (current_output == outputs.end())
current_output = outputs.begin();
};
/// Find next output where can push. /// Find next output where can push.
auto get_next_out = [&, this]() -> OutputPorts::iterator auto get_next_out = [&, this]() -> OutputPorts::iterator
{ {
while (cur_output != outputs.end()) while (!is_end_output())
{ {
if (!cur_output->isFinished()) if (!current_output->isFinished())
{ {
all_outs_finished = false; all_outs_finished = false;
if (cur_output->canPush()) if (current_output->canPush())
{ {
all_outs_full_or_unneeded = false; all_outs_full_or_unneeded = false;
++cur_output; auto res_output = current_output;
return std::prev(cur_output); inc_current_output();
return res_output;
} }
} }
++cur_output; inc_current_output();
} }
return cur_output; return current_output;
}; };
auto cur_input = inputs.begin();
bool all_inputs_finished = true;
/// Find next input from where can pull. /// Find next input from where can pull.
auto get_next_input = [&, this]() -> InputPorts::iterator auto get_next_input = [&, this]() -> InputPorts::iterator
{ {
while (cur_input != inputs.end()) while (!is_end_input())
{ {
if (!cur_input->isFinished()) if (!current_input->isFinished())
{ {
all_inputs_finished = false; all_inputs_finished = false;
cur_input->setNeeded(); current_input->setNeeded();
if (cur_input->hasData()) if (current_input->hasData())
{ {
++cur_input; auto res_input = current_input;
return std::prev(cur_input); inc_current_input();
return res_input;
} }
} }
++cur_input; inc_current_input();
} }
return cur_input; return current_input;
}; };
auto get_status_if_no_outputs = [&]() -> Status auto get_status_if_no_outputs = [&]() -> Status
@ -97,21 +124,18 @@ ResizeProcessor::Status ResizeProcessor::prepare()
/// Set all inputs needed in order to evenly process them. /// Set all inputs needed in order to evenly process them.
/// Otherwise, in case num_outputs < num_inputs and chunks are consumed faster than produced, /// Otherwise, in case num_outputs < num_inputs and chunks are consumed faster than produced,
/// some inputs can be skipped. /// some inputs can be skipped.
auto set_all_unprocessed_inputs_needed = [&]() // auto set_all_unprocessed_inputs_needed = [&]()
{ // {
for (; cur_input != inputs.end(); ++cur_input) // for (; cur_input != inputs.end(); ++cur_input)
if (!cur_input->isFinished()) // if (!cur_input->isFinished())
cur_input->setNeeded(); // cur_input->setNeeded();
}; // };
while (cur_input != inputs.end() && cur_output != outputs.end()) while (!is_end_input() && !is_end_output())
{ {
auto output = get_next_out(); auto output = get_next_out();
if (output == outputs.end()) if (output == outputs.end())
{
set_all_unprocessed_inputs_needed();
return get_status_if_no_outputs(); return get_status_if_no_outputs();
}
auto input = get_next_input(); auto input = get_next_input();
if (input == inputs.end()) if (input == inputs.end())
@ -120,11 +144,8 @@ ResizeProcessor::Status ResizeProcessor::prepare()
output->push(input->pull()); output->push(input->pull());
} }
if (cur_output == outputs.end()) if (is_end_input())
{
set_all_unprocessed_inputs_needed();
return get_status_if_no_outputs(); return get_status_if_no_outputs();
}
/// cur_input == inputs_end() /// cur_input == inputs_end()
return get_status_if_no_inputs(); return get_status_if_no_inputs();

View File

@ -21,14 +21,20 @@ class ResizeProcessor : public IProcessor
{ {
public: public:
/// TODO Check that there is non zero number of inputs and outputs. /// TODO Check that there is non zero number of inputs and outputs.
ResizeProcessor(Block header, size_t num_inputs, size_t num_outputs) ResizeProcessor(const Block & header, size_t num_inputs, size_t num_outputs)
: IProcessor(InputPorts(num_inputs, header), OutputPorts(num_outputs, header)) : IProcessor(InputPorts(num_inputs, header), OutputPorts(num_outputs, header))
, current_input(inputs.begin())
, current_output(outputs.begin())
{ {
} }
String getName() const override { return "Resize"; } String getName() const override { return "Resize"; }
Status prepare() override; Status prepare() override;
private:
InputPorts::iterator current_input;
OutputPorts::iterator current_output;
}; };
} }