ClickHouse/src/Processors/PingPongProcessor.h

107 lines
3.4 KiB
C++
Raw Normal View History

#pragma once
#include <Processors/IProcessor.h>
#include <base/unit.h>
#include <Processors/Chunk.h>
#include <Common/logger_useful.h>
namespace DB
{
/*
* Processor with N inputs and N outputs. Moves data from i-th input to i-th output as is.
* It has a pair of auxiliary ports to notify another instance by sending empty chunk after some condition holds.
* You should use this processor in pair of instances and connect auxiliary ports crosswise.
*
*
* aux
* PingPongProcessor PingPongProcessor
* aux
*
*
* One of the processors starts processing data, and another waits for notification.
* When `isReady` returns true, the first stops processing, sends a ping to another and waits for notification.
* After that, the second one also processes data until `isReady`, then send a notification back to the first one.
* After this roundtrip, processors bypass data from regular inputs to outputs.
*/
class PingPongProcessor : public IProcessor
{
public:
enum class Order : uint8_t
{
/// Processor that starts processing data.
First,
/// Processor that waits for notification.
Second,
};
using enum Order;
/// The `aux_header` is a header from another instance of procssor.
/// It's required because all outputs should have the same structure.
/// We don't care about structure of another processor, because we send just empty chunk, but need to follow the contract.
PingPongProcessor(const Block & header, const Block & aux_header, size_t num_ports, Order order_);
String getName() const override { return "PingPongProcessor"; }
2022-08-03 11:26:47 +00:00
Status prepare() override;
std::pair<InputPort *, OutputPort *> getAuxPorts();
virtual bool isReady(const Chunk & chunk) = 0;
protected:
struct PortsPair
{
InputPort * input_port = nullptr;
OutputPort * output_port = nullptr;
bool is_finished = false;
};
bool sendPing();
bool recievePing();
bool canSend() const;
bool isPairsFinished() const;
bool processPair(PortsPair & pair);
void finishPair(PortsPair & pair);
2022-08-03 11:26:47 +00:00
Status processRegularPorts();
std::vector<PortsPair> port_pairs;
size_t num_finished_pairs = 0;
InputPort & aux_in_port;
OutputPort & aux_out_port;
bool is_send = false;
bool is_recieved = false;
bool ready_to_send = false;
2022-08-03 11:26:47 +00:00
/// Used to set 'needed' flag once for auxiliary input at first `prepare` call.
bool set_needed_once = false;
Order order;
};
/// Reads first N rows from two streams evenly.
class ReadHeadBalancedProcessor : public PingPongProcessor
{
public:
ReadHeadBalancedProcessor(const Block & header, const Block & aux_header, size_t num_ports, size_t size_to_wait_, Order order_)
2022-08-03 11:26:47 +00:00
: PingPongProcessor(header, aux_header, num_ports, order_) , data_consumed(0) , size_to_wait(size_to_wait_)
{}
bool isReady(const Chunk & chunk) override
{
data_consumed += chunk.getNumRows();
2022-08-03 11:26:47 +00:00
return data_consumed > size_to_wait;
}
private:
2022-08-03 11:26:47 +00:00
size_t data_consumed;
size_t size_to_wait;
};
}