diff --git a/dbms/src/Storages/System/StorageSystemNumbers.cpp b/dbms/src/Storages/System/StorageSystemNumbers.cpp index 78e25e0c375..2afe2a7c018 100644 --- a/dbms/src/Storages/System/StorageSystemNumbers.cpp +++ b/dbms/src/Storages/System/StorageSystemNumbers.cpp @@ -5,9 +5,13 @@ #include #include + namespace DB { +namespace +{ + class NumbersBlockInputStream : public IBlockInputStream { public: @@ -43,19 +47,19 @@ private: }; -struct NumbersMtState +struct NumbersMultiThreadedState { std::atomic counter; - explicit NumbersMtState(UInt64 offset) : counter(offset) {} + explicit NumbersMultiThreadedState(UInt64 offset) : counter(offset) {} }; -using NumbersMtStatePtr = std::shared_ptr; +using NumbersMultiThreadedStatePtr = std::shared_ptr; class NumbersMultiThreadedBlockInputStream : public IBlockInputStream { public: - NumbersMultiThreadedBlockInputStream(NumbersMtStatePtr state_, UInt64 block_size_, UInt64 max_counter_) - : state(std::move(state_)), counter(state->counter), block_size(block_size_), max_counter(max_counter_) {} + NumbersMultiThreadedBlockInputStream(NumbersMultiThreadedStatePtr state_, UInt64 block_size_, UInt64 max_counter_) + : state(std::move(state_)), block_size(block_size_), max_counter(max_counter_) {} String getName() const override { return "NumbersMt"; } @@ -70,7 +74,7 @@ protected: if (block_size == 0) return {}; - UInt64 curr = counter.fetch_add(block_size, std::memory_order_acquire); + UInt64 curr = state->counter.fetch_add(block_size, std::memory_order_acquire); if (curr >= max_counter) return {}; @@ -90,13 +94,14 @@ protected: } private: - NumbersMtStatePtr state; - std::atomic & counter; + NumbersMultiThreadedStatePtr state; UInt64 block_size; UInt64 max_counter; }; +} + StorageSystemNumbers::StorageSystemNumbers(const std::string & name_, bool multithreaded_, std::optional limit_, UInt64 offset_, bool even_distribution_) : name(name_), multithreaded(multithreaded_), even_distribution(even_distribution_), limit(limit_), offset(offset_) @@ -127,7 +132,7 @@ BlockInputStreams StorageSystemNumbers::read( if (num_streams > 1 && !even_distribution && *limit) { - auto state = std::make_shared(offset); + auto state = std::make_shared(offset); UInt64 max_counter = offset + *limit; for (size_t i = 0; i < num_streams; ++i)