float frames and lag/lead_in_frame

This commit is contained in:
Alexander Kuzmenkov 2021-03-19 02:05:43 +03:00
parent bb460dd7f4
commit 6aa9039f7d
12 changed files with 412 additions and 85 deletions

View File

@ -264,7 +264,7 @@ for query_index in queries_to_run:
try: try:
prewarm_id = f'{query_prefix}.prewarm0' prewarm_id = f'{query_prefix}.prewarm0'
# Will also detect too long queries during warmup stage # Will also detect too long queries during warmup stage
res = c.execute(q, query_id = prewarm_id, settings = {'max_execution_time': 10}) res = c.execute(q, query_id = prewarm_id, settings = {'max_execution_time': args.max_query_seconds})
print(f'prewarm\t{query_index}\t{prewarm_id}\t{conn_index}\t{c.last_query.elapsed}') print(f'prewarm\t{query_index}\t{prewarm_id}\t{conn_index}\t{c.last_query.elapsed}')
except KeyboardInterrupt: except KeyboardInterrupt:
raise raise
@ -311,7 +311,8 @@ for query_index in queries_to_run:
for conn_index, c in enumerate(this_query_connections): for conn_index, c in enumerate(this_query_connections):
try: try:
res = c.execute(q, query_id = run_id) res = c.execute(q, query_id = run_id,
settings = {'max_execution_time': args.max_query_seconds})
except Exception as e: except Exception as e:
# Add query id to the exception to make debugging easier. # Add query id to the exception to make debugging easier.
e.args = (run_id, *e.args) e.args = (run_id, *e.args)

View File

@ -23,7 +23,9 @@ ClickHouse supports the standard grammar for defining windows and window functio
| `GROUPS` frame | not supported | | `GROUPS` frame | not supported |
| Calculating aggregate functions over a frame (`sum(value) over (order by time)`) | all aggregate functions are supported | | Calculating aggregate functions over a frame (`sum(value) over (order by time)`) | all aggregate functions are supported |
| `rank()`, `dense_rank()`, `row_number()` | supported | | `rank()`, `dense_rank()`, `row_number()` | supported |
| `lag/lead(value, offset)` | not supported, replace with `any(value) over (.... rows between <offset> preceding and <offset> preceding)`, or `following` for `lead`| | `lag/lead(value, offset)` | Not supported. Workarounds: |
| | 1) replace with `any(value) over (.... rows between <offset> preceding and <offset> preceding)`, or `following` for `lead`|
| | 2) use `lag_in_frame/lead_in_frame`, which are analogous, but respect the window frame. To get behavior identical to `lag/lead`, use `rows between unbounded preceding and unbounded following` |
## References ## References

View File

@ -946,3 +946,26 @@ void writeFieldText(const Field & x, WriteBuffer & buf);
String toString(const Field & x); String toString(const Field & x);
} }
template <>
struct fmt::formatter<DB::Field>
{
constexpr auto parse(format_parse_context & ctx)
{
auto it = ctx.begin();
auto end = ctx.end();
/// Only support {}.
if (it != end && *it != '}')
throw format_error("invalid format");
return it;
}
template <typename FormatContext>
auto format(const DB::Field & x, FormatContext & ctx)
{
return format_to(ctx.out(), "{}", toString(x));
}
};

View File

