From 7f31b8bf9c9cdba81abaaaebe91a510fae6a251c Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 20 Aug 2019 13:28:20 +0300 Subject: [PATCH] Added NumbersMtBlockInputStream. --- .../Storages/System/StorageSystemNumbers.cpp | 71 ++++++++++++++++++- .../Storages/System/StorageSystemNumbers.h | 5 +- .../TableFunctions/TableFunctionNumbers.cpp | 2 +- 3 files changed, 74 insertions(+), 4 deletions(-) diff --git a/dbms/src/Storages/System/StorageSystemNumbers.cpp b/dbms/src/Storages/System/StorageSystemNumbers.cpp index c909a338453..b08e074af60 100644 --- a/dbms/src/Storages/System/StorageSystemNumbers.cpp +++ b/dbms/src/Storages/System/StorageSystemNumbers.cpp @@ -43,8 +43,63 @@ private: }; -StorageSystemNumbers::StorageSystemNumbers(const std::string & name_, bool multithreaded_, std::optional limit_, UInt64 offset_) - : name(name_), multithreaded(multithreaded_), limit(limit_), offset(offset_) +struct NumbersMtState +{ + std::atomic counter; + explicit NumbersMtState(UInt64 offset) : counter(offset) {} +}; + +using NumbersMtStatePtr = std::shared_ptr; + +class NumbersMtBlockInputStream : public IBlockInputStream +{ +public: + NumbersMtBlockInputStream(NumbersMtStatePtr state_, UInt64 block_size_, UInt64 max_counter_) + : state(std::move(state_)), counter(state->counter), block_size(block_size_), max_counter(max_counter_) {} + + String getName() const override { return "NumbersMt"; } + + Block getHeader() const override + { + return { ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared(), "number") }; + } + +protected: + Block readImpl() override + { + if (block_size == 0) + return {}; + + UInt64 curr = counter.fetch_add(block_size, std::memory_order_acquire); + + if (curr >= max_counter) + return {}; + + if (curr + block_size > max_counter) + block_size = max_counter - curr; + + auto column = ColumnUInt64::create(block_size); + ColumnUInt64::Container & vec = column->getData(); + + UInt64 * pos = vec.data(); + UInt64 * end = &vec[block_size]; + while (pos < end) + *pos++ = curr++; + + return { ColumnWithTypeAndName(std::move(column), std::make_shared(), "number") }; + } + +private: + NumbersMtStatePtr state; + std::atomic & counter; + + UInt64 block_size; + UInt64 max_counter; +}; + + +StorageSystemNumbers::StorageSystemNumbers(const std::string & name_, bool multithreaded_, std::optional limit_, UInt64 offset_, bool even_distribution_) + : name(name_), multithreaded(multithreaded_), even_distribution(even_distribution_), limit(limit_), offset(offset_) { setColumns(ColumnsDescription({{"number", std::make_shared()}})); } @@ -69,6 +124,18 @@ BlockInputStreams StorageSystemNumbers::read( num_streams = 1; BlockInputStreams res(num_streams); + + if (num_streams > 1 && !even_distribution && *limit) + { + auto state = std::make_shared(offset); + UInt64 max_counter = offset + *limit; + + for (size_t i = 0; i < num_streams; ++i) + res[i] = std::make_shared(state, max_block_size, max_counter); + + return res; + } + for (size_t i = 0; i < num_streams; ++i) { res[i] = std::make_shared(max_block_size, offset + i * max_block_size, num_streams * max_block_size); diff --git a/dbms/src/Storages/System/StorageSystemNumbers.h b/dbms/src/Storages/System/StorageSystemNumbers.h index 452ec5a9ef5..6ecc6785320 100644 --- a/dbms/src/Storages/System/StorageSystemNumbers.h +++ b/dbms/src/Storages/System/StorageSystemNumbers.h @@ -38,11 +38,14 @@ public: private: const std::string name; bool multithreaded; + bool bool even_distribution; std::optional limit; UInt64 offset; protected: - StorageSystemNumbers(const std::string & name_, bool multithreaded_, std::optional limit_ = std::nullopt, UInt64 offset_ = 0); + /// If even_distribution is true, numbers are distributed evenly between streams. + /// Otherwise, streams concurrently increment atomic. + StorageSystemNumbers(const std::string & name_, bool multithreaded_, std::optional limit_ = std::nullopt, UInt64 offset_ = 0, bool even_distribution_ = true); }; } diff --git a/dbms/src/TableFunctions/TableFunctionNumbers.cpp b/dbms/src/TableFunctions/TableFunctionNumbers.cpp index 771ff8d5019..14947a40812 100644 --- a/dbms/src/TableFunctions/TableFunctionNumbers.cpp +++ b/dbms/src/TableFunctions/TableFunctionNumbers.cpp @@ -30,7 +30,7 @@ StoragePtr TableFunctionNumbers::executeImpl(const ASTPtr & ast_f UInt64 offset = arguments.size() == 2 ? evaluateArgument(context, arguments[0]) : 0; UInt64 length = arguments.size() == 2 ? evaluateArgument(context, arguments[1]) : evaluateArgument(context, arguments[0]); - auto res = StorageSystemNumbers::create(table_name, multithreaded, length, offset); + auto res = StorageSystemNumbers::create(table_name, multithreaded, length, offset, false); res->startup(); return res; }