From e4f1cf530d290df0bc539ff8c9aeea5d2ab0fef5 Mon Sep 17 00:00:00 2001 From: Russ Frank Date: Wed, 17 Mar 2021 13:42:53 -0400 Subject: [PATCH] add deltaSumTimestamp AggregateFunction, docs&test --- .../aggregate-functions/reference/deltasum.md | 6 +- .../reference/deltasumtimestamp.md | 25 +++ .../AggregateFunctionDeltaSumTimestamp.cpp | 51 ++++++ .../AggregateFunctionDeltaSumTimestamp.h | 156 ++++++++++++++++++ src/AggregateFunctions/Helpers.h | 40 +++++ .../registerAggregateFunctions.cpp | 2 + src/AggregateFunctions/ya.make | 1 + .../01762_deltasumtimestamp.reference | 5 + .../0_stateless/01762_deltasumtimestamp.sql | 5 + 9 files changed, 290 insertions(+), 1 deletion(-) create mode 100644 docs/en/sql-reference/aggregate-functions/reference/deltasumtimestamp.md create mode 100644 src/AggregateFunctions/AggregateFunctionDeltaSumTimestamp.cpp create mode 100644 src/AggregateFunctions/AggregateFunctionDeltaSumTimestamp.h create mode 100644 tests/queries/0_stateless/01762_deltasumtimestamp.reference create mode 100644 tests/queries/0_stateless/01762_deltasumtimestamp.sql diff --git a/docs/en/sql-reference/aggregate-functions/reference/deltasum.md b/docs/en/sql-reference/aggregate-functions/reference/deltasum.md index bb6f802ccaf..d8d20216a39 100644 --- a/docs/en/sql-reference/aggregate-functions/reference/deltasum.md +++ b/docs/en/sql-reference/aggregate-functions/reference/deltasum.md @@ -9,11 +9,15 @@ Syntax: `deltaSum(value)` Adds the differences between consecutive rows. If the difference is negative, it is ignored. `value` must be some integer or floating point type. +Note that the underlying data must be sorted in order for this function to work properly. +If you would like to use this function in a materialized view, you most likely want to use the +[deltaSumTimestamp](deltasumtimestamp.md) method instead. + Example: ```sql select deltaSum(arrayJoin([1, 2, 3])); -- => 2 select deltaSum(arrayJoin([1, 2, 3, 0, 3, 4, 2, 3])); -- => 7 -select deltaSum(arrayJoin([2.25, 3, 4.5])); -- => 2.25 +select deltaSum(arrayJoin([2.25, 3, 4.5])); -- => 2.25 ``` diff --git a/docs/en/sql-reference/aggregate-functions/reference/deltasumtimestamp.md b/docs/en/sql-reference/aggregate-functions/reference/deltasumtimestamp.md new file mode 100644 index 00000000000..0492fc18d97 --- /dev/null +++ b/docs/en/sql-reference/aggregate-functions/reference/deltasumtimestamp.md @@ -0,0 +1,25 @@ +--- +toc_priority: 141 +--- + +# deltaSumTimestamp {#agg_functions-deltasum} + +Syntax: `deltaSumTimestamp(value, timestamp)` + +Adds the differences between consecutive rows. If the difference is negative, it is ignored. +Uses `timestamp` to order values. +`value` must be some integer or floating point type or a Date or DateTime. +`timestamp` must be some integer or floating point type or a Date or DateTime. + +This function works better in materialized views that are ordered by some time bucket aligned +timestamp, for example a `toStartOfMinute` bucket. Because the rows in such a materialized view +will all have the same timestamp, it is impossible for them to be merged in the "right" order. This +function keeps track of the `timestamp` of the values it's seen, so it's possible to order the states +correctly during merging. + +Example: + +```sql +select deltaSumTimestamp(value, timestamp) from (select number as timestamp, [0, 4, 8, 3, 0, 0, 0, 1, 3, 5][number] as value from numbers(1, 10)); -- => 13 +``` + diff --git a/src/AggregateFunctions/AggregateFunctionDeltaSumTimestamp.cpp b/src/AggregateFunctions/AggregateFunctionDeltaSumTimestamp.cpp new file mode 100644 index 00000000000..170c70415bd --- /dev/null +++ b/src/AggregateFunctions/AggregateFunctionDeltaSumTimestamp.cpp @@ -0,0 +1,51 @@ +#include + +#include +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int ILLEGAL_TYPE_OF_ARGUMENT; +} + +namespace +{ + +AggregateFunctionPtr createAggregateFunctionDeltaSumTimestamp( + const String & name, + const DataTypes & arguments, + const Array & params) +{ + assertNoParameters(name, params); + + if (arguments.size() != 2) + throw Exception("Incorrect number of arguments for aggregate function " + name, + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + if (!isInteger(arguments[0]) && !isFloat(arguments[0]) && !isDateOrDateTime(arguments[0])) + throw Exception("Illegal type " + arguments[0]->getName() + " of argument for aggregate function " + + name + ", must be Int, Float, Date, DateTime", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + + if (!isInteger(arguments[1]) && !isFloat(arguments[1]) && !isDateOrDateTime(arguments[1])) + throw Exception("Illegal type " + arguments[1]->getName() + " of argument for aggregate function " + + name + ", must be Int, Float, Date, DateTime", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + + return AggregateFunctionPtr(createWithTwoNumericOrDateTypes( + *arguments[0], *arguments[1], arguments, params)); +} +} + +void registerAggregateFunctionDeltaSumTimestamp(AggregateFunctionFactory & factory) +{ + AggregateFunctionProperties properties = { .returns_default_when_only_null = true, .is_order_dependent = true }; + + factory.registerFunction("deltaSumTimestamp", { createAggregateFunctionDeltaSumTimestamp, properties }); +} + +} diff --git a/src/AggregateFunctions/AggregateFunctionDeltaSumTimestamp.h b/src/AggregateFunctions/AggregateFunctionDeltaSumTimestamp.h new file mode 100644 index 00000000000..e59a74d8b19 --- /dev/null +++ b/src/AggregateFunctions/AggregateFunctionDeltaSumTimestamp.h @@ -0,0 +1,156 @@ +#pragma once + +#include +#include + +#include +#include + +#include +#include +#include + +#include + + +namespace DB +{ + +template +struct AggregationFunctionDeltaSumTimestampData +{ + ValueType sum = 0; + ValueType first = 0; + ValueType last = 0; + TimestampType first_ts = 0; + TimestampType last_ts = 0; + bool seen = false; +}; + +template +class AggregationFunctionDeltaSumTimestamp final + : public IAggregateFunctionDataHelper< + AggregationFunctionDeltaSumTimestampData, + AggregationFunctionDeltaSumTimestamp + > +{ +public: + AggregationFunctionDeltaSumTimestamp(const DataTypes & arguments, const Array & params) + : IAggregateFunctionDataHelper< + AggregationFunctionDeltaSumTimestampData, + AggregationFunctionDeltaSumTimestamp + >{arguments, params} + {} + + AggregationFunctionDeltaSumTimestamp() + : IAggregateFunctionDataHelper< + AggregationFunctionDeltaSumTimestampData, + AggregationFunctionDeltaSumTimestamp + >{} + {} + + String getName() const override { return "deltaSumTimestamp"; } + + DataTypePtr getReturnType() const override { return std::make_shared>(); } + + void NO_SANITIZE_UNDEFINED ALWAYS_INLINE add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override + { + auto value = assert_cast &>(*columns[0]).getData()[row_num]; + auto ts = assert_cast &>(*columns[1]).getData()[row_num]; + + if ((this->data(place).last < value) && this->data(place).seen) + { + this->data(place).sum += (value - this->data(place).last); + } + + this->data(place).last = value; + this->data(place).last_ts = ts; + + if (!this->data(place).seen) + { + this->data(place).first = value; + this->data(place).seen = true; + this->data(place).first_ts = ts; + } + } + + // before returns true if lhs is before rhs or false if it is not or can't be determined + bool ALWAYS_INLINE before ( + const AggregationFunctionDeltaSumTimestampData * lhs, + const AggregationFunctionDeltaSumTimestampData * rhs + ) const { + if (lhs->last_ts < rhs->first_ts) { + return true; + } + if (lhs->last_ts == rhs->first_ts && lhs->last_ts < rhs->last_ts) { + return true; + } + return false; + } + + void NO_SANITIZE_UNDEFINED ALWAYS_INLINE merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override + { + auto place_data = &this->data(place); + auto rhs_data = &this->data(rhs); + + if (!place_data->seen && rhs_data->seen) + { + place_data->sum = rhs_data->sum; + place_data->seen = true; + place_data->first = rhs_data->first; + place_data->first_ts = rhs_data->first_ts; + place_data->last = rhs_data->last; + place_data->last_ts = rhs_data->last_ts; + } + else if (place_data->seen && !rhs_data->seen) + { + // Do nothing + } + else if (before(place_data, rhs_data)) + { + // This state came before the rhs state + + place_data->sum += rhs_data->sum + (rhs_data->first - place_data->last); + place_data->last = rhs_data->last; + place_data->last_ts = rhs_data->last_ts; + } + else if (before(rhs_data, place_data)) + { + // This state came after the rhs state + + place_data->sum += rhs_data->sum + (place_data->first - rhs_data->last); + place_data->first = rhs_data->first; + place_data->first_ts = rhs_data->first_ts; + } + + // If none of those conditions matched, it means both states we are merging have the same + // timestamps. This doesn't make sense to merge. + } + + void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf) const override + { + writeIntBinary(this->data(place).sum, buf); + writeIntBinary(this->data(place).first, buf); + writeIntBinary(this->data(place).first_ts, buf); + writeIntBinary(this->data(place).last, buf); + writeIntBinary(this->data(place).last_ts, buf); + writePODBinary(this->data(place).seen, buf); + } + + void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, Arena *) const override + { + readIntBinary(this->data(place).sum, buf); + readIntBinary(this->data(place).first, buf); + readIntBinary(this->data(place).first_ts, buf); + readIntBinary(this->data(place).last, buf); + readIntBinary(this->data(place).last_ts, buf); + readPODBinary(this->data(place).seen, buf); + } + + void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override + { + assert_cast &>(to).getData().push_back(this->data(place).sum); + } +}; + +} diff --git a/src/AggregateFunctions/Helpers.h b/src/AggregateFunctions/Helpers.h index 2b21b745a0e..838da44bba6 100644 --- a/src/AggregateFunctions/Helpers.h +++ b/src/AggregateFunctions/Helpers.h @@ -190,6 +190,46 @@ static IAggregateFunction * createWithTwoNumericTypes(const IDataType & first_ty return nullptr; } +template class AggregateFunctionTemplate, typename... TArgs> +static IAggregateFunction * createWithTwoNumericOrDateTypesSecond(const IDataType & second_type, TArgs && ... args) +{ + WhichDataType which(second_type); +#define DISPATCH(TYPE) \ + if (which.idx == TypeIndex::TYPE) return new AggregateFunctionTemplate(std::forward(args)...); + FOR_NUMERIC_TYPES(DISPATCH) +#undef DISPATCH + if (which.idx == TypeIndex::Enum8) return new AggregateFunctionTemplate(std::forward(args)...); + if (which.idx == TypeIndex::Enum16) return new AggregateFunctionTemplate(std::forward(args)...); + + /// expects that DataTypeDate based on UInt16, DataTypeDateTime based on UInt32 and UUID based on UInt128 + if (which.idx == TypeIndex::Date) return new AggregateFunctionTemplate(std::forward(args)...); + if (which.idx == TypeIndex::DateTime) return new AggregateFunctionTemplate(std::forward(args)...); + + return nullptr; +} + +template