Add aggregate function for exp smoothing

This commit is contained in:
Alexey Milovidov 2021-09-12 08:26:07 +03:00
parent a4153e5629
commit 0745631bf7
4 changed files with 265 additions and 2 deletions

View File

@ -0,0 +1,100 @@
#include <AggregateFunctions/IAggregateFunction.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include <Common/ExponentiallySmoothedCounter.h>
#include <Common/FieldVisitorConvertToNumber.h>
#include <DataTypes/DataTypesNumber.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
/** See the comments in ExponentiallySmoothedCounter.h
* Caveats:
* - if aggregated data is unordered, the calculation is non-deterministic;
* - so, multithreaded and distributed aggregation is also non-deterministic;
* - it highly depends on what value will be first and last;
* Nevertheless it is useful in window functions and aggregate functions over ordered data.
*/
class AggregateFunctionExponentialMovingAverage final
: public IAggregateFunctionDataHelper<ExponentiallySmoothedCounter, AggregateFunctionExponentialMovingAverage>
{
private:
Float64 half_decay;
public:
AggregateFunctionExponentialMovingAverage(const DataTypes & argument_types_, const Array & params)
: IAggregateFunctionDataHelper<ExponentiallySmoothedCounter, AggregateFunctionExponentialMovingAverage>(argument_types_, params)
{
if (params.size() != 1)
throw Exception{"Aggregate function " + getName() + " requires exactly one parameter: half decay time.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH};
half_decay = applyVisitor(FieldVisitorConvertToNumber<Float64>(), params[0]);
}
String getName() const override { return "exponentialMovingAverage"; }
DataTypePtr getReturnType() const override
{
return std::make_shared<DataTypeNumber<Float64>>();
}
bool allocatesMemoryInArena() const override { return false; }
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
{
const auto & value = columns[0]->getFloat64(row_num);
const auto & time = columns[1]->getFloat64(row_num);
this->data(place).add(value, time, half_decay);
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
{
this->data(place).merge(this->data(rhs), half_decay);
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf) const override
{
writeBinary(this->data(place).value, buf);
writeBinary(this->data(place).update_time, buf);
}
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, Arena *) const override
{
readBinary(this->data(place).value, buf);
readBinary(this->data(place).update_time, buf);
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
{
auto & column = assert_cast<ColumnVector<Float64> &>(to);
column.getData().push_back(this->data(place).value);
}
};
void registerAggregateFunctionExponentialMovingAverage(AggregateFunctionFactory & factory)
{
factory.registerFunction("exponentialMovingAverage",
[](const std::string & name, const DataTypes & argument_types, const Array & params, const Settings *) -> AggregateFunctionPtr
{
assertBinary(name, argument_types);
for (const auto & type : argument_types)
if (!isNumber(*type))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Both arguments for aggregate function {} must have numeric type, got {}", name, type->getName());
return std::make_shared<AggregateFunctionExponentialMovingAverage>(argument_types, params);
});
}
}

View File

@ -50,6 +50,7 @@ void registerAggregateFunctionWelchTTest(AggregateFunctionFactory &);
void registerAggregateFunctionStudentTTest(AggregateFunctionFactory &);
void registerAggregateFunctionSingleValueOrNull(AggregateFunctionFactory &);
void registerAggregateFunctionSequenceNextNode(AggregateFunctionFactory &);
void registerAggregateFunctionExponentialMovingAverage(AggregateFunctionFactory &);
class AggregateFunctionCombinatorFactory;
void registerAggregateFunctionCombinatorIf(AggregateFunctionCombinatorFactory &);
@ -115,10 +116,10 @@ void registerAggregateFunctions()
registerAggregateFunctionWelchTTest(factory);
registerAggregateFunctionStudentTTest(factory);
registerAggregateFunctionSingleValueOrNull(factory);
registerAggregateFunctionIntervalLengthSum(factory);
registerAggregateFunctionExponentialMovingAverage(factory);
registerWindowFunctions(factory);
registerAggregateFunctionIntervalLengthSum(factory);
}
{

View File

@ -0,0 +1,130 @@
1 0 1
0 1 0.5
0 2 0.25
0 3 0.125
0 4 0.0625
0 5 0.03125
0 6 0.015625
0 7 0.0078125
0 8 0.00390625
0 9 0.001953125
1 0 1
0 1 0.933
0 2 0.871
0 3 0.812
0 4 0.758
0 5 0.707
0 6 0.66
0 7 0.616
0 8 0.574
0 9 0.536
0 0 0
1 1 0.5
2 2 1.25
3 3 2.125
4 4 3.0625
5 5 4.03125
6 6 5.015625
7 7 6.0078125
8 8 7.00390625
9 9 8.001953125
1 0 1 ██████████████████████████████████████████████████
0 1 0.933 ██████████████████████████████████████████████▋
0 2 0.871 ███████████████████████████████████████████▌
0 3 0.812 ████████████████████████████████████████▌
0 4 0.758 █████████████████████████████████████▊
0 5 0.707 ███████████████████████████████████▎
0 6 0.66 ████████████████████████████████▊
0 7 0.616 ██████████████████████████████▋
0 8 0.574 ████████████████████████████▋
0 9 0.536 ██████████████████████████▋
0 10 0.5 █████████████████████████
0 11 0.467 ███████████████████████▎
0 12 0.435 █████████████████████▋
0 13 0.406 ████████████████████▎
0 14 0.379 ██████████████████▊
0 15 0.354 █████████████████▋
0 16 0.33 ████████████████▍
0 17 0.308 ███████████████▍
0 18 0.287 ██████████████▎
0 19 0.268 █████████████▍
0 20 0.25 ████████████▌
0 21 0.233 ███████████▋
0 22 0.218 ██████████▊
0 23 0.203 ██████████▏
0 24 0.189 █████████▍
1 25 0.244 ████████████▏
1 26 0.294 ██████████████▋
1 27 0.342 █████████████████
1 28 0.386 ███████████████████▎
1 29 0.427 █████████████████████▎
1 30 0.465 ███████████████████████▎
1 31 0.501 █████████████████████████
1 32 0.534 ██████████████████████████▋
1 33 0.566 ████████████████████████████▎
1 34 0.595 █████████████████████████████▋
1 35 0.622 ███████████████████████████████
1 36 0.647 ████████████████████████████████▎
1 37 0.671 █████████████████████████████████▌
1 38 0.693 ██████████████████████████████████▋
1 39 0.713 ███████████████████████████████████▋
1 40 0.733 ████████████████████████████████████▋
1 41 0.751 █████████████████████████████████████▌
1 42 0.767 ██████████████████████████████████████▎
1 43 0.783 ███████████████████████████████████████▏
1 44 0.797 ███████████████████████████████████████▋
1 45 0.811 ████████████████████████████████████████▌
1 46 0.824 █████████████████████████████████████████▏
1 47 0.835 █████████████████████████████████████████▋
1 48 0.846 ██████████████████████████████████████████▎
1 49 0.857 ██████████████████████████████████████████▋
1 0 1 ██████████████████████████████████████████████████
0 1 0.5 █████████████████████████
0 2 0.25 ████████████▌
0 3 0.125 ██████▎
0 4 0.062 ███
1 5 0.531 ██████████████████████████▌
0 6 0.266 █████████████▎
0 7 0.133 ██████▋
0 8 0.066 ███▎
0 9 0.033 █▋
1 10 0.517 █████████████████████████▋
0 11 0.258 ████████████▊
0 12 0.129 ██████▍
0 13 0.065 ███▏
0 14 0.032 █▌
1 15 0.516 █████████████████████████▋
0 16 0.258 ████████████▊
0 17 0.129 ██████▍
0 18 0.065 ███▏
0 19 0.032 █▌
1 20 0.516 █████████████████████████▋
0 21 0.258 ████████████▊
0 22 0.129 ██████▍
0 23 0.065 ███▏
0 24 0.032 █▌
1 25 0.516 █████████████████████████▋
0 26 0.258 ████████████▊
0 27 0.129 ██████▍
0 28 0.065 ███▏
0 29 0.032 █▌
1 30 0.516 █████████████████████████▋
0 31 0.258 ████████████▊
0 32 0.129 ██████▍
0 33 0.065 ███▏
0 34 0.032 █▌
1 35 0.516 █████████████████████████▋
0 36 0.258 ████████████▊
0 37 0.129 ██████▍
0 38 0.065 ███▏
0 39 0.032 █▌
1 40 0.516 █████████████████████████▋
0 41 0.258 ████████████▊
0 42 0.129 ██████▍
0 43 0.065 ███▏
0 44 0.032 █▌
1 45 0.516 █████████████████████████▋
0 46 0.258 ████████████▊
0 47 0.129 ██████▍
0 48 0.065 ███▏
0 49 0.032 █▌

View File

@ -0,0 +1,32 @@
SELECT number = 0 AS value, number AS time, exponentialMovingAverage(1)(value, time) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS exp_smooth FROM numbers(10);
SELECT value, time, round(exp_smooth, 3) FROM (SELECT number = 0 AS value, number AS time, exponentialMovingAverage(10)(value, time) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS exp_smooth FROM numbers(10));
SELECT number AS value, number AS time, exponentialMovingAverage(1)(value, time) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS exp_smooth FROM numbers(10);
SELECT
value,
time,
round(exp_smooth, 3),
bar(exp_smooth, 0, 1, 50) AS bar
FROM
(
SELECT
(number = 0) OR (number >= 25) AS value,
number AS time,
exponentialMovingAverage(10)(value, time) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS exp_smooth
FROM numbers(50)
);
SELECT
value,
time,
round(exp_smooth, 3),
bar(exp_smooth, 0, 1, 50) AS bar
FROM
(
SELECT
(number % 5) = 0 AS value,
number AS time,
exponentialMovingAverage(1)(value, time) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS exp_smooth
FROM numbers(50)
);