From 0ffbd7cb1f352a579986ff49242770eadb29b56e Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 20 May 2018 08:21:20 +0300 Subject: [PATCH] Processors: experimental [#CLICKHOUSE-2948] --- dbms/src/Processors/Processor.h | 89 ++++++++++++++++++- dbms/src/Processors/tests/processors_test.cpp | 24 +++-- 2 files changed, 102 insertions(+), 11 deletions(-) diff --git a/dbms/src/Processors/Processor.h b/dbms/src/Processors/Processor.h index 44056340eaa..5702c316671 100644 --- a/dbms/src/Processors/Processor.h +++ b/dbms/src/Processors/Processor.h @@ -199,6 +199,10 @@ public: }; +using InputPorts = std::vector; +using OutputPorts = std::vector; + + inline void connect(OutputPort & output, InputPort & input) { input.output_port = &output; @@ -253,13 +257,13 @@ public: class IProcessor { protected: - std::list inputs; - std::list outputs; + InputPorts inputs; + OutputPorts outputs; public: IProcessor() {} - IProcessor(std::list && inputs_, std::list && outputs_) + IProcessor(InputPorts && inputs_, OutputPorts && outputs_) : inputs(std::move(inputs_)), outputs(std::move(outputs_)) { for (auto & port : inputs) @@ -328,6 +332,8 @@ class SequentialPipelineExecutor : IProcessor private: std::list processors; + /// Look for first Ready or Async processor by depth-first search in needed input ports. + /// NOTE: Pipeline must not have cycles. template void traverse(IProcessor & processor, Visit && visit, Finish && finish) { @@ -565,6 +571,83 @@ public: }; +class ResizeProcessor : public IProcessor +{ +public: + using IProcessor::IProcessor; + + String getName() const override { return "Resize"; } + + Status prepare() override + { + bool all_outputs_full = true; + bool all_outputs_unneeded = true; + + for (const auto & output : outputs) + { + if (!output.hasData()) + all_outputs_full = false; + + if (output.isNeeded()) + all_outputs_unneeded = false; + } + + if (all_outputs_full) + return Status::PortFull; + + if (all_outputs_unneeded) + { + for (auto & input : inputs) + input.setNotNeeded(); + + return Status::Unneeded; + } + + bool all_inputs_finished = true; + bool all_inputs_have_no_data = true; + + for (auto & input : inputs) + { + if (!input.isFinished()) + { + all_inputs_finished = false; + + input.setNeeded(); + if (input.hasData()) + all_inputs_have_no_data = false; + } + } + + if (all_inputs_finished) + return Status::Finished; + + if (all_inputs_have_no_data) + return Status::NeedData; + + return Status::Ready; + } + + void work() override + { + for (auto & input : inputs) + { + if (input.hasData()) + { + for (auto & output : outputs) + { + if (!output.hasData()) + { + output.push(input.pull()); + break; + } + } + break; + } + } + } +}; + + /*class AsynchronousProcessor : public IProcessor { diff --git a/dbms/src/Processors/tests/processors_test.cpp b/dbms/src/Processors/tests/processors_test.cpp index e9cdec1ffce..946d237789e 100644 --- a/dbms/src/Processors/tests/processors_test.cpp +++ b/dbms/src/Processors/tests/processors_test.cpp @@ -43,8 +43,8 @@ protected: public: String getName() const override { return "SleepyNumbers"; } - SleepyNumbersSource() - : IProcessor({}, {std::move(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared(), "number" }}))}), output(outputs.front()) + SleepyNumbersSource(UInt64 start_number, unsigned sleep_useconds) + : IProcessor({}, {std::move(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared(), "number" }}))}), output(outputs.front()), current_number(start_number), sleep_useconds(sleep_useconds) { } @@ -75,7 +75,7 @@ public: active = true; pool.schedule([&watch, this] { - usleep(100000); + usleep(sleep_useconds); current_block = generate(); active = false; watch.notify(); @@ -90,6 +90,7 @@ private: std::atomic_bool active {false}; UInt64 current_number = 0; + unsigned sleep_useconds; Block generate() { @@ -138,14 +139,21 @@ private: int main(int, char **) try { - auto source = std::make_shared(); - auto sink = std::make_shared(); - auto limit = std::make_shared(source->getPort().getHeader(), 100, 0); + auto source1 = std::make_shared(0, 100000); + auto source2 = std::make_shared(1000, 200000); - connect(source->getPort(), limit->getInputPort()); + auto header = source1->getPort().getHeader(); + + auto resize = std::make_shared(InputPorts{Block(header), Block(header)}, OutputPorts{Block(header)}); + auto limit = std::make_shared(Block(header), 100, 0); + auto sink = std::make_shared(); + + connect(source1->getPort(), resize->getInputs()[0]); + connect(source2->getPort(), resize->getInputs()[1]); + connect(resize->getOutputs()[0], limit->getInputPort()); connect(limit->getOutputPort(), sink->getPort()); - SequentialPipelineExecutor executor({source, limit, sink}); + SequentialPipelineExecutor executor({source1, source2, resize, limit, sink}); EventCounter watch; while (true)