Merge pull request #59390 from divanik/divanik/generate_series_function

Add step to the generating numbers table functions
This commit is contained in:
divanik 2024-03-21 16:27:25 +01:00 committed by GitHub
commit 9dbe7beef8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 473 additions and 123 deletions

View File

@ -0,0 +1,8 @@
---
slug: /en/sql-reference/table-functions/generateSeries
sidebar_position: 147
sidebar_label: generateSeries
---
### Alias To
[generate_series](generate_series.md)

View File

@ -0,0 +1,25 @@
---
slug: /en/sql-reference/table-functions/generate_series
sidebar_position: 146
sidebar_label: generate_series
---
# generate_series
`generate_series(START, STOP)` - Returns a table with the single generate_series column (UInt64) that contains integers from start to stop inclusively.
`generate_series(START, STOP, STEP)` - Returns a table with the single generate_series column (UInt64) that contains integers from start to stop inclusively with spacing between values given by STEP.
The following queries return tables with the same content but different column names:
``` sql
SELECT * FROM numbers(10, 5);
SELECT * FROM generate_series(10, 14);
```
And the following queries return tables with the same content but different column names (but the second option is more efficient):
``` sql
SELECT * FROM numbers(10, 11) WHERE number % 3 == (10 % 3);
SELECT * FROM generate_series(10, 20, 3) ;
```

View File

@ -8,6 +8,7 @@ sidebar_label: numbers
`numbers(N)` Returns a table with the single number column (UInt64) that contains integers from 0 to N-1. `numbers(N)` Returns a table with the single number column (UInt64) that contains integers from 0 to N-1.
`numbers(N, M)` - Returns a table with the single number column (UInt64) that contains integers from N to (N + M - 1). `numbers(N, M)` - Returns a table with the single number column (UInt64) that contains integers from N to (N + M - 1).
`numbers(N, M, S)` - Returns a table with the single number column (UInt64) that contains integers from N to (N + M - 1) with step S.
Similar to the `system.numbers` table, it can be used for testing and generating successive values, `numbers(N, M)` more efficient than `system.numbers`. Similar to the `system.numbers` table, it can be used for testing and generating successive values, `numbers(N, M)` more efficient than `system.numbers`.
@ -21,6 +22,15 @@ SELECT * FROM system.numbers WHERE number BETWEEN 0 AND 9;
SELECT * FROM system.numbers WHERE number IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9); SELECT * FROM system.numbers WHERE number IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
``` ```
And the following queries are equivalent:
``` sql
SELECT number * 2 FROM numbers(10);
SELECT (number - 10) * 2 FROM numbers(10, 10);
SELECT * FROM numbers(0, 20, 2);
```
Examples: Examples:
``` sql ``` sql

View File

@ -27,10 +27,39 @@ void iota(T * begin, size_t count, T first_value)
return iotaImpl(begin, count, first_value); return iotaImpl(begin, count, first_value);
} }
MULTITARGET_FUNCTION_AVX2_SSE42(
MULTITARGET_FUNCTION_HEADER(template <iota_supported_types T> void NO_INLINE),
iotaWithStepImpl, MULTITARGET_FUNCTION_BODY((T * begin, size_t count, T first_value, T step) /// NOLINT
{
for (size_t i = 0; i < count; i++)
*(begin + i) = static_cast<T>(first_value + i * step);
})
)
template <iota_supported_types T>
void iotaWithStep(T * begin, size_t count, T first_value, T step)
{
#if USE_MULTITARGET_CODE
if (isArchSupported(TargetArch::AVX2))
return iotaWithStepImplAVX2(begin, count, first_value, step);
if (isArchSupported(TargetArch::SSE42))
return iotaWithStepImplSSE42(begin, count, first_value, step);
#endif
return iotaWithStepImpl(begin, count, first_value, step);
}
template void iota(UInt8 * begin, size_t count, UInt8 first_value); template void iota(UInt8 * begin, size_t count, UInt8 first_value);
template void iota(UInt32 * begin, size_t count, UInt32 first_value); template void iota(UInt32 * begin, size_t count, UInt32 first_value);
template void iota(UInt64 * begin, size_t count, UInt64 first_value); template void iota(UInt64 * begin, size_t count, UInt64 first_value);
#if defined(OS_DARWIN) #if defined(OS_DARWIN)
template void iota(size_t * begin, size_t count, size_t first_value); template void iota(size_t * begin, size_t count, size_t first_value);
#endif #endif
template void iotaWithStep(UInt8 * begin, size_t count, UInt8 first_value, UInt8 step);
template void iotaWithStep(UInt32 * begin, size_t count, UInt32 first_value, UInt32 step);
template void iotaWithStep(UInt64 * begin, size_t count, UInt64 first_value, UInt64 step);
#if defined(OS_DARWIN)
template void iotaWithStep(size_t * begin, size_t count, size_t first_value, size_t step);
#endif
} }

View File

@ -31,4 +31,14 @@ extern template void iota(UInt64 * begin, size_t count, UInt64 first_value);
#if defined(OS_DARWIN) #if defined(OS_DARWIN)
extern template void iota(size_t * begin, size_t count, size_t first_value); extern template void iota(size_t * begin, size_t count, size_t first_value);
#endif #endif
template <iota_supported_types T>
void iotaWithStep(T * begin, size_t count, T first_value, T step);
extern template void iotaWithStep(UInt8 * begin, size_t count, UInt8 first_value, UInt8 step);
extern template void iotaWithStep(UInt32 * begin, size_t count, UInt32 first_value, UInt32 step);
extern template void iotaWithStep(UInt64 * begin, size_t count, UInt64 first_value, UInt64 step);
#if defined(OS_DARWIN)
extern template void iotaWithStep(size_t * begin, size_t count, size_t first_value, size_t step);
#endif
} }

