Fix generate_series

This commit is contained in:
János Benjamin Antal 2024-07-10 11:12:28 +00:00
parent 8b78cf1c61
commit 2442473b25
3 changed files with 41 additions and 11 deletions

View File

@ -36,12 +36,32 @@ inline void iotaWithStepOptimized(T * begin, size_t count, T first_value, T step
iotaWithStep(begin, count, first_value, step);
}
/// The range is defined as [start, end)
UInt64 itemCountInRange(UInt64 start, UInt64 end, UInt64 step)
{
const auto range_count = end - start;
if (step == 1)
return range_count;
return (range_count - 1) / step + 1;
}
class NumbersSource : public ISource
{
public:
NumbersSource(
UInt64 block_size_, UInt64 offset_, std::optional<UInt64> end_, UInt64 chunk_step_, const std::string & column_name, UInt64 step_)
: ISource(createHeader(column_name)), block_size(block_size_), next(offset_), chunk_step(chunk_step_), end(end_), step(step_)
UInt64 block_size_,
UInt64 offset_,
std::optional<UInt64> end_,
const std::string & column_name,
UInt64 step_in_chunk_,
UInt64 step_between_chunks_)
: ISource(createHeader(column_name))
, block_size(block_size_)
, next(offset_)
, end(end_)
, step_in_chunk(step_in_chunk_)
, step_between_chunks(step_between_chunks_)
{
}
String getName() const override { return "Numbers"; }
@ -59,7 +79,10 @@ protected:
{
if (end.value() <= next)
return {};
real_block_size = std::min(block_size, end.value() - next);
auto max_items_to_generate = itemCountInRange(next, *end, step_in_chunk);
real_block_size = std::min(block_size, max_items_to_generate);
}
auto column = ColumnUInt64::create(real_block_size);
ColumnUInt64::Container & vec = column->getData();
@ -69,9 +92,9 @@ protected:
UInt64 * current_end = &vec[real_block_size];
iotaWithStepOptimized(pos, static_cast<size_t>(current_end - pos), curr, step);
iotaWithStepOptimized(pos, static_cast<size_t>(current_end - pos), curr, step_in_chunk);
next += chunk_step;
next += step_between_chunks;
progress(column->size(), column->byteSize());
return {Columns{std::move(column)}, real_block_size};
@ -80,9 +103,9 @@ protected:
private:
UInt64 block_size;
UInt64 next;
UInt64 chunk_step;
std::optional<UInt64> end; /// not included
UInt64 step;
UInt64 step_in_chunk;
UInt64 step_between_chunks;
};
struct RangeWithStep
@ -552,19 +575,24 @@ Pipe ReadFromSystemNumbersStep::makePipe()
});
/// Fall back to NumbersSource
/// Range in a single block
const auto block_range = max_block_size * numbers_storage.step;
/// Step between chunks in a single source.
/// It is bigger than block_range in case of multiple threads, because we have to account for other sources as well.
const auto step_between_chunks = num_streams * block_range;
for (size_t i = 0; i < num_streams; ++i)
{
auto source = std::make_shared<NumbersSource>(
max_block_size,
numbers_storage.offset + i * max_block_size * numbers_storage.step,
numbers_storage.offset + i * block_range,
end,
num_streams * max_block_size * numbers_storage.step,
numbers_storage.column_name,
numbers_storage.step);
numbers_storage.step,
step_between_chunks);
if (numbers_storage.limit && i == 0)
{
auto rows_appr = (*numbers_storage.limit - 1) / numbers_storage.step + 1;
auto rows_appr = itemCountInRange(numbers_storage.offset, *numbers_storage.limit, numbers_storage.step);
if (limit > 0 && limit < rows_appr)
rows_appr = query_info_limit;
source->addTotalRowsApprox(rows_appr);

View File

@ -5,6 +5,7 @@
501
50
17928
17928
0
10
13

View File

@ -5,6 +5,7 @@ SELECT count() FROM generate_series(7, 77, 10);
SELECT count() FROM generate_series(0, 1000, 2);
SELECT count() FROM generate_series(0, 999, 20);
SELECT sum(generate_series) FROM generate_series(4, 1008, 4) WHERE generate_series % 7 = 1;
SELECT sum(generate_series) FROM generate_series(4, 1008, 4) WHERE generate_series % 7 = 1 SETTINGS max_block_size = 71;
SELECT * FROM generate_series(5, 4);
SELECT * FROM generate_series(0, 0);