start offset for ROWS frame

This commit is contained in:
Alexander Kuzmenkov 2021-02-02 02:26:14 +03:00
parent 5e99b4461d
commit ef46c36317
5 changed files with 343 additions and 27 deletions

View File

@ -557,7 +557,15 @@ static bool tryParseFrameDefinition(ASTWindowDefinition * node, IParser::Pos & p
}
else if (parser_literal.parse(pos, ast_literal, expected))
{
node->frame.begin_offset = ast_literal->as<ASTLiteral &>().value.safeGet<Int64>();
const Field & value = ast_literal->as<ASTLiteral &>().value;
if (!isInt64FieldType(value.getType()))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Only integer frame offsets are supported, '{}' is not supported.",
Field::Types::toString(value.getType()));
}
node->frame.begin_offset = value.get<Int64>();
node->frame.begin_type = WindowFrame::BoundaryType::Offset;
}
else
{
@ -603,7 +611,15 @@ static bool tryParseFrameDefinition(ASTWindowDefinition * node, IParser::Pos & p
}
else if (parser_literal.parse(pos, ast_literal, expected))
{
node->frame.end_offset = ast_literal->as<ASTLiteral &>().value.safeGet<Int64>();
const Field & value = ast_literal->as<ASTLiteral &>().value;
if (!isInt64FieldType(value.getType()))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Only integer frame offsets are supported, '{}' is not supported.",
Field::Types::toString(value.getType()));
}
node->frame.end_offset = value.get<Int64>();
node->frame.end_type = WindowFrame::BoundaryType::Offset;
}
else
{

View File

@ -165,16 +165,214 @@ void WindowTransform::advancePartitionEnd()
assert(!partition_ended && partition_end == blocksEnd());
}
void WindowTransform::advanceFrameStart() const
auto WindowTransform::moveRowNumberNoCheck(const RowNumber & _x, int offset) const
{
// Frame start is always UNBOUNDED PRECEDING for now, so we don't have to
// move it. It is initialized when the new partition starts.
if (window_description.frame.begin_type
!= WindowFrame::BoundaryType::Unbounded)
RowNumber x = _x;
if (offset > 0)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Frame start type '{}' is not implemented",
WindowFrame::toString(window_description.frame.begin_type));
for (;;)
{
assertValid(x);
assert(offset >= 0);
const auto block_rows = blockRowsNumber(x);
x.row += offset;
if (x.row >= block_rows)
{
offset = x.row - block_rows;
x.row = 0;
x.block++;
if (x == blocksEnd())
{
break;
}
}
else
{
offset = 0;
break;
}
}
}
else if (offset < 0)
{
for (;;)
{
assertValid(x);
assert(offset <= 0);
if (x.row >= static_cast<uint64_t>(-offset))
{
x.row -= -offset;
offset = 0;
break;
}
if (x.block == first_block_number)
{
break;
}
// offset is negative
offset += (x.row + 1);
--x.block;
x.row = blockRowsNumber(x) - 1;
}
}
return std::tuple{x, offset};
}
auto WindowTransform::moveRowNumber(const RowNumber & _x, int offset) const
{
auto [x, o] = moveRowNumberNoCheck(_x, offset);
#ifndef NDEBUG
// Check that it was reversible.
auto [xx, oo] = moveRowNumberNoCheck(x, -(offset - o));
// fmt::print(stderr, "{} -> {}, result {}, {}, new offset {}, twice {}, {}\n",
// _x, offset, x, o, -(offset - o), xx, oo);
assert(xx == _x);
assert(oo == 0);
#endif
return std::tuple{x, o};
}
void WindowTransform::advanceFrameStartRowsOffset()
{
// Just recalculate it each time by walking blocks.
const auto [moved_row, offset_left] = moveRowNumber(current_row,
window_description.frame.begin_offset);
frame_start = moved_row;
assertValid(frame_start);
// fmt::print(stderr, "frame start {} partition start {}\n", frame_start,
// partition_start);
if (moved_row <= partition_start)
{
// Got to the beginning of partition and can't go further back.
frame_start = partition_start;
frame_started = true;
return;
}
assert(frame_start <= partition_end);
if (frame_start == partition_end && partition_ended)
{
// A FOLLOWING frame start ran into the end of partition.
frame_started = true;
}
assert(partition_start < frame_start);
frame_start = moved_row;
frame_started = offset_left == 0;
}
void WindowTransform::advanceFrameStartChoose()
{
switch (window_description.frame.begin_type)
{
case WindowFrame::BoundaryType::Unbounded:
// UNBOUNDED PRECEDING, just mark it valid. It is initialized when
// the new partition starts.
frame_started = true;
return;
case WindowFrame::BoundaryType::Offset:
switch (window_description.frame.type)
{
case WindowFrame::FrameType::Rows:
advanceFrameStartRowsOffset();
return;
default:
// Fallthrough to the "not implemented" error.
break;
}
break;
default:
// Fallthrough to the "not implemented" error.
break;
}
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Frame start type '{}' for frame '{}' is not implemented",
WindowFrame::toString(window_description.frame.begin_type),
WindowFrame::toString(window_description.frame.type));
}
void WindowTransform::advanceFrameStart()
{
if (frame_started)
{
return;
}
const auto frame_start_before = frame_start;
advanceFrameStartChoose();
if (frame_start == frame_start_before)
{
return;
}
assert(frame_start_before < frame_start);
assert(partition_start <= frame_start);
assert(frame_start <= partition_end);
if (partition_ended && frame_start == partition_end)
{
// Check that if the start of frame (e.g. FOLLOWING) runs into the end
// of partition, it is marked as valid -- we can't advance it any
// further.
assert(frame_started);
}
// We're very dumb and have to reinitialize aggregate functions if the frame
// start changed. No point in doing it if we don't yet know where the frame
// starts.
if (!frame_started)
{
return;
}
// frame_end value might not be valid yet, but we know that it is greater or
// equal than frame_start. If it's less than the new frame_start, we have to
// skip rows between frame_end and frame_start, because they are not in the
// frame and must not contribute to the value of aggregate functions.
if (frame_end < frame_start)
{
frame_end = frame_start;
}
for (auto & ws : workspaces)
{
const auto & f = ws.window_function;
const auto * a = f.aggregate_function.get();
auto * buf = ws.aggregate_function_state.data();
a->destroy(buf);
a->create(buf);
for (auto row = frame_start; row < frame_end; advanceRowNumber(row))
{
if (row.block != ws.cached_block_number)
{
ws.argument_columns.clear();
for (const auto i : ws.argument_column_indices)
{
ws.argument_columns.push_back(inputAt(row)[i].get());
}
ws.cached_block_number = row.block;
}
a->add(buf, ws.argument_columns.data(), row.row, arena.get());
// fmt::print(stderr, "(1) add row {}\n", row.row);
}
}
}
@ -356,6 +554,7 @@ void WindowTransform::advanceFrameEnd()
auto * columns = ws.argument_columns.data();
for (auto row = frame_end_before.row; row < rows_end; ++row)
{
// fmt::print(stderr, "(2) add row {}\n", row);
a->add(buf, columns, row, arena.get());
}
}
@ -414,8 +613,8 @@ void WindowTransform::appendChunk(Chunk & chunk)
for (;;)
{
advancePartitionEnd();
// fmt::print(stderr, "partition [?, {}), {}\n",
// partition_end, partition_ended);
// fmt::print(stderr, "partition [{}, {}), {}\n",
// partition_start, partition_end, partition_ended);
// Either we ran out of data or we found the end of partition (maybe
// both, but this only happens at the total end of data).
@ -433,12 +632,21 @@ void WindowTransform::appendChunk(Chunk & chunk)
// Advance the frame start, updating the state of the aggregate
// functions.
advanceFrameStart();
if (!frame_started)
{
// Wait for more input data to find the start of frame.
assert(!input_is_finished);
assert(!partition_ended);
}
// Advance the frame end, updating the state of the aggregate
// functions.
advanceFrameEnd();
// fmt::print(stderr, "row {} frame [{}, {}) {}\n",
// current_row, frame_start, frame_end, frame_ended);
// fmt::print(stderr, "row {} frame [{}, {}) {}, {}\n",
// current_row, frame_start, frame_end,
// frame_started, frame_ended);
if (!frame_ended)
{
@ -448,8 +656,10 @@ void WindowTransform::appendChunk(Chunk & chunk)
return;
}
// The frame shouldn't be empty (probably?).
assert(frame_start < frame_end);
// The frame can be empty sometimes, e.g. the boundaries coincide
// or the start is after the partition end. But hopefully start is
// not after end.
assert(frame_start <= frame_end);
// Write out the aggregation results.
writeOutCurrentRow();
@ -458,6 +668,7 @@ void WindowTransform::appendChunk(Chunk & chunk)
advanceRowNumber(current_row);
first_not_ready_row = current_row;
frame_ended = false;
frame_started = false;
}
if (input_is_finished)
@ -478,15 +689,15 @@ void WindowTransform::appendChunk(Chunk & chunk)
}
// Start the next partition.
const auto new_partition_start = partition_end;
partition_start = partition_end;
advanceRowNumber(partition_end);
partition_ended = false;
// We have to reset the frame when the new partition starts. This is not a
// generally correct way to do so, but we don't really support moving frame
// for now.
frame_start = new_partition_start;
frame_end = new_partition_start;
assert(current_row == new_partition_start);
frame_start = partition_start;
frame_end = partition_start;
assert(current_row == partition_start);
// fmt::print(stderr, "reinitialize agg data at start of {}\n",
// new_partition_start);
@ -534,6 +745,15 @@ IProcessor::Status WindowTransform::prepare()
return Status::Finished;
}
if (output_data.exception)
{
// An exception occurred during processing.
output.pushData(std::move(output_data));
output.finish();
input.close();
return Status::Finished;
}
assert(first_not_ready_row.block >= first_block_number);
// The first_not_ready_row might be past-the-end if we have already
// calculated the window functions for all input rows. That's why the