@ -1,5 +1,6 @@
#include <Interpreters/WindowDescription.h> #include <Interpreters/WindowDescription.h>
#include <Core/Field.h>
#include <IO/Operators.h> #include <IO/Operators.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
@ -60,7 +61,7 @@ void WindowFrame::toString(WriteBuffer & buf) const
} }
else else
{ {
buf << abs(begin_offset); buf << applyVisitor(FieldVisitorToString(), begin_offset);
buf << " " buf << " "
<< (begin_preceding ? "PRECEDING" : "FOLLOWING"); << (begin_preceding ? "PRECEDING" : "FOLLOWING");
} }
@ -77,7 +78,7 @@ void WindowFrame::toString(WriteBuffer & buf) const
} }
else else
{ {
buf << abs(end_offset); buf << applyVisitor(FieldVisitorToString(), end_offset);
buf << " " buf << " "
<< (end_preceding ? "PRECEDING" : "FOLLOWING"); << (end_preceding ? "PRECEDING" : "FOLLOWING");
} }
@ -121,23 +122,37 @@ void WindowFrame::checkValid() const
if (end_type == BoundaryType::Offset if (end_type == BoundaryType::Offset
&& begin_type == BoundaryType::Offset) && begin_type == BoundaryType::Offset)
{ {
// Frame starting with following rows can't have preceding rows. // Frame start offset must be less or equal that the frame end offset.
if (!(end_preceding && !begin_preceding)) bool begin_less_equal_end;
if (begin_preceding && end_preceding)
{ {
// Frame start offset must be less or equal that the frame end offset. begin_less_equal_end = begin_offset >= end_offset;
const bool begin_before_end
= begin_offset * (begin_preceding ? -1 : 1)
<= end_offset * (end_preceding ? -1 : 1);
if (!begin_before_end)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Frame start offset {} {} does not precede the frame end offset {} {}",
begin_offset, begin_preceding ? "PRECEDING" : "FOLLOWING",
end_offset, end_preceding ? "PRECEDING" : "FOLLOWING");
}
return;
} }
else if (begin_preceding && !end_preceding)
{
begin_less_equal_end = true;
}
else if (!begin_preceding && end_preceding)
{
begin_less_equal_end = false;
}
else if (!begin_preceding && !end_preceding)
{
begin_less_equal_end = begin_offset <= end_offset;
}
else
{
assert(false);
}
if (!begin_less_equal_end)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Frame start offset {} {} does not precede the frame end offset {} {}",
begin_offset, begin_preceding ? "PRECEDING" : "FOLLOWING",
end_offset, end_preceding ? "PRECEDING" : "FOLLOWING");
}
return;
} }
throw Exception(ErrorCodes::BAD_ARGUMENTS, throw Exception(ErrorCodes::BAD_ARGUMENTS,

View File

@ -44,14 +44,13 @@ struct WindowFrame
// Offset might be both preceding and following, controlled by begin_preceding, // Offset might be both preceding and following, controlled by begin_preceding,
// but the offset value must be positive. // but the offset value must be positive.
BoundaryType begin_type = BoundaryType::Unbounded; BoundaryType begin_type = BoundaryType::Unbounded;
// This should have been a Field but I'm getting some crazy linker errors. Field begin_offset = 0;
int64_t begin_offset = 0;
bool begin_preceding = true; bool begin_preceding = true;
// Here as well, Unbounded can only be UNBOUNDED FOLLOWING, and end_preceding // Here as well, Unbounded can only be UNBOUNDED FOLLOWING, and end_preceding
// must be false. // must be false.
BoundaryType end_type = BoundaryType::Current; BoundaryType end_type = BoundaryType::Current;
int64_t end_offset = 0; Field end_offset = 0;
bool end_preceding = false; bool end_preceding = false;

View File

@ -70,7 +70,8 @@ void ASTWindowDefinition::formatImpl(const FormatSettings & settings,
} }
else else
{ {
settings.ostr << abs(frame.begin_offset); settings.ostr << applyVisitor(FieldVisitorToString(),
frame.begin_offset);
settings.ostr << " " settings.ostr << " "
<< (!frame.begin_preceding ? "FOLLOWING" : "PRECEDING"); << (!frame.begin_preceding ? "FOLLOWING" : "PRECEDING");
} }
@ -85,7 +86,8 @@ void ASTWindowDefinition::formatImpl(const FormatSettings & settings,
} }
else else
{ {
settings.ostr << abs(frame.end_offset); settings.ostr << applyVisitor(FieldVisitorToString(),
frame.end_offset);
settings.ostr << " " settings.ostr << " "
<< (!frame.end_preceding ? "FOLLOWING" : "PRECEDING"); << (!frame.end_preceding ? "FOLLOWING" : "PRECEDING");
} }

