From f65d8d932516b0578e57ae8809298224045c8e06 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Fri, 15 Feb 2019 19:16:52 +0300 Subject: [PATCH] Added processor tests with chain and merge sort. --- dbms/src/Processors/ConcatProcessor.cpp | 2 +- dbms/src/Processors/tests/CMakeLists.txt | 5 + .../tests/processors_test_chain.cpp | 165 +++++++++ .../tests/processors_test_merge.cpp | 329 ++++++++++++++++++ 4 files changed, 500 insertions(+), 1 deletion(-) create mode 100644 dbms/src/Processors/tests/processors_test_chain.cpp create mode 100644 dbms/src/Processors/tests/processors_test_merge.cpp diff --git a/dbms/src/Processors/ConcatProcessor.cpp b/dbms/src/Processors/ConcatProcessor.cpp index 56006841971..37efeb36f3a 100644 --- a/dbms/src/Processors/ConcatProcessor.cpp +++ b/dbms/src/Processors/ConcatProcessor.cpp @@ -26,7 +26,7 @@ ConcatProcessor::Status ConcatProcessor::prepare() return Status::PortFull; } - if (output.hasData()) + if (!output.canPush()) return Status::PortFull; /// Check can input. diff --git a/dbms/src/Processors/tests/CMakeLists.txt b/dbms/src/Processors/tests/CMakeLists.txt index 7a66dd3d9de..0c03133259a 100644 --- a/dbms/src/Processors/tests/CMakeLists.txt +++ b/dbms/src/Processors/tests/CMakeLists.txt @@ -1,2 +1,7 @@ add_executable (processors_test processors_test.cpp) +add_executable (processors_test_chain processors_test_chain.cpp) +add_executable (processors_test_merge processors_test_merge.cpp) + target_link_libraries (processors_test dbms) +target_link_libraries (processors_test_chain dbms) +target_link_libraries (processors_test_merge dbms) diff --git a/dbms/src/Processors/tests/processors_test_chain.cpp b/dbms/src/Processors/tests/processors_test_chain.cpp new file mode 100644 index 00000000000..64101df7ec6 --- /dev/null +++ b/dbms/src/Processors/tests/processors_test_chain.cpp @@ -0,0 +1,165 @@ +#include + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include + +#include +#include + + +using namespace DB; + + +class NumbersSource : public ISource +{ +public: + String getName() const override { return "Numbers"; } + + NumbersSource(UInt64 start_number, unsigned sleep_useconds) + : ISource(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared(), "number" }})), + current_number(start_number), sleep_useconds(sleep_useconds) + { + } + +private: + UInt64 current_number = 0; + unsigned sleep_useconds; + + Block generate() override + { + usleep(sleep_useconds); + + MutableColumns columns; + columns.emplace_back(ColumnUInt64::create(1, current_number)); + ++current_number; + return getPort().getHeader().cloneWithColumns(std::move(columns)); + } +}; + +class SleepyTransform : public ISimpleTransform +{ +public: + explicit SleepyTransform(unsigned sleep_useconds) + : ISimpleTransform( + Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared(), "number" }}), + Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared(), "number" }})) + , sleep_useconds(sleep_useconds) {} + + String getName() const override { return "SleepyTransform"; } + +protected: + void transform(Block &) override + { + usleep(sleep_useconds); + } + +private: + unsigned sleep_useconds; +}; + +class PrintSink : public ISink +{ +public: + String getName() const override { return "Print"; } + + explicit PrintSink(String prefix) + : ISink(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared(), "number" }})), + prefix(std::move(prefix)) {} + +private: + String prefix; + WriteBufferFromFileDescriptor out{STDOUT_FILENO}; + FormatSettings settings; + + void consume(Block block) override + { + size_t rows = block.rows(); + size_t columns = block.columns(); + + for (size_t row_num = 0; row_num < rows; ++row_num) + { + writeString(prefix, out); + for (size_t column_num = 0; column_num < columns; ++column_num) + { + if (column_num != 0) + writeChar('\t', out); + getPort().getHeader().getByPosition(column_num).type->serializeText(*block.getByPosition(column_num).column, row_num, out, settings); + } + writeChar('\n', out); + } + + out.next(); + } +}; + +template +struct measure +{ + template + static typename TimeT::rep execution(F&& func, Args&&... args) + { + auto start = std::chrono::steady_clock::now(); + std::forward(func)(std::forward(args)...); + auto duration = std::chrono::duration_cast< TimeT> + (std::chrono::steady_clock::now() - start); + return duration.count(); + } +}; + +int main(int, char **) +try +{ + auto execute_chain = [](ThreadPool * pool) + { + std::cerr << "---------------------\n"; + + auto source = std::make_shared(0, 100000); + auto transform1 = std::make_shared(100000); + auto transform2 = std::make_shared(100000); + auto transform3 = std::make_shared(100000); + auto limit = std::make_shared(source->getPort().getHeader(), 20, 0); + auto sink = std::make_shared(""); + + connect(source->getPort(), transform1->getInputPort()); + connect(transform1->getOutputPort(), transform2->getInputPort()); + connect(transform2->getOutputPort(), transform3->getInputPort()); + connect(transform3->getOutputPort(), limit->getInputPort()); + connect(limit->getOutputPort(), sink->getPort()); + + std::vector processors = {source, transform1, transform2, transform3, limit, sink}; +// WriteBufferFromOStream out(std::cout); +// printPipeline(processors, out); + + PipelineExecutor executor(processors, pool); + executor.execute(); + }; + + + ThreadPool pool(4, 4, 10); + + auto time_single = measure<>::execution(execute_chain, nullptr); + auto time_mt = measure<>::execution(execute_chain, &pool); + + std::cout << "Single Thread time: " << time_single << " ms.\n"; + std::cout << "Multiple Threads time: " << time_mt << " ms.\n"; + + return 0; +} +catch (...) +{ + std::cerr << getCurrentExceptionMessage(true) << '\n'; + throw; +} diff --git a/dbms/src/Processors/tests/processors_test_merge.cpp b/dbms/src/Processors/tests/processors_test_merge.cpp new file mode 100644 index 00000000000..9364a1bf354 --- /dev/null +++ b/dbms/src/Processors/tests/processors_test_merge.cpp @@ -0,0 +1,329 @@ +#include + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include + +#include +#include + + +using namespace DB; + + +class MergingSortedProcessor : public IProcessor +{ +public: + MergingSortedProcessor(const Block & header, size_t num_inputs) + : IProcessor(InputPorts(num_inputs, header), OutputPorts{header}) + , blocks(num_inputs), positions(num_inputs, 0), finished(num_inputs, false) + { + } + + String getName() const override { return "MergingSortedProcessor"; } + + Status prepare() override + { + auto & output = outputs[0]; + + /// Check can output. + + if (output.isFinished()) + { + for (auto & in : inputs) + in.close(); + + return Status::Finished; + } + + if (!output.isNeeded()) + { + for (auto & in : inputs) + in.setNotNeeded(); + + return Status::PortFull; + } + + if (output.hasData()) + return Status::PortFull; + + /// Push if has data. + if (res) + { + output.push(std::move(res)); + return Status::PortFull; + } + + /// Check for inputs we need. + bool all_inputs_finished = true; + bool all_inputs_has_data = true; + for (size_t i = 0; i < inputs.size(); ++i) + { + if (!finished[i]) + { + if (!inputs[i].isFinished()) + { + all_inputs_finished = false; + bool needed = positions[i] >= blocks[i].rows(); + if (needed) + { + inputs[i].setNeeded(); + if (inputs[i].hasData()) + { + blocks[i] = inputs[i].pull(); + positions[i] = 0; + } + else + all_inputs_has_data = false; + } + else + inputs[i].setNotNeeded(); + } + else + finished[i] = true; + } + } + + if (all_inputs_finished) + { + output.finish(); + return Status::Finished; + } + + if (!all_inputs_has_data) + return Status::NeedData; + + return Status::Ready; + } + + void work() override + { + using Key = std::pair; + std::priority_queue, std::greater<>> queue; + for (size_t i = 0; i < blocks.size(); ++i) + { + if (finished[i]) + continue; + + if (positions[i] >= blocks[i].rows()) + return; + + queue.push({blocks[i].getByPosition(0).column->getUInt(positions[i]), i}); + } + + auto col = ColumnUInt64::create(); + + while (!queue.empty()) + { + size_t ps = queue.top().second; + queue.pop(); + + auto & cur_col = blocks[ps].getByPosition(0).column; + col->insertFrom(*cur_col, positions[ps]); + ++positions[ps]; + + if (positions[ps] == cur_col->size()) + break; + + queue.push({cur_col->getUInt(positions[ps]), ps}); + } + + res = getOutputPort().getHeader(); + res.getByPosition(0).column = std::move(col); + } + + OutputPort & getOutputPort() { return outputs[0]; } + +private: + Blocks blocks; + Block res; + std::vector positions; + std::vector finished; +}; + + +class NumbersSource : public ISource +{ +public: + String getName() const override { return "Numbers"; } + + NumbersSource(UInt64 start_number, UInt64 step, unsigned sleep_useconds) + : ISource(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared(), "number" }})), + current_number(start_number), step(step), sleep_useconds(sleep_useconds) + { + } + +private: + UInt64 current_number = 0; + UInt64 step; + unsigned sleep_useconds; + + Block generate() override + { + usleep(sleep_useconds); + + MutableColumns columns; + columns.emplace_back(ColumnUInt64::create(1, current_number)); + current_number += step; + return getPort().getHeader().cloneWithColumns(std::move(columns)); + } +}; + +class SleepyTransform : public ISimpleTransform +{ +public: + explicit SleepyTransform(unsigned sleep_useconds) + : ISimpleTransform( + Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared(), "number" }}), + Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared(), "number" }})) + , sleep_useconds(sleep_useconds) {} + + String getName() const override { return "SleepyTransform"; } + +protected: + void transform(Block &) override + { + usleep(sleep_useconds); + } + +private: + unsigned sleep_useconds; +}; + +class PrintSink : public ISink +{ +public: + String getName() const override { return "Print"; } + + explicit PrintSink(String prefix) + : ISink(Block({ColumnWithTypeAndName{ ColumnUInt64::create(), std::make_shared(), "number" }})), + prefix(std::move(prefix)) {} + +private: + String prefix; + WriteBufferFromFileDescriptor out{STDOUT_FILENO}; + FormatSettings settings; + + void consume(Block block) override + { + size_t rows = block.rows(); + size_t columns = block.columns(); + + for (size_t row_num = 0; row_num < rows; ++row_num) + { + writeString(prefix, out); + for (size_t column_num = 0; column_num < columns; ++column_num) + { + if (column_num != 0) + writeChar('\t', out); + getPort().getHeader().getByPosition(column_num).type->serializeText(*block.getByPosition(column_num).column, row_num, out, settings); + } + writeChar('\n', out); + } + + out.next(); + } +}; + +template +struct measure +{ + template + static typename TimeT::rep execution(F&& func, Args&&... args) + { + auto start = std::chrono::steady_clock::now(); + std::forward(func)(std::forward(args)...); + auto duration = std::chrono::duration_cast< TimeT> + (std::chrono::steady_clock::now() - start); + return duration.count(); + } +}; + +int main(int, char **) +try +{ + auto execute_chain = [](String msg, size_t start1, size_t start2, size_t start3, ThreadPool * pool) + { + std::cerr << msg << "\n"; + + auto source1 = std::make_shared(start1, 3, 100000); + auto source2 = std::make_shared(start2, 3, 100000); + auto source3 = std::make_shared(start3, 3, 100000); + + auto transform1 = std::make_shared(100000); + auto transform2 = std::make_shared(100000); + auto transform3 = std::make_shared(100000); + + auto limit1 = std::make_shared(source1->getPort().getHeader(), 20, 0); + auto limit2 = std::make_shared(source2->getPort().getHeader(), 20, 0); + auto limit3 = std::make_shared(source3->getPort().getHeader(), 20, 0); + + auto merge = std::make_shared(source1->getPort().getHeader(), 3); + auto limit_fin = std::make_shared(source1->getPort().getHeader(), 54, 0); + auto sink = std::make_shared(""); + + connect(source1->getPort(), transform1->getInputPort()); + connect(source2->getPort(), transform2->getInputPort()); + connect(source3->getPort(), transform3->getInputPort()); + + connect(transform1->getOutputPort(), limit1->getInputPort()); + connect(transform2->getOutputPort(), limit2->getInputPort()); + connect(transform3->getOutputPort(), limit3->getInputPort()); + + connect(limit1->getOutputPort(), merge->getInputs()[0]); + connect(limit2->getOutputPort(), merge->getInputs()[1]); + connect(limit3->getOutputPort(), merge->getInputs()[2]); + + connect(merge->getOutputPort(), limit_fin->getInputPort()); + connect(limit_fin->getOutputPort(), sink->getPort()); + + std::vector processors = {source1, source2, source3, + transform1, transform2, transform3, + limit1, limit2, limit3, + merge, limit_fin, sink}; +// WriteBufferFromOStream out(std::cout); +// printPipeline(processors, out); + + PipelineExecutor executor(processors, pool); + executor.execute(); + }; + + ThreadPool pool(4, 4, 10); + + auto even_time_single = measure<>::execution(execute_chain, "Even distribution single thread", 0, 1, 2, nullptr); + auto even_time_mt = measure<>::execution(execute_chain, "Even distribution multiple threads", 0, 1, 2, &pool); + + auto half_time_single = measure<>::execution(execute_chain, "Half distribution single thread", 0, 31, 62, nullptr); + auto half_time_mt = measure<>::execution(execute_chain, "Half distribution multiple threads", 0, 31, 62, &pool); + + auto ordered_time_single = measure<>::execution(execute_chain, "Ordered distribution single thread", 0, 61, 122, nullptr); + auto ordered_time_mt = measure<>::execution(execute_chain, "Ordered distribution multiple threads", 0, 61, 122, &pool); + + std::cout << "Single Thread [0:60:3] [1:60:3] [2:60:3] time: " << even_time_single << " ms.\n"; + std::cout << "Multiple Threads [0:60:3] [1:60:3] [2:60:3] time:" << even_time_mt << " ms.\n"; + + std::cout << "Single Thread [0:60:3] [31:90:3] [62:120:3] time: " << half_time_single << " ms.\n"; + std::cout << "Multiple Threads [0:60:3] [31:90:3] [62:120:3] time: " << half_time_mt << " ms.\n"; + + std::cout << "Single Thread [0:60:3] [61:120:3] [122:180:3] time: " << ordered_time_single << " ms.\n"; + std::cout << "Multiple Threads [0:60:3] [61:120:3] [122:180:3] time: " << ordered_time_mt << " ms.\n"; + + return 0; +} +catch (...) +{ + std::cerr << getCurrentExceptionMessage(true) << '\n'; + throw; +}