Processors: experimental [#CLICKHOUSE-2948]

This commit is contained in:
Alexey Milovidov 2018-05-20 08:21:20 +03:00
parent b937decb5b
commit 0ffbd7cb1f
2 changed files with 102 additions and 11 deletions

View File

@ -199,6 +199,10 @@ public:
};
using InputPorts = std::vector<InputPort>;
using OutputPorts = std::vector<OutputPort>;
inline void connect(OutputPort & output, InputPort & input)
{
input.output_port = &output;
@ -253,13 +257,13 @@ public:
class IProcessor
{
protected:
std::list<InputPort> inputs;
std::list<OutputPort> outputs;
InputPorts inputs;
OutputPorts outputs;
public:
IProcessor() {}
IProcessor(std::list<InputPort> && inputs_, std::list<OutputPort> && outputs_)
IProcessor(InputPorts && inputs_, OutputPorts && outputs_)
: inputs(std::move(inputs_)), outputs(std::move(outputs_))
{
for (auto & port : inputs)
@ -328,6 +332,8 @@ class SequentialPipelineExecutor : IProcessor
private:
std::list<ProcessorPtr> processors;
/// Look for first Ready or Async processor by depth-first search in needed input ports.
/// NOTE: Pipeline must not have cycles.
template <typename Visit, typename Finish>
void traverse(IProcessor & processor, Visit && visit, Finish && finish)
{
@ -565,6 +571,83 @@ public:
};
class ResizeProcessor : public IProcessor
{
public:
using IProcessor::IProcessor;
String getName() const override { return "Resize"; }
Status prepare() override
{
bool all_outputs_full = true;
bool all_outputs_unneeded = true;
for (const auto & output : outputs)
{
if (!output.hasData())
all_outputs_full = false;
if (output.isNeeded())
all_outputs_unneeded = false;
}
if (all_outputs_full)
return Status::PortFull;
if (all_outputs_unneeded)
{
for (auto & input : inputs)
input.setNotNeeded();
return Status::Unneeded;
}
bool all_inputs_finished = true;
bool all_inputs_have_no_data = true;
for (auto & input : inputs)
{
if (!input.isFinished())
{
all_inputs_finished = false;
input.setNeeded();
if (input.hasData())
all_inputs_have_no_data = false;
}
}
if (all_inputs_finished)
return Status::Finished;
if (all_inputs_have_no_data)
return Status::NeedData;
return Status::Ready;
}
void work() override
{
for (auto & input : inputs)
{
if (input.hasData())
{
for (auto & output : outputs)
{
if (!output.hasData())
{
output.push(input.pull());
break;
}
}
break;
}
}
}
};
/*class AsynchronousProcessor : public IProcessor
{

View File

@ -43,8 +43,8 @@ protected:
public:
String getName() const override { return "SleepyNumbers"; }
SleepyNumbersSource()
: IProcessor({}, {std::move(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }}))}), output(outputs.front())
SleepyNumbersSource(UInt64 start_number, unsigned sleep_useconds)
: IProcessor({}, {std::move(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }}))}), output(outputs.front()), current_number(start_number), sleep_useconds(sleep_useconds)
{
}
@ -75,7 +75,7 @@ public:
active = true;
pool.schedule([&watch, this]
{
usleep(100000);
usleep(sleep_useconds);
current_block = generate();
active = false;
watch.notify();
@ -90,6 +90,7 @@ private:
std::atomic_bool active {false};
UInt64 current_number = 0;
unsigned sleep_useconds;
Block generate()
{
@ -138,14 +139,21 @@ private:
int main(int, char **)
try
{
auto source = std::make_shared<SleepyNumbersSource>();
auto sink = std::make_shared<PrintSink>();
auto limit = std::make_shared<LimitTransform>(source->getPort().getHeader(), 100, 0);
auto source1 = std::make_shared<SleepyNumbersSource>(0, 100000);
auto source2 = std::make_shared<SleepyNumbersSource>(1000, 200000);
connect(source->getPort(), limit->getInputPort());
auto header = source1->getPort().getHeader();
auto resize = std::make_shared<ResizeProcessor>(InputPorts{Block(header), Block(header)}, OutputPorts{Block(header)});
auto limit = std::make_shared<LimitTransform>(Block(header), 100, 0);
auto sink = std::make_shared<PrintSink>();
connect(source1->getPort(), resize->getInputs()[0]);
connect(source2->getPort(), resize->getInputs()[1]);
connect(resize->getOutputs()[0], limit->getInputPort());
connect(limit->getOutputPort(), sink->getPort());
SequentialPipelineExecutor executor({source, limit, sink});
SequentialPipelineExecutor executor({source1, source2, resize, limit, sink});
EventCounter watch;
while (true)