diff --git a/dbms/src/Processors/Processor.h b/dbms/src/Processors/Processor.h index c154ed3835e..e568c11d862 100644 --- a/dbms/src/Processors/Processor.h +++ b/dbms/src/Processors/Processor.h @@ -9,6 +9,8 @@ #include #include +#include + /** Processor is an element of query execution pipeline. * It has zero or more input ports and zero or more output ports. @@ -180,7 +182,7 @@ private: public: using Port::Port; - void push(Block && block) + void push(Block block) { if (hasData()) throw Exception("Port already has data"); @@ -266,7 +268,7 @@ protected: public: IProcessor() {} - IProcessor(InputPorts && inputs_, OutputPorts && outputs_) + IProcessor(InputPorts inputs_, OutputPorts outputs_) : inputs(std::move(inputs_)), outputs(std::move(outputs_)) { for (auto & port : inputs) @@ -299,19 +301,44 @@ public: /// You may call 'schedule' method and processor will initiate some background work. Async, - /// Processor is doing some work in background and you have to wait. - Wait + /// Processor is doing some work in background. + /// You may wait for next event or do something else and then you should call 'prepare' again. + Wait, + + /// Call 'prepare' again. + Again, }; + /** Method 'prepare' is responsible for all cheap ("instantenous": O(1) of data volume, no wait) calculations. + * + * It may access input and output ports, + * indicate the need for work by another processor by returning NeedData or PortFull, + * or indicate the absense of work by returning Finished or Unneeded, + * it may pull data from input ports and push data to output ports. + * + * The method is not thread-safe and must be called from a single thread in one moment of time, + * even for different connected processors. + * + * Instead of all long work (CPU calculations or waiting) it should just prepare all required data and return Ready or Async. + */ virtual Status prepare() = 0; - /// You may call this method if 'status' returned Ready. + /** You may call this method if 'prepare' returned Ready. + * This method cannot access any ports. It should use only data that was prepared by 'prepare' method. + * + * Method work can be executed in parallel for different processors. + */ virtual void work() { throw Exception("Method 'work' is not implemented for " + getName() + " processor"); } - /// You may call this method if 'status' returned Async. + /** You may call this method if 'prepare' returned Async. + * This method cannot access any ports. It should use only data that was prepared by 'prepare' method. + * + * This method should return instantly and fire an event when asynchronous job will be done. + * When the job is not done, method 'prepare' will return Wait and the user may block and wait for next event before checking again. + */ virtual void schedule(EventCounter & /*watch*/) { throw Exception("Method 'schedule' is not implemented for " + getName() + " processor"); @@ -321,6 +348,19 @@ public: auto & getInputs() { return inputs; } auto & getOutputs() { return outputs; } + + void dump() const + { + std::cerr << getName() << "\n"; + + std::cerr << "inputs:\n"; + for (const auto & port : inputs) + std::cerr << "\t" << port.hasData() << " " << port.isNeeded() << " " << port.isFinished() << "\n"; + + std::cerr << "outputs:\n"; + for (const auto & port : outputs) + std::cerr << "\t" << port.hasData() << " " << port.isNeeded() << "\n"; + } }; using ProcessorPtr = std::shared_ptr; @@ -334,13 +374,22 @@ class SequentialPipelineExecutor : IProcessor { private: std::list processors; + IProcessor * current_processor = nullptr; /// Look for first Ready or Async processor by depth-first search in needed input ports. /// NOTE: Pipeline must not have cycles. template void traverse(IProcessor & processor, Visit && visit, Finish && finish) { - Status status = processor.prepare(); + Status status; + do + { + status = processor.prepare(); + } while (status == Status::Again); + +// processor.dump(); +// std::cerr << "status: " << static_cast(status) << "\n\n"; + visit(processor, status); if (status == Status::Ready || status == Status::Async) @@ -350,6 +399,11 @@ private: for (auto & input : processor.getInputs()) if (input.isNeeded()) traverse(input.getOutputPort().getProcessor(), std::forward(visit), std::forward(finish)); + + if (status == Status::PortFull) + for (auto & output : processor.getOutputs()) + if (output.hasData()) + traverse(output.getInputPort().getProcessor(), std::forward(visit), std::forward(finish)); } public: @@ -362,10 +416,13 @@ public: Status prepare() override { + current_processor = nullptr; + bool has_someone_to_wait = false; - bool found = false; Status found_status = Status::Finished; +// std::cerr << "\n\n-----------------------------\n\n"; + for (auto & element : processors) { traverse(*element, @@ -374,17 +431,17 @@ public: if (status == Status::Wait) has_someone_to_wait = true; }, - [&] (IProcessor &, Status status) + [&] (IProcessor & processor, Status status) { - found = true; + current_processor = &processor; found_status = status; }); - if (found) + if (current_processor) break; } - if (found) + if (current_processor) return found_status; if (has_someone_to_wait) return Status::Wait; @@ -402,52 +459,19 @@ public: void work() override { - bool found = false; - for (auto & element : processors) - { - traverse(*element, - [] (IProcessor &, Status) {}, - [&found] (IProcessor & processor, Status status) - { - if (status == Status::Ready) - { - found = true; - //std::cerr << processor.getName() << " will work\n"; - processor.work(); - } - }); - - if (found) - break; - } - - if (!found) + if (!current_processor) throw Exception("Bad pipeline"); + +// std::cerr << current_processor->getName() << " will work\n"; + current_processor->work(); } void schedule(EventCounter & watch) override { - bool found = false; - for (auto & element : processors) - { - traverse(*element, - [] (IProcessor &, Status) {}, - [&found, &watch] (IProcessor & processor, Status status) - { - if (status == Status::Async) - { - found = true; - //std::cerr << processor.getName() << " will schedule\n"; - processor.schedule(watch); - } - }); - - if (found) - break; - } - - if (!found) + if (!current_processor) throw Exception("Bad pipeline"); + + current_processor->schedule(watch); } }; @@ -457,11 +481,12 @@ class ISource : public IProcessor protected: OutputPort & output; bool finished = false; + Block current_block; virtual Block generate() = 0; public: - ISource(Block && header) + ISource(Block header) : IProcessor({}, {std::move(header)}), output(outputs.front()) { } @@ -477,14 +502,16 @@ public: if (!output.isNeeded()) return Status::Unneeded; + if (current_block) + output.push(std::move(current_block)); + return Status::Ready; } void work() override { - if (Block block = generate()) - output.push(std::move(block)); - else + current_block = generate(); + if (!current_block) finished = true; } @@ -496,20 +523,27 @@ class ISink : public IProcessor { protected: InputPort & input; + Block current_block; - virtual void consume(Block && block) = 0; + virtual void consume(Block block) = 0; public: - ISink(Block && header) + ISink(Block header) : IProcessor({std::move(header)}, {}), input(inputs.front()) { } Status prepare() override { - if (input.hasData()) + if (current_block) return Status::Ready; + if (input.hasData()) + { + current_block = input.pull(); + return Status::Ready; + } + if (input.isFinished()) return Status::Finished; @@ -519,7 +553,10 @@ public: void work() override { - consume(input.pull()); +// std::cerr << "Working\n"; + consume(std::move(current_block)); + +// std::cerr << current_block.columns() << "\n"; } InputPort & getPort() { return input; } @@ -535,10 +572,13 @@ protected: InputPort & input; OutputPort & output; + Block current_block; + bool transformed = false; + virtual void transform(Block & block) = 0; public: - ITransform(Block && input_header, Block && output_header) + ITransform(Block input_header, Block output_header) : IProcessor({std::move(input_header)}, {std::move(output_header)}), input(inputs.front()), output(outputs.front()) { @@ -546,14 +586,25 @@ public: Status prepare() override { - if (output.hasData()) - return Status::PortFull; - if (!output.isNeeded()) return Status::Unneeded; + if (current_block) + { + if (!transformed) + return Status::Ready; + else if (output.hasData()) + return Status::PortFull; + else + output.push(std::move(current_block)); + } + if (input.hasData()) + { + current_block = input.pull(); + transformed = false; return Status::Ready; + } if (input.isFinished()) return Status::Finished; @@ -564,9 +615,8 @@ public: void work() override { - Block data = input.pull(); - transform(data); - output.push(std::move(data)); + transform(current_block); + transformed = true; } InputPort & getInputPort() { return input; } @@ -621,17 +671,14 @@ public: } } - if (all_inputs_finished) - return Status::Finished; - if (all_inputs_have_no_data) - return Status::NeedData; + { + if (all_inputs_finished) + return Status::Finished; + else + return Status::NeedData; + } - return Status::Ready; - } - - void work() override - { for (auto & input : inputs) { if (input.hasData()) @@ -647,6 +694,8 @@ public: break; } } + + return Status::Again; } }; @@ -665,11 +714,13 @@ private: size_t limit; size_t offset; - size_t pos = 0; /// how many rows were read, including the last read block + size_t rows_read = 0; /// including the last read block bool always_read_till_end; + Block current_block; + public: - LimitTransform(Block && header, size_t limit, size_t offset, bool always_read_till_end = false) + LimitTransform(Block header, size_t limit, size_t offset, bool always_read_till_end = false) : IProcessor({std::move(header)}, {std::move(header)}), input(inputs.front()), output(outputs.front()), limit(limit), offset(offset), always_read_till_end(always_read_till_end) @@ -680,7 +731,15 @@ public: Status prepare() override { - if (pos >= offset + limit) + if (current_block) + { + if (output.hasData()) + return Status::PortFull; + + output.push(std::move(current_block)); + } + + if (rows_read >= offset + limit) { output.setFinished(); if (!always_read_till_end) @@ -690,53 +749,62 @@ public: } } - if (output.hasData()) - return Status::PortFull; - if (!output.isNeeded()) return Status::Unneeded; input.setNeeded(); - return input.hasData() - ? Status::Ready - : Status::NeedData; + + if (!input.hasData()) + return Status::NeedData; + + current_block = input.pull(); + + /// Skip block (for 'always_read_till_end' case) + if (rows_read >= offset + limit) + { + current_block.clear(); + return Status::NeedData; + } + + size_t rows = current_block.rows(); + rows_read += rows; + + if (rows_read <= offset) + { + current_block.clear(); + return Status::NeedData; + } + + /// return the whole block + if (rows_read >= offset + rows && rows_read <= offset + limit) + { + if (output.hasData()) + return Status::PortFull; + + output.push(std::move(current_block)); + return Status::NeedData; + } + + return Status::Ready; } void work() override { - Block block = input.pull(); - - if (pos >= offset + limit) - return; - - size_t rows = block.rows(); - pos += rows; - - if (pos <= offset) - return; - - /// return the whole block - if (pos >= offset + rows && pos <= offset + limit) - { - output.push(std::move(block)); - return; - } + size_t rows = current_block.rows(); + size_t columns = current_block.columns(); /// return a piece of the block size_t start = std::max( static_cast(0), - static_cast(offset) - static_cast(pos) + static_cast(rows)); + static_cast(offset) - static_cast(rows_read) + static_cast(rows)); size_t length = std::min( static_cast(limit), std::min( - static_cast(pos) - static_cast(offset), - static_cast(limit) + static_cast(offset) - static_cast(pos) + static_cast(rows))); + static_cast(rows_read) - static_cast(offset), + static_cast(limit) + static_cast(offset) - static_cast(rows_read) + static_cast(rows))); - size_t columns = block.columns(); for (size_t i = 0; i < columns; ++i) - block.getByPosition(i).column = block.getByPosition(i).column->cut(start, length); - - output.push(std::move(block)); + current_block.getByPosition(i).column = current_block.getByPosition(i).column->cut(start, length); } InputPort & getInputPort() { return input; } diff --git a/dbms/src/Processors/tests/processors_test.cpp b/dbms/src/Processors/tests/processors_test.cpp index b9941ab1d63..477a4413db5 100644 --- a/dbms/src/Processors/tests/processors_test.cpp +++ b/dbms/src/Processors/tests/processors_test.cpp @@ -62,12 +62,8 @@ public: if (output.hasData()) return Status::PortFull; - return Status::Ready; - } - - void work() override - { output.push(std::move(current_block)); + return Status::Again; } void schedule(EventCounter & watch) override @@ -115,7 +111,7 @@ public: private: WriteBufferFromFileDescriptor out{STDOUT_FILENO}; - void consume(Block && block) override + void consume(Block block) override { size_t rows = block.rows(); size_t columns = block.columns(); @@ -139,21 +135,25 @@ private: int main(int, char **) try { - auto source1 = std::make_shared(0, 100000); + auto source0 = std::make_shared(); + auto header = source0->getPort().getHeader(); + auto limit0 = std::make_shared(Block(header), 10, 0); + + auto source1 = std::make_shared(100, 100000); auto source2 = std::make_shared(1000, 200000); - auto header = source1->getPort().getHeader(); - - auto resize = std::make_shared(InputPorts{Block(header), Block(header)}, OutputPorts{Block(header)}); + auto resize = std::make_shared(InputPorts{Block(header), Block(header), Block(header)}, OutputPorts{Block(header)}); auto limit = std::make_shared(Block(header), 100, 0); auto sink = std::make_shared(); - connect(source1->getPort(), resize->getInputs()[0]); - connect(source2->getPort(), resize->getInputs()[1]); + connect(source0->getPort(), limit0->getInputPort()); + connect(limit0->getOutputPort(), resize->getInputs()[0]); + connect(source1->getPort(), resize->getInputs()[1]); + connect(source2->getPort(), resize->getInputs()[2]); connect(resize->getOutputs()[0], limit->getInputPort()); connect(limit->getOutputPort(), sink->getPort()); - SequentialPipelineExecutor executor({source1, source2, resize, limit, sink}); + SequentialPipelineExecutor executor({sink}); EventCounter watch; while (true)