diff --git a/dbms/src/Processors/ResizeProcessor.cpp b/dbms/src/Processors/ResizeProcessor.cpp index 188a944e3f1..b0d5d9e7d44 100644 --- a/dbms/src/Processors/ResizeProcessor.cpp +++ b/dbms/src/Processors/ResizeProcessor.cpp @@ -6,57 +6,84 @@ namespace DB ResizeProcessor::Status ResizeProcessor::prepare() { - auto cur_output = outputs.begin(); + bool is_first_output = true; + auto output_end = current_output; + bool all_outs_full_or_unneeded = true; bool all_outs_finished = true; + bool is_first_input = true; + auto input_end = current_input; + + bool all_inputs_finished = true; + + auto is_end_input = [&]() { return !is_first_input && current_input == input_end; }; + auto is_end_output = [&]() { return !is_first_output && current_output == output_end; }; + + auto inc_current_input = [&]() + { + is_first_input = false; + ++current_input; + + if (current_input == inputs.end()) + current_input = inputs.begin(); + }; + + auto inc_current_output = [&]() + { + is_first_output = false; + ++current_output; + + if (current_output == outputs.end()) + current_output = outputs.begin(); + }; + /// Find next output where can push. auto get_next_out = [&, this]() -> OutputPorts::iterator { - while (cur_output != outputs.end()) + while (!is_end_output()) { - if (!cur_output->isFinished()) + if (!current_output->isFinished()) { all_outs_finished = false; - if (cur_output->canPush()) + if (current_output->canPush()) { all_outs_full_or_unneeded = false; - ++cur_output; - return std::prev(cur_output); + auto res_output = current_output; + inc_current_output(); + return res_output; } } - ++cur_output; + inc_current_output(); } - return cur_output; + return current_output; }; - auto cur_input = inputs.begin(); - bool all_inputs_finished = true; - /// Find next input from where can pull. auto get_next_input = [&, this]() -> InputPorts::iterator { - while (cur_input != inputs.end()) + while (!is_end_input()) { - if (!cur_input->isFinished()) + if (!current_input->isFinished()) { all_inputs_finished = false; - cur_input->setNeeded(); - if (cur_input->hasData()) + current_input->setNeeded(); + if (current_input->hasData()) { - ++cur_input; - return std::prev(cur_input); + auto res_input = current_input; + inc_current_input(); + return res_input; } } - ++cur_input; + inc_current_input(); } - return cur_input; + return current_input; }; auto get_status_if_no_outputs = [&]() -> Status @@ -97,21 +124,18 @@ ResizeProcessor::Status ResizeProcessor::prepare() /// Set all inputs needed in order to evenly process them. /// Otherwise, in case num_outputs < num_inputs and chunks are consumed faster than produced, /// some inputs can be skipped. - auto set_all_unprocessed_inputs_needed = [&]() - { - for (; cur_input != inputs.end(); ++cur_input) - if (!cur_input->isFinished()) - cur_input->setNeeded(); - }; +// auto set_all_unprocessed_inputs_needed = [&]() +// { +// for (; cur_input != inputs.end(); ++cur_input) +// if (!cur_input->isFinished()) +// cur_input->setNeeded(); +// }; - while (cur_input != inputs.end() && cur_output != outputs.end()) + while (!is_end_input() && !is_end_output()) { auto output = get_next_out(); if (output == outputs.end()) - { - set_all_unprocessed_inputs_needed(); return get_status_if_no_outputs(); - } auto input = get_next_input(); if (input == inputs.end()) @@ -120,11 +144,8 @@ ResizeProcessor::Status ResizeProcessor::prepare() output->push(input->pull()); } - if (cur_output == outputs.end()) - { - set_all_unprocessed_inputs_needed(); + if (is_end_input()) return get_status_if_no_outputs(); - } /// cur_input == inputs_end() return get_status_if_no_inputs(); diff --git a/dbms/src/Processors/ResizeProcessor.h b/dbms/src/Processors/ResizeProcessor.h index b09d25be552..67574c384a1 100644 --- a/dbms/src/Processors/ResizeProcessor.h +++ b/dbms/src/Processors/ResizeProcessor.h @@ -21,14 +21,20 @@ class ResizeProcessor : public IProcessor { public: /// TODO Check that there is non zero number of inputs and outputs. - ResizeProcessor(Block header, size_t num_inputs, size_t num_outputs) + ResizeProcessor(const Block & header, size_t num_inputs, size_t num_outputs) : IProcessor(InputPorts(num_inputs, header), OutputPorts(num_outputs, header)) + , current_input(inputs.begin()) + , current_output(outputs.begin()) { } String getName() const override { return "Resize"; } Status prepare() override; + +private: + InputPorts::iterator current_input; + OutputPorts::iterator current_output; }; }