2020-01-09 10:37:27 +00:00
|
|
|
#include <Processors/DelayedPortsProcessor.h>
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
DelayedPortsProcessor::DelayedPortsProcessor(const Block & header, size_t num_ports, const PortNumbers & delayed_ports)
|
|
|
|
: IProcessor(InputPorts(num_ports, header), OutputPorts(num_ports, header))
|
|
|
|
, num_delayed(delayed_ports.size())
|
|
|
|
{
|
|
|
|
port_pairs.resize(num_ports);
|
|
|
|
|
|
|
|
auto input_it = inputs.begin();
|
|
|
|
auto output_it = outputs.begin();
|
|
|
|
for (size_t i = 0; i < num_ports; ++i)
|
|
|
|
{
|
|
|
|
port_pairs[i].input_port = &*input_it;
|
|
|
|
port_pairs[i].output_port = &*output_it;
|
|
|
|
++input_it;
|
|
|
|
++output_it;
|
|
|
|
}
|
|
|
|
|
|
|
|
for (auto & delayed : delayed_ports)
|
|
|
|
port_pairs[delayed].is_delayed = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
bool DelayedPortsProcessor::processPair(PortsPair & pair)
|
|
|
|
{
|
|
|
|
auto finish = [&]()
|
|
|
|
{
|
|
|
|
if (!pair.is_finished)
|
|
|
|
{
|
|
|
|
pair.is_finished = true;
|
|
|
|
++num_finished;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
if (pair.output_port->isFinished())
|
|
|
|
{
|
2020-01-09 12:24:56 +00:00
|
|
|
pair.input_port->close();
|
2020-01-09 10:37:27 +00:00
|
|
|
finish();
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (pair.input_port->isFinished())
|
|
|
|
{
|
2020-01-09 12:24:56 +00:00
|
|
|
pair.output_port->finish();
|
2020-01-09 10:37:27 +00:00
|
|
|
finish();
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!pair.output_port->canPush())
|
|
|
|
return false;
|
|
|
|
|
|
|
|
pair.input_port->setNeeded();
|
|
|
|
if (pair.input_port->hasData())
|
|
|
|
pair.output_port->pushData(pair.input_port->pullData());
|
|
|
|
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
IProcessor::Status DelayedPortsProcessor::prepare(const PortNumbers & updated_inputs, const PortNumbers & updated_outputs)
|
|
|
|
{
|
|
|
|
bool skip_delayed = (num_finished + num_delayed) < port_pairs.size();
|
|
|
|
bool need_data = false;
|
|
|
|
|
|
|
|
for (auto & output_number : updated_outputs)
|
|
|
|
{
|
|
|
|
if (!skip_delayed || !port_pairs[output_number].is_delayed)
|
|
|
|
need_data = processPair(port_pairs[output_number]) || need_data;
|
|
|
|
}
|
|
|
|
|
|
|
|
for (auto & input_number : updated_inputs)
|
|
|
|
{
|
|
|
|
if (!skip_delayed || !port_pairs[input_number].is_delayed)
|
|
|
|
need_data = processPair(port_pairs[input_number]) || need_data;
|
|
|
|
}
|
|
|
|
|
|
|
|
/// In case if main streams are finished at current iteration, start processing delayed streams.
|
|
|
|
if (skip_delayed && (num_finished + num_delayed) >= port_pairs.size())
|
|
|
|
{
|
|
|
|
for (auto & pair : port_pairs)
|
|
|
|
if (pair.is_delayed)
|
|
|
|
need_data = processPair(pair) || need_data;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (num_finished == port_pairs.size())
|
|
|
|
return Status::Finished;
|
|
|
|
|
|
|
|
if (need_data)
|
|
|
|
return Status::NeedData;
|
|
|
|
|
|
|
|
return Status::PortFull;
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|