mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-14 18:32:29 +00:00
422 lines
11 KiB
C++
422 lines
11 KiB
C++
#include <Processors/ResizeProcessor.h>
|
|
#include <iostream>
|
|
|
|
namespace DB
|
|
{
|
|
namespace ErrorCodes
|
|
{
|
|
extern const int LOGICAL_ERROR;
|
|
}
|
|
|
|
ResizeProcessor::Status ResizeProcessor::prepare()
|
|
{
|
|
bool is_first_output = true;
|
|
auto output_end = current_output;
|
|
|
|
bool all_outs_full_or_unneeded = true;
|
|
bool all_outs_finished = true;
|
|
|
|
bool is_first_input = true;
|
|
auto input_end = current_input;
|
|
|
|
bool all_inputs_finished = true;
|
|
|
|
auto is_end_input = [&]() { return !is_first_input && current_input == input_end; };
|
|
auto is_end_output = [&]() { return !is_first_output && current_output == output_end; };
|
|
|
|
auto inc_current_input = [&]()
|
|
{
|
|
is_first_input = false;
|
|
++current_input;
|
|
|
|
if (current_input == inputs.end())
|
|
current_input = inputs.begin();
|
|
};
|
|
|
|
auto inc_current_output = [&]()
|
|
{
|
|
is_first_output = false;
|
|
++current_output;
|
|
|
|
if (current_output == outputs.end())
|
|
current_output = outputs.begin();
|
|
};
|
|
|
|
/// Find next output where can push.
|
|
auto get_next_out = [&, this]() -> OutputPorts::iterator
|
|
{
|
|
while (!is_end_output())
|
|
{
|
|
if (!current_output->isFinished())
|
|
{
|
|
all_outs_finished = false;
|
|
|
|
if (current_output->canPush())
|
|
{
|
|
all_outs_full_or_unneeded = false;
|
|
auto res_output = current_output;
|
|
inc_current_output();
|
|
return res_output;
|
|
}
|
|
}
|
|
|
|
inc_current_output();
|
|
}
|
|
|
|
return outputs.end();
|
|
};
|
|
|
|
/// Find next input from where can pull.
|
|
auto get_next_input = [&, this]() -> InputPorts::iterator
|
|
{
|
|
while (!is_end_input())
|
|
{
|
|
if (!current_input->isFinished())
|
|
{
|
|
all_inputs_finished = false;
|
|
|
|
current_input->setNeeded();
|
|
if (current_input->hasData())
|
|
{
|
|
auto res_input = current_input;
|
|
inc_current_input();
|
|
return res_input;
|
|
}
|
|
}
|
|
|
|
inc_current_input();
|
|
}
|
|
|
|
return inputs.end();
|
|
};
|
|
|
|
auto get_status_if_no_outputs = [&]() -> Status
|
|
{
|
|
if (all_outs_finished)
|
|
{
|
|
for (auto & in : inputs)
|
|
in.close();
|
|
|
|
return Status::Finished;
|
|
}
|
|
|
|
if (all_outs_full_or_unneeded)
|
|
{
|
|
for (auto & in : inputs)
|
|
in.setNotNeeded();
|
|
|
|
return Status::PortFull;
|
|
}
|
|
|
|
/// Now, we pushed to output, and it must be full.
|
|
return Status::PortFull;
|
|
};
|
|
|
|
auto get_status_if_no_inputs = [&]() -> Status
|
|
{
|
|
if (all_inputs_finished)
|
|
{
|
|
for (auto & out : outputs)
|
|
out.finish();
|
|
|
|
return Status::Finished;
|
|
}
|
|
|
|
return Status::NeedData;
|
|
};
|
|
|
|
/// Set all inputs needed in order to evenly process them.
|
|
/// Otherwise, in case num_outputs < num_inputs and chunks are consumed faster than produced,
|
|
/// some inputs can be skipped.
|
|
// auto set_all_unprocessed_inputs_needed = [&]()
|
|
// {
|
|
// for (; cur_input != inputs.end(); ++cur_input)
|
|
// if (!cur_input->isFinished())
|
|
// cur_input->setNeeded();
|
|
// };
|
|
|
|
while (!is_end_input() && !is_end_output())
|
|
{
|
|
auto output = get_next_out();
|
|
auto input = get_next_input();
|
|
|
|
if (output == outputs.end())
|
|
return get_status_if_no_outputs();
|
|
|
|
|
|
if (input == inputs.end())
|
|
return get_status_if_no_inputs();
|
|
|
|
output->push(input->pull());
|
|
}
|
|
|
|
if (is_end_input())
|
|
return get_status_if_no_outputs();
|
|
|
|
/// cur_input == inputs_end()
|
|
return get_status_if_no_inputs();
|
|
}
|
|
|
|
IProcessor::Status ResizeProcessor::prepare(const PortNumbers & updated_inputs, const PortNumbers & updated_outputs)
|
|
{
|
|
if (!initialized)
|
|
{
|
|
initialized = true;
|
|
|
|
for (auto & input : inputs)
|
|
{
|
|
input.setNeeded();
|
|
input_ports.push_back({.port = &input, .status = InputStatus::NotActive});
|
|
}
|
|
|
|
for (auto & output : outputs)
|
|
output_ports.push_back({.port = &output, .status = OutputStatus::NotActive});
|
|
}
|
|
|
|
for (const auto & output_number : updated_outputs)
|
|
{
|
|
auto & output = output_ports[output_number];
|
|
if (output.port->isFinished())
|
|
{
|
|
if (output.status != OutputStatus::Finished)
|
|
{
|
|
++num_finished_outputs;
|
|
output.status = OutputStatus::Finished;
|
|
}
|
|
|
|
continue;
|
|
}
|
|
|
|
if (output.port->canPush())
|
|
{
|
|
if (output.status != OutputStatus::NeedData)
|
|
{
|
|
output.status = OutputStatus::NeedData;
|
|
waiting_outputs.push(output_number);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (num_finished_outputs == outputs.size())
|
|
{
|
|
for (auto & input : inputs)
|
|
input.close();
|
|
|
|
return Status::Finished;
|
|
}
|
|
|
|
for (const auto & input_number : updated_inputs)
|
|
{
|
|
auto & input = input_ports[input_number];
|
|
if (input.port->isFinished())
|
|
{
|
|
if (input.status != InputStatus::Finished)
|
|
{
|
|
input.status = InputStatus::Finished;
|
|
++num_finished_inputs;
|
|
}
|
|
continue;
|
|
}
|
|
|
|
if (input.port->hasData())
|
|
{
|
|
if (input.status != InputStatus::HasData)
|
|
{
|
|
input.status = InputStatus::HasData;
|
|
inputs_with_data.push(input_number);
|
|
}
|
|
}
|
|
}
|
|
|
|
while (!waiting_outputs.empty() && !inputs_with_data.empty())
|
|
{
|
|
auto & waiting_output = output_ports[waiting_outputs.front()];
|
|
waiting_outputs.pop();
|
|
|
|
auto & input_with_data = input_ports[inputs_with_data.front()];
|
|
inputs_with_data.pop();
|
|
|
|
waiting_output.port->pushData(input_with_data.port->pullData());
|
|
input_with_data.status = InputStatus::NotActive;
|
|
waiting_output.status = OutputStatus::NotActive;
|
|
|
|
if (input_with_data.port->isFinished())
|
|
{
|
|
input_with_data.status = InputStatus::Finished;
|
|
++num_finished_inputs;
|
|
}
|
|
}
|
|
|
|
if (num_finished_inputs == inputs.size())
|
|
{
|
|
for (auto & output : outputs)
|
|
output.finish();
|
|
|
|
return Status::Finished;
|
|
}
|
|
|
|
if (!waiting_outputs.empty())
|
|
return Status::NeedData;
|
|
|
|
return Status::PortFull;
|
|
}
|
|
|
|
IProcessor::Status StrictResizeProcessor::prepare(const PortNumbers & updated_inputs, const PortNumbers & updated_outputs)
|
|
{
|
|
if (!initialized)
|
|
{
|
|
initialized = true;
|
|
|
|
for (auto & input : inputs)
|
|
input_ports.push_back({.port = &input, .status = InputStatus::NotActive, .waiting_output = -1});
|
|
|
|
for (UInt64 i = 0; i < input_ports.size(); ++i)
|
|
disabled_input_ports.push(i);
|
|
|
|
for (auto & output : outputs)
|
|
output_ports.push_back({.port = &output, .status = OutputStatus::NotActive});
|
|
}
|
|
|
|
for (const auto & output_number : updated_outputs)
|
|
{
|
|
auto & output = output_ports[output_number];
|
|
if (output.port->isFinished())
|
|
{
|
|
if (output.status != OutputStatus::Finished)
|
|
{
|
|
++num_finished_outputs;
|
|
output.status = OutputStatus::Finished;
|
|
}
|
|
|
|
continue;
|
|
}
|
|
|
|
if (output.port->canPush())
|
|
{
|
|
if (output.status != OutputStatus::NeedData)
|
|
{
|
|
output.status = OutputStatus::NeedData;
|
|
waiting_outputs.push(output_number);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (num_finished_outputs == outputs.size())
|
|
{
|
|
for (auto & input : inputs)
|
|
input.close();
|
|
|
|
return Status::Finished;
|
|
}
|
|
|
|
std::queue<UInt64> inputs_with_data;
|
|
|
|
for (const auto & input_number : updated_inputs)
|
|
{
|
|
auto & input = input_ports[input_number];
|
|
if (input.port->isFinished())
|
|
{
|
|
if (input.status != InputStatus::Finished)
|
|
{
|
|
input.status = InputStatus::Finished;
|
|
++num_finished_inputs;
|
|
|
|
waiting_outputs.push(input.waiting_output);
|
|
}
|
|
continue;
|
|
}
|
|
|
|
if (input.port->hasData())
|
|
{
|
|
if (input.status != InputStatus::NotActive)
|
|
{
|
|
input.status = InputStatus::NotActive;
|
|
inputs_with_data.push(input_number);
|
|
}
|
|
}
|
|
}
|
|
|
|
while (!inputs_with_data.empty())
|
|
{
|
|
auto input_number = inputs_with_data.front();
|
|
auto & input_with_data = input_ports[input_number];
|
|
inputs_with_data.pop();
|
|
|
|
if (input_with_data.waiting_output == -1)
|
|
throw Exception("No associated output for input with data.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
auto & waiting_output = output_ports[input_with_data.waiting_output];
|
|
|
|
if (waiting_output.status == OutputStatus::NotActive)
|
|
throw Exception("Invalid status NotActive for associated output.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
if (waiting_output.status != OutputStatus::Finished)
|
|
{
|
|
waiting_output.port->pushData(input_with_data.port->pullData(/* set_not_needed = */ true));
|
|
waiting_output.status = OutputStatus::NotActive;
|
|
}
|
|
else
|
|
abandoned_chunks.emplace_back(input_with_data.port->pullData(/* set_not_needed = */ true));
|
|
|
|
if (input_with_data.port->isFinished())
|
|
{
|
|
input_with_data.status = InputStatus::Finished;
|
|
++num_finished_inputs;
|
|
}
|
|
else
|
|
disabled_input_ports.push(input_number);
|
|
}
|
|
|
|
if (num_finished_inputs == inputs.size())
|
|
{
|
|
for (auto & output : outputs)
|
|
output.finish();
|
|
|
|
return Status::Finished;
|
|
}
|
|
|
|
/// Process abandoned chunks if any.
|
|
while (!abandoned_chunks.empty() && !waiting_outputs.empty())
|
|
{
|
|
auto & waiting_output = output_ports[waiting_outputs.front()];
|
|
waiting_outputs.pop();
|
|
|
|
waiting_output.port->pushData(std::move(abandoned_chunks.back()));
|
|
abandoned_chunks.pop_back();
|
|
|
|
waiting_output.status = OutputStatus::NotActive;
|
|
}
|
|
|
|
/// Enable more inputs if needed.
|
|
while (!disabled_input_ports.empty() && !waiting_outputs.empty())
|
|
{
|
|
auto & input = input_ports[disabled_input_ports.front()];
|
|
disabled_input_ports.pop();
|
|
|
|
input.port->setNeeded();
|
|
input.status = InputStatus::NeedData;
|
|
input.waiting_output = waiting_outputs.front();
|
|
|
|
waiting_outputs.pop();
|
|
}
|
|
|
|
/// Close all other waiting for data outputs (there is no corresponding input for them).
|
|
while (!waiting_outputs.empty())
|
|
{
|
|
auto & output = output_ports[waiting_outputs.front()];
|
|
waiting_outputs.pop();
|
|
|
|
output.status = OutputStatus::Finished;
|
|
output.port->finish();
|
|
++num_finished_outputs;
|
|
}
|
|
|
|
if (disabled_input_ports.empty())
|
|
return Status::NeedData;
|
|
|
|
return Status::PortFull;
|
|
}
|
|
|
|
}
|
|
|