ClickHouse/src/Processors/DelayedPortsProcessor.h

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

43 lines
1.3 KiB
C++
Raw Normal View History

2020-01-09 10:37:27 +00:00
#pragma once
#include <Processors/IProcessor.h>
namespace DB
{
/// Processor with N inputs and N outputs. Only moves data from i-th input to i-th output as is.
/// Some ports are delayed. Delayed ports are processed after other outputs are all finished.
/// Data between ports is not mixed. It is important because this processor can be used before MergingSortedTransform.
/// Delayed ports are appeared after joins, when some non-matched data need to be processed at the end.
class DelayedPortsProcessor : public IProcessor
{
public:
2020-12-27 11:16:40 +00:00
DelayedPortsProcessor(const Block & header, size_t num_ports, const PortNumbers & delayed_ports, bool assert_main_ports_empty = false);
2020-01-09 10:37:27 +00:00
String getName() const override { return "DelayedPorts"; }
Status prepare(const PortNumbers &, const PortNumbers &) override;
private:
struct PortsPair
{
InputPort * input_port = nullptr;
OutputPort * output_port = nullptr;
bool is_delayed = false;
bool is_finished = false;
};
std::vector<PortsPair> port_pairs;
const size_t num_delayed_ports;
size_t num_finished_pairs = 0;
size_t num_finished_outputs = 0;
2020-01-09 10:37:27 +00:00
2020-12-27 11:13:17 +00:00
std::vector<size_t> output_to_pair;
2020-12-27 11:25:43 +00:00
bool are_inputs_initialized = false;
2020-12-27 11:13:17 +00:00
2020-01-09 10:37:27 +00:00
bool processPair(PortsPair & pair);
void finishPair(PortsPair & pair);
2020-01-09 10:37:27 +00:00
};
}