View File

@ -581,30 +581,20 @@ static bool tryParseFrameDefinition(ASTWindowDefinition * node, IParser::Pos & p
else if (parser_literal.parse(pos, ast_literal, expected)) else if (parser_literal.parse(pos, ast_literal, expected))
{ {
const Field & value = ast_literal->as<ASTLiteral &>().value; const Field & value = ast_literal->as<ASTLiteral &>().value;
if (!isInt64FieldType(value.getType())) if ((node->frame.type == WindowFrame::FrameType::Rows
|| node->frame.type == WindowFrame::FrameType::Groups)
&& !(value.getType() == Field::Types::UInt64
|| (value.getType() == Field::Types::Int64
&& value.get<Int64>() >= 0)))
{ {
throw Exception(ErrorCodes::BAD_ARGUMENTS, throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Only integer frame offsets are supported, '{}' is not supported.", "Frame offset for '{}' frame must be a nonnegative integer, '{}' of type '{}' given.",
WindowFrame::toString(node->frame.type),
applyVisitor(FieldVisitorToString(), value),
Field::Types::toString(value.getType())); Field::Types::toString(value.getType()));
} }
node->frame.begin_offset = value.get<Int64>(); node->frame.begin_offset = value;
node->frame.begin_type = WindowFrame::BoundaryType::Offset; node->frame.begin_type = WindowFrame::BoundaryType::Offset;
// We can easily get a UINT64_MAX here, which doesn't even fit into
// int64_t. Not sure what checks we are going to need here after we
// support floats and dates.
if (node->frame.begin_offset > INT_MAX || node->frame.begin_offset < INT_MIN)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Frame offset must be between {} and {}, but {} is given",
INT_MAX, INT_MIN, node->frame.begin_offset);
}
if (node->frame.begin_offset < 0)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Frame start offset must be greater than zero, {} given",
node->frame.begin_offset);
}
} }
else else
{ {
@ -652,28 +642,20 @@ static bool tryParseFrameDefinition(ASTWindowDefinition * node, IParser::Pos & p
else if (parser_literal.parse(pos, ast_literal, expected)) else if (parser_literal.parse(pos, ast_literal, expected))
{ {
const Field & value = ast_literal->as<ASTLiteral &>().value; const Field & value = ast_literal->as<ASTLiteral &>().value;
if (!isInt64FieldType(value.getType())) if ((node->frame.type == WindowFrame::FrameType::Rows
|| node->frame.type == WindowFrame::FrameType::Groups)
&& !(value.getType() == Field::Types::UInt64
|| (value.getType() == Field::Types::Int64
&& value.get<Int64>() >= 0)))
{ {
throw Exception(ErrorCodes::BAD_ARGUMENTS, throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Only integer frame offsets are supported, '{}' is not supported.", "Frame offset for '{}' frame must be a nonnegative integer, '{}' of type '{}' given.",
WindowFrame::toString(node->frame.type),
applyVisitor(FieldVisitorToString(), value),
Field::Types::toString(value.getType())); Field::Types::toString(value.getType()));
} }
node->frame.end_offset = value.get<Int64>(); node->frame.end_offset = value;
node->frame.end_type = WindowFrame::BoundaryType::Offset; node->frame.end_type = WindowFrame::BoundaryType::Offset;
if (node->frame.end_offset > INT_MAX || node->frame.end_offset < INT_MIN)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Frame offset must be between {} and {}, but {} is given",
INT_MAX, INT_MIN, node->frame.end_offset);
}
if (node->frame.end_offset < 0)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Frame end offset must be greater than zero, {} given",
node->frame.end_offset);
}
} }
else else
{ {

View File

@ -3,6 +3,7 @@
#include <AggregateFunctions/AggregateFunctionFactory.h> #include <AggregateFunctions/AggregateFunctionFactory.h>
#include <Common/Arena.h> #include <Common/Arena.h>
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
#include <DataTypes/getLeastSupertype.h>
#include <Interpreters/ExpressionActions.h> #include <Interpreters/ExpressionActions.h>
#include <Interpreters/convertFieldToType.h> #include <Interpreters/convertFieldToType.h>
@ -27,7 +28,8 @@ public:
virtual ~IWindowFunction() = default; virtual ~IWindowFunction() = default;
// Must insert the result for current_row. // Must insert the result for current_row.
virtual void windowInsertResultInto(IColumn & to, const WindowTransform * transform) = 0; virtual void windowInsertResultInto(const WindowTransform * transform,
size_t function_index) = 0;
}; };
// Compares ORDER BY column values at given rows to find the boundaries of frame: // Compares ORDER BY column values at given rows to find the boundaries of frame:
@ -37,7 +39,7 @@ template <typename ColumnType>
static int compareValuesWithOffset(const IColumn * _compared_column, static int compareValuesWithOffset(const IColumn * _compared_column,
size_t compared_row, const IColumn * _reference_column, size_t compared_row, const IColumn * _reference_column,
size_t reference_row, size_t reference_row,
uint64_t _offset, const Field & _offset,
bool offset_is_preceding) bool offset_is_preceding)
{ {
// Casting the columns to the known type here makes it faster, probably // Casting the columns to the known type here makes it faster, probably
@ -46,7 +48,7 @@ static int compareValuesWithOffset(const IColumn * _compared_column,
_compared_column); _compared_column);
const auto * reference_column = assert_cast<const ColumnType *>( const auto * reference_column = assert_cast<const ColumnType *>(
_reference_column); _reference_column);
const auto offset = static_cast<typename ColumnType::ValueType>(_offset); const auto offset = _offset.get<typename ColumnType::ValueType>();
const auto compared_value_data = compared_column->getDataAt(compared_row); const auto compared_value_data = compared_column->getDataAt(compared_row);
assert(compared_value_data.size == sizeof(typename ColumnType::ValueType)); assert(compared_value_data.size == sizeof(typename ColumnType::ValueType));
@ -101,6 +103,54 @@ static int compareValuesWithOffset(const IColumn * _compared_column,
} }
} }
// A specialization of compareValuesWithOffset for floats.
template <typename ColumnType>
static int compareValuesWithOffsetFloat(const IColumn * _compared_column,
size_t compared_row, const IColumn * _reference_column,
size_t reference_row,
const Field & _offset,
bool offset_is_preceding)
{
// Casting the columns to the known type here makes it faster, probably
// because the getData call can be devirtualized.
const auto * compared_column = assert_cast<const ColumnType *>(
_compared_column);
const auto * reference_column = assert_cast<const ColumnType *>(
_reference_column);
// The underlying field type is Float64 for Float32 as well. get<Float32>()
// would be a reinterpret_cast and yield an incorrect result.
const auto offset = _offset.get<Float64>();
const auto compared_value_data = compared_column->getDataAt(compared_row);
assert(compared_value_data.size == sizeof(typename ColumnType::ValueType));
auto compared_value = unalignedLoad<typename ColumnType::ValueType>(
compared_value_data.data);
const auto reference_value_data = reference_column->getDataAt(reference_row);
assert(reference_value_data.size == sizeof(typename ColumnType::ValueType));
auto reference_value = unalignedLoad<typename ColumnType::ValueType>(
reference_value_data.data);
// Floats overflow to Inf and the comparison will work normally, so we don't
// have to do anything.
if (offset_is_preceding)
{
reference_value -= offset;
}
else
{
reference_value += offset;
}
const auto result = compared_value < reference_value ? -1
: compared_value == reference_value ? 0 : 1;
// fmt::print(stderr, "compared {}, offset {}, reference {}, result {}\n",
// compared_value, offset, reference_value, result);
return result;
}
// Helper macros to dispatch on type of the ORDER BY column // Helper macros to dispatch on type of the ORDER BY column
#define APPLY_FOR_ONE_TYPE(FUNCTION, TYPE) \ #define APPLY_FOR_ONE_TYPE(FUNCTION, TYPE) \
else if (typeid_cast<const TYPE *>(column)) \ else if (typeid_cast<const TYPE *>(column)) \
@ -114,14 +164,20 @@ if (false) /* NOLINT */ \
{ \ { \
/* Do nothing, a starter condition. */ \ /* Do nothing, a starter condition. */ \
} \ } \
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<Int8>) \
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<UInt8>) \ APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<UInt8>) \
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<Int16>) \
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<UInt16>) \ APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<UInt16>) \
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<Int32>) \
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<UInt32>) \ APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<UInt32>) \
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<Int64>) \
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<UInt64>) \ APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<UInt64>) \
\
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<Int8>) \
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<Int16>) \
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<Int32>) \
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<Int64>) \
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<Int128>) \
\
APPLY_FOR_ONE_TYPE(FUNCTION##Float, ColumnVector<Float32>) \
APPLY_FOR_ONE_TYPE(FUNCTION##Float, ColumnVector<Float64>) \
\
else \ else \
{ \ { \
throw Exception(ErrorCodes::NOT_IMPLEMENTED, \ throw Exception(ErrorCodes::NOT_IMPLEMENTED, \
@ -193,9 +249,28 @@ WindowTransform::WindowTransform(const Block & input_header_,
== WindowFrame::BoundaryType::Offset)) == WindowFrame::BoundaryType::Offset))
{ {
assert(order_by_indices.size() == 1); assert(order_by_indices.size() == 1);
const IColumn * column = input_header.getByPosition( const auto & entry = input_header.getByPosition(order_by_indices[0]);
order_by_indices[0]).column.get(); const IColumn * column = entry.column.get();
APPLY_FOR_TYPES(compareValuesWithOffset) APPLY_FOR_TYPES(compareValuesWithOffset)
// Check that the offset type matches the window type.
// Convert the offsets to the ORDER BY column type. We can't just check
// that it matches, because e.g. the int literals are always (U)Int64,
// but the column might be Int8 and so on.
if (window_description.frame.begin_type
== WindowFrame::BoundaryType::Offset)
{
window_description.frame.begin_offset = convertFieldToTypeOrThrow(
window_description.frame.begin_offset,
*entry.type);
}
if (window_description.frame.end_type
== WindowFrame::BoundaryType::Offset)
{
window_description.frame.end_offset = convertFieldToTypeOrThrow(
window_description.frame.end_offset,
*entry.type);
}
} }
} }
@ -391,7 +466,7 @@ void WindowTransform::advanceFrameStartRowsOffset()
{ {
// Just recalculate it each time by walking blocks. // Just recalculate it each time by walking blocks.
const auto [moved_row, offset_left] = moveRowNumber(current_row, const auto [moved_row, offset_left] = moveRowNumber(current_row,
window_description.frame.begin_offset window_description.frame.begin_offset.get<UInt64>()
* (window_description.frame.begin_preceding ? -1 : 1)); * (window_description.frame.begin_preceding ? -1 : 1));
frame_start = moved_row; frame_start = moved_row;
@ -638,7 +713,7 @@ void WindowTransform::advanceFrameEndRowsOffset()
// Walk the specified offset from the current row. The "+1" is needed // Walk the specified offset from the current row. The "+1" is needed
// because the frame_end is a past-the-end pointer. // because the frame_end is a past-the-end pointer.
const auto [moved_row, offset_left] = moveRowNumber(current_row, const auto [moved_row, offset_left] = moveRowNumber(current_row,
window_description.frame.end_offset window_description.frame.end_offset.get<UInt64>()
* (window_description.frame.end_preceding ? -1 : 1) * (window_description.frame.end_preceding ? -1 : 1)
+ 1); + 1);
@ -852,14 +927,14 @@ void WindowTransform::writeOutCurrentRow()
for (size_t wi = 0; wi < workspaces.size(); ++wi) for (size_t wi = 0; wi < workspaces.size(); ++wi)
{ {
auto & ws = workspaces[wi]; auto & ws = workspaces[wi];
IColumn * result_column = block.output_columns[wi].get();
if (ws.window_function_impl) if (ws.window_function_impl)
{ {
ws.window_function_impl->windowInsertResultInto(*result_column, this); ws.window_function_impl->windowInsertResultInto(this, wi);
} }
else else
{ {
IColumn * result_column = block.output_columns[wi].get();
const auto * a = ws.aggregate_function.get(); const auto * a = ws.aggregate_function.get();
auto * buf = ws.aggregate_function_state.data(); auto * buf = ws.aggregate_function_state.data();
// FIXME does it also allocate the result on the arena? // FIXME does it also allocate the result on the arena?
@ -1275,8 +1350,11 @@ struct WindowFunctionRank final : public WindowFunction
DataTypePtr getReturnType() const override DataTypePtr getReturnType() const override
{ return std::make_shared<DataTypeUInt64>(); } { return std::make_shared<DataTypeUInt64>(); }
void windowInsertResultInto(IColumn & to, const WindowTransform * transform) override void windowInsertResultInto(const WindowTransform * transform,
size_t function_index) override
{ {
IColumn & to = *transform->blockAt(transform->current_row)
.output_columns[function_index];
assert_cast<ColumnUInt64 &>(to).getData().push_back( assert_cast<ColumnUInt64 &>(to).getData().push_back(
transform->peer_group_start_row_number); transform->peer_group_start_row_number);
} }
@ -1292,8 +1370,11 @@ struct WindowFunctionDenseRank final : public WindowFunction
DataTypePtr getReturnType() const override DataTypePtr getReturnType() const override
{ return std::make_shared<DataTypeUInt64>(); } { return std::make_shared<DataTypeUInt64>(); }
void windowInsertResultInto(IColumn & to, const WindowTransform * transform) override void windowInsertResultInto(const WindowTransform * transform,
size_t function_index) override
{ {
IColumn & to = *transform->blockAt(transform->current_row)
.output_columns[function_index];
assert_cast<ColumnUInt64 &>(to).getData().push_back( assert_cast<ColumnUInt64 &>(to).getData().push_back(
transform->peer_group_number); transform->peer_group_number);
} }
@ -1309,13 +1390,122 @@ struct WindowFunctionRowNumber final : public WindowFunction
DataTypePtr getReturnType() const override DataTypePtr getReturnType() const override
{ return std::make_shared<DataTypeUInt64>(); } { return std::make_shared<DataTypeUInt64>(); }
void windowInsertResultInto(IColumn & to, const WindowTransform * transform) override void windowInsertResultInto(const WindowTransform * transform,
size_t function_index) override
{ {
IColumn & to = *transform->blockAt(transform->current_row)
.output_columns[function_index];
assert_cast<ColumnUInt64 &>(to).getData().push_back( assert_cast<ColumnUInt64 &>(to).getData().push_back(
transform->current_row_number); transform->current_row_number);
} }
}; };
template <bool is_lead>
struct WindowFunctionLagLeadInFrame final : public WindowFunction
{
WindowFunctionLagLeadInFrame(const std::string & name_,
const DataTypes & argument_types_, const Array & parameters_)
: WindowFunction(name_, argument_types_, parameters_)
{
if (!parameters.empty())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Function {} cannot be parameterized", name_);
}
if (argument_types.empty())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Function {} takes at least one argument", name_);
}
if (argument_types.size() == 1)
{
return;
}
if (!isInt64FieldType(argument_types[1]->getDefault().getType()))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Offset must be an integer, '{}' given",
argument_types[1]->getName());
}
if (argument_types.size() == 2)
{
return;
}
if (!getLeastSupertype({argument_types[0], argument_types[2]}))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"The default value type '{}' is not convertible to the argument type '{}'",
argument_types[2]->getName(),
argument_types[0]->getName());
}
if (argument_types.size() > 3)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Function '{}' accepts at most 3 arguments, {} given",
name, argument_types.size());
}
}
DataTypePtr getReturnType() const override
{ return argument_types[0]; }
void windowInsertResultInto(const WindowTransform * transform,
size_t function_index) override
{
auto & current_block = transform->blockAt(transform->current_row);
IColumn & to = *current_block.output_columns[function_index];
auto & workspace = transform->workspaces[function_index];
int offset = 1;
if (argument_types.size() > 1)
{
offset = (*current_block.input_columns[
workspace.argument_column_indices[1]])[
transform->current_row.row].get<Int64>();
if (offset < 0)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"The offset for function {} must be nonnegative, {} given",
getName(), offset);
}
}
const auto [target_row, offset_left] = transform->moveRowNumber(
transform->current_row, offset * (is_lead ? 1 : -1));
if (offset_left != 0
|| target_row < transform->frame_start
|| transform->frame_end <= target_row)
{
// Offset is outside the frame.
if (argument_types.size() > 2)
{
// Column with default values is specified.
to.insertFrom(*current_block.input_columns[
workspace.argument_column_indices[2]],
transform->current_row.row);
}
else
{
to.insertDefault();
}
}
else
{
// Offset is inside the frame.
to.insertFrom(*transform->blockAt(target_row).input_columns[
workspace.argument_column_indices[0]],
target_row.row);
}
}
};
void registerWindowFunctions(AggregateFunctionFactory & factory) void registerWindowFunctions(AggregateFunctionFactory & factory)
{ {
// Why didn't I implement lag/lead yet? Because they are a mess. I imagine // Why didn't I implement lag/lead yet? Because they are a mess. I imagine
@ -1327,9 +1517,10 @@ void registerWindowFunctions(AggregateFunctionFactory & factory)
// the whole partition like Postgres does, because using a linear amount // the whole partition like Postgres does, because using a linear amount
// of additional memory is not an option when we have a lot of data. We must // of additional memory is not an option when we have a lot of data. We must
// be able to process at least the lag/lead in streaming fashion. // be able to process at least the lag/lead in streaming fashion.
// Our best bet is probably rewriting, say `lag(value, offset)` to // A partial solution for constant offsets is rewriting, say `lag(value, offset)
// `any(value) over (rows between offset preceding and offset preceding)`, // to `any(value) over (rows between offset preceding and offset preceding)`.
// at the query planning stage. // We also implement non-standard functions `lag/lead_in_frame`, that are
// analogous to `lag/lead`, but respect the frame.
// Functions like cume_dist() do require materializing the entire // Functions like cume_dist() do require materializing the entire
// partition, but it's probably also simpler to implement them by rewriting // partition, but it's probably also simpler to implement them by rewriting
// to a (rows between unbounded preceding and unbounded following) frame, // to a (rows between unbounded preceding and unbounded following) frame,
@ -1355,6 +1546,20 @@ void registerWindowFunctions(AggregateFunctionFactory & factory)
return std::make_shared<WindowFunctionRowNumber>(name, argument_types, return std::make_shared<WindowFunctionRowNumber>(name, argument_types,
parameters); parameters);
}); });
factory.registerFunction("lag_in_frame", [](const std::string & name,
const DataTypes & argument_types, const Array & parameters)
{
return std::make_shared<WindowFunctionLagLeadInFrame<false>>(
name, argument_types, parameters);
});
factory.registerFunction("lead_in_frame", [](const std::string & name,
const DataTypes & argument_types, const Array & parameters)
{
return std::make_shared<WindowFunctionLagLeadInFrame<true>>(
name, argument_types, parameters);
});
} }
} }