View File

@ -53,6 +53,11 @@ struct RowNumber
{
return block == other.block && row == other.row;
}
bool operator <= (const RowNumber & other) const
{
return *this < other || *this == other;
}
};
/*
@ -101,7 +106,9 @@ public:
private:
void advancePartitionEnd();
void advanceFrameStart() const;
void advanceFrameStart();
void advanceFrameStartChoose();
void advanceFrameStartRowsOffset();
void advanceFrameEnd();
void advanceFrameEndCurrentRow();
void advanceFrameEndUnbounded();
@ -169,9 +176,28 @@ private:
#endif
}
auto moveRowNumber(const RowNumber & _x, int offset) const;
auto moveRowNumberNoCheck(const RowNumber & _x, int offset) const;
void assertValid(const RowNumber & x) const
{
assert(x.block >= first_block_number);
if (x.block == first_block_number + blocks.size())
{
assert(x.row == 0);
}
else
{
assert(x.row < blockRowsNumber(x));
}
}
RowNumber blocksEnd() const
{ return RowNumber{first_block_number + blocks.size(), 0}; }
RowNumber blocksBegin() const
{ return RowNumber{first_block_number, 0}; }
public:
/*
* Data (formerly) inherited from ISimpleTransform, needed for the
@ -217,18 +243,22 @@ public:
// Used to determine which resulting blocks we can pass to the consumer.
RowNumber first_not_ready_row;
// We don't keep the pointer to start of partition, because we don't really
// need it, and we want to be able to drop the starting blocks to save memory.
// The `partition_end` is past-the-end, as usual. When partition_ended = false,
// it still haven't ended, and partition_end is the next row to check.
// Boundaries of the current partition.
// partition_start doesn't point to a valid block, because we want to drop
// the blocks early to save memory. We still have track it so that we can
// cut off a PRECEDING frame at the partition start.
// The `partition_end` is past-the-end, as usual. When
// partition_ended = false, it still haven't ended, and partition_end is the
// next row to check.
RowNumber partition_start;
RowNumber partition_end;
bool partition_ended = false;
// This is the row for which we are computing the window functions now.
RowNumber current_row;
// The frame is [frame_start, frame_end) if frame_ended, and unknown
// otherwise. Note that when we move to the next row, both the
// The frame is [frame_start, frame_end) if frame_ended && frame_started,
// and unknown otherwise. Note that when we move to the next row, both the
// frame_start and the frame_end may jump forward by an unknown amount of
// blocks, e.g. if we use a RANGE frame. This means that sometimes we don't
// know neither frame_end nor frame_start.
@ -239,6 +269,7 @@ public:
RowNumber frame_start;
RowNumber frame_end;
bool frame_ended = false;
bool frame_started = false;
};
}

View File

@ -516,3 +516,43 @@ settings max_block_size = 2;
27 27 29 29
27 27 29 29
30 30 30 30
-- ROWS offset frame start
select number, p,
count(*) over (partition by p order by number
rows between 1 preceding and unbounded following),
count(*) over (partition by p order by number
rows between 1 following and unbounded following)
from (select number, intDiv(number, 5) p from numbers(31))
order by p, number
settings max_block_size = 2;
0 0 5 4
1 0 5 3
2 0 4 2
3 0 3 1
4 0 2 0
5 1 5 4
6 1 5 3
7 1 4 2
8 1 3 1
9 1 2 0
10 2 5 4
11 2 5 3
12 2 4 2
13 2 3 1
14 2 2 0
15 3 5 4
16 3 5 3
17 3 4 2
18 3 3 1
19 3 2 0
20 4 5 4
21 4 5 3
22 4 4 2
23 4 3 1
24 4 2 0
25 5 5 4
26 5 5 3
27 5 4 2
28 5 3 1
29 5 2 0
30 6 1 0

View File

@ -163,3 +163,12 @@ window
rows between unbounded preceding and unbounded following)
settings max_block_size = 2;
-- ROWS offset frame start
select number, p,
count(*) over (partition by p order by number
rows between 1 preceding and unbounded following),
count(*) over (partition by p order by number
rows between 1 following and unbounded following)
from (select number, intDiv(number, 5) p from numbers(31))
order by p, number
settings max_block_size = 2;