ClickHouse/src/Processors/ResizeProcessor.h
2023-02-02 15:59:39 +00:00

144 lines
3.7 KiB
C++

#pragma once
#include <Processors/IProcessor.h>
#include <queue>
namespace DB
{
/** Has arbitrary non zero number of inputs and arbitrary non zero number of outputs.
* All of them have the same structure.
*
* Pulls data from arbitrary input (whenever it is ready) and pushes it to arbitrary output (whenever is is not full).
* Doesn't do any heavy calculations.
* Doesn't preserve an order of data.
*
* Examples:
* - union data from multiple inputs to single output - to serialize data that was processed in parallel.
* - split data from single input to multiple outputs - to allow further parallel processing.
*/
class ResizeProcessor final : public IProcessor
{
public:
/// TODO Check that there is non zero number of inputs and outputs.
ResizeProcessor(const Block & header, size_t num_inputs, size_t num_outputs)
: IProcessor(InputPorts(num_inputs, header), OutputPorts(num_outputs, header))
, current_input(inputs.begin())
, current_output(outputs.begin())
{
}
String getName() const override { return "Resize"; }
Status prepare() override;
Status prepare(const PortNumbers &, const PortNumbers &) override;
private:
InputPorts::iterator current_input;
OutputPorts::iterator current_output;
size_t num_finished_inputs = 0;
size_t num_finished_outputs = 0;
std::queue<UInt64> waiting_outputs;
std::queue<UInt64> inputs_with_data;
bool initialized = false;
enum class OutputStatus
{
NotActive,
NeedData,
Finished,
};
enum class InputStatus
{
NotActive,
HasData,
Finished,
};
struct InputPortWithStatus
{
InputPort * port;
InputStatus status;
};
struct OutputPortWithStatus
{
OutputPort * port;
OutputStatus status;
};
std::vector<InputPortWithStatus> input_ports;
std::vector<OutputPortWithStatus> output_ports;
};
class StrictResizeProcessor : public IProcessor
{
public:
/// TODO Check that there is non zero number of inputs and outputs.
StrictResizeProcessor(const Block & header, size_t num_inputs, size_t num_outputs)
: IProcessor(InputPorts(num_inputs, header), OutputPorts(num_outputs, header))
, current_input(inputs.begin())
, current_output(outputs.begin())
{
}
StrictResizeProcessor(InputPorts inputs_, OutputPorts outputs_)
: IProcessor(inputs_, outputs_)
, current_input(inputs.begin())
, current_output(outputs.begin())
{
}
String getName() const override { return "StrictResize"; }
Status prepare(const PortNumbers &, const PortNumbers &) override;
private:
InputPorts::iterator current_input;
OutputPorts::iterator current_output;
size_t num_finished_inputs = 0;
size_t num_finished_outputs = 0;
std::queue<UInt64> disabled_input_ports;
std::queue<UInt64> waiting_outputs;
bool initialized = false;
enum class OutputStatus
{
NotActive,
NeedData,
Finished,
};
enum class InputStatus
{
NotActive,
NeedData,
Finished,
};
struct InputPortWithStatus
{
InputPort * port;
InputStatus status;
ssize_t waiting_output;
};
struct OutputPortWithStatus
{
OutputPort * port;
OutputStatus status;
};
std::vector<InputPortWithStatus> input_ports;
std::vector<OutputPortWithStatus> output_ports;
/// This field contained chunks which were read for output which had became finished while reading was happening.
/// They will be pushed to any next waiting output.
std::vector<Port::Data> abandoned_chunks;
};
}