ClickHouse/dbms/src/Processors/ResizeProcessor.cpp

158 lines
3.7 KiB
C++
Raw Normal View History

#include <Processors/ResizeProcessor.h>
namespace DB
{
ResizeProcessor::Status ResizeProcessor::prepare()
{
bool is_first_output = true;
auto output_end = current_output;
2019-02-07 18:51:53 +00:00
bool all_outs_full_or_unneeded = true;
bool all_outs_finished = true;
bool is_first_input = true;
auto input_end = current_input;
bool all_inputs_finished = true;
auto is_end_input = [&]() { return !is_first_input && current_input == input_end; };
auto is_end_output = [&]() { return !is_first_output && current_output == output_end; };
auto inc_current_input = [&]()
{
is_first_input = false;
++current_input;
if (current_input == inputs.end())
current_input = inputs.begin();
};
auto inc_current_output = [&]()
{
is_first_output = false;
++current_output;
if (current_output == outputs.end())
current_output = outputs.begin();
};
2019-02-07 18:51:53 +00:00
/// Find next output where can push.
auto get_next_out = [&, this]() -> OutputPorts::iterator
{
while (!is_end_output())
{
if (!current_output->isFinished())
2019-02-07 18:51:53 +00:00
{
all_outs_finished = false;
if (current_output->canPush())
2019-02-07 18:51:53 +00:00
{
all_outs_full_or_unneeded = false;
auto res_output = current_output;
inc_current_output();
return res_output;
2019-02-07 18:51:53 +00:00
}
}
inc_current_output();
}
return outputs.end();
2019-02-07 18:51:53 +00:00
};
2019-02-07 18:51:53 +00:00
/// Find next input from where can pull.
auto get_next_input = [&, this]() -> InputPorts::iterator
{
while (!is_end_input())
{
if (!current_input->isFinished())
2019-02-07 18:51:53 +00:00
{
all_inputs_finished = false;
current_input->setNeeded();
if (current_input->hasData())
2019-02-07 18:51:53 +00:00
{
auto res_input = current_input;
inc_current_input();
return res_input;
2019-02-07 18:51:53 +00:00
}
}
inc_current_input();
}
return inputs.end();
2019-02-07 18:51:53 +00:00
};
2019-02-07 18:51:53 +00:00
auto get_status_if_no_outputs = [&]() -> Status
{
if (all_outs_finished)
{
2019-02-07 18:51:53 +00:00
for (auto & in : inputs)
in.close();
2019-02-07 18:51:53 +00:00
return Status::Finished;
}
2019-02-07 18:51:53 +00:00
if (all_outs_full_or_unneeded)
{
2019-02-07 18:51:53 +00:00
for (auto & in : inputs)
in.setNotNeeded();
return Status::PortFull;
}
2019-02-07 18:51:53 +00:00
/// Now, we pushed to output, and it must be full.
return Status::PortFull;
};
auto get_status_if_no_inputs = [&]() -> Status
{
if (all_inputs_finished)
{
2019-02-07 18:51:53 +00:00
for (auto & out : outputs)
out.finish();
return Status::Finished;
}
2019-02-07 18:51:53 +00:00
return Status::NeedData;
};
/// Set all inputs needed in order to evenly process them.
/// Otherwise, in case num_outputs < num_inputs and chunks are consumed faster than produced,
/// some inputs can be skipped.
// auto set_all_unprocessed_inputs_needed = [&]()
// {
// for (; cur_input != inputs.end(); ++cur_input)
// if (!cur_input->isFinished())
// cur_input->setNeeded();
// };
while (!is_end_input() && !is_end_output())
2019-02-07 18:51:53 +00:00
{
auto output = get_next_out();
2019-07-09 17:47:13 +00:00
auto input = get_next_input();
2019-02-07 18:51:53 +00:00
if (output == outputs.end())
return get_status_if_no_outputs();
2019-07-09 17:47:13 +00:00
2019-02-07 18:51:53 +00:00
if (input == inputs.end())
return get_status_if_no_inputs();
output->push(input->pull());
}
2019-02-07 18:51:53 +00:00
if (is_end_input())
2019-02-07 18:51:53 +00:00
return get_status_if_no_outputs();
/// cur_input == inputs_end()
return get_status_if_no_inputs();
}
}