This commit is contained in:
Alexander Kuzmenkov 2021-02-12 13:37:27 +03:00
parent 1275be58bf
commit 824475b224
3 changed files with 37 additions and 91 deletions

View File

@ -24,7 +24,7 @@ namespace ErrorCodes
class IWindowFunction
{
public:
virtual ~IWindowFunction() {}
virtual ~IWindowFunction() = default;
// Must insert the result for current_row.
virtual void windowInsertResultInto(IColumn & to, const WindowTransform * transform) = 0;
@ -1316,74 +1316,25 @@ struct WindowFunctionRowNumber final : public WindowFunction
}
};
struct WindowFunctionLagLead final : public WindowFunction
{
bool is_lag = false;
// Always positive.
uint64_t offset_rows = 1;
Field default_value;
WindowFunctionLagLead(const std::string & name_,
const DataTypes & argument_types_, const Array & parameters_,
bool is_lag_)
: WindowFunction(name_, argument_types_, parameters_)
, is_lag(is_lag_)
{
// offset and default are in parameters
if (argument_types.size() != 1)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"The window function {} must have exactly one argument -- the value column. The offset and the default value must be specified as parameters, i.e. `{}(offset, default)(column)`",
getName(), getName());
}
if (parameters.size() > 2)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"The window function {} accepts at most two parameters, {} given",
getName(), parameters.size());
}
if (parameters.size() >= 1)
{
if (!isInt64FieldType(parameters[0].getType())
|| parameters[0].get<Int64>() < 0)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"The first parameter of the window function {} must be a nonnegative integer specifying the number of offset rows. Got '{}' instead",
getName(), toString(parameters[0]));
}
offset_rows = parameters[0].get<UInt64>();
}
if (parameters.size() >= 2)
{
default_value = convertFieldToTypeOrThrow(parameters[1],
*argument_types[0]);
}
}
DataTypePtr getReturnType() const override { return argument_types[0]; }
void windowInsertResultInto(IColumn &, const WindowTransform *) override
{
// These functions are a mess... they ignore the frame, so we need to
// either materialize the whole partition (not practical if it's big),
// or track a separate frame for these functions, which would make the
// window transform completely impenetrable to human mind. Our best bet
// is probably rewriting, say, `lag(value, offset)` to
// `any(value) over (rows between offset preceding and offset preceding)`,
// at the query planning stage. We can keep this class as a stub for
// parsing, anyway.
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"The window function {} is not implemented",
getName());
}
};
void registerWindowFunctions(AggregateFunctionFactory & factory)
{
// Why didn't I implement lag/lead yet? Because they are a mess. I imagine
// they are from the older generation of window functions, when the concept
// of frame was not yet invented, so they ignore the frame and use the
// partition instead. This means we have to track a separate frame for
// these functions, which would make the window transform completely
// impenetrable to human mind. We can't just get away with materializing
// the whole partition like Postgres does, because using a linear amount
// of additional memory is not an option when we have a lot of data. We must
// be able to process at least the lag/lead in streaming fashion.
// Our best bet is probably rewriting, say `lag(value, offset)` to
// `any(value) over (rows between offset preceding and offset preceding)`,
// at the query planning stage.
// Functions like cume_dist() do require materializing the entire
// partition, but it's probably also simpler to implement them by rewriting
// to a (rows between unbounded preceding and unbounded following) frame,
// instead of adding separate logic for them.
factory.registerFunction("rank", [](const std::string & name,
const DataTypes & argument_types, const Array & parameters)
{
@ -1404,20 +1355,6 @@ void registerWindowFunctions(AggregateFunctionFactory & factory)
return std::make_shared<WindowFunctionRowNumber>(name, argument_types,
parameters);
});
factory.registerFunction("lag", [](const std::string & name,
const DataTypes & argument_types, const Array & parameters)
{
return std::make_shared<WindowFunctionLagLead>(name, argument_types,
parameters, true /* is_lag */);
});
factory.registerFunction("lead", [](const std::string & name,
const DataTypes & argument_types, const Array & parameters)
{
return std::make_shared<WindowFunctionLagLead>(name, argument_types,
parameters, false /* is_lag */);
});
}
}

View File

@ -962,12 +962,19 @@ settings max_block_size = 2;
26 5 2 5 4 3 4
29 5 2 5 4 3 5
30 6 0 1 1 1 1
-- very bad functions, not implemented yet
-- our replacement for lag/lead
select
lag(1, 5)(number) over (),
lead(2)(number) over (),
lag(number) over ()
from numbers(2); -- { serverError 48 }
anyOrNull(number)
over (order by number rows between 1 preceding and 1 preceding),
anyOrNull(number)
over (order by number rows between 1 following and 1 following)
from numbers(5);
\N 1
0 2
1 3
2 4
3 \N
-- case-insensitive SQL-standard synonyms for any and anyLast
select
number,
fIrSt_VaLue(number) over w,

View File

@ -328,13 +328,15 @@ window w as (partition by p order by o)
order by p, o, number
settings max_block_size = 2;
-- very bad functions, not implemented yet
-- our replacement for lag/lead
select
lag(1, 5)(number) over (),
lead(2)(number) over (),
lag(number) over ()
from numbers(2); -- { serverError 48 }
anyOrNull(number)
over (order by number rows between 1 preceding and 1 preceding),
anyOrNull(number)
over (order by number rows between 1 following and 1 following)
from numbers(5);
-- case-insensitive SQL-standard synonyms for any and anyLast
select
number,
fIrSt_VaLue(number) over w,