diff --git a/src/AggregateFunctions/AggregateFunctionExponentialMovingAverage.cpp b/src/AggregateFunctions/AggregateFunctionExponentialMovingAverage.cpp new file mode 100644 index 00000000000..115349d470b --- /dev/null +++ b/src/AggregateFunctions/AggregateFunctionExponentialMovingAverage.cpp @@ -0,0 +1,100 @@ +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int ILLEGAL_TYPE_OF_ARGUMENT; +} + + +/** See the comments in ExponentiallySmoothedCounter.h + * Caveats: + * - if aggregated data is unordered, the calculation is non-deterministic; + * - so, multithreaded and distributed aggregation is also non-deterministic; + * - it highly depends on what value will be first and last; + * Nevertheless it is useful in window functions and aggregate functions over ordered data. + */ +class AggregateFunctionExponentialMovingAverage final + : public IAggregateFunctionDataHelper +{ +private: + Float64 half_decay; + +public: + AggregateFunctionExponentialMovingAverage(const DataTypes & argument_types_, const Array & params) + : IAggregateFunctionDataHelper(argument_types_, params) + { + if (params.size() != 1) + throw Exception{"Aggregate function " + getName() + " requires exactly one parameter: half decay time.", + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH}; + + half_decay = applyVisitor(FieldVisitorConvertToNumber(), params[0]); + } + + String getName() const override { return "exponentialMovingAverage"; } + + DataTypePtr getReturnType() const override + { + return std::make_shared>(); + } + + bool allocatesMemoryInArena() const override { return false; } + + void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override + { + const auto & value = columns[0]->getFloat64(row_num); + const auto & time = columns[1]->getFloat64(row_num); + this->data(place).add(value, time, half_decay); + } + + void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override + { + this->data(place).merge(this->data(rhs), half_decay); + } + + void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf) const override + { + writeBinary(this->data(place).value, buf); + writeBinary(this->data(place).update_time, buf); + } + + void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, Arena *) const override + { + readBinary(this->data(place).value, buf); + readBinary(this->data(place).update_time, buf); + } + + void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override + { + auto & column = assert_cast &>(to); + column.getData().push_back(this->data(place).value); + } +}; + + +void registerAggregateFunctionExponentialMovingAverage(AggregateFunctionFactory & factory) +{ + factory.registerFunction("exponentialMovingAverage", + [](const std::string & name, const DataTypes & argument_types, const Array & params, const Settings *) -> AggregateFunctionPtr + { + assertBinary(name, argument_types); + for (const auto & type : argument_types) + if (!isNumber(*type)) + throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, + "Both arguments for aggregate function {} must have numeric type, got {}", name, type->getName()); + return std::make_shared(argument_types, params); + }); +} + +} diff --git a/src/AggregateFunctions/registerAggregateFunctions.cpp b/src/AggregateFunctions/registerAggregateFunctions.cpp index 70248d4cfde..3d640d0a34a 100644 --- a/src/AggregateFunctions/registerAggregateFunctions.cpp +++ b/src/AggregateFunctions/registerAggregateFunctions.cpp @@ -50,6 +50,7 @@ void registerAggregateFunctionWelchTTest(AggregateFunctionFactory &); void registerAggregateFunctionStudentTTest(AggregateFunctionFactory &); void registerAggregateFunctionSingleValueOrNull(AggregateFunctionFactory &); void registerAggregateFunctionSequenceNextNode(AggregateFunctionFactory &); +void registerAggregateFunctionExponentialMovingAverage(AggregateFunctionFactory &); class AggregateFunctionCombinatorFactory; void registerAggregateFunctionCombinatorIf(AggregateFunctionCombinatorFactory &); @@ -115,10 +116,10 @@ void registerAggregateFunctions() registerAggregateFunctionWelchTTest(factory); registerAggregateFunctionStudentTTest(factory); registerAggregateFunctionSingleValueOrNull(factory); + registerAggregateFunctionIntervalLengthSum(factory); + registerAggregateFunctionExponentialMovingAverage(factory); registerWindowFunctions(factory); - - registerAggregateFunctionIntervalLengthSum(factory); } { diff --git a/tests/queries/0_stateless/2020_exponential_smoothing.reference b/tests/queries/0_stateless/2020_exponential_smoothing.reference new file mode 100644 index 00000000000..ce328c7eff1 --- /dev/null +++ b/tests/queries/0_stateless/2020_exponential_smoothing.reference @@ -0,0 +1,130 @@ +1 0 1 +0 1 0.5 +0 2 0.25 +0 3 0.125 +0 4 0.0625 +0 5 0.03125 +0 6 0.015625 +0 7 0.0078125 +0 8 0.00390625 +0 9 0.001953125 +1 0 1 +0 1 0.933 +0 2 0.871 +0 3 0.812 +0 4 0.758 +0 5 0.707 +0 6 0.66 +0 7 0.616 +0 8 0.574 +0 9 0.536 +0 0 0 +1 1 0.5 +2 2 1.25 +3 3 2.125 +4 4 3.0625 +5 5 4.03125 +6 6 5.015625 +7 7 6.0078125 +8 8 7.00390625 +9 9 8.001953125 +1 0 1 ██████████████████████████████████████████████████ +0 1 0.933 ██████████████████████████████████████████████▋ +0 2 0.871 ███████████████████████████████████████████▌ +0 3 0.812 ████████████████████████████████████████▌ +0 4 0.758 █████████████████████████████████████▊ +0 5 0.707 ███████████████████████████████████▎ +0 6 0.66 ████████████████████████████████▊ +0 7 0.616 ██████████████████████████████▋ +0 8 0.574 ████████████████████████████▋ +0 9 0.536 ██████████████████████████▋ +0 10 0.5 █████████████████████████ +0 11 0.467 ███████████████████████▎ +0 12 0.435 █████████████████████▋ +0 13 0.406 ████████████████████▎ +0 14 0.379 ██████████████████▊ +0 15 0.354 █████████████████▋ +0 16 0.33 ████████████████▍ +0 17 0.308 ███████████████▍ +0 18 0.287 ██████████████▎ +0 19 0.268 █████████████▍ +0 20 0.25 ████████████▌ +0 21 0.233 ███████████▋ +0 22 0.218 ██████████▊ +0 23 0.203 ██████████▏ +0 24 0.189 █████████▍ +1 25 0.244 ████████████▏ +1 26 0.294 ██████████████▋ +1 27 0.342 █████████████████ +1 28 0.386 ███████████████████▎ +1 29 0.427 █████████████████████▎ +1 30 0.465 ███████████████████████▎ +1 31 0.501 █████████████████████████ +1 32 0.534 ██████████████████████████▋ +1 33 0.566 ████████████████████████████▎ +1 34 0.595 █████████████████████████████▋ +1 35 0.622 ███████████████████████████████ +1 36 0.647 ████████████████████████████████▎ +1 37 0.671 █████████████████████████████████▌ +1 38 0.693 ██████████████████████████████████▋ +1 39 0.713 ███████████████████████████████████▋ +1 40 0.733 ████████████████████████████████████▋ +1 41 0.751 █████████████████████████████████████▌ +1 42 0.767 ██████████████████████████████████████▎ +1 43 0.783 ███████████████████████████████████████▏ +1 44 0.797 ███████████████████████████████████████▋ +1 45 0.811 ████████████████████████████████████████▌ +1 46 0.824 █████████████████████████████████████████▏ +1 47 0.835 █████████████████████████████████████████▋ +1 48 0.846 ██████████████████████████████████████████▎ +1 49 0.857 ██████████████████████████████████████████▋ +1 0 1 ██████████████████████████████████████████████████ +0 1 0.5 █████████████████████████ +0 2 0.25 ████████████▌ +0 3 0.125 ██████▎ +0 4 0.062 ███ +1 5 0.531 ██████████████████████████▌ +0 6 0.266 █████████████▎ +0 7 0.133 ██████▋ +0 8 0.066 ███▎ +0 9 0.033 █▋ +1 10 0.517 █████████████████████████▋ +0 11 0.258 ████████████▊ +0 12 0.129 ██████▍ +0 13 0.065 ███▏ +0 14 0.032 █▌ +1 15 0.516 █████████████████████████▋ +0 16 0.258 ████████████▊ +0 17 0.129 ██████▍ +0 18 0.065 ███▏ +0 19 0.032 █▌ +1 20 0.516 █████████████████████████▋ +0 21 0.258 ████████████▊ +0 22 0.129 ██████▍ +0 23 0.065 ███▏ +0 24 0.032 █▌ +1 25 0.516 █████████████████████████▋ +0 26 0.258 ████████████▊ +0 27 0.129 ██████▍ +0 28 0.065 ███▏ +0 29 0.032 █▌ +1 30 0.516 █████████████████████████▋ +0 31 0.258 ████████████▊ +0 32 0.129 ██████▍ +0 33 0.065 ███▏ +0 34 0.032 █▌ +1 35 0.516 █████████████████████████▋ +0 36 0.258 ████████████▊ +0 37 0.129 ██████▍ +0 38 0.065 ███▏ +0 39 0.032 █▌ +1 40 0.516 █████████████████████████▋ +0 41 0.258 ████████████▊ +0 42 0.129 ██████▍ +0 43 0.065 ███▏ +0 44 0.032 █▌ +1 45 0.516 █████████████████████████▋ +0 46 0.258 ████████████▊ +0 47 0.129 ██████▍ +0 48 0.065 ███▏ +0 49 0.032 █▌ diff --git a/tests/queries/0_stateless/2020_exponential_smoothing.sql b/tests/queries/0_stateless/2020_exponential_smoothing.sql new file mode 100644 index 00000000000..a210225453a --- /dev/null +++ b/tests/queries/0_stateless/2020_exponential_smoothing.sql @@ -0,0 +1,32 @@ +SELECT number = 0 AS value, number AS time, exponentialMovingAverage(1)(value, time) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS exp_smooth FROM numbers(10); +SELECT value, time, round(exp_smooth, 3) FROM (SELECT number = 0 AS value, number AS time, exponentialMovingAverage(10)(value, time) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS exp_smooth FROM numbers(10)); + +SELECT number AS value, number AS time, exponentialMovingAverage(1)(value, time) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS exp_smooth FROM numbers(10); + +SELECT + value, + time, + round(exp_smooth, 3), + bar(exp_smooth, 0, 1, 50) AS bar +FROM +( + SELECT + (number = 0) OR (number >= 25) AS value, + number AS time, + exponentialMovingAverage(10)(value, time) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS exp_smooth + FROM numbers(50) +); + +SELECT + value, + time, + round(exp_smooth, 3), + bar(exp_smooth, 0, 1, 50) AS bar +FROM +( + SELECT + (number % 5) = 0 AS value, + number AS time, + exponentialMovingAverage(1)(value, time) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS exp_smooth + FROM numbers(50) +);