Processors: experimental [#CLICKHOUSE-2948]

This commit is contained in:
Alexey Milovidov 2018-05-22 07:44:56 +03:00
parent 81177dc264
commit dabc6c8105
2 changed files with 146 additions and 28 deletions

View File

@ -212,6 +212,9 @@ using OutputPorts = std::vector<OutputPort>;
inline void connect(OutputPort & output, InputPort & input)
{
if (input.state || output.state)
throw Exception("Port is already connected");
input.output_port = &output;
output.input_port = &input;
input.state = std::make_shared<Port::State>();
@ -370,32 +373,27 @@ using ProcessorPtr = std::shared_ptr<IProcessor>;
/// Look for first Ready or Async processor by depth-first search in needed input ports and full output ports.
/// NOTE: Pipeline must not have cycles.
template <typename Visit, typename Finish>
void traverse(IProcessor & processor, Visit && visit, Finish && finish)
template <typename Visit>
void traverse(IProcessor & processor, Visit && visit)
{
IProcessor::Status status;
do
{
status = processor.prepare();
status = visit(processor);
} while (status == IProcessor::Status::Again);
// processor.dump();
// std::cerr << "status: " << static_cast<int>(status) << "\n\n";
visit(processor, status);
if (status == IProcessor::Status::Ready || status == IProcessor::Status::Async)
return finish(processor, status);
return;
if (status == IProcessor::Status::NeedData)
for (auto & input : processor.getInputs())
if (input.isNeeded())
traverse(input.getOutputPort().getProcessor(), std::forward<Visit>(visit), std::forward<Finish>(finish));
traverse(input.getOutputPort().getProcessor(), std::forward<Visit>(visit));
if (status == IProcessor::Status::PortFull)
for (auto & output : processor.getOutputs())
if (output.hasData())
traverse(output.getInputPort().getProcessor(), std::forward<Visit>(visit), std::forward<Finish>(finish));
traverse(output.getInputPort().getProcessor(), std::forward<Visit>(visit));
}
@ -429,15 +427,20 @@ public:
for (auto & element : processors)
{
traverse(*element,
[&] (IProcessor &, Status status)
[&] (IProcessor & processor)
{
Status status = processor.prepare();
if (status == Status::Wait)
has_someone_to_wait = true;
},
[&] (IProcessor & processor, Status status)
if (status == Status::Ready || status == Status::Async)
{
current_processor = &processor;
found_status = status;
}
return status;
});
if (current_processor)
@ -512,23 +515,29 @@ public:
for (auto & element : processors)
{
traverse(*element,
[&] (IProcessor &, Status status)
[&] (IProcessor & processor)
{
if (status == Status::Wait)
has_someone_to_wait = true;
},
[&] (IProcessor & processor, Status status)
{
std::lock_guard lock(mutex);
if (active_processors.count(&processor))
{
has_someone_to_wait = true;
return Status::Wait;
}
else
}
Status status = processor.prepare();
if (status == Status::Wait)
has_someone_to_wait = true;
if (status == Status::Ready || status == Status::Async)
{
current_processor = &processor;
current_status = status;
}
return status;
});
if (current_processor)
@ -730,6 +739,112 @@ public:
};
/** Has one input and one output.
* Pulls all blocks from input, and only then produce output.
* Examples: ORDER BY, GROUP BY.
*/
class IAccumulatingTransform : public IProcessor
{
protected:
InputPort & input;
OutputPort & output;
Block current_input_block;
Block current_output_block;
bool finished = false;
virtual void consume(Block block) = 0;
virtual Block generate() = 0;
public:
IAccumulatingTransform(Block input_header, Block output_header)
: IProcessor({std::move(input_header)}, {std::move(output_header)}),
input(inputs.front()), output(outputs.front())
{
}
Status prepare() override
{
if (!output.isNeeded())
return Status::Unneeded;
if (current_input_block)
return Status::Ready;
if (current_output_block)
{
if (output.hasData())
return Status::PortFull;
else
output.push(std::move(current_output_block));
}
if (input.hasData())
{
current_input_block = input.pull();
return Status::Ready;
}
if (input.isFinished())
{
if (finished)
return Status::Finished;
return Status::Ready;
}
input.setNeeded();
return Status::NeedData;
}
void work() override
{
if (current_input_block)
{
consume(std::move(current_input_block));
}
else
{
current_output_block = generate();
if (!current_output_block)
finished = true;
}
}
InputPort & getInputPort() { return input; }
OutputPort & getOutputPort() { return output; }
};
class QueueBuffer : public IAccumulatingTransform
{
private:
std::queue<Block> blocks;
public:
String getName() const override { return "QueueBuffer"; }
QueueBuffer(Block header)
: IAccumulatingTransform(header, header)
{
}
void consume(Block block) override
{
blocks.push(std::move(block));
}
Block generate() override
{
if (blocks.empty())
return {};
Block res = std::move(blocks.front());
blocks.pop();
return std::move(res);
}
};
class ResizeProcessor : public IProcessor
{
public:

View File

@ -146,12 +146,15 @@ try
auto source1 = std::make_shared<SleepyNumbersSource>(100, 100000);
auto source2 = std::make_shared<SleepyNumbersSource>(1000, 200000);
auto queue = std::make_shared<QueueBuffer>(header);
auto resize = std::make_shared<ResizeProcessor>(InputPorts{Block(header), Block(header), Block(header)}, OutputPorts{Block(header)});
auto limit = std::make_shared<LimitTransform>(Block(header), 100, 0);
auto sink = std::make_shared<PrintSink>();
connect(source0->getPort(), limit0->getInputPort());
connect(limit0->getOutputPort(), resize->getInputs()[0]);
connect(limit0->getOutputPort(), queue->getInputPort());
connect(queue->getOutputPort(), resize->getInputs()[0]);
connect(source1->getPort(), resize->getInputs()[1]);
connect(source2->getPort(), resize->getInputs()[2]);
connect(resize->getOutputs()[0], limit->getInputPort());