Add deltaSum aggregate function, docs & test

This commit is contained in:
Russ Frank 2021-02-03 22:49:28 -05:00
parent 9d8033d8d6
commit 3913f39211
7 changed files with 169 additions and 0 deletions

View File

@ -0,0 +1,19 @@
---
toc_priority: 141
---
# deltaSum {#agg_functions-deltasum}
Syntax: `deltaSum(value)`
Adds the differences between consecutive rows. If the difference is negative, it is ignored.
`value` must be some integer, floating point or decimal type.
Example:
```sql
select deltaSum(arrayJoin([1, 2, 3])); -- => 3
select deltaSum(arrayJoin([1, 2, 3, 0, 3, 4, 2, 3])); -- => 8
```

View File

@ -0,0 +1,42 @@
#include <AggregateFunctions/AggregateFunctionDeltaSum.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include <AggregateFunctions/Helpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
namespace
{
AggregateFunctionPtr createAggregateFunctionDeltaSum(
const String & name,
const DataTypes & arguments,
const Array & params)
{
assertNoParameters(name, params);
if (arguments.size() != 1)
throw Exception("Incorrect number of arguments for aggregate function " + name,
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return AggregateFunctionPtr(createWithNumericType<AggregationFunctionDeltaSum>(*arguments[0], arguments, params));
}
}
void registerAggregateFunctionDeltaSum(AggregateFunctionFactory & factory)
{
AggregateFunctionProperties properties = { .returns_default_when_only_null = true, .is_order_dependent = true };
factory.registerFunction("deltaSum", { createAggregateFunctionDeltaSum, properties });
}
}

View File

@ -0,0 +1,95 @@
#pragma once
#include <experimental/type_traits>
#include <type_traits>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypesDecimal.h>
#include <Columns/ColumnVector.h>
#include <AggregateFunctions/IAggregateFunction.h>
namespace DB
{
template <typename T>
struct AggregationFunctionDeltaSumData
{
T sum{};
T last{};
T first{};
};
template <typename T>
class AggregationFunctionDeltaSum final : public IAggregateFunctionDataHelper<
AggregationFunctionDeltaSumData<T>, AggregationFunctionDeltaSum<T>>
{
public:
AggregationFunctionDeltaSum(const DataTypes & arguments, const Array & params)
: IAggregateFunctionDataHelper<
AggregationFunctionDeltaSumData<T>, AggregationFunctionDeltaSum<T>> {arguments, params}
{
// empty constructor
}
String getName() const override
{
return "deltaSum";
}
DataTypePtr getReturnType() const override
{
return std::make_shared<DataTypeNumber<T>>();
}
void ALWAYS_INLINE add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
{
const T & value = (*columns[0])[row_num].get<T>();
if (this->data(place).last < value) {
this->data(place).sum += (value - this->data(place).last);
}
this->data(place).last = value;
if (this->data(place).first == 0) {
this->data(place).first = value;
}
}
void ALWAYS_INLINE merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
{
if (this->data(place).last < this->data(rhs).first) {
this->data(place).sum += this->data(rhs).sum + (this->data(rhs).first - this->data(place).last);
} else {
this->data(place).sum += this->data(rhs).sum;
}
this->data(place).last = this->data(rhs).last;
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
writeIntBinary(this->data(place).sum, buf);
writeIntBinary(this->data(place).first, buf);
writeIntBinary(this->data(place).last, buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
readIntBinary(this->data(place).sum, buf);
readIntBinary(this->data(place).first, buf);
readIntBinary(this->data(place).last, buf);
}
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
assert_cast<ColumnVector<T> &>(to).getData().push_back(this->data(place).sum);
}
};
}

View File

@ -11,6 +11,7 @@ class AggregateFunctionFactory;
void registerAggregateFunctionAvg(AggregateFunctionFactory &);
void registerAggregateFunctionAvgWeighted(AggregateFunctionFactory &);
void registerAggregateFunctionCount(AggregateFunctionFactory &);
void registerAggregateFunctionDeltaSum(AggregateFunctionFactory &);
void registerAggregateFunctionGroupArray(AggregateFunctionFactory &);
void registerAggregateFunctionGroupUniqArray(AggregateFunctionFactory &);
void registerAggregateFunctionGroupArrayInsertAt(AggregateFunctionFactory &);
@ -66,6 +67,7 @@ void registerAggregateFunctions()
registerAggregateFunctionAvg(factory);
registerAggregateFunctionAvgWeighted(factory);
registerAggregateFunctionCount(factory);
registerAggregateFunctionDeltaSum(factory);
registerAggregateFunctionGroupArray(factory);
registerAggregateFunctionGroupUniqArray(factory);
registerAggregateFunctionGroupArrayInsertAt(factory);

View File

@ -19,6 +19,7 @@ SRCS(
AggregateFunctionCategoricalInformationValue.cpp
AggregateFunctionCombinatorFactory.cpp
AggregateFunctionCount.cpp
AggregateFunctionDeltaSum.cpp
AggregateFunctionDistinct.cpp
AggregateFunctionEntropy.cpp
AggregateFunctionFactory.cpp

View File

@ -0,0 +1,5 @@
3
7
8
8
8

View File

@ -0,0 +1,5 @@
select deltaSum(arrayJoin([1, 2, 3]));
select deltaSum(arrayJoin([1, 2, 3, 0, 3, 4]));
select deltaSum(arrayJoin([1, 2, 3, 0, 3, 4, 2, 3]));
select deltaSum(arrayJoin([1, 2, 3, 0, 3, 3, 3, 3, 3, 4, 2, 3]));
select deltaSum(arrayJoin([1, 2, 3, 0, 0, 0, 0, 3, 3, 3, 3, 3, 4, 2, 3]));