diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index 813c63109fb..c31e52d9823 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -55,6 +55,7 @@ add_headers_and_sources(dbms src/Storages) add_headers_and_sources(dbms src/Storages/Distributed) add_headers_and_sources(dbms src/Storages/MergeTree) add_headers_and_sources(dbms src/Client) +add_headers_and_sources(dbms src/Processors) add_headers_only(dbms src/Server) list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD}) diff --git a/dbms/src/CMakeLists.txt b/dbms/src/CMakeLists.txt index d52fb965f9e..9f3c13643f8 100644 --- a/dbms/src/CMakeLists.txt +++ b/dbms/src/CMakeLists.txt @@ -14,3 +14,4 @@ add_subdirectory (Server) add_subdirectory (Client) add_subdirectory (TableFunctions) add_subdirectory (Analyzers) +add_subdirectory (Processors) diff --git a/dbms/src/Processors/CMakeLists.txt b/dbms/src/Processors/CMakeLists.txt new file mode 100644 index 00000000000..99ba159eaf4 --- /dev/null +++ b/dbms/src/Processors/CMakeLists.txt @@ -0,0 +1,4 @@ +if (ENABLE_TESTS) + add_subdirectory (tests) +endif () + diff --git a/dbms/src/Processors/Processor.h b/dbms/src/Processors/Processor.h new file mode 100644 index 00000000000..0050eb05994 --- /dev/null +++ b/dbms/src/Processors/Processor.h @@ -0,0 +1,564 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + + +/** Processor is an element of query execution pipeline. + * It has zero or more input ports and zero or more output ports. + * + * Blocks of data are transferred over ports. + * Each port has fixed structure: names and types of columns and values of constants. + * + * Processors may pull data from input ports, do some processing and push data to output ports. + * Processor may indicate that it requires input data to proceed and set priorities of input ports. + * + * Synchronous work must only use CPU - don't do any sleep, IO wait, network wait. + * + * Processor may want to do work asynchronously (example: fetch data from remote server) + * - in this case it will initiate background job and allow to subscribe to it. + * + * Processor may throw an exception to indicate some runtime error. + * + * Different ports may have different structure. For example, ports may correspond to different datasets. + * + * TODO Ports may carry algebraic properties about streams of data. + * For example, that data comes ordered by specific key; or grouped by specific key; or have unique values of specific key. + * + * Examples: + * + * Source. Has no input ports and single output port. Generates data itself and pushes it to its output port. + * + * Sink. Has single input port and no output ports. Consumes data that was passed to its input port. + * + * Empty source. Immediately says that data on its output port is finished. + * + * Null sink. Consumes data and does nothing. + * + * Simple transformation. Has single input and single output port. Pulls data, transforms it and pushes to output port. + * Example: expression calculator. + * + * Squashing or filtering transformation. Pulls data, possibly accumulates it, and sometimes pushes it to output port. + * Examples: DISTINCT, WHERE, squashing of blocks for INSERT SELECT. + * + * Accumulating transformation. Pulls and accumulates all data from input until it it exhausted, then pushes data to output port. + * Examples: ORDER BY, GROUP BY. + * + * Limiting transformation. Pulls data from input and passes to output. + * When there was enough data, says that it doesn't need data on its input and that data on its output port is finished. + * + * Resize. Has arbitary number of inputs and arbitary number of outputs. Pulls data from whatever ready input and pushes it to randomly choosed output. + * Examples: + * Union - merge data from number of inputs to one output in arbitary order. + * Split - read data from one input and pass it to arbitary output. + * + * Concat. Has many inputs but only one output. Pulls all data from first input, then all data from second input, etc. and pushes it to output. + * + * Ordered merge. Has many inputs but only one output. Pulls data from selected input in specific order, merges and pushes it to output. + */ + +namespace DB +{ + +class InputPort; +class OutputPort; + +class Port +{ +protected: + friend void connect(OutputPort &, InputPort &); + + /// Shared state of two connected ports. + struct State + { + Block data; + bool needed = false; + bool finished = false; + }; + + Block header; + std::shared_ptr state; + +public: + Port(const Block & header) + : header(header) {} + + Block getHeader() const { return header; } + bool isConnected() const { return state != nullptr; } + + void assumeConnected() const + { + if (!isConnected()) + throw Exception("Port is not connected"); + } + + bool hasData() const + { + assumeConnected(); + return state->data; + } + + bool isNeeded() const + { + assumeConnected(); + return state->needed; + } +}; + + +class InputPort : public Port +{ +public: + using Port::Port; + + Block pull() + { + if (!hasData()) + throw Exception("Port has no data"); + + return std::move(state->data); + } + + bool isFinished() const + { + assumeConnected(); + return state->finished; + } + + void setNeeded() + { + assumeConnected(); + state->needed = true; + } + + void setNotNeeded() + { + assumeConnected(); + state->needed = false; + } +}; + + +class OutputPort : public Port +{ +public: + using Port::Port; + + void push(Block && block) + { + if (hasData()) + throw Exception("Port already has data"); + + state->data = std::move(block); + } + + void setFinished() + { + assumeConnected(); + state->finished = true; + } +}; + + +inline void connect(OutputPort & output, InputPort & input) +{ + input.state = output.state = std::make_shared(); +} + + +/** Allow to subscribe for multiple events and wait for them one by one in arbitary order. + */ +class EventCounter +{ +private: + size_t events_happened = 0; + size_t events_waited = 0; + + mutable std::mutex mutex; + std::condition_variable condvar; + +public: + void notify() + { + { + std::lock_guard lock(mutex); + ++events_happened; + } + condvar.notify_all(); + } + + void wait() + { + std::unique_lock lock(mutex); + condvar.wait(lock, [&]{ return events_happened > events_waited; }); + ++events_waited; + } + + template + bool waitFor(Duration && duration) + { + std::unique_lock lock(mutex); + if (condvar.wait(lock, std::forward(duration), [&]{ return events_happened > events_waited; })) + { + ++events_waited; + return true; + } + return false; + } +}; + + +class IProcessor +{ +protected: + std::list inputs; + std::list outputs; + +public: + IProcessor() {} + + IProcessor(std::list && inputs, std::list && outputs) + : inputs(std::move(inputs)), outputs(std::move(outputs)) {} + + virtual String getName() const = 0; + + enum class Status + { + /// Processor needs some data at its inputs to proceed. + /// You need to run another processor to generate required input and proceed. + NeedData, + + /// Processor cannot proceed because output port is full. + /// You need to transfer data from output port to the input port of another processor. + PortFull, + + /// All work is done, nothing more to do. + Finished, + + /// You may call 'work' method and processor will do some work synchronously. + Ready, + + /// You may call 'schedule' method and processor will initiate some background work. + Async, + + /// Processor is doing some work in background and you have to wait. + Wait + }; + + virtual Status getStatus() = 0; + + /// You may call this method if 'status' returned Ready. + virtual void work() + { + throw Exception("Method 'work' is not implemented for " + getName() + " processor"); + } + + /// You may call this method if 'status' returned Async. + virtual void schedule(EventCounter & /*watch*/) + { + throw Exception("Method 'schedule' is not implemented for " + getName() + " processor"); + } + + virtual ~IProcessor() {} + + /// Someone needs data on at least one output port or it has no outputs. + bool isNeeded() const + { + if (outputs.empty()) + return true; + + for (const auto & output : outputs) + if (output.isNeeded()) + return true; + + return false; + } +}; + +using ProcessorPtr = std::shared_ptr; + + +/** Wraps pipeline in a single processor. + * This processor has no inputs and outputs and just executes the pipeline, + * performing all synchronous work from the current thread. + */ +class SequentialPipelineExecutor : IProcessor +{ +private: + std::list processors; + +public: + SequentialPipelineExecutor(const std::list & processors) : processors(processors) {} + + String getName() const override { return "SequentialPipelineExecutor"; } + + Status getStatus() override + { + for (auto & element : processors) + if (element->getStatus() == Status::Async) + return Status::Async; + + for (auto & element : processors) + if (element->getStatus() == Status::Ready && element->isNeeded()) + return Status::Ready; + + for (auto & element : processors) + if (element->getStatus() == Status::Wait) + return Status::Wait; + + for (auto & element : processors) + { + if (element->getStatus() == Status::NeedData) + throw Exception("Pipeline stuck: " + element->getName() + " processor need input data but no one is going to generate it"); + if (element->getStatus() == Status::PortFull) + throw Exception("Pipeline stuck: " + element->getName() + " processor have data in output port but no one is going to consume it"); + } + + /// TODO Check that all processors are finished. + return Status::Finished; + } + + void work() override + { + /// Execute one ready and needed processor. + for (auto it = processors.begin(); it != processors.end(); ++it) + { + auto & element = *it; + + //std::cerr << element->getName() << " status is " << static_cast(element->getStatus()) << "\n"; + + if (element->getStatus() == Status::Ready && element->isNeeded()) + { + //std::cerr << element->getName() << " will work\n"; + + element->work(); + processors.splice(processors.end(), processors, it); + return; + } + } + + throw Exception("Bad pipeline"); + } + + void schedule(EventCounter & watch) override + { + /// Schedule all needed asynchronous jobs. + for (auto it = processors.begin(); it != processors.end(); ++it) + { + auto & element = *it; + + if (element->getStatus() == Status::Async && element->isNeeded()) + element->schedule(watch); + } + + throw Exception("Bad pipeline"); + } +}; + + +class ISource : public IProcessor +{ +protected: + OutputPort & output; + bool finished = false; + + virtual Block generate() = 0; + +public: + ISource(Block && header) + : IProcessor({}, {std::move(header)}), output(outputs.front()) + { + } + + Status getStatus() override + { + if (finished) + return Status::Finished; + + if (output.hasData()) + return Status::PortFull; + + return Status::Ready; + } + + void work() override + { + if (Block block = generate()) + output.push(std::move(block)); + else + finished = true; + } + + OutputPort & getPort() { return output; } +}; + + +class ISink : public IProcessor +{ +protected: + InputPort & input; + + virtual void consume(Block && block) = 0; + +public: + ISink(Block && header) + : IProcessor({std::move(header)}, {}), input(inputs.front()) + { + } + + Status getStatus() override + { + if (input.hasData()) + return Status::Ready; + + if (input.isFinished()) + return Status::Finished; + + input.setNeeded(); + return Status::NeedData; + } + + void work() override + { + consume(input.pull()); + } + + InputPort & getPort() { return input; } +}; + + +/** Has one input and one output. + * Simply pull a block from input, transform it, and push it to output. + */ +class ITransform : public IProcessor +{ +protected: + InputPort & input; + OutputPort & output; + + virtual void transform(Block & block) = 0; + +public: + ITransform(Block && input_header, Block && output_header) + : IProcessor({std::move(input_header)}, {std::move(output_header)}), + input(inputs.front()), output(outputs.front()) + { + } + + Status getStatus() override + { + if (output.hasData()) + return Status::PortFull; + + if (input.hasData()) + return Status::Ready; + + if (input.isFinished()) + return Status::Finished; + + input.setNeeded(); + return Status::NeedData; + } + + void work() override + { + Block data = input.pull(); + transform(data); + output.push(std::move(data)); + } + + InputPort & getInputPort() { return input; } + OutputPort & getOutputPort() { return output; } +}; + + +class LimitTransform : public IProcessor +{ +private: + InputPort & input; + OutputPort & output; + + size_t limit; + size_t offset; + size_t pos = 0; /// how many rows were read, including the last read block + bool always_read_till_end; + +public: + LimitTransform(Block && header, size_t limit, size_t offset, bool always_read_till_end = false) + : IProcessor({std::move(header)}, {std::move(header)}), + input(inputs.front()), output(outputs.front()), + limit(limit), offset(offset), always_read_till_end(always_read_till_end) + { + } + + String getName() const override { return "Limit"; } + + Status getStatus() override + { + if (pos >= offset + limit) + { + output.setFinished(); + if (!always_read_till_end) + { + input.setNotNeeded(); + return Status::Finished; + } + } + + if (output.hasData()) + return Status::PortFull; + + input.setNeeded(); + return input.hasData() + ? Status::Ready + : Status::NeedData; + } + + void work() override + { + Block block = input.pull(); + + if (pos >= offset + limit) + return; + + size_t rows = block.rows(); + pos += rows; + + if (pos <= offset) + return; + + /// return the whole block + if (pos >= offset + rows && pos <= offset + limit) + { + output.push(std::move(block)); + return; + } + + /// return a piece of the block + size_t start = std::max( + static_cast(0), + static_cast(offset) - static_cast(pos) + static_cast(rows)); + + size_t length = std::min( + static_cast(limit), std::min( + static_cast(pos) - static_cast(offset), + static_cast(limit) + static_cast(offset) - static_cast(pos) + static_cast(rows))); + + size_t columns = block.columns(); + for (size_t i = 0; i < columns; ++i) + block.getByPosition(i).column = block.getByPosition(i).column->cut(start, length); + + output.push(std::move(block)); + } + + InputPort & getInputPort() { return input; } + OutputPort & getOutputPort() { return output; } +}; + +} diff --git a/dbms/src/Processors/tests/CMakeLists.txt b/dbms/src/Processors/tests/CMakeLists.txt new file mode 100644 index 00000000000..7a66dd3d9de --- /dev/null +++ b/dbms/src/Processors/tests/CMakeLists.txt @@ -0,0 +1,2 @@ +add_executable (processors_test processors_test.cpp) +target_link_libraries (processors_test dbms) diff --git a/dbms/src/Processors/tests/processors_test.cpp b/dbms/src/Processors/tests/processors_test.cpp new file mode 100644 index 00000000000..1f9b5b26592 --- /dev/null +++ b/dbms/src/Processors/tests/processors_test.cpp @@ -0,0 +1,99 @@ +#include +#include +#include +#include +#include +#include + + +using namespace DB; + + +class NumbersSource : public ISource +{ +public: + String getName() const override { return "Numbers"; } + + NumbersSource() + : ISource(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared(), "number" }})) + { + } + +private: + UInt64 current_number = 0; + + Block generate() override + { + MutableColumns columns; + columns.emplace_back(ColumnUInt64::create(1, current_number)); + ++current_number; + return getPort().getHeader().cloneWithColumns(std::move(columns)); + } +}; + + +class PrintSink : public ISink +{ +public: + String getName() const override { return "Print"; } + + PrintSink() + : ISink(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared(), "number" }})) + { + } + +private: + WriteBufferFromFileDescriptor out{STDOUT_FILENO}; + + void consume(Block && block) override + { + size_t rows = block.rows(); + size_t columns = block.columns(); + + for (size_t row_num = 0; row_num < rows; ++row_num) + { + for (size_t column_num = 0; column_num < columns; ++column_num) + { + if (column_num != 0) + writeChar('\t', out); + getPort().getHeader().getByPosition(column_num).type->serializeText(*block.getByPosition(column_num).column, row_num, out); + } + writeChar('\n', out); + } + + out.next(); + } +}; + + +int main(int, char **) +try +{ + auto source = std::make_shared(); + auto sink = std::make_shared(); + auto limit = std::make_shared(source->getPort().getHeader(), 100, 0); + + connect(source->getPort(), limit->getInputPort()); + connect(limit->getOutputPort(), sink->getPort()); + + SequentialPipelineExecutor executor({source, limit, sink}); + + while (true) + { + IProcessor::Status status = executor.getStatus(); + + if (status == IProcessor::Status::Finished) + break; + else if (status == IProcessor::Status::Ready) + executor.work(); + else + throw Exception("Bad status"); + } + + return 0; +} +catch (...) +{ + std::cerr << getCurrentExceptionMessage(true) << '\n'; + throw; +}