Processors: experimental [#CLICKHOUSE-2948]

This commit is contained in:
Alexey Milovidov 2018-05-24 05:52:21 +03:00
parent 661197bf99
commit 62296fc763
7 changed files with 34 additions and 7 deletions

View File

@ -37,7 +37,10 @@ ConcatProcessor::Status ConcatProcessor::prepare()
++current_input;
if (current_input == inputs.end())
{
output.setFinished();
return Status::Finished;
}
current_input->setNeeded();
}

View File

@ -33,6 +33,8 @@ ForkProcessor::Status ForkProcessor::prepare()
if (input.isFinished())
{
input.setNotNeeded();
for (auto & output : outputs)
output.setFinished();
return Status::Finished;
}
else

View File

@ -19,7 +19,7 @@ class ForkProcessor : public IProcessor
{
public:
ForkProcessor(Block header, size_t num_outputs)
: IProcessor(InputPorts{header}, OutputPorts(num_inputs, header))
: IProcessor(InputPorts{header}, OutputPorts(num_outputs, header))
{
}

View File

@ -33,7 +33,10 @@ ISimpleTransform::Status ISimpleTransform::prepare()
}
if (input.isFinished())
{
output.setFinished();
return Status::Finished;
}
input.setNeeded();
return Status::NeedData;

View File

@ -12,7 +12,10 @@ ISource::ISource(Block header)
ISource::Status ISource::prepare()
{
if (finished)
{
output.setFinished();
return Status::Finished;
}
if (output.hasData())
return Status::PortFull;

View File

@ -49,7 +49,11 @@ ResizeProcessor::Status ResizeProcessor::prepare()
if (all_inputs_have_no_data)
{
if (all_inputs_finished)
{
for (auto & output : outputs)
output.setFinished();
return Status::Finished;
}
else
return Status::NeedData;
}

View File

@ -6,6 +6,7 @@
#include <Processors/ISink.h>
#include <Processors/ResizeProcessor.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/ForkProcessor.h>
#include <Processors/LimitTransform.h>
#include <Processors/QueueBuffer.h>
#include <Processors/Executors/SequentialPipelineExecutor.h>
@ -118,12 +119,14 @@ class PrintSink : public ISink
public:
String getName() const override { return "Print"; }
PrintSink()
: ISink(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }}))
PrintSink(String prefix)
: ISink(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number" }})),
prefix(std::move(prefix))
{
}
private:
String prefix;
WriteBufferFromFileDescriptor out{STDOUT_FILENO};
void consume(Block block) override
@ -133,6 +136,7 @@ private:
for (size_t row_num = 0; row_num < rows; ++row_num)
{
writeString(prefix, out);
for (size_t column_num = 0; column_num < columns; ++column_num)
{
if (column_num != 0)
@ -178,25 +182,33 @@ try
connect(limit3->getOutputPort(), concat->getInputs()[0]);
connect(limit4->getOutputPort(), concat->getInputs()[1]);
auto fork = std::make_shared<ForkProcessor>(header, 2);
connect(concat->getOutputPort(), fork->getInputPort());
auto print_after_concat = std::make_shared<PrintSink>("---------- ");
connect(fork->getOutputs()[1], print_after_concat->getPort());
auto resize = std::make_shared<ResizeProcessor>(header, 4, 1);
connect(queue->getOutputPort(), resize->getInputs()[0]);
connect(source1->getPort(), resize->getInputs()[1]);
connect(source2->getPort(), resize->getInputs()[2]);
connect(concat->getOutputPort(), resize->getInputs()[3]);
connect(fork->getOutputs()[0], resize->getInputs()[3]);
auto limit = std::make_shared<LimitTransform>(header, 100, 0);
connect(resize->getOutputs()[0], limit->getInputPort());
auto sink = std::make_shared<PrintSink>();
auto sink = std::make_shared<PrintSink>("");
connect(limit->getOutputPort(), sink->getPort());
printPipeline({source0, source1, source2, source3, source4, limit0, limit3, limit4, limit, queue, concat, resize, sink});
printPipeline({source0, source1, source2, source3, source4, limit0, limit3, limit4, limit, queue, concat, fork, print_after_concat, resize, sink});
ThreadPool pool(4, 10);
ParallelPipelineExecutor executor({sink}, pool);
ParallelPipelineExecutor executor({sink, print_after_concat}, pool);
//SequentialPipelineExecutor executor({sink});
EventCounter watch;