mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
add deltaSumTimestamp AggregateFunction, docs&test
This commit is contained in:
parent
521fea62ef
commit
e4f1cf530d
@ -9,11 +9,15 @@ Syntax: `deltaSum(value)`
|
||||
Adds the differences between consecutive rows. If the difference is negative, it is ignored.
|
||||
`value` must be some integer or floating point type.
|
||||
|
||||
Note that the underlying data must be sorted in order for this function to work properly.
|
||||
If you would like to use this function in a materialized view, you most likely want to use the
|
||||
[deltaSumTimestamp](deltasumtimestamp.md) method instead.
|
||||
|
||||
Example:
|
||||
|
||||
```sql
|
||||
select deltaSum(arrayJoin([1, 2, 3])); -- => 2
|
||||
select deltaSum(arrayJoin([1, 2, 3, 0, 3, 4, 2, 3])); -- => 7
|
||||
select deltaSum(arrayJoin([2.25, 3, 4.5])); -- => 2.25
|
||||
select deltaSum(arrayJoin([2.25, 3, 4.5])); -- => 2.25
|
||||
```
|
||||
|
||||
|
@ -0,0 +1,25 @@
|
||||
---
|
||||
toc_priority: 141
|
||||
---
|
||||
|
||||
# deltaSumTimestamp {#agg_functions-deltasum}
|
||||
|
||||
Syntax: `deltaSumTimestamp(value, timestamp)`
|
||||
|
||||
Adds the differences between consecutive rows. If the difference is negative, it is ignored.
|
||||
Uses `timestamp` to order values.
|
||||
`value` must be some integer or floating point type or a Date or DateTime.
|
||||
`timestamp` must be some integer or floating point type or a Date or DateTime.
|
||||
|
||||
This function works better in materialized views that are ordered by some time bucket aligned
|
||||
timestamp, for example a `toStartOfMinute` bucket. Because the rows in such a materialized view
|
||||
will all have the same timestamp, it is impossible for them to be merged in the "right" order. This
|
||||
function keeps track of the `timestamp` of the values it's seen, so it's possible to order the states
|
||||
correctly during merging.
|
||||
|
||||
Example:
|
||||
|
||||
```sql
|
||||
select deltaSumTimestamp(value, timestamp) from (select number as timestamp, [0, 4, 8, 3, 0, 0, 0, 1, 3, 5][number] as value from numbers(1, 10)); -- => 13
|
||||
```
|
||||
|
@ -0,0 +1,51 @@
|
||||
#include <AggregateFunctions/AggregateFunctionDeltaSumTimestamp.h>
|
||||
|
||||
#include <AggregateFunctions/AggregateFunctionFactory.h>
|
||||
#include <AggregateFunctions/FactoryHelpers.h>
|
||||
#include <AggregateFunctions/Helpers.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
AggregateFunctionPtr createAggregateFunctionDeltaSumTimestamp(
|
||||
const String & name,
|
||||
const DataTypes & arguments,
|
||||
const Array & params)
|
||||
{
|
||||
assertNoParameters(name, params);
|
||||
|
||||
if (arguments.size() != 2)
|
||||
throw Exception("Incorrect number of arguments for aggregate function " + name,
|
||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
if (!isInteger(arguments[0]) && !isFloat(arguments[0]) && !isDateOrDateTime(arguments[0]))
|
||||
throw Exception("Illegal type " + arguments[0]->getName() + " of argument for aggregate function " +
|
||||
name + ", must be Int, Float, Date, DateTime", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
|
||||
if (!isInteger(arguments[1]) && !isFloat(arguments[1]) && !isDateOrDateTime(arguments[1]))
|
||||
throw Exception("Illegal type " + arguments[1]->getName() + " of argument for aggregate function " +
|
||||
name + ", must be Int, Float, Date, DateTime", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
|
||||
return AggregateFunctionPtr(createWithTwoNumericOrDateTypes<AggregationFunctionDeltaSumTimestamp>(
|
||||
*arguments[0], *arguments[1], arguments, params));
|
||||
}
|
||||
}
|
||||
|
||||
void registerAggregateFunctionDeltaSumTimestamp(AggregateFunctionFactory & factory)
|
||||
{
|
||||
AggregateFunctionProperties properties = { .returns_default_when_only_null = true, .is_order_dependent = true };
|
||||
|
||||
factory.registerFunction("deltaSumTimestamp", { createAggregateFunctionDeltaSumTimestamp, properties });
|
||||
}
|
||||
|
||||
}
|
156
src/AggregateFunctions/AggregateFunctionDeltaSumTimestamp.h
Normal file
156
src/AggregateFunctions/AggregateFunctionDeltaSumTimestamp.h
Normal file
@ -0,0 +1,156 @@
|
||||
#pragma once
|
||||
|
||||
#include <type_traits>
|
||||
#include <experimental/type_traits>
|
||||
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
|
||||
#include <Columns/ColumnVector.h>
|
||||
#include <DataTypes/DataTypesDecimal.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
|
||||
#include <AggregateFunctions/IAggregateFunction.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
template <typename ValueType, typename TimestampType>
|
||||
struct AggregationFunctionDeltaSumTimestampData
|
||||
{
|
||||
ValueType sum = 0;
|
||||
ValueType first = 0;
|
||||
ValueType last = 0;
|
||||
TimestampType first_ts = 0;
|
||||
TimestampType last_ts = 0;
|
||||
bool seen = false;
|
||||
};
|
||||
|
||||
template <typename ValueType, typename TimestampType>
|
||||
class AggregationFunctionDeltaSumTimestamp final
|
||||
: public IAggregateFunctionDataHelper<
|
||||
AggregationFunctionDeltaSumTimestampData<ValueType, TimestampType>,
|
||||
AggregationFunctionDeltaSumTimestamp<ValueType, TimestampType>
|
||||
>
|
||||
{
|
||||
public:
|
||||
AggregationFunctionDeltaSumTimestamp(const DataTypes & arguments, const Array & params)
|
||||
: IAggregateFunctionDataHelper<
|
||||
AggregationFunctionDeltaSumTimestampData<ValueType, TimestampType>,
|
||||
AggregationFunctionDeltaSumTimestamp<ValueType, TimestampType>
|
||||
>{arguments, params}
|
||||
{}
|
||||
|
||||
AggregationFunctionDeltaSumTimestamp()
|
||||
: IAggregateFunctionDataHelper<
|
||||
AggregationFunctionDeltaSumTimestampData<ValueType, TimestampType>,
|
||||
AggregationFunctionDeltaSumTimestamp<ValueType, TimestampType>
|
||||
>{}
|
||||
{}
|
||||
|
||||
String getName() const override { return "deltaSumTimestamp"; }
|
||||
|
||||
DataTypePtr getReturnType() const override { return std::make_shared<DataTypeNumber<ValueType>>(); }
|
||||
|
||||
void NO_SANITIZE_UNDEFINED ALWAYS_INLINE add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
|
||||
{
|
||||
auto value = assert_cast<const ColumnVector<ValueType> &>(*columns[0]).getData()[row_num];
|
||||
auto ts = assert_cast<const ColumnVector<TimestampType> &>(*columns[1]).getData()[row_num];
|
||||
|
||||
if ((this->data(place).last < value) && this->data(place).seen)
|
||||
{
|
||||
this->data(place).sum += (value - this->data(place).last);
|
||||
}
|
||||
|
||||
this->data(place).last = value;
|
||||
this->data(place).last_ts = ts;
|
||||
|
||||
if (!this->data(place).seen)
|
||||
{
|
||||
this->data(place).first = value;
|
||||
this->data(place).seen = true;
|
||||
this->data(place).first_ts = ts;
|
||||
}
|
||||
}
|
||||
|
||||
// before returns true if lhs is before rhs or false if it is not or can't be determined
|
||||
bool ALWAYS_INLINE before (
|
||||
const AggregationFunctionDeltaSumTimestampData<ValueType, TimestampType> * lhs,
|
||||
const AggregationFunctionDeltaSumTimestampData<ValueType, TimestampType> * rhs
|
||||
) const {
|
||||
if (lhs->last_ts < rhs->first_ts) {
|
||||
return true;
|
||||
}
|
||||
if (lhs->last_ts == rhs->first_ts && lhs->last_ts < rhs->last_ts) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void NO_SANITIZE_UNDEFINED ALWAYS_INLINE merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
|
||||
{
|
||||
auto place_data = &this->data(place);
|
||||
auto rhs_data = &this->data(rhs);
|
||||
|
||||
if (!place_data->seen && rhs_data->seen)
|
||||
{
|
||||
place_data->sum = rhs_data->sum;
|
||||
place_data->seen = true;
|
||||
place_data->first = rhs_data->first;
|
||||
place_data->first_ts = rhs_data->first_ts;
|
||||
place_data->last = rhs_data->last;
|
||||
place_data->last_ts = rhs_data->last_ts;
|
||||
}
|
||||
else if (place_data->seen && !rhs_data->seen)
|
||||
{
|
||||
// Do nothing
|
||||
}
|
||||
else if (before(place_data, rhs_data))
|
||||
{
|
||||
// This state came before the rhs state
|
||||
|
||||
place_data->sum += rhs_data->sum + (rhs_data->first - place_data->last);
|
||||
place_data->last = rhs_data->last;
|
||||
place_data->last_ts = rhs_data->last_ts;
|
||||
}
|
||||
else if (before(rhs_data, place_data))
|
||||
{
|
||||
// This state came after the rhs state
|
||||
|
||||
place_data->sum += rhs_data->sum + (place_data->first - rhs_data->last);
|
||||
place_data->first = rhs_data->first;
|
||||
place_data->first_ts = rhs_data->first_ts;
|
||||
}
|
||||
|
||||
// If none of those conditions matched, it means both states we are merging have the same
|
||||
// timestamps. This doesn't make sense to merge.
|
||||
}
|
||||
|
||||
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf) const override
|
||||
{
|
||||
writeIntBinary(this->data(place).sum, buf);
|
||||
writeIntBinary(this->data(place).first, buf);
|
||||
writeIntBinary(this->data(place).first_ts, buf);
|
||||
writeIntBinary(this->data(place).last, buf);
|
||||
writeIntBinary(this->data(place).last_ts, buf);
|
||||
writePODBinary<bool>(this->data(place).seen, buf);
|
||||
}
|
||||
|
||||
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, Arena *) const override
|
||||
{
|
||||
readIntBinary(this->data(place).sum, buf);
|
||||
readIntBinary(this->data(place).first, buf);
|
||||
readIntBinary(this->data(place).first_ts, buf);
|
||||
readIntBinary(this->data(place).last, buf);
|
||||
readIntBinary(this->data(place).last_ts, buf);
|
||||
readPODBinary<bool>(this->data(place).seen, buf);
|
||||
}
|
||||
|
||||
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
|
||||
{
|
||||
assert_cast<ColumnVector<ValueType> &>(to).getData().push_back(this->data(place).sum);
|
||||
}
|
||||
};
|
||||
|
||||
}
|
@ -190,6 +190,46 @@ static IAggregateFunction * createWithTwoNumericTypes(const IDataType & first_ty
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
template <typename FirstType, template <typename, typename> class AggregateFunctionTemplate, typename... TArgs>
|
||||
static IAggregateFunction * createWithTwoNumericOrDateTypesSecond(const IDataType & second_type, TArgs && ... args)
|
||||
{
|
||||
WhichDataType which(second_type);
|
||||
#define DISPATCH(TYPE) \
|
||||
if (which.idx == TypeIndex::TYPE) return new AggregateFunctionTemplate<FirstType, TYPE>(std::forward<TArgs>(args)...);
|
||||
FOR_NUMERIC_TYPES(DISPATCH)
|
||||
#undef DISPATCH
|
||||
if (which.idx == TypeIndex::Enum8) return new AggregateFunctionTemplate<FirstType, Int8>(std::forward<TArgs>(args)...);
|
||||
if (which.idx == TypeIndex::Enum16) return new AggregateFunctionTemplate<FirstType, Int16>(std::forward<TArgs>(args)...);
|
||||
|
||||
/// expects that DataTypeDate based on UInt16, DataTypeDateTime based on UInt32 and UUID based on UInt128
|
||||
if (which.idx == TypeIndex::Date) return new AggregateFunctionTemplate<FirstType, UInt16>(std::forward<TArgs>(args)...);
|
||||
if (which.idx == TypeIndex::DateTime) return new AggregateFunctionTemplate<FirstType, UInt32>(std::forward<TArgs>(args)...);
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
template <template <typename, typename> class AggregateFunctionTemplate, typename... TArgs>
|
||||
static IAggregateFunction * createWithTwoNumericOrDateTypes(const IDataType & first_type, const IDataType & second_type, TArgs && ... args)
|
||||
{
|
||||
WhichDataType which(first_type);
|
||||
#define DISPATCH(TYPE) \
|
||||
if (which.idx == TypeIndex::TYPE) \
|
||||
return createWithTwoNumericOrDateTypesSecond<TYPE, AggregateFunctionTemplate>(second_type, std::forward<TArgs>(args)...);
|
||||
FOR_NUMERIC_TYPES(DISPATCH)
|
||||
#undef DISPATCH
|
||||
if (which.idx == TypeIndex::Enum8)
|
||||
return createWithTwoNumericOrDateTypesSecond<Int8, AggregateFunctionTemplate>(second_type, std::forward<TArgs>(args)...);
|
||||
if (which.idx == TypeIndex::Enum16)
|
||||
return createWithTwoNumericOrDateTypesSecond<Int16, AggregateFunctionTemplate>(second_type, std::forward<TArgs>(args)...);
|
||||
|
||||
/// expects that DataTypeDate based on UInt16, DataTypeDateTime based on UInt32 and UUID based on UInt128
|
||||
if (which.idx == TypeIndex::Date)
|
||||
return createWithTwoNumericOrDateTypesSecond<UInt16, AggregateFunctionTemplate>(second_type, std::forward<TArgs>(args)...);
|
||||
if (which.idx == TypeIndex::DateTime)
|
||||
return createWithTwoNumericOrDateTypesSecond<UInt32, AggregateFunctionTemplate>(second_type, std::forward<TArgs>(args)...);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
template <template <typename> class AggregateFunctionTemplate, typename... TArgs>
|
||||
static IAggregateFunction * createWithStringType(const IDataType & argument_type, TArgs && ... args)
|
||||
{
|
||||
|
@ -12,6 +12,7 @@ void registerAggregateFunctionAvg(AggregateFunctionFactory &);
|
||||
void registerAggregateFunctionAvgWeighted(AggregateFunctionFactory &);
|
||||
void registerAggregateFunctionCount(AggregateFunctionFactory &);
|
||||
void registerAggregateFunctionDeltaSum(AggregateFunctionFactory &);
|
||||
void registerAggregateFunctionDeltaSumTimestamp(AggregateFunctionFactory &);
|
||||
void registerAggregateFunctionGroupArray(AggregateFunctionFactory &);
|
||||
void registerAggregateFunctionGroupUniqArray(AggregateFunctionFactory &);
|
||||
void registerAggregateFunctionGroupArrayInsertAt(AggregateFunctionFactory &);
|
||||
@ -70,6 +71,7 @@ void registerAggregateFunctions()
|
||||
registerAggregateFunctionAvgWeighted(factory);
|
||||
registerAggregateFunctionCount(factory);
|
||||
registerAggregateFunctionDeltaSum(factory);
|
||||
registerAggregateFunctionDeltaSumTimestamp(factory);
|
||||
registerAggregateFunctionGroupArray(factory);
|
||||
registerAggregateFunctionGroupUniqArray(factory);
|
||||
registerAggregateFunctionGroupArrayInsertAt(factory);
|
||||
|
@ -20,6 +20,7 @@ SRCS(
|
||||
AggregateFunctionCombinatorFactory.cpp
|
||||
AggregateFunctionCount.cpp
|
||||
AggregateFunctionDeltaSum.cpp
|
||||
AggregateFunctionDeltaSumTimestamp.cpp
|
||||
AggregateFunctionDistinct.cpp
|
||||
AggregateFunctionEntropy.cpp
|
||||
AggregateFunctionFactory.cpp
|
||||
|
@ -0,0 +1,5 @@
|
||||
10
|
||||
10
|
||||
8
|
||||
8
|
||||
13
|
5
tests/queries/0_stateless/01762_deltasumtimestamp.sql
Normal file
5
tests/queries/0_stateless/01762_deltasumtimestamp.sql
Normal file
@ -0,0 +1,5 @@
|
||||
select deltaSumTimestampMerge(state) from (select deltaSumTimestampState(value, timestamp) as state from (select toDate(number) as timestamp, [4, 5, 5, 5][number-4] as value from numbers(5, 4)) UNION ALL select deltaSumTimestampState(value, timestamp) as state from (select toDate(number) as timestamp, [0, 4, 8, 3][number] as value from numbers(1, 4)));
|
||||
select deltaSumTimestampMerge(state) from (select deltaSumTimestampState(value, timestamp) as state from (select number as timestamp, [0, 4, 8, 3][number] as value from numbers(1, 4)) UNION ALL select deltaSumTimestampState(value, timestamp) as state from (select number as timestamp, [4, 5, 5, 5][number-4] as value from numbers(5, 4)));
|
||||
select deltaSumTimestamp(value, timestamp) from (select toDateTime(number) as timestamp, [0, 4, 8, 3][number] as value from numbers(1, 4));
|
||||
select deltaSumTimestamp(value, timestamp) from (select toDateTime(number) as timestamp, [0, 4.5, 8, 3][number] as value from numbers(1, 4));
|
||||
select deltaSumTimestamp(value, timestamp) from (select number as timestamp, [0, 4, 8, 3, 0, 0, 0, 1, 3, 5][number] as value from numbers(1, 10));
|
Loading…
Reference in New Issue
Block a user