fix the calculation for moving frame start

This commit is contained in:
Alexander Kuzmenkov 2021-02-03 08:53:21 +03:00
parent 8bd026271a
commit 7e945bab03
4 changed files with 130 additions and 79 deletions

View File

@ -34,7 +34,15 @@ public:
std::optional<std::string> file;
std::optional<UInt64> line;
};
static constexpr size_t capacity = 32;
static constexpr size_t capacity =
#ifndef NDEBUG
/* The stacks are normally larger in debug version due to less inlining. */
64
#else
32
#endif
;
using FramePointers = std::array<void *, capacity>;
using Frames = std::array<Frame, capacity>;

View File

@ -348,49 +348,6 @@ void WindowTransform::advanceFrameStart()
// further.
assert(frame_started);
}
// We're very dumb and have to reinitialize aggregate functions if the frame
// start changed. No point in doing it if we don't yet know where the frame
// starts.
if (!frame_started)
{
return;
}
// frame_end value might not be valid yet, but we know that it is greater or
// equal than frame_start. If it's less than the new frame_start, we have to
// skip rows between frame_end and frame_start, because they are not in the
// frame and must not contribute to the value of aggregate functions.
if (frame_end < frame_start)
{
frame_end = frame_start;
}
for (auto & ws : workspaces)
{
const auto & f = ws.window_function;
const auto * a = f.aggregate_function.get();
auto * buf = ws.aggregate_function_state.data();
a->destroy(buf);
a->create(buf);
for (auto row = frame_start; row < frame_end; advanceRowNumber(row))
{
if (row.block != ws.cached_block_number)
{
ws.argument_columns.clear();
for (const auto i : ws.argument_column_indices)
{
ws.argument_columns.push_back(inputAt(row)[i].get());
}
ws.cached_block_number = row.block;
}
a->add(buf, ws.argument_columns.data(), row.row, arena.get());
// fmt::print(stderr, "(1) add row {}\n", row.row);
}
}
}
bool WindowTransform::arePeers(const RowNumber & x, const RowNumber & y) const
@ -516,7 +473,6 @@ void WindowTransform::advanceFrameEnd()
switch (window_description.frame.end_type)
{
case WindowFrame::BoundaryType::Current:
// The only frame end we have for now is CURRENT ROW.
advanceFrameEndCurrentRow();
break;
case WindowFrame::BoundaryType::Unbounded:
@ -536,45 +492,81 @@ void WindowTransform::advanceFrameEnd()
{
return;
}
}
// Add the rows over which we advanced the frame to the aggregate function
// states. We could have advanced over at most the entire last block.
uint64_t rows_end = frame_end.row;
if (frame_end.row == 0)
// Update the aggregation states after the frame has changed.
void WindowTransform::updateAggregationState()
{
// fmt::print(stderr, "update agg states [{}, {}) -> [{}, {})\n",
// prev_frame_start, prev_frame_end, frame_start, frame_end);
// Assert that the frame boundaries are known, have proper order wrt each
// other, and have not gone back wrt the previous frame.
assert(frame_started);
assert(frame_ended);
assert(frame_start <= frame_end);
assert(prev_frame_start <= prev_frame_end);
assert(prev_frame_start <= frame_start);
assert(prev_frame_end <= frame_end);
// We might have to reset aggregation state and/or add some rows to it.
// Figure out what to do.
bool reset_aggregation = false;
RowNumber rows_to_add_start;
RowNumber rows_to_add_end;
if (frame_start == prev_frame_start)
{
assert(frame_end == blocksEnd());
rows_end = blockRowsNumber(frame_end_before);
// The frame start didn't change, add the tail rows.
reset_aggregation = false;
rows_to_add_start = prev_frame_end;
rows_to_add_end = frame_end;
}
else
{
assert(frame_end_before.block == frame_end.block);
// The frame start changed, reset the state and aggregate over the
// entire frame. This can be made per-function after we learn to
// subtract rows from some types of aggregation states, but for now we
// always have to reset when the frame start changes.
reset_aggregation = true;
rows_to_add_start = frame_start;
rows_to_add_end = frame_end;
}
// Equality would mean "no data to process", for which we checked above.
assert(frame_end_before.row < rows_end);
for (auto & ws : workspaces)
{
if (frame_end_before.block != ws.cached_block_number)
{
const auto & block
= blocks[frame_end_before.block - first_block_number];
ws.argument_columns.clear();
for (const auto i : ws.argument_column_indices)
{
ws.argument_columns.push_back(block.input_columns[i].get());
}
ws.cached_block_number = frame_end_before.block;
}
const auto * a = ws.window_function.aggregate_function.get();
auto * buf = ws.aggregate_function_state.data();
auto * columns = ws.argument_columns.data();
for (auto row = frame_end_before.row; row < rows_end; ++row)
if (reset_aggregation)
{
// fmt::print(stderr, "(2) reset aggregation\n");
a->destroy(buf);
a->create(buf);
}
for (auto row = rows_to_add_start; row < rows_to_add_end;
advanceRowNumber(row))
{
if (row.block != ws.cached_block_number)
{
const auto & block
= blocks[row.block - first_block_number];
ws.argument_columns.clear();
for (const auto i : ws.argument_column_indices)
{
ws.argument_columns.push_back(block.input_columns[i].get());
}
ws.cached_block_number = row.block;
}
// fmt::print(stderr, "(2) add row {}\n", row);
a->add(buf, columns, row, arena.get());
auto * columns = ws.argument_columns.data();
a->add(buf, columns, row.row, arena.get());
}
}
prev_frame_start = frame_start;
prev_frame_end = frame_end;
}
void WindowTransform::writeOutCurrentRow()
@ -646,8 +638,11 @@ void WindowTransform::appendChunk(Chunk & chunk)
// which is precisely the definition of `partition_end`.
while (current_row < partition_end)
{
// Advance the frame start, updating the state of the aggregate
// functions.
// fmt::print(stderr, "(1) row {} frame [{}, {}) {}, {}\n",
// current_row, frame_start, frame_end,
// frame_started, frame_ended);
// Advance the frame start.
advanceFrameStart();
if (!frame_started)
@ -655,15 +650,19 @@ void WindowTransform::appendChunk(Chunk & chunk)
// Wait for more input data to find the start of frame.
assert(!input_is_finished);
assert(!partition_ended);
return;
}
// Advance the frame end, updating the state of the aggregate
// functions.
advanceFrameEnd();
// frame_end must be greater or equal than frame_start, so if the
// frame_start is already past the current frame_end, we can start
// from it to save us some work.
if (frame_end < frame_start)
{
frame_end = frame_start;
}
// fmt::print(stderr, "row {} frame [{}, {}) {}, {}\n",
// current_row, frame_start, frame_end,
// frame_started, frame_ended);
// Advance the frame end.
advanceFrameEnd();
if (!frame_ended)
{
@ -673,6 +672,10 @@ void WindowTransform::appendChunk(Chunk & chunk)
return;
}
// fmt::print(stderr, "(2) row {} frame [{}, {}) {}, {}\n",
// current_row, frame_start, frame_end,
// frame_started, frame_ended);
// The frame can be empty sometimes, e.g. the boundaries coincide
// or the start is after the partition end. But hopefully start is
// not after end.
@ -680,6 +683,13 @@ void WindowTransform::appendChunk(Chunk & chunk)
assert(frame_ended);
assert(frame_start <= frame_end);
// Now that we know the new frame boundaries, update the aggregation
// states. Theoretically we could do this simultaneously with moving
// the frame boundaries, but it would require some care not to
// perform unnecessary work while we are still looking for the frame
// start, so do it the simple way for now.
updateAggregationState();
// Write out the aggregation results.
writeOutCurrentRow();
@ -716,6 +726,8 @@ void WindowTransform::appendChunk(Chunk & chunk)
// for now.
frame_start = partition_start;
frame_end = partition_start;
prev_frame_start = partition_start;
prev_frame_end = partition_start;
assert(current_row == partition_start);
// fmt::print(stderr, "reinitialize agg data at start of {}\n",

