ClickHouse/src/Processors/Transforms/WindowTransform.h

313 lines
9.4 KiB
C++
Raw Normal View History

2020-12-10 22:16:58 +00:00
#pragma once
2021-01-30 01:16:44 +00:00
#include <Interpreters/WindowDescription.h>
#include <Processors/IProcessor.h>
2020-12-15 00:36:03 +00:00
#include <Common/AlignedBuffer.h>
2021-01-22 23:03:07 +00:00
#include <deque>
2020-12-10 22:16:58 +00:00
namespace DB
{
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
2020-12-15 00:36:03 +00:00
class Arena;
2021-01-27 00:08:15 +00:00
// Runtime data for computing one window function.
2020-12-15 00:36:03 +00:00
struct WindowFunctionWorkspace
{
WindowFunctionDescription window_function;
AlignedBuffer aggregate_function_state;
std::vector<size_t> argument_column_indices;
// Argument columns. Be careful, this is a per-block cache.
2020-12-15 00:36:03 +00:00
std::vector<const IColumn *> argument_columns;
uint64_t cached_block_number = std::numeric_limits<uint64_t>::max();
2021-01-22 23:03:07 +00:00
};
struct WindowTransformBlock
{
Columns input_columns;
MutableColumns output_columns;
// Even in case of `count() over ()` we should have a dummy input column.
// Not sure how reliable this is...
size_t numRows() const { return input_columns[0]->size(); }
};
struct RowNumber
{
uint64_t block = 0;
2021-01-25 15:07:09 +00:00
uint64_t row = 0;
2021-01-22 23:03:07 +00:00
bool operator < (const RowNumber & other) const
{
return block < other.block
|| (block == other.block && row < other.row);
}
bool operator == (const RowNumber & other) const
{
return block == other.block && row == other.row;
}
2021-02-01 23:26:14 +00:00
bool operator <= (const RowNumber & other) const
{
return *this < other || *this == other;
}
2020-12-15 00:36:03 +00:00
};
2020-12-18 17:13:28 +00:00
/*
* Computes several window functions that share the same window. The input must
2021-01-27 00:08:15 +00:00
* be sorted by PARTITION BY (in any order), then by ORDER BY.
2021-01-22 23:03:07 +00:00
* We need to track the following pointers:
2021-01-27 00:08:15 +00:00
* 1) boundaries of partition -- rows that compare equal w/PARTITION BY.
2021-01-28 13:42:18 +00:00
* 2) current row for which we will compute the window functions.
* 3) boundaries of the frame for this row.
2021-01-27 00:08:15 +00:00
* Both the peer group and the frame are inside the partition, but can have any
* position relative to each other.
2021-01-28 13:42:18 +00:00
* All pointers only move forward. For partition boundaries, this is ensured by
* the order of input data. This property also trivially holds for the ROWS and
* GROUPS frames. For the RANGE frame, the proof requires the additional fact
* that the ranges are specified in terms of (the single) ORDER BY column.
2020-12-18 17:13:28 +00:00
*/
2021-01-20 07:10:54 +00:00
class WindowTransform : public IProcessor /* public ISimpleTransform */
2020-12-10 22:16:58 +00:00
{
public:
WindowTransform(
2020-12-22 01:41:02 +00:00
const Block & input_header_,
const Block & output_header_,
const WindowDescription & window_description_,
const std::vector<WindowFunctionDescription> &
2021-01-20 17:23:15 +00:00
functions);
2020-12-15 00:36:03 +00:00
~WindowTransform() override;
2020-12-10 22:16:58 +00:00
String getName() const override
{
return "WindowTransform";
}
static Block transformHeader(Block header, const ExpressionActionsPtr & expression);
2021-01-20 07:10:54 +00:00
/*
2021-01-20 17:23:15 +00:00
* (former) Implementation of ISimpleTransform.
2021-01-20 07:10:54 +00:00
*/
2021-01-22 23:03:07 +00:00
void appendChunk(Chunk & chunk) /*override*/;
2021-01-20 07:10:54 +00:00
/*
* Implementation of IProcessor;
*/
Status prepare() override;
void work() override;
2020-12-10 22:16:58 +00:00
2021-01-22 23:03:07 +00:00
private:
void advancePartitionEnd();
2021-02-01 23:26:14 +00:00
void advanceFrameStart();
void advanceFrameStartRowsOffset();
2021-02-05 09:13:19 +00:00
void advanceFrameStartRangeOffsetDispatch();
template <typename T>
void advanceFrameStartRangeOffset();
2021-01-28 13:42:18 +00:00
void advanceFrameEndCurrentRow();
2021-01-30 01:16:44 +00:00
void advanceFrameEndUnbounded();
2021-02-04 07:41:09 +00:00
void advanceFrameEndRowsOffset();
void advanceFrameEnd();
2021-01-28 13:42:18 +00:00
bool arePeers(const RowNumber & x, const RowNumber & y) const;
void updateAggregationState();
2021-01-28 13:42:18 +00:00
void writeOutCurrentRow();
2021-01-22 23:03:07 +00:00
Columns & inputAt(const RowNumber & x)
{
assert(x.block >= first_block_number);
assert(x.block - first_block_number < blocks.size());
return blocks[x.block - first_block_number].input_columns;
}
const Columns & inputAt(const RowNumber & x) const
{ return const_cast<WindowTransform *>(this)->inputAt(x); }
2021-01-26 17:59:38 +00:00
size_t blockRowsNumber(const RowNumber & x) const
{
return inputAt(x)[0]->size();
}
2021-01-22 23:03:07 +00:00
MutableColumns & outputAt(const RowNumber & x)
{
assert(x.block >= first_block_number);
assert(x.block - first_block_number < blocks.size());
return blocks[x.block - first_block_number].output_columns;
}
void advanceRowNumber(RowNumber & x) const
{
assert(x.block >= first_block_number);
assert(x.block - first_block_number < blocks.size());
2021-01-25 15:07:09 +00:00
const auto block_rows = inputAt(x)[0]->size();
2021-01-22 23:03:07 +00:00
assert(x.row < block_rows);
x.row++;
if (x.row < block_rows)
{
return;
}
x.row = 0;
++x.block;
}
void retreatRowNumber(RowNumber & x) const
{
if (x.row > 0)
{
--x.row;
return;
}
--x.block;
assert(x.block >= first_block_number);
assert(x.block < first_block_number + blocks.size());
assert(inputAt(x)[0]->size() > 0);
x.row = inputAt(x)[0]->size() - 1;
#ifndef NDEBUG
auto xx = x;
advanceRowNumber(xx);
assert(xx == x);
#endif
}
2021-02-01 23:26:14 +00:00
auto moveRowNumber(const RowNumber & _x, int offset) const;
auto moveRowNumberNoCheck(const RowNumber & _x, int offset) const;
void assertValid(const RowNumber & x) const
{
assert(x.block >= first_block_number);
if (x.block == first_block_number + blocks.size())
{
assert(x.row == 0);
}
else
{
assert(x.row < blockRowsNumber(x));
}
}
2021-01-22 23:03:07 +00:00
RowNumber blocksEnd() const
{ return RowNumber{first_block_number + blocks.size(), 0}; }
2021-02-01 23:26:14 +00:00
RowNumber blocksBegin() const
{ return RowNumber{first_block_number, 0}; }
2020-12-15 00:36:03 +00:00
public:
2021-01-20 07:10:54 +00:00
/*
2021-01-28 13:42:18 +00:00
* Data (formerly) inherited from ISimpleTransform, needed for the
* implementation of the IProcessor interface.
2021-01-20 07:10:54 +00:00
*/
InputPort & input;
OutputPort & output;
bool has_input = false;
2021-01-22 23:03:07 +00:00
bool input_is_finished = false;
2021-01-20 07:10:54 +00:00
Port::Data input_data;
bool has_output = false;
Port::Data output_data;
/*
* Data for window transform itself.
*/
2020-12-15 00:36:03 +00:00
Block input_header;
WindowDescription window_description;
2020-12-18 17:13:28 +00:00
2020-12-16 12:57:47 +00:00
// Indices of the PARTITION BY columns in block.
2020-12-15 00:36:03 +00:00
std::vector<size_t> partition_by_indices;
2021-01-22 23:03:07 +00:00
// Indices of the ORDER BY columns in block;
std::vector<size_t> order_by_indices;
2020-12-18 17:13:28 +00:00
2021-01-22 23:03:07 +00:00
// Per-window-function scratch spaces.
2020-12-15 00:36:03 +00:00
std::vector<WindowFunctionWorkspace> workspaces;
2021-01-22 23:03:07 +00:00
// FIXME Reset it when the partition changes. We only save the temporary
// states in it (probably?).
2020-12-15 00:36:03 +00:00
std::unique_ptr<Arena> arena;
2021-01-22 23:03:07 +00:00
// A sliding window of blocks we currently need. We add the input blocks as
// they arrive, and discard the blocks we don't need anymore. The blocks
// have an always-incrementing index. The index of the first block is in
// `first_block_number`.
std::deque<WindowTransformBlock> blocks;
uint64_t first_block_number = 0;
// The next block we are going to pass to the consumer.
uint64_t next_output_block_number = 0;
// The first row for which we still haven't calculated the window functions.
// Used to determine which resulting blocks we can pass to the consumer.
RowNumber first_not_ready_row;
2021-02-01 23:26:14 +00:00
// Boundaries of the current partition.
// partition_start doesn't point to a valid block, because we want to drop
2021-02-03 13:41:59 +00:00
// the blocks early to save memory. We still have to track it so that we can
2021-02-01 23:26:14 +00:00
// cut off a PRECEDING frame at the partition start.
// The `partition_end` is past-the-end, as usual. When
// partition_ended = false, it still haven't ended, and partition_end is the
// next row to check.
RowNumber partition_start;
2021-01-22 23:03:07 +00:00
RowNumber partition_end;
bool partition_ended = false;
// The row for which we are now computing the window functions.
2021-01-28 13:42:18 +00:00
RowNumber current_row;
// The start of current peer group, needed for CURRENT ROW frame start.
// For ROWS frame, always equal to the current row, and for RANGE and GROUP
// frames may be earlier.
RowNumber peer_group_start;
2021-01-22 23:03:07 +00:00
2021-02-01 23:26:14 +00:00
// The frame is [frame_start, frame_end) if frame_ended && frame_started,
// and unknown otherwise. Note that when we move to the next row, both the
2021-01-27 00:08:15 +00:00
// frame_start and the frame_end may jump forward by an unknown amount of
// blocks, e.g. if we use a RANGE frame. This means that sometimes we don't
// know neither frame_end nor frame_start.
// We update the states of the window functions as we track the frame
// boundaries.
2021-01-22 23:03:07 +00:00
// After we have found the final boundaries of the frame, we can immediately
2021-01-28 13:42:18 +00:00
// output the result for the current row, w/o waiting for more data.
2021-01-22 23:03:07 +00:00
RowNumber frame_start;
RowNumber frame_end;
bool frame_ended = false;
2021-02-01 23:26:14 +00:00
bool frame_started = false;
// The previous frame boundaries that correspond to the current state of the
// aggregate function. We use them to determine how to update the aggregation
// state after we find the new frame.
RowNumber prev_frame_start;
RowNumber prev_frame_end;
2020-12-10 22:16:58 +00:00
};
}
2021-01-22 23:03:07 +00:00
/// See https://fmt.dev/latest/api.html#formatting-user-defined-types
template <>
struct fmt::formatter<DB::RowNumber>
{
constexpr auto parse(format_parse_context & ctx)
{
auto it = ctx.begin();
auto end = ctx.end();
/// Only support {}.
if (it != end && *it != '}')
throw format_error("invalid format");
return it;
}
template <typename FormatContext>
auto format(const DB::RowNumber & x, FormatContext & ctx)
{
return format_to(ctx.out(), "{}:{}", x.block, x.row);
}
};