2018-05-23 20:19:33 +00:00
|
|
|
#include <Processors/ResizeProcessor.h>
|
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
ResizeProcessor::Status ResizeProcessor::prepare()
|
|
|
|
{
|
2019-04-26 11:34:51 +00:00
|
|
|
bool is_first_output = true;
|
|
|
|
auto output_end = current_output;
|
|
|
|
|
2019-02-07 18:51:53 +00:00
|
|
|
bool all_outs_full_or_unneeded = true;
|
|
|
|
bool all_outs_finished = true;
|
2018-05-23 20:19:33 +00:00
|
|
|
|
2019-04-26 11:34:51 +00:00
|
|
|
bool is_first_input = true;
|
|
|
|
auto input_end = current_input;
|
|
|
|
|
|
|
|
bool all_inputs_finished = true;
|
|
|
|
|
|
|
|
auto is_end_input = [&]() { return !is_first_input && current_input == input_end; };
|
|
|
|
auto is_end_output = [&]() { return !is_first_output && current_output == output_end; };
|
|
|
|
|
|
|
|
auto inc_current_input = [&]()
|
|
|
|
{
|
|
|
|
is_first_input = false;
|
|
|
|
++current_input;
|
|
|
|
|
|
|
|
if (current_input == inputs.end())
|
|
|
|
current_input = inputs.begin();
|
|
|
|
};
|
|
|
|
|
|
|
|
auto inc_current_output = [&]()
|
|
|
|
{
|
|
|
|
is_first_output = false;
|
|
|
|
++current_output;
|
|
|
|
|
|
|
|
if (current_output == outputs.end())
|
|
|
|
current_output = outputs.begin();
|
|
|
|
};
|
|
|
|
|
2019-02-07 18:51:53 +00:00
|
|
|
/// Find next output where can push.
|
|
|
|
auto get_next_out = [&, this]() -> OutputPorts::iterator
|
|
|
|
{
|
2019-04-26 11:34:51 +00:00
|
|
|
while (!is_end_output())
|
2018-05-24 02:39:22 +00:00
|
|
|
{
|
2019-04-26 11:34:51 +00:00
|
|
|
if (!current_output->isFinished())
|
2019-02-07 18:51:53 +00:00
|
|
|
{
|
|
|
|
all_outs_finished = false;
|
|
|
|
|
2019-04-26 11:34:51 +00:00
|
|
|
if (current_output->canPush())
|
2019-02-07 18:51:53 +00:00
|
|
|
{
|
|
|
|
all_outs_full_or_unneeded = false;
|
2019-04-26 11:34:51 +00:00
|
|
|
auto res_output = current_output;
|
|
|
|
inc_current_output();
|
|
|
|
return res_output;
|
2019-02-07 18:51:53 +00:00
|
|
|
}
|
|
|
|
}
|
2018-05-23 20:19:33 +00:00
|
|
|
|
2019-04-26 11:34:51 +00:00
|
|
|
inc_current_output();
|
2018-05-24 02:39:22 +00:00
|
|
|
}
|
2018-05-23 20:19:33 +00:00
|
|
|
|
2019-04-26 11:40:35 +00:00
|
|
|
return outputs.end();
|
2019-02-07 18:51:53 +00:00
|
|
|
};
|
2018-05-23 20:19:33 +00:00
|
|
|
|
2019-02-07 18:51:53 +00:00
|
|
|
/// Find next input from where can pull.
|
|
|
|
auto get_next_input = [&, this]() -> InputPorts::iterator
|
|
|
|
{
|
2019-04-26 11:34:51 +00:00
|
|
|
while (!is_end_input())
|
2018-05-24 02:39:22 +00:00
|
|
|
{
|
2019-04-26 11:34:51 +00:00
|
|
|
if (!current_input->isFinished())
|
2019-02-07 18:51:53 +00:00
|
|
|
{
|
|
|
|
all_inputs_finished = false;
|
2018-05-24 02:39:22 +00:00
|
|
|
|
2019-04-26 11:34:51 +00:00
|
|
|
current_input->setNeeded();
|
|
|
|
if (current_input->hasData())
|
2019-02-07 18:51:53 +00:00
|
|
|
{
|
2019-04-26 11:34:51 +00:00
|
|
|
auto res_input = current_input;
|
|
|
|
inc_current_input();
|
|
|
|
return res_input;
|
2019-02-07 18:51:53 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-04-26 11:34:51 +00:00
|
|
|
inc_current_input();
|
2018-05-24 02:39:22 +00:00
|
|
|
}
|
2018-05-23 20:19:33 +00:00
|
|
|
|
2019-04-26 11:40:35 +00:00
|
|
|
return inputs.end();
|
2019-02-07 18:51:53 +00:00
|
|
|
};
|
2018-05-23 20:19:33 +00:00
|
|
|
|
2019-02-07 18:51:53 +00:00
|
|
|
auto get_status_if_no_outputs = [&]() -> Status
|
|
|
|
{
|
|
|
|
if (all_outs_finished)
|
2018-05-23 20:19:33 +00:00
|
|
|
{
|
2019-02-07 18:51:53 +00:00
|
|
|
for (auto & in : inputs)
|
|
|
|
in.close();
|
2018-05-23 20:19:33 +00:00
|
|
|
|
2019-02-07 18:51:53 +00:00
|
|
|
return Status::Finished;
|
2018-05-23 20:19:33 +00:00
|
|
|
}
|
|
|
|
|
2019-02-07 18:51:53 +00:00
|
|
|
if (all_outs_full_or_unneeded)
|
2018-05-24 02:39:22 +00:00
|
|
|
{
|
2019-02-07 18:51:53 +00:00
|
|
|
for (auto & in : inputs)
|
|
|
|
in.setNotNeeded();
|
|
|
|
|
|
|
|
return Status::PortFull;
|
2018-05-24 02:39:22 +00:00
|
|
|
}
|
2018-05-23 20:19:33 +00:00
|
|
|
|
2019-02-07 18:51:53 +00:00
|
|
|
/// Now, we pushed to output, and it must be full.
|
|
|
|
return Status::PortFull;
|
|
|
|
};
|
|
|
|
|
|
|
|
auto get_status_if_no_inputs = [&]() -> Status
|
|
|
|
{
|
|
|
|
if (all_inputs_finished)
|
2018-05-23 20:19:33 +00:00
|
|
|
{
|
2019-02-07 18:51:53 +00:00
|
|
|
for (auto & out : outputs)
|
|
|
|
out.finish();
|
|
|
|
|
|
|
|
return Status::Finished;
|
2018-05-23 20:19:33 +00:00
|
|
|
}
|
2019-02-07 18:51:53 +00:00
|
|
|
|
|
|
|
return Status::NeedData;
|
|
|
|
};
|
|
|
|
|
2019-04-26 11:15:33 +00:00
|
|
|
/// Set all inputs needed in order to evenly process them.
|
|
|
|
/// Otherwise, in case num_outputs < num_inputs and chunks are consumed faster than produced,
|
|
|
|
/// some inputs can be skipped.
|
2019-04-26 11:34:51 +00:00
|
|
|
// auto set_all_unprocessed_inputs_needed = [&]()
|
|
|
|
// {
|
|
|
|
// for (; cur_input != inputs.end(); ++cur_input)
|
|
|
|
// if (!cur_input->isFinished())
|
|
|
|
// cur_input->setNeeded();
|
|
|
|
// };
|
|
|
|
|
|
|
|
while (!is_end_input() && !is_end_output())
|
2019-02-07 18:51:53 +00:00
|
|
|
{
|
|
|
|
auto output = get_next_out();
|
2019-07-09 17:47:13 +00:00
|
|
|
auto input = get_next_input();
|
|
|
|
|
2019-02-07 18:51:53 +00:00
|
|
|
if (output == outputs.end())
|
|
|
|
return get_status_if_no_outputs();
|
|
|
|
|
2019-07-09 17:47:13 +00:00
|
|
|
|
2019-02-07 18:51:53 +00:00
|
|
|
if (input == inputs.end())
|
|
|
|
return get_status_if_no_inputs();
|
|
|
|
|
|
|
|
output->push(input->pull());
|
2018-05-23 20:19:33 +00:00
|
|
|
}
|
2019-02-07 18:51:53 +00:00
|
|
|
|
2019-04-26 11:34:51 +00:00
|
|
|
if (is_end_input())
|
2019-02-07 18:51:53 +00:00
|
|
|
return get_status_if_no_outputs();
|
|
|
|
|
|
|
|
/// cur_input == inputs_end()
|
|
|
|
return get_status_if_no_inputs();
|
2018-05-23 20:19:33 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|