View File

@ -113,6 +113,7 @@ private:
void advanceFrameEndCurrentRow();
void advanceFrameEndUnbounded();
bool arePeers(const RowNumber & x, const RowNumber & y) const;
void updateAggregationState();
void writeOutCurrentRow();
Columns & inputAt(const RowNumber & x)
@ -254,7 +255,7 @@ public:
RowNumber partition_end;
bool partition_ended = false;
// This is the row for which we are computing the window functions now.
// The row for which we are now computing the window functions.
RowNumber current_row;
// The frame is [frame_start, frame_end) if frame_ended && frame_started,
@ -270,6 +271,12 @@ public:
RowNumber frame_end;
bool frame_ended = false;
bool frame_started = false;
// The previous frame boundaries that correspond to the current state of the
// aggregate function. We use them to determine how to update the aggregation
// state after we find the new frame.
RowNumber prev_frame_start;
RowNumber prev_frame_end;
};
}

View File

@ -25,7 +25,31 @@
select *
from (
select CounterID, UserID, count(*) user_hits,
count() over (partition by CounterID order by user_hits desc)
count()
over (partition by CounterID order by user_hits desc
rows unbounded preceding)
user_rank
from hits_100m_single
where CounterID < 10000
group by CounterID, UserID
)
where user_rank <= 10
format Null
]]></query>
<!--
The RANGE version should give (almost) the same result, because counts
for the top ranking users are probably different, so the ranks won't be
influenced by grouping. But it is going to be slower than ROWS because
of the additional work of finding the group boundaries.
-->
<query><![CDATA[
select *
from (
select CounterID, UserID, count(*) user_hits,
count()
over (partition by CounterID order by user_hits desc
range unbounded preceding)
user_rank
from hits_100m_single
where CounterID < 10000