View File

@ -110,7 +110,9 @@ public:
Status prepare() override; Status prepare() override;
void work() override; void work() override;
private: /*
* Implementation details.
*/
void advancePartitionEnd(); void advancePartitionEnd();
bool arePeers(const RowNumber & x, const RowNumber & y) const; bool arePeers(const RowNumber & x, const RowNumber & y) const;
@ -321,10 +323,7 @@ public:
int (* compare_values_with_offset) ( int (* compare_values_with_offset) (
const IColumn * compared_column, size_t compared_row, const IColumn * compared_column, size_t compared_row,
const IColumn * reference_column, size_t reference_row, const IColumn * reference_column, size_t reference_row,
// We can make it a Field later if we need the Decimals. Now we only const Field & offset,
// have ints and datetime, and the underlying Field type for them is
// uint64_t anyway.
uint64_t offset,
bool offset_is_preceding); bool offset_is_preceding);
}; };

View File

@ -110,4 +110,46 @@
format Null format Null
</query> </query>
<!-- Our variant of lead. -->
<query>
select lead_in_frame(number) over w
from
(select number, intDiv(number, 1111) p, mod(number, 111) o
from numbers(10000000)) t
window w as (partition by p order by o
rows between unbounded preceding and unbounded following)
format Null
</query>
<!-- A faster replacement for lead with constant offset. -->
<query>
select any(number) over w
from
(select number, intDiv(number, 1111) p, mod(number, 111) o
from numbers(10000000)) t
window w as (partition by p order by o
rows between 1 following and 1 following)
format Null
</query>
<query>
select lead_in_frame(number, number) over w
from
(select number, intDiv(number, 1111) p, mod(number, 111) o
from numbers(10000000)) t
window w as (partition by p order by o
rows between unbounded preceding and unbounded following)
format Null
</query>
<query>
select lead_in_frame(number, number, number) over w
from
(select number, intDiv(number, 1111) p, mod(number, 111) o
from numbers(10000000)) t
window w as (partition by p order by o
rows between unbounded preceding and unbounded following)
format Null
</query>
</test> </test>

