add AggregateFunction TSgroup{Rate}Sum (#4542)

This commit is contained in:
Yangkuan Liu 2019-05-16 00:16:25 +08:00 committed by proller
parent 1ea9e3019d
commit 0760a3436f
6 changed files with 405 additions and 1 deletions

View File

@ -0,0 +1,31 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionTSGroupSum.h>
#include <AggregateFunctions/Helpers.h>
#include <AggregateFunctions/FactoryHelpers.h>
namespace DB
{
namespace
{
template<bool rate>
AggregateFunctionPtr createAggregateFunctionTSgroupSum(const std::string & name, const DataTypes & arguments, const Array & params)
{
assertNoParameters(name, params);
if (arguments.size() < 3)
throw Exception("Not enough event arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return std::make_shared<AggregateFunctionTSgroupSum<rate>>(arguments);
}
}
void registerAggregateFunctionTSgroupSum(AggregateFunctionFactory & factory)
{
factory.registerFunction("TSgroupSum", createAggregateFunctionTSgroupSum<false>, AggregateFunctionFactory::CaseInsensitive);
factory.registerFunction("TSgroupRateSum", createAggregateFunctionTSgroupSum<true>, AggregateFunctionFactory::CaseInsensitive);
}
}

View File

@ -0,0 +1,306 @@
#pragma once
#include <iostream>
#include <sstream>
#include <map>
#include <queue>
#include <utility>
#include <unordered_set>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnTuple.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeArray.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Common/ArenaAllocator.h>
#include <ext/range.h>
#include <bitset>
#include <AggregateFunctions/IAggregateFunction.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int TOO_MANY_ARGUMENTS_FOR_FUNCTION;
}
template <bool rate>
struct AggregateFunctionTSgroupSumData
{
using DataPoint = std::pair<Int64, Float64>;
struct Points
{
using Dps = std::queue<DataPoint>;
Dps dps;
void add(Int64 t, Float64 v)
{
dps.push(std::make_pair(t, v));
if (dps.size() > 2)
dps.pop();
}
Float64 getval(Int64 t)
{
Int64 t1, t2;
Float64 v1, v2;
if (rate)
{
if (dps.size() < 2)
return 0;
t1 = dps.back().first;
t2 = dps.front().first;
v1 = dps.back().second;
v2 = dps.front().second;
return (v1-v2)/Float64(t1-t2);
}
else
{
if (dps.size() == 1 && t == dps.front().first)
return dps.front().second;
t1 = dps.back().first;
t2 = dps.front().first;
v1 = dps.back().second;
v2 = dps.front().second;
return v2 + ((v1-v2)*Float64(t-t2))/Float64(t1-t2);
}
}
};
static constexpr size_t bytes_on_stack = 128;
typedef std::map<UInt64, Points> Series;
typedef PODArray<DataPoint, bytes_on_stack, AllocatorWithStackMemory<Allocator<false>, bytes_on_stack>> AggSeries;
Series ss;
AggSeries result;
void add(UInt64 uid, Int64 t, Float64 v)
{//suppose t is coming asc
typename Series::iterator it_ss;
if (ss.count(uid) == 0)
{//time series not exist, insert new one
Points tmp;
tmp.add(t, v);
ss.emplace(uid, tmp);
it_ss = ss.find(uid);
}
else
{
it_ss = ss.find(uid);
it_ss->second.add(t, v);
}
if (result.size() > 0 && t < result.back().first)
throw Exception{"TSgroupSum or TSgroupRateSum must order by timestamp asc!!!", ErrorCodes::LOGICAL_ERROR};
if (result.size() > 0 && t == result.back().first)
{
//do not add new point
if (rate)
result.back().second += it_ss->second.getval(t);
else
result.back().second += v;
}
else
{
if (rate)
result.emplace_back(std::make_pair(t, it_ss->second.getval(t)));
else
result.emplace_back(std::make_pair(t,v));
}
size_t i = result.size() - 1;
//reverse find out the index of timestamp that more than previous timestamp of t
while (result[i].first > it_ss->second.dps.front().first && i >= 0)
i--;
i++;
while (i < result.size()-1)
{
result[i].second += it_ss->second.getval(result[i].first);
i++;
}
}
void merge(const AggregateFunctionTSgroupSumData & other)
{
//if ts has overlap, then aggregate two series by interpolation;
AggSeries tmp;
tmp.reserve(other.result.size() + result.size());
size_t i=0, j=0;
Int64 t1, t2;
Float64 v1, v2;
while (i < result.size() && j < other.result.size())
{
if (result[i].first < other.result[j].first)
{
if (j==0)
{
tmp.emplace_back(result[i]);
}
else
{
t1 = other.result[j].first;
t2 = other.result[j-1].first;
v1 = other.result[j].second;
v2 = other.result[j-1].second;
Float64 value = result[i].second + v2 + (v1 - v2) * (Float64(result[i].first - t2)) / Float64(t1 - t2);
tmp.emplace_back(std::make_pair(result[i].first, value));
}
i++;
}
else if (result[i].first > other.result[j].first)
{
if (i==0)
{
tmp.emplace_back(other.result[j]);
}
else
{
t1 = result[i].first;
t2 = result[i-1].first;
v1 = result[i].second;
v2 = result[i-1].second;
Float64 value = other.result[j].second + v2 + (v1-v2)*(Float64(other.result[j].first-t2))/Float64(t1-t2);
tmp.emplace_back(std::make_pair(other.result[j].first, value));
}
j++;
}
else
{
tmp.emplace_back(std::make_pair(result[i].first, result[i].second + other.result[j].second));
i++;
j++;
}
}
while (i < result.size())
{
tmp.emplace_back(result[i]);
i++;
}
while (j < other.result.size())
{
tmp.push_back(other.result[j]);
j++;
}
swap(result, tmp);
}
void serialize(WriteBuffer & buf) const
{
size_t size = result.size();
writeVarUInt(size, buf);
buf.write(reinterpret_cast<const char *>(result.data()), sizeof(result[0]));
}
void deserialize(ReadBuffer & buf)
{
size_t size = 0;
readVarUInt(size, buf);
result.resize(size);
buf.read(reinterpret_cast<char *>(result.data()), size * sizeof(result[0]));
}
};
template<bool rate>
class AggregateFunctionTSgroupSum final
: public IAggregateFunctionDataHelper<AggregateFunctionTSgroupSumData<rate>, AggregateFunctionTSgroupSum<rate>>
{
private:
public:
String getName() const override
{
return rate?"TSgroupRateSum":"TSgroupSum";
}
AggregateFunctionTSgroupSum(const DataTypes & arguments)
: IAggregateFunctionDataHelper<AggregateFunctionTSgroupSumData<rate>, AggregateFunctionTSgroupSum<rate>>(arguments, {})
{
if (!WhichDataType(arguments[0].get()).isUInt64())
throw Exception{"Illegal type " + arguments[0].get()->getName() + " of argument 1 of aggregate function "
+ getName() + ", must be UInt64",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
if (!WhichDataType(arguments[1].get()).isInt64())
throw Exception{"Illegal type " + arguments[1].get()->getName() + " of argument 2 of aggregate function "
+ getName() + ", must be Int64",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
if (!WhichDataType(arguments[2].get()).isFloat64())
throw Exception{"Illegal type " + arguments[2].get()->getName() + " of argument 3 of aggregate function "
+ getName() + ", must be Float64",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
}
DataTypePtr getReturnType() const override
{
auto datatypes = std::vector<DataTypePtr>();
datatypes.push_back(std::make_shared<DataTypeInt64>());
datatypes.push_back(std::make_shared<DataTypeFloat64>());
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeTuple>(datatypes));
}
void add(AggregateDataPtr place, const IColumn ** columns, const size_t row_num, Arena *) const override
{
auto uid = static_cast<const ColumnVector<UInt64> *>(columns[0])->getData()[row_num];
auto ts = static_cast<const ColumnVector<Int64> *>(columns[1])->getData()[row_num];
auto val = static_cast<const ColumnVector<Float64> *>(columns[2])->getData()[row_num];
if (uid && ts && val)
{
this->data(place).add(uid, ts, val);
}
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
{
this->data(place).merge(this->data(rhs));
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
this->data(place).serialize(buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
this->data(place).deserialize(buf);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
const auto & value = this->data(place).result;
size_t size = value.size();
ColumnArray & arr_to = static_cast<ColumnArray &>(to);
ColumnArray::Offsets & offsets_to = arr_to.getOffsets();
size_t old_size = offsets_to.back();
offsets_to.push_back(offsets_to.back() + size);
if (size)
{
typename ColumnInt64::Container & ts_to = static_cast<ColumnInt64 &>(static_cast<ColumnTuple &>(arr_to.getData()).getColumn(0)).getData();
typename ColumnFloat64::Container & val_to = static_cast<ColumnFloat64 &>(static_cast<ColumnTuple &>(arr_to.getData()).getColumn(1)).getData();
ts_to.reserve(old_size + size);
val_to.reserve(old_size + size);
size_t i = 0;
while (i < this->data(place).result.size())
{
ts_to.push_back(this->data(place).result[i].first);
val_to.push_back(this->data(place).result[i].second);
i++;
}
}
}
bool allocatesMemoryInArena() const override
{
return true;
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
};
}

View File

@ -41,7 +41,7 @@ void registerAggregateFunctionCombinatorNull(AggregateFunctionCombinatorFactory
void registerAggregateFunctionHistogram(AggregateFunctionFactory & factory);
void registerAggregateFunctionRetention(AggregateFunctionFactory & factory);
void registerAggregateFunctionTSgroupSum(AggregateFunctionFactory & factory);
void registerAggregateFunctions()
{
{
@ -70,6 +70,7 @@ void registerAggregateFunctions()
registerAggregateFunctionsMaxIntersections(factory);
registerAggregateFunctionHistogram(factory);
registerAggregateFunctionRetention(factory);
registerAggregateFunctionTSgroupSum(factory);
registerAggregateFunctionMLMethod(factory);
registerAggregateFunctionEntropy(factory);
registerAggregateFunctionLeastSqr(factory);

View File

@ -0,0 +1,2 @@
[(2,0.2),(3,0.8999999999999999),(7,2.0999999999999996),(8,2.4),(12,3.5999999999999996),(17,5.1000000000000005),(18,5.4),(24,7.199999999999999),(25,2.5)]
[(2,0),(3,0.09999999999999999),(7,0.3),(8,0.30000000000000004),(12,0.29999999999999993),(17,0.30000000000000004),(18,0.30000000000000004),(24,0.29999999999999993),(25,0.1)]

View File

@ -0,0 +1,10 @@
drop table if exists tsgroupsum_test;
create table tsgroupsum_test (uid UInt64, ts Int64, value Float64) engine=Memory;
insert into tsgroupsum_test values (1,2,0.2),(1,7,0.7),(1,12,1.2),(1,17,1.7),(1,25,2.5);
insert into tsgroupsum_test values (2,3,0.6),(2,8,1.6),(2,12,2.4),(2,18,3.6),(2,24,4.8);
select TSgroupSum(uid, ts, value) from (select * from tsgroupsum_test order by ts asc);
select TSgroupRateSum(uid, ts, value) from (select * from tsgroupsum_test order by ts asc);
drop table tsgroupsum_test;

View File

@ -301,6 +301,60 @@ GROUP BY timeslot
└─────────────────────┴──────────────────────────────────────────────┘
```
## TSgroupSum(uid, timestamp, value) {#agg_function-tsgroupsum}
TSgroupSum can aggregate different time series that sample timestamp not alignment.
It will use linear interpolation between two sample timestamp and then sum time-series together.
`uid` is the time series unique id, UInt64.
`timestamp` is Int64 type in order to support millisecond or microsecond.
`value` is the metric.
Before use this function, timestamp should be in ascend order
Example:
```
┌─uid─┬─timestamp─┬─value─┐
│ 1 │ 2 │ 0.2 │
│ 1 │ 7 │ 0.7 │
│ 1 │ 12 │ 1.2 │
│ 1 │ 17 │ 1.7 │
│ 1 │ 25 │ 2.5 │
│ 2 │ 3 │ 0.6 │
│ 2 │ 8 │ 1.6 │
│ 2 │ 12 │ 2.4 │
│ 2 │ 18 │ 3.6 │
│ 2 │ 24 │ 4.8 │
└─────┴───────────┴───────┘
```
```
CREATE TABLE time_series(
uid UInt64,
timestamp Int64,
value Float64
) ENGINE = Memory;
INSERT INTO time_series VALUES
(1,2,0.2),(1,7,0.7),(1,12,1.2),(1,17,1.7),(1,25,2.5),
(2,3,0.6),(2,8,1.6),(2,12,2.4),(2,18,3.6),(2,24,4.8);
SELECT TSgroupSum(uid, timestamp, value)
FROM (
SELECT * FROM time_series order by timestamp ASC
);
```
And the result will be:
```
[(2,0.2),(3,0.9),(7,2.1),(8,2.4),(12,3.6),(17,5.1),(18,5.4),(24,7.2),(25,2.5)]
```
## TSgroupRateSum(uid, ts, val) {#agg_function-tsgroupratesum}
Similarly TSgroupRateSum, TSgroupRateSum will Calculate the rate of time-series and then sum rates together.
Also, timestamp should be in ascend order before use this function.
Use this function, the result above case will be:
```
[(2,0),(3,0.1),(7,0.3),(8,0.3),(12,0.3),(17,0.3),(18,0.3),(24,0.3),(25,0.1)]
```
## avg(x) {#agg_function-avg}
Calculates the average.