ClickHouse/src/Processors/DelayedPortsProcessor.cpp

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

183 lines
4.8 KiB
C++
Raw Normal View History

2020-01-09 10:37:27 +00:00
#include <Processors/DelayedPortsProcessor.h>
2022-01-30 19:49:48 +00:00
#include <base/sort.h>
2020-01-09 10:37:27 +00:00
namespace DB
{
2020-12-27 13:53:42 +00:00
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
InputPorts createInputPorts(
const Block & header,
size_t num_ports,
IProcessor::PortNumbers delayed_ports,
bool assert_main_ports_empty)
{
if (!assert_main_ports_empty)
return InputPorts(num_ports, header);
InputPorts res;
2022-01-30 19:49:48 +00:00
::sort(delayed_ports.begin(), delayed_ports.end());
size_t next_delayed_port = 0;
for (size_t i = 0; i < num_ports; ++i)
{
if (next_delayed_port < delayed_ports.size() && i == delayed_ports[next_delayed_port])
{
res.emplace_back(header);
++next_delayed_port;
}
else
res.emplace_back(Block());
}
return res;
}
2020-12-27 11:02:21 +00:00
DelayedPortsProcessor::DelayedPortsProcessor(
2020-12-27 11:16:40 +00:00
const Block & header, size_t num_ports, const PortNumbers & delayed_ports, bool assert_main_ports_empty)
: IProcessor(createInputPorts(header, num_ports, delayed_ports, assert_main_ports_empty),
2020-12-27 11:16:40 +00:00
OutputPorts((assert_main_ports_empty ? delayed_ports.size() : num_ports), header))
, num_delayed_ports(delayed_ports.size())
2020-01-09 10:37:27 +00:00
{
port_pairs.resize(num_ports);
2020-12-27 11:13:17 +00:00
output_to_pair.reserve(outputs.size());
2020-01-09 10:37:27 +00:00
2020-12-27 11:06:14 +00:00
for (const auto & delayed : delayed_ports)
port_pairs[delayed].is_delayed = true;
2020-01-09 10:37:27 +00:00
auto input_it = inputs.begin();
auto output_it = outputs.begin();
for (size_t i = 0; i < num_ports; ++i)
{
port_pairs[i].input_port = &*input_it;
++input_it;
2020-12-27 11:02:21 +00:00
2020-12-27 11:16:40 +00:00
if (port_pairs[i].is_delayed || !assert_main_ports_empty)
2020-12-27 11:02:21 +00:00
{
port_pairs[i].output_port = &*output_it;
2020-12-27 11:13:17 +00:00
output_to_pair.push_back(i);
2020-12-27 11:02:21 +00:00
++output_it;
}
2020-01-09 10:37:27 +00:00
}
}
void DelayedPortsProcessor::finishPair(PortsPair & pair)
2020-01-09 10:37:27 +00:00
{
if (!pair.is_finished)
2020-01-09 10:37:27 +00:00
{
if (pair.output_port)
pair.output_port->finish();
pair.input_port->close();
pair.is_finished = true;
++num_finished_inputs;
if (pair.output_port)
++num_finished_outputs;
2023-08-21 13:08:32 +00:00
if (!pair.is_delayed)
++num_finished_main_inputs;
}
}
2020-01-09 10:37:27 +00:00
bool DelayedPortsProcessor::processPair(PortsPair & pair)
{
2020-12-27 11:02:21 +00:00
if (pair.output_port && pair.output_port->isFinished())
2020-01-09 10:37:27 +00:00
{
finishPair(pair);
2020-01-09 10:37:27 +00:00
return false;
}
if (pair.input_port->isFinished())
{
finishPair(pair);
2020-01-09 10:37:27 +00:00
return false;
}
2020-12-27 11:02:21 +00:00
if (pair.output_port && !pair.output_port->canPush())
2020-01-09 10:37:27 +00:00
return false;
pair.input_port->setNeeded();
if (pair.input_port->hasData())
2020-12-27 11:02:21 +00:00
{
if (!pair.output_port)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Input port for DelayedPortsProcessor is assumed to have no data, but it has one");
pair.output_port->pushData(pair.input_port->pullData(true));
2020-12-27 11:02:21 +00:00
}
2020-01-09 10:37:27 +00:00
return true;
}
bool DelayedPortsProcessor::shouldSkipDelayed() const
{
2023-08-21 13:08:32 +00:00
return num_finished_main_inputs + num_delayed_ports < port_pairs.size();
}
2020-01-09 10:37:27 +00:00
IProcessor::Status DelayedPortsProcessor::prepare(const PortNumbers & updated_inputs, const PortNumbers & updated_outputs)
{
bool skip_delayed = shouldSkipDelayed();
2020-01-09 10:37:27 +00:00
bool need_data = false;
2020-12-27 11:25:43 +00:00
if (!are_inputs_initialized && !updated_outputs.empty())
{
/// Activate inputs with no output.
2020-12-27 11:22:46 +00:00
for (const auto & pair : port_pairs)
if (!pair.output_port)
pair.input_port->setNeeded();
2020-12-27 11:25:43 +00:00
are_inputs_initialized = true;
}
2020-04-22 06:34:20 +00:00
for (const auto & output_number : updated_outputs)
2020-01-09 10:37:27 +00:00
{
auto & pair = port_pairs[output_to_pair[output_number]];
/// Finish pair of ports earlier if possible.
if (!pair.is_finished && pair.output_port && pair.output_port->isFinished())
finishPair(pair);
else if (!skip_delayed || !pair.is_delayed)
need_data = processPair(pair) || need_data;
}
/// Do not wait for delayed ports if all output ports are finished.
if (num_finished_outputs == outputs.size())
{
for (auto & pair : port_pairs)
finishPair(pair);
return Status::Finished;
2020-01-09 10:37:27 +00:00
}
2020-04-22 06:34:20 +00:00
for (const auto & input_number : updated_inputs)
2020-01-09 10:37:27 +00:00
{
if (!skip_delayed || !port_pairs[input_number].is_delayed)
need_data = processPair(port_pairs[input_number]) || need_data;
}
/// In case if main streams are finished at current iteration, start processing delayed streams.
if (skip_delayed && !shouldSkipDelayed())
2020-01-09 10:37:27 +00:00
{
for (auto & pair : port_pairs)
if (pair.is_delayed)
need_data = processPair(pair) || need_data;
}
if (num_finished_inputs == port_pairs.size())
2020-01-09 10:37:27 +00:00
return Status::Finished;
if (need_data)
return Status::NeedData;
return Status::PortFull;
}
}