2017-04-01 09:19:00 +00:00
|
|
|
#include <Common/Exception.h>
|
|
|
|
#include <Columns/ColumnsNumber.h>
|
|
|
|
#include <DataTypes/DataTypesNumber.h>
|
2019-01-23 14:48:50 +00:00
|
|
|
#include <DataStreams/IBlockInputStream.h>
|
2017-06-10 09:04:31 +00:00
|
|
|
#include <DataStreams/LimitBlockInputStream.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Storages/System/StorageSystemNumbers.h>
|
2010-03-04 19:20:28 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2019-01-23 14:48:50 +00:00
|
|
|
class NumbersBlockInputStream : public IBlockInputStream
|
2010-03-04 19:20:28 +00:00
|
|
|
{
|
2014-08-22 17:26:43 +00:00
|
|
|
public:
|
2017-04-01 07:20:54 +00:00
|
|
|
NumbersBlockInputStream(size_t block_size_, size_t offset_, size_t step_)
|
|
|
|
: block_size(block_size_), next(offset_), step(step_) {}
|
2014-08-22 17:26:43 +00:00
|
|
|
|
2018-01-06 18:10:44 +00:00
|
|
|
String getName() const override { return "Numbers"; }
|
|
|
|
|
2018-02-18 03:23:48 +00:00
|
|
|
Block getHeader() const override
|
2018-01-06 18:10:44 +00:00
|
|
|
{
|
2018-01-09 00:19:58 +00:00
|
|
|
return { ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number") };
|
2018-01-06 18:10:44 +00:00
|
|
|
}
|
2014-08-22 17:26:43 +00:00
|
|
|
|
|
|
|
protected:
|
2018-01-06 18:10:44 +00:00
|
|
|
Block readImpl() override
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2017-12-14 01:43:19 +00:00
|
|
|
auto column = ColumnUInt64::create(block_size);
|
2017-12-15 21:32:25 +00:00
|
|
|
ColumnUInt64::Container & vec = column->getData();
|
2014-08-22 17:26:43 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
size_t curr = next; /// The local variable for some reason works faster (>20%) than member of class.
|
2018-09-02 03:00:04 +00:00
|
|
|
UInt64 * pos = vec.data(); /// This also accelerates the code.
|
2017-04-01 07:20:54 +00:00
|
|
|
UInt64 * end = &vec[block_size];
|
|
|
|
while (pos < end)
|
|
|
|
*pos++ = curr++;
|
2014-08-22 17:26:43 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
next += step;
|
2017-12-16 00:49:03 +00:00
|
|
|
return { ColumnWithTypeAndName(std::move(column), std::make_shared<DataTypeUInt64>(), "number") };
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2014-08-22 17:26:43 +00:00
|
|
|
private:
|
2017-04-01 07:20:54 +00:00
|
|
|
size_t block_size;
|
|
|
|
UInt64 next;
|
|
|
|
UInt64 step;
|
2014-08-22 17:26:43 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
|
2019-02-05 17:05:33 +00:00
|
|
|
StorageSystemNumbers::StorageSystemNumbers(const std::string & name_, bool multithreaded_, std::optional<size_t> limit_, size_t offset_)
|
2018-06-20 08:09:09 +00:00
|
|
|
: name(name_), multithreaded(multithreaded_), limit(limit_), offset(offset_)
|
2010-03-04 19:20:28 +00:00
|
|
|
{
|
2018-03-13 14:18:11 +00:00
|
|
|
setColumns(ColumnsDescription({{"number", std::make_shared<DataTypeUInt64>()}}));
|
2011-08-15 02:24:44 +00:00
|
|
|
}
|
2010-03-04 19:20:28 +00:00
|
|
|
|
|
|
|
|
2012-01-09 19:20:48 +00:00
|
|
|
BlockInputStreams StorageSystemNumbers::read(
|
2017-04-01 07:20:54 +00:00
|
|
|
const Names & column_names,
|
2017-12-02 02:47:12 +00:00
|
|
|
const SelectQueryInfo &,
|
2018-09-08 11:29:23 +00:00
|
|
|
const Context & /*context*/,
|
|
|
|
QueryProcessingStage::Enum /*processed_stage*/,
|
2017-06-10 09:04:31 +00:00
|
|
|
size_t max_block_size,
|
2017-06-02 15:54:39 +00:00
|
|
|
unsigned num_streams)
|
2011-08-15 02:24:44 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
check(column_names);
|
2014-08-22 17:26:43 +00:00
|
|
|
|
2019-02-05 17:05:33 +00:00
|
|
|
if (limit && *limit < max_block_size)
|
2017-06-10 09:04:31 +00:00
|
|
|
{
|
2019-02-05 17:05:33 +00:00
|
|
|
max_block_size = std::min(max_block_size, *limit);
|
2017-06-10 09:04:31 +00:00
|
|
|
multithreaded = false;
|
|
|
|
}
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (!multithreaded)
|
2017-06-02 15:54:39 +00:00
|
|
|
num_streams = 1;
|
2014-08-22 17:26:43 +00:00
|
|
|
|
2017-06-02 15:54:39 +00:00
|
|
|
BlockInputStreams res(num_streams);
|
|
|
|
for (size_t i = 0; i < num_streams; ++i)
|
2017-06-10 09:04:31 +00:00
|
|
|
{
|
2018-06-20 08:09:09 +00:00
|
|
|
res[i] = std::make_shared<NumbersBlockInputStream>(max_block_size, offset + i * max_block_size, num_streams * max_block_size);
|
2014-08-22 17:26:43 +00:00
|
|
|
|
2017-06-10 09:04:31 +00:00
|
|
|
if (limit) /// This formula is how to split 'limit' elements to 'num_streams' chunks almost uniformly.
|
2019-02-05 17:05:33 +00:00
|
|
|
res[i] = std::make_shared<LimitBlockInputStream>(res[i], *limit * (i + 1) / num_streams - *limit * i / num_streams, 0);
|
2017-06-10 09:04:31 +00:00
|
|
|
}
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
return res;
|
2010-03-04 19:20:28 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|