View File

@ -9,8 +9,11 @@
#include <QueryPipeline/QueryPipelineBuilder.h> #include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/MergeTree/KeyCondition.h> #include <Storages/MergeTree/KeyCondition.h>
#include <Storages/System/StorageSystemNumbers.h> #include <Storages/System/StorageSystemNumbers.h>
#include <fmt/format.h>
#include <Common/iota.h> #include <Common/iota.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include "Core/Types.h"
#include "base/types.h"
namespace DB namespace DB
{ {
@ -23,19 +26,34 @@ extern const int TOO_MANY_ROWS;
namespace namespace
{ {
template <iota_supported_types T>
inline void iotaWithStepOptimized(T * begin, size_t count, T first_value, T step)
{
if (step == 1)
iota(begin, count, first_value);
else
iotaWithStep(begin, count, first_value, step);
}
class NumbersSource : public ISource class NumbersSource : public ISource
{ {
public: public:
NumbersSource(UInt64 block_size_, UInt64 offset_, std::optional<UInt64> limit_, UInt64 step_) NumbersSource(UInt64 block_size_, UInt64 offset_, std::optional<UInt64> limit_, UInt64 chunk_step_, const std::string & column_name, UInt64 step_)
: ISource(createHeader()), block_size(block_size_), next(offset_), step(step_) : ISource(createHeader(column_name))
, block_size(block_size_)
, next(offset_)
, chunk_step(chunk_step_)
, step(step_)
{ {
if (limit_.has_value()) if (limit_.has_value())
end = limit_.value() + offset_; end = limit_.value() + offset_;
} }
String getName() const override { return "Numbers"; } String getName() const override { return "Numbers"; }
static Block createHeader() { return {ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number")}; } static Block createHeader(const std::string & column_name)
{
return {ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), column_name)};
}
protected: protected:
Chunk generate() override Chunk generate() override
@ -52,10 +70,12 @@ protected:
UInt64 curr = next; /// The local variable for some reason works faster (>20%) than member of class. UInt64 curr = next; /// The local variable for some reason works faster (>20%) than member of class.
UInt64 * pos = vec.data(); /// This also accelerates the code. UInt64 * pos = vec.data(); /// This also accelerates the code.
UInt64 * end_ = &vec[real_block_size];
iota(pos, static_cast<size_t>(end_ - pos), curr);
next += step; UInt64 * current_end = &vec[real_block_size];
iotaWithStepOptimized(pos, static_cast<size_t>(current_end - pos), curr, step);
next += chunk_step;
progress(column->size(), column->byteSize()); progress(column->size(), column->byteSize());
@ -65,35 +85,49 @@ protected:
private: private:
UInt64 block_size; UInt64 block_size;
UInt64 next; UInt64 next;
UInt64 chunk_step;
std::optional<UInt64> end; /// not included std::optional<UInt64> end; /// not included
UInt64 step; UInt64 step;
}; };
struct RangeWithStep
UInt128 sizeOfRange(const Range & r)
{ {
UInt64 left;
UInt64 step;
UInt128 size; UInt128 size;
if (r.right.isPositiveInfinity())
return static_cast<UInt128>(std::numeric_limits<uint64_t>::max()) - r.left.get<UInt64>() + r.left_included;
size = static_cast<UInt128>(r.right.get<UInt64>()) - r.left.get<UInt64>() + 1;
if (!r.left_included)
size--;
if (!r.right_included)
size--;
assert(size >= 0);
return size;
}; };
auto sizeOfRanges(const Ranges & rs) using RangesWithStep = std::vector<RangeWithStep>;
std::optional<RangeWithStep> steppedRangeFromRange(const Range & r, UInt64 step, UInt64 remainder)
{
if ((r.right.get<UInt64>() == 0) && (!r.right_included))
return std::nullopt;
UInt64 begin = (r.left.get<UInt64>() / step) * step;
if (begin > std::numeric_limits<UInt64>::max() - remainder)
return std::nullopt;
begin += remainder;
while ((r.left_included <= r.left.get<UInt64>()) && (begin <= r.left.get<UInt64>() - r.left_included))
{
if (std::numeric_limits<UInt64>::max() - step < begin)
return std::nullopt;
begin += step;
}
if ((begin >= r.right_included) && (begin - r.right_included >= r.right.get<UInt64>()))
return std::nullopt;
UInt64 right_edge_included = r.right.get<UInt64>() - (1 - r.right_included);
return std::optional{RangeWithStep{begin, step, static_cast<UInt128>(right_edge_included - begin) / step + 1}};
}
auto sizeOfRanges(const RangesWithStep & rs)
{ {
UInt128 total_size{}; UInt128 total_size{};
for (const Range & r : rs) for (const RangeWithStep & r : rs)
{ {
/// total_size will never overflow /// total_size will never overflow
total_size += sizeOfRange(r); total_size += r.size;
} }
return total_size; return total_size;
}; };
@ -119,8 +153,17 @@ public:
using RangesStatePtr = std::shared_ptr<RangesState>; using RangesStatePtr = std::shared_ptr<RangesState>;
NumbersRangedSource(const Ranges & ranges_, RangesStatePtr & ranges_state_, UInt64 base_block_size_) NumbersRangedSource(
: ISource(NumbersSource::createHeader()), ranges(ranges_), ranges_state(ranges_state_), base_block_size(base_block_size_) const RangesWithStep & ranges_,
RangesStatePtr & ranges_state_,
UInt64 base_block_size_,
UInt64 step_,
const std::string & column_name)
: ISource(NumbersSource::createHeader(column_name))
, ranges(ranges_)
, ranges_state(ranges_state_)
, base_block_size(base_block_size_)
, step(step_)
{ {
} }
@ -133,6 +176,7 @@ protected:
{ {
std::lock_guard lock(ranges_state->mutex); std::lock_guard lock(ranges_state->mutex);
UInt64 need = base_block_size_; UInt64 need = base_block_size_;
UInt64 size = 0; /// how many item found. UInt64 size = 0; /// how many item found.
@ -144,7 +188,7 @@ protected:
while (need != 0) while (need != 0)
{ {
UInt128 can_provide = end.offset_in_ranges == ranges.size() ? static_cast<UInt128>(0) UInt128 can_provide = end.offset_in_ranges == ranges.size() ? static_cast<UInt128>(0)
: sizeOfRange(ranges[end.offset_in_ranges]) - end.offset_in_range; : ranges[end.offset_in_ranges].size - end.offset_in_range;
if (can_provide == 0) if (can_provide == 0)
break; break;
@ -171,6 +215,7 @@ protected:
} }
ranges_state->pos = end; ranges_state->pos = end;
return size; return size;
} }
@ -179,10 +224,6 @@ protected:
if (ranges.empty()) if (ranges.empty())
return {}; return {};
auto first_value = [](const Range & r) { return r.left.get<UInt64>() + (r.left_included ? 0 : 1); };
auto last_value = [](const Range & r) { return r.right.get<UInt64>() - (r.right_included ? 0 : 1); };
/// Find the data range. /// Find the data range.
/// If data left is small, shrink block size. /// If data left is small, shrink block size.
RangesPos start, end; RangesPos start, end;
@ -207,41 +248,43 @@ protected:
UInt128 can_provide = cursor.offset_in_ranges == end.offset_in_ranges UInt128 can_provide = cursor.offset_in_ranges == end.offset_in_ranges
? end.offset_in_range - cursor.offset_in_range ? end.offset_in_range - cursor.offset_in_range
: static_cast<UInt128>(last_value(range)) - first_value(range) + 1 - cursor.offset_in_range; : range.size - cursor.offset_in_range;
/// set value to block /// set value to block
auto set_value = [&pos](UInt128 & start_value, UInt128 & end_value) auto set_value = [&pos, this](UInt128 & start_value, UInt128 & end_value)
{ {
if (end_value > std::numeric_limits<UInt64>::max()) if (end_value > std::numeric_limits<UInt64>::max())
{ {
while (start_value < end_value) while (start_value < end_value)
*(pos++) = start_value++; {
*(pos++) = start_value;
start_value += this->step;
}
} }
else else
{ {
auto start_value_64 = static_cast<UInt64>(start_value); auto start_value_64 = static_cast<UInt64>(start_value);
auto end_value_64 = static_cast<UInt64>(end_value); auto end_value_64 = static_cast<UInt64>(end_value);
auto size = end_value_64 - start_value_64; auto size = (end_value_64 - start_value_64) / this->step;
iota(pos, static_cast<size_t>(size), start_value_64); iotaWithStepOptimized(pos, static_cast<size_t>(size), start_value_64, step);
pos += size; pos += size;
} }
}; };
if (can_provide > need) if (can_provide > need)
{ {
UInt64 start_value = first_value(range) + cursor.offset_in_range; UInt64 start_value = range.left + cursor.offset_in_range * step;
/// end_value will never overflow /// end_value will never overflow
iota(pos, static_cast<size_t>(need), start_value); iotaWithStepOptimized(pos, static_cast<size_t>(need), start_value, step);
pos += need; pos += need;
provided += need; provided += need;
cursor.offset_in_range += need; cursor.offset_in_range += need;
} }
else if (can_provide == need) else if (can_provide == need)
{ {
/// to avoid UInt64 overflow /// to avoid UInt64 overflow
UInt128 start_value = static_cast<UInt128>(first_value(range)) + cursor.offset_in_range; UInt128 start_value = static_cast<UInt128>(range.left) + cursor.offset_in_range * step;
UInt128 end_value = start_value + need; UInt128 end_value = start_value + need * step;
set_value(start_value, end_value); set_value(start_value, end_value);
provided += need; provided += need;
@ -251,8 +294,8 @@ protected:
else else
{ {
/// to avoid UInt64 overflow /// to avoid UInt64 overflow
UInt128 start_value = static_cast<UInt128>(first_value(range)) + cursor.offset_in_range; UInt128 start_value = static_cast<UInt128>(range.left) + cursor.offset_in_range * step;
UInt128 end_value = start_value + can_provide; UInt128 end_value = start_value + can_provide * step;
set_value(start_value, end_value); set_value(start_value, end_value);
provided += static_cast<UInt64>(can_provide); provided += static_cast<UInt64>(can_provide);
@ -269,13 +312,15 @@ protected:
private: private:
/// The ranges is shared between all streams. /// The ranges is shared between all streams.
Ranges ranges; RangesWithStep ranges;
/// Ranges state shared between all streams, actually is the start of the ranges. /// Ranges state shared between all streams, actually is the start of the ranges.
RangesStatePtr ranges_state; RangesStatePtr ranges_state;
/// Base block size, will shrink when data left is not enough. /// Base block size, will shrink when data left is not enough.
UInt64 base_block_size; UInt64 base_block_size;
UInt64 step;
}; };
} }
@ -296,12 +341,12 @@ bool shouldPushdownLimit(SelectQueryInfo & query_info, UInt64 limit_length)
/// Shrink ranges to size. /// Shrink ranges to size.
/// For example: ranges: [1, 5], [8, 100]; size: 7, we will get [1, 5], [8, 9] /// For example: ranges: [1, 5], [8, 100]; size: 7, we will get [1, 5], [8, 9]
void shrinkRanges(Ranges & ranges, size_t size) void shrinkRanges(RangesWithStep & ranges, size_t size)
{ {
size_t last_range_idx = 0; size_t last_range_idx = 0;
for (size_t i = 0; i < ranges.size(); i++) for (size_t i = 0; i < ranges.size(); i++)
{ {
auto range_size = sizeOfRange(ranges[i]); auto range_size = ranges[i].size;
if (range_size < size) if (range_size < size)
{ {
size -= static_cast<UInt64>(range_size); size -= static_cast<UInt64>(range_size);
@ -315,9 +360,7 @@ void shrinkRanges(Ranges & ranges, size_t size)
else else
{ {
auto & range = ranges[i]; auto & range = ranges[i];
UInt64 right = range.left.get<UInt64>() + static_cast<UInt64>(size); range.size = static_cast<UInt128>(size);
range.right = Field(right);
range.right_included = !range.left_included;
last_range_idx = i; last_range_idx = i;
break; break;
} }
@ -387,11 +430,19 @@ Pipe ReadFromSystemNumbersStep::makePipe()
if (!numbers_storage.multithreaded) if (!numbers_storage.multithreaded)
num_streams = 1; num_streams = 1;
Pipe pipe;
Ranges ranges;
if (numbers_storage.limit.has_value() && (numbers_storage.limit.value() == 0))
{
pipe.addSource(std::make_shared<NullSource>(NumbersSource::createHeader(numbers_storage.column_name)));
return pipe;
}
chassert(numbers_storage.step != UInt64{0});
/// Build rpn of query filters /// Build rpn of query filters
KeyCondition condition(filter_actions_dag, context, column_names, key_expression); KeyCondition condition(filter_actions_dag, context, column_names, key_expression);
Pipe pipe;
Ranges ranges;
if (condition.extractPlainRanges(ranges)) if (condition.extractPlainRanges(ranges))
{ {
@ -403,7 +454,8 @@ Pipe ReadFromSystemNumbersStep::makePipe()
{ {
if (std::numeric_limits<UInt64>::max() - numbers_storage.offset >= *(numbers_storage.limit)) if (std::numeric_limits<UInt64>::max() - numbers_storage.offset >= *(numbers_storage.limit))
{ {
table_range.emplace(FieldRef(numbers_storage.offset), true, FieldRef(numbers_storage.offset + *(numbers_storage.limit)), false); table_range.emplace(
FieldRef(numbers_storage.offset), true, FieldRef(numbers_storage.offset + *(numbers_storage.limit)), false);
} }
/// UInt64 overflow, for example: SELECT number FROM numbers(18446744073709551614, 5) /// UInt64 overflow, for example: SELECT number FROM numbers(18446744073709551614, 5)
else else
@ -419,13 +471,20 @@ Pipe ReadFromSystemNumbersStep::makePipe()
table_range.emplace(FieldRef(numbers_storage.offset), true, FieldRef(std::numeric_limits<UInt64>::max()), true); table_range.emplace(FieldRef(numbers_storage.offset), true, FieldRef(std::numeric_limits<UInt64>::max()), true);
} }
Ranges intersected_ranges; RangesWithStep intersected_ranges;
for (auto & r : ranges) for (auto & r : ranges)
{ {
auto intersected_range = table_range->intersectWith(r); auto intersected_range = table_range->intersectWith(r);
if (intersected_range) if (intersected_range.has_value())
intersected_ranges.push_back(*intersected_range); {
auto range_with_step
= steppedRangeFromRange(intersected_range.value(), numbers_storage.step, numbers_storage.offset % numbers_storage.step);
if (range_with_step.has_value())
intersected_ranges.push_back(*range_with_step);
}
} }
/// intersection with overflowed_table_range goes back. /// intersection with overflowed_table_range goes back.
if (overflowed_table_range.has_value()) if (overflowed_table_range.has_value())
{ {
@ -433,66 +492,78 @@ Pipe ReadFromSystemNumbersStep::makePipe()
{ {
auto intersected_range = overflowed_table_range->intersectWith(r); auto intersected_range = overflowed_table_range->intersectWith(r);
if (intersected_range) if (intersected_range)
intersected_ranges.push_back(*overflowed_table_range); {
auto range_with_step = steppedRangeFromRange(
intersected_range.value(),
numbers_storage.step,
static_cast<UInt64>(
(static_cast<UInt128>(numbers_storage.offset) + std::numeric_limits<UInt64>::max() + 1)
% numbers_storage.step));
if (range_with_step)
intersected_ranges.push_back(*range_with_step);
}
} }
} }
/// ranges is blank, return a source who has no data /// ranges is blank, return a source who has no data
if (intersected_ranges.empty()) if (intersected_ranges.empty())
{ {
pipe.addSource(std::make_shared<NullSource>(NumbersSource::createHeader())); pipe.addSource(std::make_shared<NullSource>(NumbersSource::createHeader(numbers_storage.column_name)));
return pipe; return pipe;
} }
const auto & limit_length = limit_length_and_offset.first; const auto & limit_length = limit_length_and_offset.first;
const auto & limit_offset = limit_length_and_offset.second; const auto & limit_offset = limit_length_and_offset.second;
/// If intersected ranges is limited or we can pushdown limit. UInt128 total_size = sizeOfRanges(intersected_ranges);
if (!intersected_ranges.rbegin()->right.isPositiveInfinity() || should_pushdown_limit) UInt128 query_limit = limit_length + limit_offset;
/// limit total_size by query_limit
if (should_pushdown_limit && query_limit < total_size)
{ {
UInt128 total_size = sizeOfRanges(intersected_ranges); total_size = query_limit;
UInt128 query_limit = limit_length + limit_offset; /// We should shrink intersected_ranges for case:
/// intersected_ranges: [1, 4], [7, 100]; query_limit: 2
/// limit total_size by query_limit shrinkRanges(intersected_ranges, total_size);
if (should_pushdown_limit && query_limit < total_size)
{
total_size = query_limit;
/// We should shrink intersected_ranges for case:
/// intersected_ranges: [1, 4], [7, 100]; query_limit: 2
shrinkRanges(intersected_ranges, total_size);
}
checkLimits(size_t(total_size));
if (total_size / max_block_size < num_streams)
num_streams = static_cast<size_t>(total_size / max_block_size);
if (num_streams == 0)
num_streams = 1;
/// Ranges state, all streams will share the state.
auto ranges_state = std::make_shared<NumbersRangedSource::RangesState>();
for (size_t i = 0; i < num_streams; ++i)
{
auto source = std::make_shared<NumbersRangedSource>(intersected_ranges, ranges_state, max_block_size);
if (i == 0)
source->addTotalRowsApprox(total_size);
pipe.addSource(std::move(source));
}
return pipe;
} }
checkLimits(size_t(total_size));
if (total_size / max_block_size < num_streams)
num_streams = static_cast<size_t>(total_size / max_block_size);
if (num_streams == 0)
num_streams = 1;
/// Ranges state, all streams will share the state.
auto ranges_state = std::make_shared<NumbersRangedSource::RangesState>();
for (size_t i = 0; i < num_streams; ++i)
{
auto source = std::make_shared<NumbersRangedSource>(
intersected_ranges, ranges_state, max_block_size, numbers_storage.step, numbers_storage.column_name);
if (i == 0)
source->addTotalRowsApprox(total_size);
pipe.addSource(std::move(source));
}
return pipe;
} }
/// Fall back to NumbersSource /// Fall back to NumbersSource
for (size_t i = 0; i < num_streams; ++i) for (size_t i = 0; i < num_streams; ++i)
{ {
auto source auto source = std::make_shared<NumbersSource>(
= std::make_shared<NumbersSource>(max_block_size, numbers_storage.offset + i * max_block_size, numbers_storage.limit, num_streams * max_block_size); max_block_size,
numbers_storage.offset + i * max_block_size * numbers_storage.step,
numbers_storage.limit,
num_streams * max_block_size * numbers_storage.step,
numbers_storage.column_name,
numbers_storage.step);
if (numbers_storage.limit && i == 0) if (numbers_storage.limit && i == 0)
{ {
auto rows_appr = *(numbers_storage.limit); auto rows_appr = (*numbers_storage.limit - 1) / numbers_storage.step + 1;
if (limit > 0 && limit < rows_appr) if (limit > 0 && limit < rows_appr)
rows_appr = limit; rows_appr = limit;
source->addTotalRowsApprox(rows_appr); source->addTotalRowsApprox(rows_appr);
@ -504,7 +575,7 @@ Pipe ReadFromSystemNumbersStep::makePipe()
if (numbers_storage.limit) if (numbers_storage.limit)
{ {
size_t i = 0; size_t i = 0;
auto storage_limit = *(numbers_storage.limit); auto storage_limit = (*numbers_storage.limit - 1) / numbers_storage.step + 1;
/// This formula is how to split 'limit' elements to 'num_streams' chunks almost uniformly. /// This formula is how to split 'limit' elements to 'num_streams' chunks almost uniformly.
pipe.addSimpleTransform( pipe.addSimpleTransform(
[&](const Block & header) [&](const Block & header)

View File

@ -15,13 +15,19 @@
namespace DB namespace DB
{ {
StorageSystemNumbers::StorageSystemNumbers(const StorageID & table_id, bool multithreaded_, std::optional<UInt64> limit_, UInt64 offset_) StorageSystemNumbers::StorageSystemNumbers(
: IStorage(table_id), multithreaded(multithreaded_), limit(limit_), offset(offset_) const StorageID & table_id,
bool multithreaded_,
const std::string & column_name_,
std::optional<UInt64> limit_,
UInt64 offset_,
UInt64 step_)
: IStorage(table_id), multithreaded(multithreaded_), limit(limit_), offset(offset_), column_name(column_name_), step(step_)
{ {
StorageInMemoryMetadata storage_metadata; StorageInMemoryMetadata storage_metadata;
/// This column doesn't have a comment, because otherwise it will be added to all the tables which were created via /// This column doesn't have a comment, because otherwise it will be added to all the tables which were created via
/// CREATE TABLE test as numbers(5) /// CREATE TABLE test as numbers(5)
storage_metadata.setColumns(ColumnsDescription({{"number", std::make_shared<DataTypeUInt64>()}})); storage_metadata.setColumns(ColumnsDescription({{column_name_, std::make_shared<DataTypeUInt64>()}}));
setInMemoryMetadata(storage_metadata); setInMemoryMetadata(storage_metadata);
} }
@ -38,5 +44,4 @@ void StorageSystemNumbers::read(
query_plan.addStep(std::make_unique<ReadFromSystemNumbersStep>( query_plan.addStep(std::make_unique<ReadFromSystemNumbersStep>(
column_names, query_info, storage_snapshot, context, shared_from_this(), max_block_size, num_streams)); column_names, query_info, storage_snapshot, context, shared_from_this(), max_block_size, num_streams));
} }
} }

View File

@ -10,7 +10,6 @@ namespace DB
class Context; class Context;
/** Implements a table engine for the system table "numbers". /** Implements a table engine for the system table "numbers".
* The table contains the only column number UInt64. * The table contains the only column number UInt64.
* From this table, you can read all natural numbers, starting from 0 (to 2^64 - 1, and then again). * From this table, you can read all natural numbers, starting from 0 (to 2^64 - 1, and then again).
@ -38,11 +37,18 @@ class Context;
* (and result could be out of order). If both multithreaded and limit are specified, * (and result could be out of order). If both multithreaded and limit are specified,
* the table could give you not exactly 1..limit range, but some arbitrary 'limit' numbers. * the table could give you not exactly 1..limit range, but some arbitrary 'limit' numbers.
*/ */
class StorageSystemNumbers final : public IStorage class StorageSystemNumbers final : public IStorage
{ {
public: public:
/// Otherwise, streams concurrently increment atomic. /// Otherwise, streams concurrently increment atomic.
StorageSystemNumbers(const StorageID & table_id, bool multithreaded_, std::optional<UInt64> limit_ = std::nullopt, UInt64 offset_ = 0); StorageSystemNumbers(
const StorageID & table_id,
bool multithreaded_,
const std::string & column_name,
std::optional<UInt64> limit_ = std::nullopt,
UInt64 offset_ = 0,
UInt64 step_ = 1);
std::string getName() const override { return "SystemNumbers"; } std::string getName() const override { return "SystemNumbers"; }
@ -58,7 +64,6 @@ public:
bool hasEvenlyDistributedRead() const override { return true; } bool hasEvenlyDistributedRead() const override { return true; }
bool isSystemStorage() const override { return true; } bool isSystemStorage() const override { return true; }
bool supportsTransactions() const override { return true; } bool supportsTransactions() const override { return true; }
private: private:
@ -67,6 +72,8 @@ private:
bool multithreaded; bool multithreaded;
std::optional<UInt64> limit; std::optional<UInt64> limit;
UInt64 offset; UInt64 offset;
std::string column_name;
UInt64 step;
}; };
} }

View File

@ -120,8 +120,10 @@ namespace DB
void attachSystemTablesServer(ContextPtr context, IDatabase & system_database, bool has_zookeeper) void attachSystemTablesServer(ContextPtr context, IDatabase & system_database, bool has_zookeeper)
{ {
attachNoDescription<StorageSystemOne>(context, system_database, "one", "This table contains a single row with a single dummy UInt8 column containing the value 0. Used when the table is not specified explicitly, for example in queries like `SELECT 1`."); attachNoDescription<StorageSystemOne>(context, system_database, "one", "This table contains a single row with a single dummy UInt8 column containing the value 0. Used when the table is not specified explicitly, for example in queries like `SELECT 1`.");
attachNoDescription<StorageSystemNumbers>(context, system_database, "numbers", "Generates all natural numbers, starting from 0 (to 2^64 - 1, and then again) in sorted order.", false); attachNoDescription<StorageSystemNumbers>(context, system_database, "numbers", "Generates all natural numbers, starting from 0 (to 2^64 - 1, and then again) in sorted order.", false, "number");
attachNoDescription<StorageSystemNumbers>(context, system_database, "numbers_mt", "Multithreaded version of `system.numbers`. Numbers order is not guaranteed.", true); attachNoDescription<StorageSystemNumbers>(context, system_database, "numbers_mt", "Multithreaded version of `system.numbers`. Numbers order is not guaranteed.", true, "number");
attachNoDescription<StorageSystemNumbers>(context, system_database, "generate_series", "Generates arithmetic progression of natural numbers in sorted order in a given segment with a given step", false, "generate_series");
attachNoDescription<StorageSystemNumbers>(context, system_database, "generateSeries", "Generates arithmetic progression of natural numbers in sorted order in a given segment with a given step", false, "generate_series");
attachNoDescription<StorageSystemZeros>(context, system_database, "zeros", "Produces unlimited number of non-materialized zeros.", false); attachNoDescription<StorageSystemZeros>(context, system_database, "zeros", "Produces unlimited number of non-materialized zeros.", false);
attachNoDescription<StorageSystemZeros>(context, system_database, "zeros_mt", "Multithreaded version of system.zeros.", true); attachNoDescription<StorageSystemZeros>(context, system_database, "zeros_mt", "Multithreaded version of system.zeros.", true);
attach<StorageSystemDatabases>(context, system_database, "databases", "Lists all databases of the current server."); attach<StorageSystemDatabases>(context, system_database, "databases", "Lists all databases of the current server.");

View File

@ -0,0 +1,118 @@
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Context.h>
#include <Interpreters/convertFieldToType.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTFunction.h>
#include <Storages/System/StorageSystemNumbers.h>
#include <TableFunctions/ITableFunction.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Common/FieldVisitorToString.h>
#include <Common/typeid_cast.h>
#include "registerTableFunctions.h"
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int INVALID_SETTING_VALUE;
}
namespace
{
constexpr std::array<const char *, 2> names = {"generate_series", "generateSeries"};
template <size_t alias_num>
class TableFunctionGenerateSeries : public ITableFunction
{
public:
static_assert(alias_num < names.size());
static constexpr auto name = names[alias_num];
std::string getName() const override { return name; }
bool hasStaticStructure() const override { return true; }
private:
StoragePtr executeImpl(
const ASTPtr & ast_function,
ContextPtr context,
const std::string & table_name,
ColumnsDescription cached_columns,
bool is_insert_query) const override;
const char * getStorageTypeName() const override { return "SystemNumbers"; }
UInt64 evaluateArgument(ContextPtr context, ASTPtr & argument) const;
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
};
template <size_t alias_num>
ColumnsDescription TableFunctionGenerateSeries<alias_num>::getActualTableStructure(ContextPtr /*context*/, bool /*is_insert_query*/) const
{
/// NOTE: https://bugs.llvm.org/show_bug.cgi?id=47418
return ColumnsDescription{{{"generate_series", std::make_shared<DataTypeUInt64>()}}};
}
template <size_t alias_num>
StoragePtr TableFunctionGenerateSeries<alias_num>::executeImpl(
const ASTPtr & ast_function,
ContextPtr context,
const std::string & table_name,
ColumnsDescription /*cached_columns*/,
bool /*is_insert_query*/) const
{
if (const auto * function = ast_function->as<ASTFunction>())
{
auto arguments = function->arguments->children;
if (arguments.size() != 2 && arguments.size() != 3)
throw Exception(
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function '{}' requires 'length' or 'offset, length'.", getName());
UInt64 start = evaluateArgument(context, arguments[0]);
UInt64 stop = evaluateArgument(context, arguments[1]);
UInt64 step = (arguments.size() == 3) ? evaluateArgument(context, arguments[2]) : UInt64{1};
if (step == UInt64{0})
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "Table function '{}' requires step to be a positive number", getName());
auto res = (start > stop)
? std::make_shared<StorageSystemNumbers>(
StorageID(getDatabaseName(), table_name), false, std::string{"generate_series"}, 0, 0, 1)
: std::make_shared<StorageSystemNumbers>(
StorageID(getDatabaseName(), table_name), false, std::string{"generate_series"}, (stop - start) + 1, start, step);
res->startup();
return res;
}
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function '{}' requires 'limit' or 'offset, limit'.", getName());
}
template <size_t alias_num>
UInt64 TableFunctionGenerateSeries<alias_num>::evaluateArgument(ContextPtr context, ASTPtr & argument) const
{
const auto & [field, type] = evaluateConstantExpression(argument, context);
if (!isNativeNumber(type))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} expression, must be numeric type", type->getName());
Field converted = convertFieldToType(field, DataTypeUInt64());
if (converted.isNull())
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"The value {} is not representable as UInt64",
applyVisitor(FieldVisitorToString(), field));
return converted.safeGet<UInt64>();
}
}
void registerTableFunctionGenerateSeries(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionGenerateSeries<0>>({.documentation = {}, .allow_readonly = true});
factory.registerFunction<TableFunctionGenerateSeries<1>>({.documentation = {}, .allow_readonly = true});
}
}

View File

@ -1,13 +1,15 @@
#include <optional>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Context.h>
#include <Interpreters/convertFieldToType.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTFunction.h>
#include <Storages/System/StorageSystemNumbers.h>
#include <TableFunctions/ITableFunction.h> #include <TableFunctions/ITableFunction.h>
#include <TableFunctions/TableFunctionFactory.h> #include <TableFunctions/TableFunctionFactory.h>
#include <Parsers/ASTFunction.h>
#include <Common/typeid_cast.h>
#include <Common/FieldVisitorToString.h> #include <Common/FieldVisitorToString.h>
#include <Storages/System/StorageSystemNumbers.h> #include <Common/typeid_cast.h>
#include <Interpreters/evaluateConstantExpression.h> #include "base/types.h"
#include <Interpreters/convertFieldToType.h>
#include <Interpreters/Context.h>
#include <DataTypes/DataTypesNumber.h>
#include "registerTableFunctions.h" #include "registerTableFunctions.h"
@ -16,8 +18,8 @@ namespace DB
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ILLEGAL_TYPE_OF_ARGUMENT; extern const int ILLEGAL_TYPE_OF_ARGUMENT;
} }
namespace namespace
@ -34,8 +36,14 @@ public:
static constexpr auto name = multithreaded ? "numbers_mt" : "numbers"; static constexpr auto name = multithreaded ? "numbers_mt" : "numbers";
std::string getName() const override { return name; } std::string getName() const override { return name; }
bool hasStaticStructure() const override { return true; } bool hasStaticStructure() const override { return true; }
private: private:
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override; StoragePtr executeImpl(
const ASTPtr & ast_function,
ContextPtr context,
const std::string & table_name,
ColumnsDescription cached_columns,
bool is_insert_query) const override;
const char * getStorageTypeName() const override { return "SystemNumbers"; } const char * getStorageTypeName() const override { return "SystemNumbers"; }
UInt64 evaluateArgument(ContextPtr context, ASTPtr & argument) const; UInt64 evaluateArgument(ContextPtr context, ASTPtr & argument) const;
@ -51,19 +59,27 @@ ColumnsDescription TableFunctionNumbers<multithreaded>::getActualTableStructure(
} }
template <bool multithreaded> template <bool multithreaded>
StoragePtr TableFunctionNumbers<multithreaded>::executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool /*is_insert_query*/) const StoragePtr TableFunctionNumbers<multithreaded>::executeImpl(
const ASTPtr & ast_function,
ContextPtr context,
const std::string & table_name,
ColumnsDescription /*cached_columns*/,
bool /*is_insert_query*/) const
{ {
if (const auto * function = ast_function->as<ASTFunction>()) if (const auto * function = ast_function->as<ASTFunction>())
{ {
auto arguments = function->arguments->children; auto arguments = function->arguments->children;
if (arguments.size() != 1 && arguments.size() != 2) if ((arguments.empty()) || (arguments.size() >= 4))
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function '{}' requires 'length' or 'offset, length'.", getName()); throw Exception(
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function '{}' requires 'length' or 'offset, length'.", getName());
UInt64 offset = arguments.size() == 2 ? evaluateArgument(context, arguments[0]) : 0; UInt64 offset = arguments.size() >= 2 ? evaluateArgument(context, arguments[0]) : 0;
UInt64 length = arguments.size() == 2 ? evaluateArgument(context, arguments[1]) : evaluateArgument(context, arguments[0]); UInt64 length = arguments.size() >= 2 ? evaluateArgument(context, arguments[1]) : evaluateArgument(context, arguments[0]);
UInt64 step = arguments.size() == 3 ? evaluateArgument(context, arguments[2]) : 1;
auto res = std::make_shared<StorageSystemNumbers>(StorageID(getDatabaseName(), table_name), multithreaded, length, offset); auto res = std::make_shared<StorageSystemNumbers>(
StorageID(getDatabaseName(), table_name), multithreaded, std::string{"number"}, length, offset, step);
res->startup(); res->startup();
return res; return res;
} }
@ -80,8 +96,10 @@ UInt64 TableFunctionNumbers<multithreaded>::evaluateArgument(ContextPtr context,
Field converted = convertFieldToType(field, DataTypeUInt64()); Field converted = convertFieldToType(field, DataTypeUInt64());
if (converted.isNull()) if (converted.isNull())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "The value {} is not representable as UInt64", throw Exception(
applyVisitor(FieldVisitorToString(), field)); ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"The value {} is not representable as UInt64",
applyVisitor(FieldVisitorToString(), field));
return converted.safeGet<UInt64>(); return converted.safeGet<UInt64>();
} }

View File

@ -11,6 +11,7 @@ void registerTableFunctions()
registerTableFunctionMerge(factory); registerTableFunctionMerge(factory);
registerTableFunctionRemote(factory); registerTableFunctionRemote(factory);
registerTableFunctionNumbers(factory); registerTableFunctionNumbers(factory);
registerTableFunctionGenerateSeries(factory);
registerTableFunctionNull(factory); registerTableFunctionNull(factory);
registerTableFunctionZeros(factory); registerTableFunctionZeros(factory);
registerTableFunctionExecutable(factory); registerTableFunctionExecutable(factory);

View File

@ -8,6 +8,7 @@ class TableFunctionFactory;
void registerTableFunctionMerge(TableFunctionFactory & factory); void registerTableFunctionMerge(TableFunctionFactory & factory);
void registerTableFunctionRemote(TableFunctionFactory & factory); void registerTableFunctionRemote(TableFunctionFactory & factory);
void registerTableFunctionNumbers(TableFunctionFactory & factory); void registerTableFunctionNumbers(TableFunctionFactory & factory);
void registerTableFunctionGenerateSeries(TableFunctionFactory & factory);
void registerTableFunctionNull(TableFunctionFactory & factory); void registerTableFunctionNull(TableFunctionFactory & factory);
void registerTableFunctionZeros(TableFunctionFactory & factory); void registerTableFunctionZeros(TableFunctionFactory & factory);
void registerTableFunctionExecutable(TableFunctionFactory & factory); void registerTableFunctionExecutable(TableFunctionFactory & factory);

View File

@ -4,6 +4,8 @@ dictionary
executable executable
file file
generateRandom generateRandom
generateSeries
generate_series
input input
jdbc jdbc
merge merge

View File

@ -0,0 +1,28 @@
0
1
4
8
501
50
17928
0
10
13
16
19
7
17
27
37
47
57
67
77
17
22
27
32
37
42
47
52

View File

@ -0,0 +1,14 @@
SELECT count() FROM generate_series(5, 4);
SELECT count() FROM generate_series(0, 0);
SELECT count() FROM generate_series(10, 20, 3);
SELECT count() FROM generate_series(7, 77, 10);
SELECT count() FROM generate_series(0, 1000, 2);
SELECT count() FROM generate_series(0, 999, 20);
SELECT sum(generate_series) FROM generate_series(4, 1008, 4) WHERE generate_series % 7 = 1;
SELECT * FROM generate_series(5, 4);
SELECT * FROM generate_series(0, 0);
SELECT * FROM generate_series(10, 20, 3);
SELECT * FROM generate_series(7, 77, 10);
SELECT * FROM generate_series(7, 52, 5) WHERE generate_series >= 13;

View File

@ -1,4 +1,4 @@
SYSTEM FLUSH LOGS; SYSTEM FLUSH LOGS;
SELECT 'Column ' || name || ' from table ' || concat(database, '.', table) || ' should have a comment' SELECT 'Column ' || name || ' from table ' || concat(database, '.', table) || ' should have a comment'
FROM system.columns FROM system.columns
WHERE (database = 'system') AND (comment = '') AND (table NOT ILIKE '%_log_%') AND (table NOT IN ('numbers', 'numbers_mt', 'one')) AND (default_kind != 'ALIAS'); WHERE (database = 'system') AND (comment = '') AND (table NOT ILIKE '%_log_%') AND (table NOT IN ('numbers', 'numbers_mt', 'one', 'generate_series', 'generateSeries')) AND (default_kind != 'ALIAS');

View File

@ -1584,6 +1584,7 @@ gccMurmurHash
gcem gcem
generateRandom generateRandom
generateRandomStructure generateRandomStructure
generateSeries
generateULID generateULID
generateUUIDv generateUUIDv
geoDistance geoDistance