Better code

This commit is contained in:
Alexey Milovidov 2019-08-20 22:53:27 +03:00
parent db55780d21
commit c1de51dc5b

View File

@ -5,9 +5,13 @@
#include <DataStreams/LimitBlockInputStream.h>
#include <Storages/System/StorageSystemNumbers.h>
namespace DB
{
namespace
{
class NumbersBlockInputStream : public IBlockInputStream
{
public:
@ -43,19 +47,19 @@ private:
};
struct NumbersMtState
struct NumbersMultiThreadedState
{
std::atomic<UInt64> counter;
explicit NumbersMtState(UInt64 offset) : counter(offset) {}
explicit NumbersMultiThreadedState(UInt64 offset) : counter(offset) {}
};
using NumbersMtStatePtr = std::shared_ptr<NumbersMtState>;
using NumbersMultiThreadedStatePtr = std::shared_ptr<NumbersMultiThreadedState>;
class NumbersMultiThreadedBlockInputStream : public IBlockInputStream
{
public:
NumbersMultiThreadedBlockInputStream(NumbersMtStatePtr state_, UInt64 block_size_, UInt64 max_counter_)
: state(std::move(state_)), counter(state->counter), block_size(block_size_), max_counter(max_counter_) {}
NumbersMultiThreadedBlockInputStream(NumbersMultiThreadedStatePtr state_, UInt64 block_size_, UInt64 max_counter_)
: state(std::move(state_)), block_size(block_size_), max_counter(max_counter_) {}
String getName() const override { return "NumbersMt"; }
@ -70,7 +74,7 @@ protected:
if (block_size == 0)
return {};
UInt64 curr = counter.fetch_add(block_size, std::memory_order_acquire);
UInt64 curr = state->counter.fetch_add(block_size, std::memory_order_acquire);
if (curr >= max_counter)
return {};
@ -90,13 +94,14 @@ protected:
}
private:
NumbersMtStatePtr state;
std::atomic<UInt64> & counter;
NumbersMultiThreadedStatePtr state;
UInt64 block_size;
UInt64 max_counter;
};
}
StorageSystemNumbers::StorageSystemNumbers(const std::string & name_, bool multithreaded_, std::optional<UInt64> limit_, UInt64 offset_, bool even_distribution_)
: name(name_), multithreaded(multithreaded_), even_distribution(even_distribution_), limit(limit_), offset(offset_)
@ -127,7 +132,7 @@ BlockInputStreams StorageSystemNumbers::read(
if (num_streams > 1 && !even_distribution && *limit)
{
auto state = std::make_shared<NumbersMtState>(offset);
auto state = std::make_shared<NumbersMultiThreadedState>(offset);
UInt64 max_counter = offset + *limit;
for (size_t i = 0; i < num_streams; ++i)