mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 08:32:02 +00:00
Add feature with the right author name
This commit is contained in:
parent
3f0cfbd8c0
commit
623b425745
@ -28,32 +28,37 @@ namespace
|
|||||||
class NumbersSource : public ISource
|
class NumbersSource : public ISource
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
NumbersSource(UInt64 block_size_, UInt64 offset_, UInt64 step_, const std::string& column_name, UInt64 inner_step_)
|
NumbersSource(UInt64 block_size_, UInt64 offset_, UInt64 step_, const std::string & column_name, UInt64 inner_step_)
|
||||||
: ISource(createHeader(column_name)), block_size(block_size_), next(offset_), step(step_), inner_step(inner_step_), inner_remainder(offset_ % inner_step_)
|
: ISource(createHeader(column_name))
|
||||||
|
, block_size(block_size_)
|
||||||
|
, next(offset_)
|
||||||
|
, step(step_)
|
||||||
|
, inner_step(inner_step_)
|
||||||
|
, inner_remainder(offset_ % inner_step_)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
String getName() const override { return "Numbers"; }
|
String getName() const override { return "Numbers"; }
|
||||||
|
|
||||||
|
static Block createHeader(const std::string & column_name)
|
||||||
static Block createHeader(const std::string& column_name) { return {ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), column_name)}; }
|
{
|
||||||
|
return {ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), column_name)};
|
||||||
|
}
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
Chunk generate() override
|
Chunk generate() override
|
||||||
{
|
{
|
||||||
|
|
||||||
UInt64 curr = next; /// The local variable for some reason works faster (>20%) than member of class.
|
UInt64 curr = next; /// The local variable for some reason works faster (>20%) than member of class.
|
||||||
UInt64 first_element = (curr / inner_step) * inner_step + inner_remainder;
|
UInt64 first_element = (curr / inner_step) * inner_step + inner_remainder;
|
||||||
if (first_element < curr) {
|
if (first_element < curr)
|
||||||
first_element += inner_step;
|
first_element += inner_step;
|
||||||
}
|
|
||||||
UInt64 filtered_block_size = 0;
|
UInt64 filtered_block_size = 0;
|
||||||
if (first_element - curr >= block_size) {
|
if (first_element - curr >= block_size)
|
||||||
|
{
|
||||||
auto column = ColumnUInt64::create(0);
|
auto column = ColumnUInt64::create(0);
|
||||||
return {Columns{std::move(column)}, filtered_block_size};
|
return {Columns{std::move(column)}, filtered_block_size};
|
||||||
}
|
}
|
||||||
if (first_element - curr < block_size) {
|
if (first_element - curr < block_size)
|
||||||
filtered_block_size = (block_size - (first_element - curr) - 1) / inner_step + 1;
|
filtered_block_size = (block_size - (first_element - curr) - 1) / inner_step + 1;
|
||||||
}
|
|
||||||
|
|
||||||
auto column = ColumnUInt64::create(filtered_block_size);
|
auto column = ColumnUInt64::create(filtered_block_size);
|
||||||
ColumnUInt64::Container & vec = column->getData();
|
ColumnUInt64::Container & vec = column->getData();
|
||||||
@ -76,32 +81,37 @@ private:
|
|||||||
UInt64 inner_remainder;
|
UInt64 inner_remainder;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct RangeWithStep {
|
struct RangeWithStep
|
||||||
|
{
|
||||||
Range range;
|
Range range;
|
||||||
UInt64 step;
|
UInt64 step;
|
||||||
};
|
};
|
||||||
|
|
||||||
using RangesWithStep = std::vector<RangeWithStep>;
|
using RangesWithStep = std::vector<RangeWithStep>;
|
||||||
|
|
||||||
std::optional<RangeWithStep> stepped_range_from_range(const Range& r, UInt64 step, UInt64 remainder) {
|
std::optional<RangeWithStep> stepped_range_from_range(const Range & r, UInt64 step, UInt64 remainder)
|
||||||
UInt64 begin = (r.left.get<UInt64>() / step) * step;
|
{
|
||||||
if (begin > std::numeric_limits<UInt64>::max() - remainder) {
|
if ((r.right.get<UInt64>() == 0) && (!r.right_included))
|
||||||
|
return std::nullopt;
|
||||||
|
UInt64 begin = (r.left.get<UInt64>() / step) * step;
|
||||||
|
if (begin > std::numeric_limits<UInt64>::max() - remainder)
|
||||||
return std::nullopt;
|
return std::nullopt;
|
||||||
}
|
|
||||||
begin += remainder;
|
begin += remainder;
|
||||||
while (begin <= r.left.get<UInt128>() - r.left_included) {
|
|
||||||
if (std::numeric_limits<UInt64>::max() - step < begin) {
|
// LOG_DEBUG(&Poco::Logger::get("stepped_range_from_range"), "Begin: {}", begin);
|
||||||
|
// LOG_DEBUG(&Poco::Logger::get("stepped_range_from_range"), "Begin: {}", begin);
|
||||||
|
while ((r.left_included <= r.left.get<UInt64>()) && (begin <= r.left.get<UInt64>() - r.left_included))
|
||||||
|
{
|
||||||
|
if (std::numeric_limits<UInt64>::max() - step < begin)
|
||||||
return std::nullopt;
|
return std::nullopt;
|
||||||
}
|
|
||||||
begin += step;
|
begin += step;
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_DEBUG(&Poco::Logger::get("stepped_range_from_range"), "Begin: {}", begin);
|
// LOG_DEBUG(&Poco::Logger::get("stepped_range_from_range"), "Begin: {}", begin);
|
||||||
UInt128 right_edge = (r.right.get<UInt128>() + r.right_included);
|
if ((begin >= r.right_included) && (begin - r.right_included >= r.right.get<UInt64>()))
|
||||||
if (begin >= right_edge) {
|
|
||||||
return std::nullopt;
|
return std::nullopt;
|
||||||
}
|
UInt64 right_edge_included = r.right.get<UInt64>() - (1 - r.right_included);
|
||||||
return std::optional{RangeWithStep{Range(begin, true, static_cast<UInt64>(right_edge - 1), true), step}};
|
return std::optional{RangeWithStep{Range(begin, true, right_edge_included, true), step}};
|
||||||
}
|
}
|
||||||
|
|
||||||
[[maybe_unused]] UInt128 sizeOfRange(const RangeWithStep & r)
|
[[maybe_unused]] UInt128 sizeOfRange(const RangeWithStep & r)
|
||||||
@ -144,8 +154,17 @@ public:
|
|||||||
|
|
||||||
using RangesStatePtr = std::shared_ptr<RangesState>;
|
using RangesStatePtr = std::shared_ptr<RangesState>;
|
||||||
|
|
||||||
[[maybe_unused]] NumbersRangedSource(const RangesWithStep & ranges_, RangesStatePtr & ranges_state_, UInt64 base_block_size_, const std::string& column_name)
|
[[maybe_unused]] NumbersRangedSource(
|
||||||
: ISource(NumbersSource::createHeader(column_name)), ranges(ranges_), ranges_state(ranges_state_), base_block_size(base_block_size_)
|
const RangesWithStep & ranges_,
|
||||||
|
RangesStatePtr & ranges_state_,
|
||||||
|
UInt64 base_block_size_,
|
||||||
|
UInt64 step_,
|
||||||
|
const std::string & column_name)
|
||||||
|
: ISource(NumbersSource::createHeader(column_name))
|
||||||
|
, ranges(ranges_)
|
||||||
|
, ranges_state(ranges_state_)
|
||||||
|
, base_block_size(base_block_size_)
|
||||||
|
, step(step_)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -158,6 +177,7 @@ protected:
|
|||||||
{
|
{
|
||||||
std::lock_guard lock(ranges_state->mutex);
|
std::lock_guard lock(ranges_state->mutex);
|
||||||
|
|
||||||
|
|
||||||
UInt64 need = base_block_size_;
|
UInt64 need = base_block_size_;
|
||||||
UInt64 size = 0; /// how many item found.
|
UInt64 size = 0; /// how many item found.
|
||||||
|
|
||||||
@ -196,6 +216,10 @@ protected:
|
|||||||
}
|
}
|
||||||
|
|
||||||
ranges_state->pos = end;
|
ranges_state->pos = end;
|
||||||
|
|
||||||
|
LOG_DEBUG(&Poco::Logger::get("Range borders"), "Begin: {} {}", start.offset_in_ranges, static_cast<size_t>(start.offset_in_range));
|
||||||
|
LOG_DEBUG(&Poco::Logger::get("Range borders"), "End: {} {}", end.offset_in_ranges, static_cast<size_t>(end.offset_in_range));
|
||||||
|
|
||||||
return size;
|
return size;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -234,12 +258,19 @@ protected:
|
|||||||
? end.offset_in_range - cursor.offset_in_range
|
? end.offset_in_range - cursor.offset_in_range
|
||||||
: static_cast<UInt128>(last_value(range) - first_value(range)) / range.step + 1 - cursor.offset_in_range;
|
: static_cast<UInt128>(last_value(range) - first_value(range)) / range.step + 1 - cursor.offset_in_range;
|
||||||
|
|
||||||
|
LOG_DEBUG(
|
||||||
|
&Poco::Logger::get("Generate"),
|
||||||
|
"Can Provide: {}, Block size: {}",
|
||||||
|
static_cast<UInt64>(can_provide),
|
||||||
|
static_cast<UInt64>(block_size));
|
||||||
|
|
||||||
/// set value to block
|
/// set value to block
|
||||||
auto set_value = [&pos, this](UInt128 & start_value, UInt128 & end_value)
|
auto set_value = [&pos, this](UInt128 & start_value, UInt128 & end_value)
|
||||||
{
|
{
|
||||||
if (end_value > std::numeric_limits<UInt64>::max())
|
if (end_value > std::numeric_limits<UInt64>::max())
|
||||||
{
|
{
|
||||||
while (start_value < end_value) {
|
while (start_value < end_value)
|
||||||
|
{
|
||||||
*(pos++) = start_value;
|
*(pos++) = start_value;
|
||||||
start_value += this->step;
|
start_value += this->step;
|
||||||
}
|
}
|
||||||
@ -248,7 +279,9 @@ protected:
|
|||||||
{
|
{
|
||||||
auto start_value_64 = static_cast<UInt64>(start_value);
|
auto start_value_64 = static_cast<UInt64>(start_value);
|
||||||
auto end_value_64 = static_cast<UInt64>(end_value);
|
auto end_value_64 = static_cast<UInt64>(end_value);
|
||||||
auto size = end_value_64 - start_value_64;
|
auto size = (end_value_64 - start_value_64) / this->step;
|
||||||
|
LOG_DEBUG(
|
||||||
|
&Poco::Logger::get("Iota"), "Size: {}, Step: {}, Start: {}", static_cast<size_t>(size), this->step, start_value_64);
|
||||||
iota_with_step(pos, static_cast<size_t>(size), start_value_64, step);
|
iota_with_step(pos, static_cast<size_t>(size), start_value_64, step);
|
||||||
pos += size;
|
pos += size;
|
||||||
}
|
}
|
||||||
@ -374,7 +407,7 @@ ReadFromSystemNumbersStep::ReadFromSystemNumbersStep(
|
|||||||
, key_expression{KeyDescription::parse(column_names[0], storage_snapshot->metadata->columns, context).expression}
|
, key_expression{KeyDescription::parse(column_names[0], storage_snapshot->metadata->columns, context).expression}
|
||||||
, max_block_size{max_block_size_}
|
, max_block_size{max_block_size_}
|
||||||
, num_streams{num_streams_}
|
, num_streams{num_streams_}
|
||||||
, limit_length_and_offset(InterpreterSelectQuery::getLimitLengthAndOffset(query_info.query->as<ASTSelectQuery&>(), context))
|
, limit_length_and_offset(InterpreterSelectQuery::getLimitLengthAndOffset(query_info.query->as<ASTSelectQuery &>(), context))
|
||||||
, should_pushdown_limit(shouldPushdownLimit(query_info, limit_length_and_offset.first))
|
, should_pushdown_limit(shouldPushdownLimit(query_info, limit_length_and_offset.first))
|
||||||
, limit(query_info.limit)
|
, limit(query_info.limit)
|
||||||
, storage_limits(query_info.storage_limits)
|
, storage_limits(query_info.storage_limits)
|
||||||
@ -410,14 +443,28 @@ Pipe ReadFromSystemNumbersStep::makePipe()
|
|||||||
{
|
{
|
||||||
auto & numbers_storage = storage->as<StorageSystemNumbers &>();
|
auto & numbers_storage = storage->as<StorageSystemNumbers &>();
|
||||||
|
|
||||||
|
LOG_DEBUG(
|
||||||
|
&Poco::Logger::get("Parameters"),
|
||||||
|
"Parameters: Limit: {}, Offset: {} Step: {}",
|
||||||
|
numbers_storage.limit.value(),
|
||||||
|
numbers_storage.offset,
|
||||||
|
numbers_storage.step);
|
||||||
|
|
||||||
if (!numbers_storage.multithreaded)
|
if (!numbers_storage.multithreaded)
|
||||||
num_streams = 1;
|
num_streams = 1;
|
||||||
|
|
||||||
|
Pipe pipe;
|
||||||
|
Ranges ranges;
|
||||||
|
|
||||||
|
if (numbers_storage.limit.has_value() && (numbers_storage.limit.value() == 0))
|
||||||
|
{
|
||||||
|
pipe.addSource(std::make_shared<NullSource>(NumbersSource::createHeader(numbers_storage.column_name)));
|
||||||
|
return pipe;
|
||||||
|
}
|
||||||
|
|
||||||
/// Build rpn of query filters
|
/// Build rpn of query filters
|
||||||
KeyCondition condition(buildFilterDAG(), context, column_names, key_expression);
|
KeyCondition condition(buildFilterDAG(), context, column_names, key_expression);
|
||||||
|
|
||||||
Pipe pipe;
|
|
||||||
Ranges ranges;
|
|
||||||
|
|
||||||
if (condition.extractPlainRanges(ranges))
|
if (condition.extractPlainRanges(ranges))
|
||||||
{
|
{
|
||||||
@ -430,14 +477,15 @@ Pipe ReadFromSystemNumbersStep::makePipe()
|
|||||||
{
|
{
|
||||||
if (std::numeric_limits<UInt64>::max() - numbers_storage.offset >= *(numbers_storage.limit))
|
if (std::numeric_limits<UInt64>::max() - numbers_storage.offset >= *(numbers_storage.limit))
|
||||||
{
|
{
|
||||||
table_range.emplace(FieldRef(numbers_storage.offset), true, FieldRef(numbers_storage.offset + *(numbers_storage.limit)), false);
|
table_range.emplace(
|
||||||
|
FieldRef(numbers_storage.offset), true, FieldRef(numbers_storage.offset + *(numbers_storage.limit)), false);
|
||||||
}
|
}
|
||||||
/// UInt64 overflow, for example: SELECT number FROM numbers(18446744073709551614, 5)
|
/// UInt64 overflow, for example: SELECT number FROM numbers(18446744073709551614, 5)
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
table_range.emplace(FieldRef(numbers_storage.offset), true, std::numeric_limits<UInt64>::max(), true);
|
table_range.emplace(FieldRef(numbers_storage.offset), true, std::numeric_limits<UInt64>::max(), true);
|
||||||
auto overflow_end = UInt128(numbers_storage.offset) + UInt128(*numbers_storage.limit);
|
auto overflow_end = UInt128(numbers_storage.offset) + UInt128(*numbers_storage.limit);
|
||||||
overflowed_table_range.emplace(
|
overflowed_table_range.emplace(
|
||||||
FieldRef(UInt64(0)), true, FieldRef(UInt64(overflow_end - std::numeric_limits<UInt64>::max() - 1)), false);
|
FieldRef(UInt64(0)), true, FieldRef(UInt64(overflow_end - std::numeric_limits<UInt64>::max() - 1)), false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -451,34 +499,59 @@ Pipe ReadFromSystemNumbersStep::makePipe()
|
|||||||
for (auto & r : ranges)
|
for (auto & r : ranges)
|
||||||
{
|
{
|
||||||
auto intersected_range = table_range->intersectWith(r);
|
auto intersected_range = table_range->intersectWith(r);
|
||||||
if (intersected_range.has_value()) {
|
if (intersected_range.has_value())
|
||||||
auto range_with_step = stepped_range_from_range(intersected_range.value(), numbers_storage.step, numbers_storage.offset % numbers_storage.step);
|
{
|
||||||
if (range_with_step.has_value()) {
|
LOG_DEBUG(
|
||||||
|
&Poco::Logger::get("Ranges"),
|
||||||
|
"Ranges: {} {} {} {}",
|
||||||
|
intersected_range->left.get<UInt64>(),
|
||||||
|
intersected_range->right.get<UInt64>(),
|
||||||
|
intersected_range->left_included,
|
||||||
|
intersected_range->right_included);
|
||||||
|
auto range_with_step = stepped_range_from_range(
|
||||||
|
intersected_range.value(), numbers_storage.step, numbers_storage.offset % numbers_storage.step);
|
||||||
|
if (range_with_step.has_value())
|
||||||
|
{
|
||||||
|
LOG_DEBUG(
|
||||||
|
&Poco::Logger::get("Ranges With Step"),
|
||||||
|
"Ranges: {} {} {} {} {}",
|
||||||
|
range_with_step->range.left.get<UInt64>(),
|
||||||
|
range_with_step->range.right.get<UInt64>(),
|
||||||
|
range_with_step->range.left_included,
|
||||||
|
range_with_step->range.right_included,
|
||||||
|
range_with_step->step);
|
||||||
intersected_ranges.push_back(*range_with_step);
|
intersected_ranges.push_back(*range_with_step);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
for (const auto& range : intersected_ranges) {
|
|
||||||
LOG_DEBUG(&Poco::Logger::get("Ranges"), "Left: {}; Right {}, LI: {}, RI: {}, Step: {}", range.range.left.get<UInt64>(), range.range.right.get<UInt64>(), range.range.left_included, range.range.right_included, range.step);
|
|
||||||
// std::cout <<
|
|
||||||
}
|
|
||||||
/// intersection with overflowed_table_range goes back.
|
/// intersection with overflowed_table_range goes back.
|
||||||
if (overflowed_table_range.has_value())
|
if (overflowed_table_range.has_value())
|
||||||
{
|
{
|
||||||
for (auto & r : ranges)
|
for (auto & r : ranges)
|
||||||
{
|
{
|
||||||
auto intersected_range = overflowed_table_range->intersectWith(r);
|
auto intersected_range = overflowed_table_range->intersectWith(r);
|
||||||
if (intersected_range) {
|
if (intersected_range)
|
||||||
auto range_with_step = stepped_range_from_range(intersected_range.value(), numbers_storage.step, static_cast<UInt64>((static_cast<UInt128>(numbers_storage.offset) + std::numeric_limits<UInt64>::max() + 1) % numbers_storage.step));
|
{
|
||||||
if (range_with_step) {
|
auto range_with_step = stepped_range_from_range(
|
||||||
|
intersected_range.value(),
|
||||||
|
numbers_storage.step,
|
||||||
|
static_cast<UInt64>(
|
||||||
|
(static_cast<UInt128>(numbers_storage.offset) + std::numeric_limits<UInt64>::max() + 1)
|
||||||
|
% numbers_storage.step));
|
||||||
|
if (range_with_step)
|
||||||
intersected_ranges.push_back(*range_with_step);
|
intersected_ranges.push_back(*range_with_step);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// for (const auto& range : intersected_ranges)
|
||||||
|
// {
|
||||||
|
// LOG_DEBUG(&Poco::Logger::get("Ranges with step"), "Left: {}; Right {}, LI: {}, RI: {}, Step: {}", range.range.left.get<UInt64>(), range.range.right.get<UInt64>(), range.range.left_included, range.range.right_included, range.step);
|
||||||
|
// // std::cout <<
|
||||||
|
// }
|
||||||
|
|
||||||
/// ranges is blank, return a source who has no data
|
/// ranges is blank, return a source who has no data
|
||||||
if (intersected_ranges.empty())
|
if (intersected_ranges.empty())
|
||||||
{
|
{
|
||||||
@ -492,6 +565,7 @@ Pipe ReadFromSystemNumbersStep::makePipe()
|
|||||||
if (!intersected_ranges.rbegin()->range.right.isPositiveInfinity() || should_pushdown_limit)
|
if (!intersected_ranges.rbegin()->range.right.isPositiveInfinity() || should_pushdown_limit)
|
||||||
{
|
{
|
||||||
UInt128 total_size = sizeOfRanges(intersected_ranges);
|
UInt128 total_size = sizeOfRanges(intersected_ranges);
|
||||||
|
LOG_DEBUG(&Poco::Logger::get("Total_Size"), "Total Size: {}", static_cast<UInt64>(total_size));
|
||||||
UInt128 query_limit = limit_length + limit_offset;
|
UInt128 query_limit = limit_length + limit_offset;
|
||||||
|
|
||||||
/// limit total_size by query_limit
|
/// limit total_size by query_limit
|
||||||
@ -515,7 +589,8 @@ Pipe ReadFromSystemNumbersStep::makePipe()
|
|||||||
auto ranges_state = std::make_shared<NumbersRangedSource::RangesState>();
|
auto ranges_state = std::make_shared<NumbersRangedSource::RangesState>();
|
||||||
for (size_t i = 0; i < num_streams; ++i)
|
for (size_t i = 0; i < num_streams; ++i)
|
||||||
{
|
{
|
||||||
auto source = std::make_shared<NumbersRangedSource>(intersected_ranges, ranges_state, max_block_size, numbers_storage.column_name);
|
auto source = std::make_shared<NumbersRangedSource>(
|
||||||
|
intersected_ranges, ranges_state, max_block_size, numbers_storage.step, numbers_storage.column_name);
|
||||||
|
|
||||||
if (i == 0)
|
if (i == 0)
|
||||||
source->addTotalRowsApprox(total_size);
|
source->addTotalRowsApprox(total_size);
|
||||||
@ -529,12 +604,16 @@ Pipe ReadFromSystemNumbersStep::makePipe()
|
|||||||
/// Fall back to NumbersSource
|
/// Fall back to NumbersSource
|
||||||
for (size_t i = 0; i < num_streams; ++i)
|
for (size_t i = 0; i < num_streams; ++i)
|
||||||
{
|
{
|
||||||
auto source
|
auto source = std::make_shared<NumbersSource>(
|
||||||
= std::make_shared<NumbersSource>(max_block_size, numbers_storage.offset + i * max_block_size, num_streams * max_block_size, numbers_storage.column_name, numbers_storage.step);
|
max_block_size,
|
||||||
|
numbers_storage.offset + i * max_block_size,
|
||||||
|
num_streams * max_block_size,
|
||||||
|
numbers_storage.column_name,
|
||||||
|
numbers_storage.step);
|
||||||
|
|
||||||
if (numbers_storage.limit && i == 0)
|
if (numbers_storage.limit && i == 0)
|
||||||
{
|
{
|
||||||
auto rows_appr = *(numbers_storage.limit);
|
auto rows_appr = (*numbers_storage.limit - 1) / numbers_storage.step + 1;
|
||||||
if (limit > 0 && limit < rows_appr)
|
if (limit > 0 && limit < rows_appr)
|
||||||
rows_appr = limit;
|
rows_appr = limit;
|
||||||
source->addTotalRowsApprox(rows_appr);
|
source->addTotalRowsApprox(rows_appr);
|
||||||
@ -546,7 +625,7 @@ Pipe ReadFromSystemNumbersStep::makePipe()
|
|||||||
if (numbers_storage.limit)
|
if (numbers_storage.limit)
|
||||||
{
|
{
|
||||||
size_t i = 0;
|
size_t i = 0;
|
||||||
auto storage_limit = *(numbers_storage.limit);
|
auto storage_limit = (*numbers_storage.limit - 1) / numbers_storage.step + 1;
|
||||||
/// This formula is how to split 'limit' elements to 'num_streams' chunks almost uniformly.
|
/// This formula is how to split 'limit' elements to 'num_streams' chunks almost uniformly.
|
||||||
pipe.addSimpleTransform(
|
pipe.addSimpleTransform(
|
||||||
[&](const Block & header)
|
[&](const Block & header)
|
||||||
|
@ -229,4 +229,4 @@ struct SelectQueryInfo
|
|||||||
|
|
||||||
bool isFinal() const;
|
bool isFinal() const;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -16,7 +16,13 @@
|
|||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
StorageSystemNumbers::StorageSystemNumbers(const StorageID & table_id, bool multithreaded_, const std::string& column_name_, std::optional<UInt64> limit_, UInt64 offset_, UInt64 step_)
|
StorageSystemNumbers::StorageSystemNumbers(
|
||||||
|
const StorageID & table_id,
|
||||||
|
bool multithreaded_,
|
||||||
|
const std::string & column_name_,
|
||||||
|
std::optional<UInt64> limit_,
|
||||||
|
UInt64 offset_,
|
||||||
|
UInt64 step_)
|
||||||
: IStorage(table_id), multithreaded(multithreaded_), limit(limit_), offset(offset_), column_name(column_name_), step(step_)
|
: IStorage(table_id), multithreaded(multithreaded_), limit(limit_), offset(offset_), column_name(column_name_), step(step_)
|
||||||
{
|
{
|
||||||
StorageInMemoryMetadata storage_metadata;
|
StorageInMemoryMetadata storage_metadata;
|
||||||
|
@ -10,11 +10,17 @@ namespace DB
|
|||||||
|
|
||||||
class Context;
|
class Context;
|
||||||
|
|
||||||
class StorageSystemNumbers final : public IStorage
|
class StorageSystemNumbers final : public IStorage
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
/// Otherwise, streams concurrently increment atomic.
|
/// Otherwise, streams concurrently increment atomic.
|
||||||
StorageSystemNumbers(const StorageID & table_id, bool multithreaded_, const std::string& column_name, std::optional<UInt64> limit_ = std::nullopt, UInt64 offset_ = 0, UInt64 step_ = 1);
|
StorageSystemNumbers(
|
||||||
|
const StorageID & table_id,
|
||||||
|
bool multithreaded_,
|
||||||
|
const std::string & column_name,
|
||||||
|
std::optional<UInt64> limit_ = std::nullopt,
|
||||||
|
UInt64 offset_ = 0,
|
||||||
|
UInt64 step_ = 1);
|
||||||
|
|
||||||
std::string getName() const override { return "SystemNumbers"; }
|
std::string getName() const override { return "SystemNumbers"; }
|
||||||
|
|
||||||
@ -30,7 +36,6 @@ public:
|
|||||||
|
|
||||||
bool hasEvenlyDistributedRead() const override { return true; }
|
bool hasEvenlyDistributedRead() const override { return true; }
|
||||||
bool isSystemStorage() const override { return true; }
|
bool isSystemStorage() const override { return true; }
|
||||||
|
|
||||||
bool supportsTransactions() const override { return true; }
|
bool supportsTransactions() const override { return true; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
@ -38,11 +43,9 @@ private:
|
|||||||
|
|
||||||
bool multithreaded;
|
bool multithreaded;
|
||||||
std::optional<UInt64> limit;
|
std::optional<UInt64> limit;
|
||||||
UInt64 offset;
|
UInt64 offset;`
|
||||||
std::string column_name;
|
std::string column_name;
|
||||||
|
|
||||||
UInt64 step;
|
UInt64 step;
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -120,7 +120,7 @@ void attachSystemTablesServer(ContextPtr context, IDatabase & system_database, b
|
|||||||
attach<StorageSystemOne>(context, system_database, "one", "This table contains a single row with a single dummy UInt8 column containing the value 0. Used when the table is not specified explicitly, for example in queries like `SELECT 1`.");
|
attach<StorageSystemOne>(context, system_database, "one", "This table contains a single row with a single dummy UInt8 column containing the value 0. Used when the table is not specified explicitly, for example in queries like `SELECT 1`.");
|
||||||
attach<StorageSystemNumbers>(context, system_database, "numbers", "Generates all natural numbers, starting from 0 (to 2^64 - 1, and then again) in sorted order.", false, "number");
|
attach<StorageSystemNumbers>(context, system_database, "numbers", "Generates all natural numbers, starting from 0 (to 2^64 - 1, and then again) in sorted order.", false, "number");
|
||||||
attach<StorageSystemNumbers>(context, system_database, "numbers_mt", "Multithreaded version of `system.numbers`. Numbers order is not guaranteed.", true, "number");
|
attach<StorageSystemNumbers>(context, system_database, "numbers_mt", "Multithreaded version of `system.numbers`. Numbers order is not guaranteed.", true, "number");
|
||||||
// attach<StorageSystemNumbers>(context, system_database, "generate_series", "Multithreaded version of `system.numbers`. Numbers order is not guaranteed.", false, "generate_series");
|
attach<StorageSystemNumbers>(context, system_database, "generate_series", "Generates arithmetic progression of natural numbers in sorted order in a given segment with a given step", false, "generate_series");
|
||||||
attach<StorageSystemZeros>(context, system_database, "zeros", "Produces unlimited number of non-materialized zeros.", false);
|
attach<StorageSystemZeros>(context, system_database, "zeros", "Produces unlimited number of non-materialized zeros.", false);
|
||||||
attach<StorageSystemZeros>(context, system_database, "zeros_mt", "Multithreaded version of system.zeros.", true);
|
attach<StorageSystemZeros>(context, system_database, "zeros_mt", "Multithreaded version of system.zeros.", true);
|
||||||
attach<StorageSystemDatabases>(context, system_database, "databases", "Lists all databases of the current server.");
|
attach<StorageSystemDatabases>(context, system_database, "databases", "Lists all databases of the current server.");
|
||||||
|
@ -29,7 +29,7 @@ if (TARGET ch_contrib::azure_sdk)
|
|||||||
target_link_libraries(clickhouse_table_functions PRIVATE ch_contrib::azure_sdk)
|
target_link_libraries(clickhouse_table_functions PRIVATE ch_contrib::azure_sdk)
|
||||||
endif ()
|
endif ()
|
||||||
|
|
||||||
if (TARGET ch_co`trib::simdjson)
|
if (TARGET ch_contrib::simdjson)
|
||||||
target_link_libraries(clickhouse_table_functions PRIVATE ch_contrib::simdjson)
|
target_link_libraries(clickhouse_table_functions PRIVATE ch_contrib::simdjson)
|
||||||
endif ()
|
endif ()
|
||||||
|
|
||||||
|
@ -5,7 +5,7 @@
|
|||||||
#include <Access/Common/AccessFlags.h>
|
#include <Access/Common/AccessFlags.h>
|
||||||
#include <Common/ProfileEvents.h>
|
#include <Common/ProfileEvents.h>
|
||||||
#include <TableFunctions/TableFunctionFactory.h>
|
#include <TableFunctions/TableFunctionFactory.h>
|
||||||
#include <Common/logger_useful.h>
|
|
||||||
|
|
||||||
namespace ProfileEvents
|
namespace ProfileEvents
|
||||||
{
|
{
|
||||||
|
@ -1,13 +1,13 @@
|
|||||||
|
#include <DataTypes/DataTypesNumber.h>
|
||||||
|
#include <Interpreters/Context.h>
|
||||||
|
#include <Interpreters/convertFieldToType.h>
|
||||||
|
#include <Interpreters/evaluateConstantExpression.h>
|
||||||
|
#include <Parsers/ASTFunction.h>
|
||||||
|
#include <Storages/System/StorageSystemNumbers.h>
|
||||||
#include <TableFunctions/ITableFunction.h>
|
#include <TableFunctions/ITableFunction.h>
|
||||||
#include <TableFunctions/TableFunctionFactory.h>
|
#include <TableFunctions/TableFunctionFactory.h>
|
||||||
#include <Parsers/ASTFunction.h>
|
|
||||||
#include <Common/typeid_cast.h>
|
|
||||||
#include <Common/FieldVisitorToString.h>
|
#include <Common/FieldVisitorToString.h>
|
||||||
#include <Storages/System/StorageSystemNumbers.h>
|
#include <Common/typeid_cast.h>
|
||||||
#include <Interpreters/evaluateConstantExpression.h>
|
|
||||||
#include <Interpreters/convertFieldToType.h>
|
|
||||||
#include <Interpreters/Context.h>
|
|
||||||
#include <DataTypes/DataTypesNumber.h>
|
|
||||||
#include "registerTableFunctions.h"
|
#include "registerTableFunctions.h"
|
||||||
|
|
||||||
|
|
||||||
@ -18,6 +18,7 @@ namespace ErrorCodes
|
|||||||
{
|
{
|
||||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||||
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
|
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
|
||||||
|
extern const int INVALID_SETTING_VALUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace
|
namespace
|
||||||
@ -33,8 +34,14 @@ public:
|
|||||||
static constexpr auto name = "generate_series";
|
static constexpr auto name = "generate_series";
|
||||||
std::string getName() const override { return name; }
|
std::string getName() const override { return name; }
|
||||||
bool hasStaticStructure() const override { return true; }
|
bool hasStaticStructure() const override { return true; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
|
StoragePtr executeImpl(
|
||||||
|
const ASTPtr & ast_function,
|
||||||
|
ContextPtr context,
|
||||||
|
const std::string & table_name,
|
||||||
|
ColumnsDescription cached_columns,
|
||||||
|
bool is_insert_query) const override;
|
||||||
const char * getStorageTypeName() const override { return "SystemNumbers"; }
|
const char * getStorageTypeName() const override { return "SystemNumbers"; }
|
||||||
|
|
||||||
UInt64 evaluateArgument(ContextPtr context, ASTPtr & argument) const;
|
UInt64 evaluateArgument(ContextPtr context, ASTPtr & argument) const;
|
||||||
@ -48,25 +55,31 @@ ColumnsDescription TableFunctionGenerateSeries::getActualTableStructure(ContextP
|
|||||||
return ColumnsDescription{{{"generate_series", std::make_shared<DataTypeUInt64>()}}};
|
return ColumnsDescription{{{"generate_series", std::make_shared<DataTypeUInt64>()}}};
|
||||||
}
|
}
|
||||||
|
|
||||||
StoragePtr TableFunctionGenerateSeries::executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool /*is_insert_query*/) const
|
StoragePtr TableFunctionGenerateSeries::executeImpl(
|
||||||
|
const ASTPtr & ast_function,
|
||||||
|
ContextPtr context,
|
||||||
|
const std::string & table_name,
|
||||||
|
ColumnsDescription /*cached_columns*/,
|
||||||
|
bool /*is_insert_query*/) const
|
||||||
{
|
{
|
||||||
if (const auto * function = ast_function->as<ASTFunction>())
|
if (const auto * function = ast_function->as<ASTFunction>())
|
||||||
{
|
{
|
||||||
auto arguments = function->arguments->children;
|
auto arguments = function->arguments->children;
|
||||||
|
|
||||||
if (arguments.size() != 2 && arguments.size() != 3)
|
if (arguments.size() != 2 && arguments.size() != 3)
|
||||||
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function '{}' requires 'length' or 'offset, length'.", getName());
|
throw Exception(
|
||||||
|
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function '{}' requires 'length' or 'offset, length'.", getName());
|
||||||
|
|
||||||
UInt64 start = evaluateArgument(context, arguments[0]);
|
UInt64 start = evaluateArgument(context, arguments[0]);
|
||||||
UInt64 stop = evaluateArgument(context, arguments[1]);
|
UInt64 stop = evaluateArgument(context, arguments[1]);
|
||||||
UInt64 interval = (arguments.size() == 3) ? evaluateArgument(context, arguments[2]) : UInt64{1};
|
UInt64 step = (arguments.size() == 3) ? evaluateArgument(context, arguments[2]) : UInt64{1};
|
||||||
if (start > stop) {
|
if (step == UInt64{0})
|
||||||
auto res = std::make_shared<StorageSystemNumbers>(StorageID(getDatabaseName(), table_name), false, std::string{"generate_series"}, 0);
|
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "Table function '{}' requires step to be a positive number", getName());
|
||||||
res->startup();
|
auto res = (start > stop)
|
||||||
return res;
|
? std::make_shared<StorageSystemNumbers>(
|
||||||
}
|
StorageID(getDatabaseName(), table_name), false, std::string{"generate_series"}, 0, 0, 0)
|
||||||
|
: std::make_shared<StorageSystemNumbers>(
|
||||||
auto res = std::make_shared<StorageSystemNumbers>(StorageID(getDatabaseName(), table_name), false, std::string{"generate_series"}, (stop - start) / interval + 1, start, interval);
|
StorageID(getDatabaseName(), table_name), false, std::string{"generate_series"}, (stop - start) + 1, start, step);
|
||||||
res->startup();
|
res->startup();
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
@ -82,8 +95,10 @@ UInt64 TableFunctionGenerateSeries::evaluateArgument(ContextPtr context, ASTPtr
|
|||||||
|
|
||||||
Field converted = convertFieldToType(field, DataTypeUInt64());
|
Field converted = convertFieldToType(field, DataTypeUInt64());
|
||||||
if (converted.isNull())
|
if (converted.isNull())
|
||||||
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "The value {} is not representable as UInt64",
|
throw Exception(
|
||||||
applyVisitor(FieldVisitorToString(), field));
|
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
|
||||||
|
"The value {} is not representable as UInt64",
|
||||||
|
applyVisitor(FieldVisitorToString(), field));
|
||||||
|
|
||||||
return converted.safeGet<UInt64>();
|
return converted.safeGet<UInt64>();
|
||||||
}
|
}
|
@ -1,13 +1,13 @@
|
|||||||
|
#include <DataTypes/DataTypesNumber.h>
|
||||||
|
#include <Interpreters/Context.h>
|
||||||
|
#include <Interpreters/convertFieldToType.h>
|
||||||
|
#include <Interpreters/evaluateConstantExpression.h>
|
||||||
|
#include <Parsers/ASTFunction.h>
|
||||||
|
#include <Storages/System/StorageSystemNumbers.h>
|
||||||
#include <TableFunctions/ITableFunction.h>
|
#include <TableFunctions/ITableFunction.h>
|
||||||
#include <TableFunctions/TableFunctionFactory.h>
|
#include <TableFunctions/TableFunctionFactory.h>
|
||||||
#include <Parsers/ASTFunction.h>
|
|
||||||
#include <Common/typeid_cast.h>
|
|
||||||
#include <Common/FieldVisitorToString.h>
|
#include <Common/FieldVisitorToString.h>
|
||||||
#include <Storages/System/StorageSystemNumbers.h>
|
#include <Common/typeid_cast.h>
|
||||||
#include <Interpreters/evaluateConstantExpression.h>
|
|
||||||
#include <Interpreters/convertFieldToType.h>
|
|
||||||
#include <Interpreters/Context.h>
|
|
||||||
#include <DataTypes/DataTypesNumber.h>
|
|
||||||
#include "registerTableFunctions.h"
|
#include "registerTableFunctions.h"
|
||||||
|
|
||||||
|
|
||||||
@ -16,8 +16,8 @@ namespace DB
|
|||||||
|
|
||||||
namespace ErrorCodes
|
namespace ErrorCodes
|
||||||
{
|
{
|
||||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||||
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
|
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace
|
namespace
|
||||||
@ -34,8 +34,14 @@ public:
|
|||||||
static constexpr auto name = multithreaded ? "numbers_mt" : "numbers";
|
static constexpr auto name = multithreaded ? "numbers_mt" : "numbers";
|
||||||
std::string getName() const override { return name; }
|
std::string getName() const override { return name; }
|
||||||
bool hasStaticStructure() const override { return true; }
|
bool hasStaticStructure() const override { return true; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
|
StoragePtr executeImpl(
|
||||||
|
const ASTPtr & ast_function,
|
||||||
|
ContextPtr context,
|
||||||
|
const std::string & table_name,
|
||||||
|
ColumnsDescription cached_columns,
|
||||||
|
bool is_insert_query) const override;
|
||||||
const char * getStorageTypeName() const override { return "SystemNumbers"; }
|
const char * getStorageTypeName() const override { return "SystemNumbers"; }
|
||||||
|
|
||||||
UInt64 evaluateArgument(ContextPtr context, ASTPtr & argument) const;
|
UInt64 evaluateArgument(ContextPtr context, ASTPtr & argument) const;
|
||||||
@ -51,19 +57,26 @@ ColumnsDescription TableFunctionNumbers<multithreaded>::getActualTableStructure(
|
|||||||
}
|
}
|
||||||
|
|
||||||
template <bool multithreaded>
|
template <bool multithreaded>
|
||||||
StoragePtr TableFunctionNumbers<multithreaded>::executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool /*is_insert_query*/) const
|
StoragePtr TableFunctionNumbers<multithreaded>::executeImpl(
|
||||||
|
const ASTPtr & ast_function,
|
||||||
|
ContextPtr context,
|
||||||
|
const std::string & table_name,
|
||||||
|
ColumnsDescription /*cached_columns*/,
|
||||||
|
bool /*is_insert_query*/) const
|
||||||
{
|
{
|
||||||
if (const auto * function = ast_function->as<ASTFunction>())
|
if (const auto * function = ast_function->as<ASTFunction>())
|
||||||
{
|
{
|
||||||
auto arguments = function->arguments->children;
|
auto arguments = function->arguments->children;
|
||||||
|
|
||||||
if (arguments.size() != 1 && arguments.size() != 2)
|
if (arguments.size() != 1 && arguments.size() != 2)
|
||||||
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function '{}' requires 'length' or 'offset, length'.", getName());
|
throw Exception(
|
||||||
|
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function '{}' requires 'length' or 'offset, length'.", getName());
|
||||||
|
|
||||||
UInt64 offset = arguments.size() == 2 ? evaluateArgument(context, arguments[0]) : 0;
|
UInt64 offset = arguments.size() == 2 ? evaluateArgument(context, arguments[0]) : 0;
|
||||||
UInt64 length = arguments.size() == 2 ? evaluateArgument(context, arguments[1]) : evaluateArgument(context, arguments[0]);
|
UInt64 length = arguments.size() == 2 ? evaluateArgument(context, arguments[1]) : evaluateArgument(context, arguments[0]);
|
||||||
|
|
||||||
auto res = std::make_shared<StorageSystemNumbers>(StorageID(getDatabaseName(), table_name), multithreaded, std::string{"number"}, length, offset);
|
auto res = std::make_shared<StorageSystemNumbers>(
|
||||||
|
StorageID(getDatabaseName(), table_name), multithreaded, std::string{"number"}, length, offset);
|
||||||
res->startup();
|
res->startup();
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
@ -80,8 +93,10 @@ UInt64 TableFunctionNumbers<multithreaded>::evaluateArgument(ContextPtr context,
|
|||||||
|
|
||||||
Field converted = convertFieldToType(field, DataTypeUInt64());
|
Field converted = convertFieldToType(field, DataTypeUInt64());
|
||||||
if (converted.isNull())
|
if (converted.isNull())
|
||||||
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "The value {} is not representable as UInt64",
|
throw Exception(
|
||||||
applyVisitor(FieldVisitorToString(), field));
|
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
|
||||||
|
"The value {} is not representable as UInt64",
|
||||||
|
applyVisitor(FieldVisitorToString(), field));
|
||||||
|
|
||||||
return converted.safeGet<UInt64>();
|
return converted.safeGet<UInt64>();
|
||||||
}
|
}
|
||||||
|
28
tests/queries/0_stateless/02970_generate_series.reference
Normal file
28
tests/queries/0_stateless/02970_generate_series.reference
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
0
|
||||||
|
1
|
||||||
|
4
|
||||||
|
8
|
||||||
|
500000001
|
||||||
|
50000000
|
||||||
|
100000001
|
||||||
|
0
|
||||||
|
10
|
||||||
|
13
|
||||||
|
16
|
||||||
|
19
|
||||||
|
7
|
||||||
|
17
|
||||||
|
27
|
||||||
|
37
|
||||||
|
47
|
||||||
|
57
|
||||||
|
67
|
||||||
|
77
|
||||||
|
17
|
||||||
|
22
|
||||||
|
27
|
||||||
|
32
|
||||||
|
37
|
||||||
|
42
|
||||||
|
47
|
||||||
|
52
|
14
tests/queries/0_stateless/02970_generate_series.sql
Normal file
14
tests/queries/0_stateless/02970_generate_series.sql
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
SELECT count() FROM generate_series(5, 4);
|
||||||
|
SELECT count() FROM generate_series(0, 0);
|
||||||
|
SELECT count() FROM generate_series(10, 20, 3);
|
||||||
|
SELECT count() FROM generate_series(7, 77, 10);
|
||||||
|
SELECT count() FROM generate_series(0, 1000000000, 2);
|
||||||
|
SELECT count() FROM generate_series(0, 999999999, 20);
|
||||||
|
SELECT count() FROM generate_series(0, 1000000000, 2) WHERE generate_series % 5 == 0;
|
||||||
|
|
||||||
|
SELECT * FROM generate_series(5, 4);
|
||||||
|
SELECT * FROM generate_series(0, 0);
|
||||||
|
SELECT * FROM generate_series(10, 20, 3);
|
||||||
|
SELECT * FROM generate_series(7, 77, 10);
|
||||||
|
SELECT * FROM generate_series(7, 52, 5) WHERE generate_series >= 13;
|
||||||
|
|
Loading…
Reference in New Issue
Block a user