View File

@ -974,6 +974,32 @@ from numbers(5);
1 3 1 3
2 4 2 4
3 \N 3 \N
-- variants of lag/lead that respect the frame
select number, p, pp,
lag_in_frame(number, number - pp, number * 11) over w as lag,
lead_in_frame(number, number - pp, number * 11) over w as lead
from (select number, intDiv(number, 5) p, p * 5 pp from numbers(16))
window w as (partition by p order by number
rows between unbounded preceding and unbounded following)
order by number
settings max_block_size = 3;
;
0 0 0 0 0
1 0 0 0 2
2 0 0 0 4
3 0 0 0 33
4 0 0 0 44
5 1 5 5 5
6 1 5 5 7
7 1 5 5 9
8 1 5 5 88
9 1 5 5 99
10 2 10 10 10
11 2 10 10 12
12 2 10 10 14
13 2 10 10 143
14 2 10 10 154
15 3 15 15 15
-- case-insensitive SQL-standard synonyms for any and anyLast -- case-insensitive SQL-standard synonyms for any and anyLast
select select
number, number,
@ -993,3 +1019,16 @@ order by number
7 6 8 7 6 8
8 7 9 8 7 9
9 8 9 9 8 9
-- floating point RANGE frame
select
count(*) over (order by (toFloat32(number) as f32) range 5. preceding),
count(*) over (order by (toFloat64(number) as f64) range 5. preceding)
from numbers(7)
;
1 1
2 2
3 3
4 4
5 5
6 6
6 6

View File

@ -336,6 +336,17 @@ select
over (order by number rows between 1 following and 1 following) over (order by number rows between 1 following and 1 following)
from numbers(5); from numbers(5);
-- variants of lag/lead that respect the frame
select number, p, pp,
lag_in_frame(number, number - pp, number * 11) over w as lag,
lead_in_frame(number, number - pp, number * 11) over w as lead
from (select number, intDiv(number, 5) p, p * 5 pp from numbers(16))
window w as (partition by p order by number
rows between unbounded preceding and unbounded following)
order by number
settings max_block_size = 3;
;
-- case-insensitive SQL-standard synonyms for any and anyLast -- case-insensitive SQL-standard synonyms for any and anyLast
select select
number, number,
@ -345,3 +356,10 @@ from numbers(10)
window w as (order by number range between 1 preceding and 1 following) window w as (order by number range between 1 preceding and 1 following)
order by number order by number
; ;
-- floating point RANGE frame
select
count(*) over (order by (toFloat32(number) as f32) range 5. preceding),
count(*) over (order by (toFloat64(number) as f64) range 5. preceding)
from numbers(7)
;