mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 08:02:02 +00:00
Remove timeSeriesGroupSum
This commit is contained in:
parent
fb3a69b298
commit
85d3e62275
@ -1,35 +0,0 @@
|
||||
#include "AggregateFunctionTimeSeriesGroupSum.h"
|
||||
#include "AggregateFunctionFactory.h"
|
||||
#include "FactoryHelpers.h"
|
||||
#include "Helpers.h"
|
||||
#include "registerAggregateFunctions.h"
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
}
|
||||
namespace
|
||||
{
|
||||
template <bool rate>
|
||||
AggregateFunctionPtr createAggregateFunctionTimeSeriesGroupSum(const std::string & name, const DataTypes & arguments, const Array & params)
|
||||
{
|
||||
assertNoParameters(name, params);
|
||||
|
||||
if (arguments.size() < 3)
|
||||
throw Exception("Not enough event arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
return std::make_shared<AggregateFunctionTimeSeriesGroupSum<rate>>(arguments);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void registerAggregateFunctionTimeSeriesGroupSum(AggregateFunctionFactory & factory)
|
||||
{
|
||||
factory.registerFunction("timeSeriesGroupSum", createAggregateFunctionTimeSeriesGroupSum<false>);
|
||||
factory.registerFunction("timeSeriesGroupRateSum", createAggregateFunctionTimeSeriesGroupSum<true>);
|
||||
}
|
||||
|
||||
}
|
@ -1,291 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <bitset>
|
||||
#include <map>
|
||||
#include <queue>
|
||||
#include <unordered_set>
|
||||
#include <utility>
|
||||
#include <Columns/ColumnArray.h>
|
||||
#include <Columns/ColumnTuple.h>
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
#include <DataTypes/DataTypeTuple.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Common/ArenaAllocator.h>
|
||||
#include <Common/assert_cast.h>
|
||||
#include <ext/range.h>
|
||||
#include "IAggregateFunction.h"
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
|
||||
}
|
||||
|
||||
template <bool rate>
|
||||
struct AggregateFunctionTimeSeriesGroupSumData
|
||||
{
|
||||
using DataPoint = std::pair<Int64, Float64>;
|
||||
struct Points
|
||||
{
|
||||
using Dps = std::queue<DataPoint>;
|
||||
Dps dps;
|
||||
void add(Int64 t, Float64 v)
|
||||
{
|
||||
dps.push(std::make_pair(t, v));
|
||||
if (dps.size() > 2)
|
||||
dps.pop();
|
||||
}
|
||||
Float64 getval(Int64 t)
|
||||
{
|
||||
Int64 t1, t2;
|
||||
Float64 v1, v2;
|
||||
if (rate)
|
||||
{
|
||||
if (dps.size() < 2)
|
||||
return 0;
|
||||
t1 = dps.back().first;
|
||||
t2 = dps.front().first;
|
||||
v1 = dps.back().second;
|
||||
v2 = dps.front().second;
|
||||
return (v1 - v2) / Float64(t1 - t2);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (dps.size() == 1 && t == dps.front().first)
|
||||
return dps.front().second;
|
||||
t1 = dps.back().first;
|
||||
t2 = dps.front().first;
|
||||
v1 = dps.back().second;
|
||||
v2 = dps.front().second;
|
||||
return v2 + ((v1 - v2) * Float64(t - t2)) / Float64(t1 - t2);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
typedef std::map<UInt64, Points> Series;
|
||||
typedef PODArrayWithStackMemory<DataPoint, 128> AggSeries;
|
||||
Series ss;
|
||||
AggSeries result;
|
||||
|
||||
void add(UInt64 uid, Int64 t, Float64 v)
|
||||
{ //suppose t is coming asc
|
||||
typename Series::iterator it_ss;
|
||||
if (ss.count(uid) == 0)
|
||||
{ //time series not exist, insert new one
|
||||
Points tmp;
|
||||
tmp.add(t, v);
|
||||
ss.emplace(uid, tmp);
|
||||
it_ss = ss.find(uid);
|
||||
}
|
||||
else
|
||||
{
|
||||
it_ss = ss.find(uid);
|
||||
it_ss->second.add(t, v);
|
||||
}
|
||||
if (result.size() > 0 && t < result.back().first)
|
||||
throw Exception{"timeSeriesGroupSum or timeSeriesGroupRateSum must order by timestamp asc.", ErrorCodes::LOGICAL_ERROR};
|
||||
if (result.size() > 0 && t == result.back().first)
|
||||
{
|
||||
//do not add new point
|
||||
if (rate)
|
||||
result.back().second += it_ss->second.getval(t);
|
||||
else
|
||||
result.back().second += v;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (rate)
|
||||
result.emplace_back(std::make_pair(t, it_ss->second.getval(t)));
|
||||
else
|
||||
result.emplace_back(std::make_pair(t, v));
|
||||
}
|
||||
ssize_t i = result.size() - 1;
|
||||
//reverse find out the index of timestamp that more than previous timestamp of t
|
||||
while (result[i].first > it_ss->second.dps.front().first && i >= 0)
|
||||
i--;
|
||||
|
||||
i++;
|
||||
while (i < ssize_t(result.size()) - 1)
|
||||
{
|
||||
result[i].second += it_ss->second.getval(result[i].first);
|
||||
i++;
|
||||
}
|
||||
}
|
||||
|
||||
void merge(const AggregateFunctionTimeSeriesGroupSumData & other)
|
||||
{
|
||||
//if ts has overlap, then aggregate two series by interpolation;
|
||||
AggSeries tmp;
|
||||
tmp.reserve(other.result.size() + result.size());
|
||||
size_t i = 0, j = 0;
|
||||
Int64 t1, t2;
|
||||
Float64 v1, v2;
|
||||
while (i < result.size() && j < other.result.size())
|
||||
{
|
||||
if (result[i].first < other.result[j].first)
|
||||
{
|
||||
if (j == 0)
|
||||
{
|
||||
tmp.emplace_back(result[i]);
|
||||
}
|
||||
else
|
||||
{
|
||||
t1 = other.result[j].first;
|
||||
t2 = other.result[j - 1].first;
|
||||
v1 = other.result[j].second;
|
||||
v2 = other.result[j - 1].second;
|
||||
Float64 value = result[i].second + v2 + (v1 - v2) * (Float64(result[i].first - t2)) / Float64(t1 - t2);
|
||||
tmp.emplace_back(std::make_pair(result[i].first, value));
|
||||
}
|
||||
i++;
|
||||
}
|
||||
else if (result[i].first > other.result[j].first)
|
||||
{
|
||||
if (i == 0)
|
||||
{
|
||||
tmp.emplace_back(other.result[j]);
|
||||
}
|
||||
else
|
||||
{
|
||||
t1 = result[i].first;
|
||||
t2 = result[i - 1].first;
|
||||
v1 = result[i].second;
|
||||
v2 = result[i - 1].second;
|
||||
Float64 value = other.result[j].second + v2 + (v1 - v2) * (Float64(other.result[j].first - t2)) / Float64(t1 - t2);
|
||||
tmp.emplace_back(std::make_pair(other.result[j].first, value));
|
||||
}
|
||||
j++;
|
||||
}
|
||||
else
|
||||
{
|
||||
tmp.emplace_back(std::make_pair(result[i].first, result[i].second + other.result[j].second));
|
||||
i++;
|
||||
j++;
|
||||
}
|
||||
}
|
||||
while (i < result.size())
|
||||
{
|
||||
tmp.emplace_back(result[i]);
|
||||
i++;
|
||||
}
|
||||
while (j < other.result.size())
|
||||
{
|
||||
tmp.push_back(other.result[j]);
|
||||
j++;
|
||||
}
|
||||
swap(result, tmp);
|
||||
}
|
||||
|
||||
void serialize(WriteBuffer & buf) const
|
||||
{
|
||||
size_t size = result.size();
|
||||
writeVarUInt(size, buf);
|
||||
if (size > 0)
|
||||
{
|
||||
buf.write(reinterpret_cast<const char *>(result.data()), size * sizeof(result[0]));
|
||||
}
|
||||
}
|
||||
|
||||
void deserialize(ReadBuffer & buf)
|
||||
{
|
||||
size_t size = 0;
|
||||
readVarUInt(size, buf);
|
||||
result.resize(size);
|
||||
if (size > 0)
|
||||
{
|
||||
buf.read(reinterpret_cast<char *>(result.data()), size * sizeof(result[0]));
|
||||
}
|
||||
}
|
||||
};
|
||||
template <bool rate>
|
||||
class AggregateFunctionTimeSeriesGroupSum final
|
||||
: public IAggregateFunctionDataHelper<AggregateFunctionTimeSeriesGroupSumData<rate>, AggregateFunctionTimeSeriesGroupSum<rate>>
|
||||
{
|
||||
private:
|
||||
public:
|
||||
String getName() const override { return rate ? "timeSeriesGroupRateSum" : "timeSeriesGroupSum"; }
|
||||
|
||||
AggregateFunctionTimeSeriesGroupSum(const DataTypes & arguments)
|
||||
: IAggregateFunctionDataHelper<AggregateFunctionTimeSeriesGroupSumData<rate>, AggregateFunctionTimeSeriesGroupSum<rate>>(arguments, {})
|
||||
{
|
||||
if (!WhichDataType(arguments[0].get()).isUInt64())
|
||||
throw Exception{"Illegal type " + arguments[0].get()->getName() + " of argument 1 of aggregate function " + getName()
|
||||
+ ", must be UInt64",
|
||||
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
|
||||
|
||||
if (!WhichDataType(arguments[1].get()).isInt64())
|
||||
throw Exception{"Illegal type " + arguments[1].get()->getName() + " of argument 2 of aggregate function " + getName()
|
||||
+ ", must be Int64",
|
||||
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
|
||||
|
||||
if (!WhichDataType(arguments[2].get()).isFloat64())
|
||||
throw Exception{"Illegal type " + arguments[2].get()->getName() + " of argument 3 of aggregate function " + getName()
|
||||
+ ", must be Float64",
|
||||
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
|
||||
}
|
||||
|
||||
DataTypePtr getReturnType() const override
|
||||
{
|
||||
auto datatypes = std::vector<DataTypePtr>();
|
||||
datatypes.push_back(std::make_shared<DataTypeInt64>());
|
||||
datatypes.push_back(std::make_shared<DataTypeFloat64>());
|
||||
|
||||
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeTuple>(datatypes));
|
||||
}
|
||||
|
||||
void add(AggregateDataPtr place, const IColumn ** columns, const size_t row_num, Arena *) const override
|
||||
{
|
||||
auto uid = assert_cast<const ColumnVector<UInt64> *>(columns[0])->getData()[row_num];
|
||||
auto ts = assert_cast<const ColumnVector<Int64> *>(columns[1])->getData()[row_num];
|
||||
auto val = assert_cast<const ColumnVector<Float64> *>(columns[2])->getData()[row_num];
|
||||
if (uid && ts && val)
|
||||
{
|
||||
this->data(place).add(uid, ts, val);
|
||||
}
|
||||
}
|
||||
|
||||
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override { this->data(place).merge(this->data(rhs)); }
|
||||
|
||||
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override { this->data(place).serialize(buf); }
|
||||
|
||||
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override { this->data(place).deserialize(buf); }
|
||||
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
|
||||
{
|
||||
const auto & value = this->data(place).result;
|
||||
size_t size = value.size();
|
||||
|
||||
ColumnArray & arr_to = assert_cast<ColumnArray &>(to);
|
||||
ColumnArray::Offsets & offsets_to = arr_to.getOffsets();
|
||||
size_t old_size = offsets_to.back();
|
||||
|
||||
offsets_to.push_back(offsets_to.back() + size);
|
||||
|
||||
if (size)
|
||||
{
|
||||
typename ColumnInt64::Container & ts_to
|
||||
= assert_cast<ColumnInt64 &>(assert_cast<ColumnTuple &>(arr_to.getData()).getColumn(0)).getData();
|
||||
typename ColumnFloat64::Container & val_to
|
||||
= assert_cast<ColumnFloat64 &>(assert_cast<ColumnTuple &>(arr_to.getData()).getColumn(1)).getData();
|
||||
ts_to.reserve(old_size + size);
|
||||
val_to.reserve(old_size + size);
|
||||
size_t i = 0;
|
||||
while (i < this->data(place).result.size())
|
||||
{
|
||||
ts_to.push_back(this->data(place).result[i].first);
|
||||
val_to.push_back(this->data(place).result[i].second);
|
||||
i++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return true; }
|
||||
};
|
||||
}
|
@ -32,7 +32,6 @@ void registerAggregateFunctionsBitmap(AggregateFunctionFactory &);
|
||||
void registerAggregateFunctionsMaxIntersections(AggregateFunctionFactory &);
|
||||
void registerAggregateFunctionHistogram(AggregateFunctionFactory &);
|
||||
void registerAggregateFunctionRetention(AggregateFunctionFactory &);
|
||||
void registerAggregateFunctionTimeSeriesGroupSum(AggregateFunctionFactory &);
|
||||
void registerAggregateFunctionMLMethod(AggregateFunctionFactory &);
|
||||
void registerAggregateFunctionEntropy(AggregateFunctionFactory &);
|
||||
void registerAggregateFunctionSimpleLinearRegression(AggregateFunctionFactory &);
|
||||
@ -86,7 +85,6 @@ void registerAggregateFunctions()
|
||||
registerAggregateFunctionsMaxIntersections(factory);
|
||||
registerAggregateFunctionHistogram(factory);
|
||||
registerAggregateFunctionRetention(factory);
|
||||
registerAggregateFunctionTimeSeriesGroupSum(factory);
|
||||
registerAggregateFunctionMLMethod(factory);
|
||||
registerAggregateFunctionEntropy(factory);
|
||||
registerAggregateFunctionSimpleLinearRegression(factory);
|
||||
|
@ -45,7 +45,6 @@ SRCS(
|
||||
AggregateFunctionStatisticsSimple.cpp
|
||||
AggregateFunctionSum.cpp
|
||||
AggregateFunctionSumMap.cpp
|
||||
AggregateFunctionTimeSeriesGroupSum.cpp
|
||||
AggregateFunctionTopK.cpp
|
||||
AggregateFunctionUniq.cpp
|
||||
AggregateFunctionUniqCombined.cpp
|
||||
|
@ -344,8 +344,6 @@
|
||||
"TABLE"
|
||||
"TABLES"
|
||||
"TEMPORARY"
|
||||
"timeSeriesGroupRateSum"
|
||||
"timeSeriesGroupSum"
|
||||
"TIMESTAMP"
|
||||
"TIMESTAMP_ADD"
|
||||
"TIMESTAMPADD"
|
||||
|
@ -1,2 +0,0 @@
|
||||
[(2,0.2),(3,0.8999999999999999),(7,2.0999999999999996),(8,2.4),(12,3.5999999999999996),(17,5.1000000000000005),(18,5.4),(24,7.199999999999999),(25,2.5)]
|
||||
[(2,0),(3,0.09999999999999999),(7,0.3),(8,0.30000000000000004),(12,0.29999999999999993),(17,0.30000000000000004),(18,0.30000000000000004),(24,0.29999999999999993),(25,0.1)]
|
@ -1,10 +0,0 @@
|
||||
drop table if exists tsgroupsum_test;
|
||||
|
||||
create table tsgroupsum_test (uid UInt64, ts Int64, value Float64) engine=Memory;
|
||||
insert into tsgroupsum_test values (1,2,0.2),(1,7,0.7),(1,12,1.2),(1,17,1.7),(1,25,2.5);
|
||||
insert into tsgroupsum_test values (2,3,0.6),(2,8,1.6),(2,12,2.4),(2,18,3.6),(2,24,4.8);
|
||||
|
||||
select timeSeriesGroupSum(uid, ts, value) from (select * from tsgroupsum_test order by ts asc);
|
||||
select timeSeriesGroupRateSum(uid, ts, value) from (select * from tsgroupsum_test order by ts asc);
|
||||
|
||||
drop table tsgroupsum_test;
|
@ -1,3 +0,0 @@
|
||||
[]
|
||||
1
|
||||
server is still alive
|
@ -1,23 +0,0 @@
|
||||
DROP TABLE IF EXISTS tsgs_local;
|
||||
DROP TABLE IF EXISTS tsgs;
|
||||
|
||||
CREATE TABLE tsgs_local ENGINE = MergeTree ORDER BY tuple() AS
|
||||
SELECT
|
||||
toUInt64(13820745146630357293) AS a,
|
||||
toInt64(1604422500000000000) AS b,
|
||||
toFloat64(0) AS c
|
||||
FROM numbers(100);
|
||||
|
||||
-- the issue (https://github.com/ClickHouse/ClickHouse/issues/16862) happens during serialization of the state
|
||||
-- so happens only when Distributed tables are used or with -State modifier.
|
||||
|
||||
CREATE TABLE tsgs AS tsgs_local ENGINE = Distributed(test_cluster_two_shards, currentDatabase(), tsgs_local);
|
||||
|
||||
SELECT timeSeriesGroupSum(a, b, c) FROM tsgs;
|
||||
|
||||
SELECT count() FROM ( SELECT timeSeriesGroupSumState(a, b, c) as x FROM tsgs_local) WHERE NOT ignore(*);
|
||||
|
||||
SELECT 'server is still alive';
|
||||
|
||||
DROP TABLE tsgs_local;
|
||||
DROP TABLE tsgs;
|
Loading…
Reference in New Issue
Block a user