Added NumbersMtBlockInputStream.

This commit is contained in:
Nikolai Kochetov 2019-08-20 13:28:20 +03:00
parent 1cebcd3e73
commit 7f31b8bf9c
3 changed files with 74 additions and 4 deletions

View File

@ -43,8 +43,63 @@ private:
};
StorageSystemNumbers::StorageSystemNumbers(const std::string & name_, bool multithreaded_, std::optional<UInt64> limit_, UInt64 offset_)
: name(name_), multithreaded(multithreaded_), limit(limit_), offset(offset_)
struct NumbersMtState
{
std::atomic<UInt64> counter;
explicit NumbersMtState(UInt64 offset) : counter(offset) {}
};
using NumbersMtStatePtr = std::shared_ptr<NumbersMtState>;
class NumbersMtBlockInputStream : public IBlockInputStream
{
public:
NumbersMtBlockInputStream(NumbersMtStatePtr state_, UInt64 block_size_, UInt64 max_counter_)
: state(std::move(state_)), counter(state->counter), block_size(block_size_), max_counter(max_counter_) {}
String getName() const override { return "NumbersMt"; }
Block getHeader() const override
{
return { ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number") };
}
protected:
Block readImpl() override
{
if (block_size == 0)
return {};
UInt64 curr = counter.fetch_add(block_size, std::memory_order_acquire);
if (curr >= max_counter)
return {};
if (curr + block_size > max_counter)
block_size = max_counter - curr;
auto column = ColumnUInt64::create(block_size);
ColumnUInt64::Container & vec = column->getData();
UInt64 * pos = vec.data();
UInt64 * end = &vec[block_size];
while (pos < end)
*pos++ = curr++;
return { ColumnWithTypeAndName(std::move(column), std::make_shared<DataTypeUInt64>(), "number") };
}
private:
NumbersMtStatePtr state;
std::atomic<UInt64> & counter;
UInt64 block_size;
UInt64 max_counter;
};
StorageSystemNumbers::StorageSystemNumbers(const std::string & name_, bool multithreaded_, std::optional<UInt64> limit_, UInt64 offset_, bool even_distribution_)
: name(name_), multithreaded(multithreaded_), even_distribution(even_distribution_), limit(limit_), offset(offset_)
{
setColumns(ColumnsDescription({{"number", std::make_shared<DataTypeUInt64>()}}));
}
@ -69,6 +124,18 @@ BlockInputStreams StorageSystemNumbers::read(
num_streams = 1;
BlockInputStreams res(num_streams);
if (num_streams > 1 && !even_distribution && *limit)
{
auto state = std::make_shared<NumbersMtState>(offset);
UInt64 max_counter = offset + *limit;
for (size_t i = 0; i < num_streams; ++i)
res[i] = std::make_shared<NumbersMtBlockInputStream>(state, max_block_size, max_counter);
return res;
}
for (size_t i = 0; i < num_streams; ++i)
{
res[i] = std::make_shared<NumbersBlockInputStream>(max_block_size, offset + i * max_block_size, num_streams * max_block_size);

View File

@ -38,11 +38,14 @@ public:
private:
const std::string name;
bool multithreaded;
bool bool even_distribution;
std::optional<UInt64> limit;
UInt64 offset;
protected:
StorageSystemNumbers(const std::string & name_, bool multithreaded_, std::optional<UInt64> limit_ = std::nullopt, UInt64 offset_ = 0);
/// If even_distribution is true, numbers are distributed evenly between streams.
/// Otherwise, streams concurrently increment atomic.
StorageSystemNumbers(const std::string & name_, bool multithreaded_, std::optional<UInt64> limit_ = std::nullopt, UInt64 offset_ = 0, bool even_distribution_ = true);
};
}

View File

@ -30,7 +30,7 @@ StoragePtr TableFunctionNumbers<multithreaded>::executeImpl(const ASTPtr & ast_f
UInt64 offset = arguments.size() == 2 ? evaluateArgument(context, arguments[0]) : 0;
UInt64 length = arguments.size() == 2 ? evaluateArgument(context, arguments[1]) : evaluateArgument(context, arguments[0]);
auto res = StorageSystemNumbers::create(table_name, multithreaded, length, offset);
auto res = StorageSystemNumbers::create(table_name, multithreaded, length, offset, false);
res->startup();
return res;
}