mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 09:32:06 +00:00
Merge pull request #5595 from inv2004/moving-sum
movingSum/Avg window functions for numeric and decimals
This commit is contained in:
commit
b43e75200e
@ -0,0 +1,100 @@
|
||||
#include <AggregateFunctions/AggregateFunctionFactory.h>
|
||||
#include <AggregateFunctions/AggregateFunctionGroupArrayMoving.h>
|
||||
#include <AggregateFunctions/Helpers.h>
|
||||
#include <AggregateFunctions/FactoryHelpers.h>
|
||||
#include <DataTypes/DataTypeDate.h>
|
||||
#include <DataTypes/DataTypeDateTime.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
template <typename T, typename Tlimit_num_elems>
|
||||
struct MovingSum
|
||||
{
|
||||
using DataType = MovingSumData<T>;
|
||||
using Function = MovingImpl<T, Tlimit_num_elems, DataType>;
|
||||
};
|
||||
|
||||
template <typename T, typename Tlimit_num_elems>
|
||||
struct MovingAvg
|
||||
{
|
||||
using DataType = MovingAvgData<T>;
|
||||
using Function = MovingImpl<T, Tlimit_num_elems, DataType>;
|
||||
};
|
||||
|
||||
template <typename T, typename Tlimit_num_elems> using MovingSumTemplate = typename MovingSum<T, Tlimit_num_elems>::Function;
|
||||
template <typename T, typename Tlimit_num_elems> using MovingAvgTemplate = typename MovingAvg<T, Tlimit_num_elems>::Function;
|
||||
|
||||
template <template <typename, typename> class Function, typename has_limit, typename ... TArgs>
|
||||
inline AggregateFunctionPtr createAggregateFunctionMovingImpl(const std::string & name, const DataTypePtr & argument_type, TArgs ... args)
|
||||
{
|
||||
AggregateFunctionPtr res;
|
||||
|
||||
if (isDecimal(argument_type))
|
||||
res.reset(createWithDecimalType<Function, has_limit>(*argument_type, argument_type, std::forward<TArgs>(args)...));
|
||||
else
|
||||
res.reset(createWithNumericType<Function, has_limit>(*argument_type, argument_type, std::forward<TArgs>(args)...));
|
||||
|
||||
if (!res)
|
||||
throw Exception("Illegal type " + argument_type->getName() + " of argument for aggregate function " + name,
|
||||
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
template <template <typename, typename> class Function>
|
||||
AggregateFunctionPtr createAggregateFunctionMoving(const std::string & name, const DataTypes & argument_types, const Array & parameters)
|
||||
{
|
||||
assertUnary(name, argument_types);
|
||||
|
||||
bool limit_size = false;
|
||||
|
||||
UInt64 max_elems = std::numeric_limits<UInt64>::max();
|
||||
|
||||
if (parameters.empty())
|
||||
{
|
||||
// cumulative sum without parameter
|
||||
}
|
||||
else if (parameters.size() == 1)
|
||||
{
|
||||
auto type = parameters[0].getType();
|
||||
if (type != Field::Types::Int64 && type != Field::Types::UInt64)
|
||||
throw Exception("Parameter for aggregate function " + name + " should be positive number", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
if ((type == Field::Types::Int64 && parameters[0].get<Int64>() < 0) ||
|
||||
(type == Field::Types::UInt64 && parameters[0].get<UInt64>() == 0))
|
||||
throw Exception("Parameter for aggregate function " + name + " should be positive number", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
limit_size = true;
|
||||
max_elems = parameters[0].get<UInt64>();
|
||||
}
|
||||
else
|
||||
throw Exception("Incorrect number of parameters for aggregate function " + name + ", should be 0 or 1",
|
||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
if (!limit_size)
|
||||
return createAggregateFunctionMovingImpl<Function, std::false_type>(name, argument_types[0]);
|
||||
else
|
||||
return createAggregateFunctionMovingImpl<Function, std::true_type>(name, argument_types[0], max_elems);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
void registerAggregateFunctionMoving(AggregateFunctionFactory & factory)
|
||||
{
|
||||
factory.registerFunction("groupArrayMovingSum", createAggregateFunctionMoving<MovingSumTemplate>);
|
||||
factory.registerFunction("groupArrayMovingAvg", createAggregateFunctionMoving<MovingAvgTemplate>);
|
||||
}
|
||||
|
||||
}
|
200
dbms/src/AggregateFunctions/AggregateFunctionGroupArrayMoving.h
Normal file
200
dbms/src/AggregateFunctions/AggregateFunctionGroupArrayMoving.h
Normal file
@ -0,0 +1,200 @@
|
||||
#pragma once
|
||||
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <DataTypes/DataTypesDecimal.h>
|
||||
|
||||
#include <Columns/ColumnVector.h>
|
||||
#include <Columns/ColumnArray.h>
|
||||
|
||||
#include <Common/ArenaAllocator.h>
|
||||
|
||||
#include <AggregateFunctions/IAggregateFunction.h>
|
||||
|
||||
#include <type_traits>
|
||||
|
||||
#define AGGREGATE_FUNCTION_MOVING_MAX_ARRAY_SIZE 0xFFFFFF
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int TOO_LARGE_ARRAY_SIZE;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
|
||||
template <typename T>
|
||||
struct MovingSumData
|
||||
{
|
||||
// Switch to ordinary Allocator after 4096 bytes to avoid fragmentation and trash in Arena
|
||||
using Allocator = MixedAlignedArenaAllocator<alignof(T), 4096>;
|
||||
using Array = PODArray<T, 32, Allocator>;
|
||||
|
||||
Array value;
|
||||
Array window;
|
||||
T sum = 0;
|
||||
|
||||
void add(T val, Arena * arena)
|
||||
{
|
||||
sum += val;
|
||||
|
||||
value.push_back(sum, arena);
|
||||
}
|
||||
|
||||
T get(size_t idx, UInt64 win_size) const
|
||||
{
|
||||
if (idx < win_size)
|
||||
return value[idx];
|
||||
else
|
||||
return value[idx] - value[idx - win_size];
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
struct MovingAvgData
|
||||
{
|
||||
// Switch to ordinary Allocator after 4096 bytes to avoid fragmentation and trash in Arena
|
||||
using Allocator = MixedAlignedArenaAllocator<alignof(T), 4096>;
|
||||
using Array = PODArray<T, 32, Allocator>;
|
||||
|
||||
Array value;
|
||||
Array window;
|
||||
T sum = 0;
|
||||
|
||||
void add(T val, Arena * arena)
|
||||
{
|
||||
sum += val;
|
||||
|
||||
value.push_back(sum, arena);
|
||||
}
|
||||
|
||||
T get(size_t idx, UInt64 win_size) const
|
||||
{
|
||||
if (idx < win_size)
|
||||
return value[idx] / win_size;
|
||||
else
|
||||
return (value[idx] - value[idx - win_size]) / win_size;
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
|
||||
|
||||
template <typename T, typename Tlimit_num_elems, typename Data>
|
||||
class MovingImpl final
|
||||
: public IAggregateFunctionDataHelper<Data, MovingImpl<T, Tlimit_num_elems, Data>>
|
||||
{
|
||||
static constexpr bool limit_num_elems = Tlimit_num_elems::value;
|
||||
DataTypePtr & data_type;
|
||||
UInt64 win_size;
|
||||
|
||||
public:
|
||||
using ColVecType = std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, ColumnVector<T>>;
|
||||
using ColVecResult = std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, ColumnVector<T>>; // probably for overflow function in the future
|
||||
|
||||
explicit MovingImpl(const DataTypePtr & data_type_, UInt64 win_size_ = std::numeric_limits<UInt64>::max())
|
||||
: IAggregateFunctionDataHelper<Data, MovingImpl<T, Tlimit_num_elems, Data>>({data_type_}, {})
|
||||
, data_type(this->argument_types[0]), win_size(win_size_) {}
|
||||
|
||||
String getName() const override { return "movingXXX"; }
|
||||
|
||||
DataTypePtr getReturnType() const override
|
||||
{
|
||||
return std::make_shared<DataTypeArray>(data_type);
|
||||
}
|
||||
|
||||
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override
|
||||
{
|
||||
auto val = static_cast<const ColVecType &>(*columns[0]).getData()[row_num];
|
||||
|
||||
this->data(place).add(val, arena);
|
||||
}
|
||||
|
||||
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
|
||||
{
|
||||
auto & cur_elems = this->data(place);
|
||||
auto & rhs_elems = this->data(rhs);
|
||||
|
||||
size_t cur_size = cur_elems.value.size();
|
||||
|
||||
if (rhs_elems.value.size())
|
||||
cur_elems.value.insert(rhs_elems.value.begin(), rhs_elems.value.end(), arena);
|
||||
|
||||
for (size_t i = cur_size; i < cur_elems.value.size(); ++i)
|
||||
{
|
||||
cur_elems.value[i] += cur_elems.sum;
|
||||
}
|
||||
|
||||
cur_elems.sum += rhs_elems.sum;
|
||||
}
|
||||
|
||||
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
|
||||
{
|
||||
const auto & value = this->data(place).value;
|
||||
size_t size = value.size();
|
||||
writeVarUInt(size, buf);
|
||||
buf.write(reinterpret_cast<const char *>(value.data()), size * sizeof(value[0]));
|
||||
}
|
||||
|
||||
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override
|
||||
{
|
||||
size_t size = 0;
|
||||
readVarUInt(size, buf);
|
||||
|
||||
if (unlikely(size > AGGREGATE_FUNCTION_MOVING_MAX_ARRAY_SIZE))
|
||||
throw Exception("Too large array size", ErrorCodes::TOO_LARGE_ARRAY_SIZE);
|
||||
|
||||
auto & value = this->data(place).value;
|
||||
|
||||
value.resize(size, arena);
|
||||
buf.read(reinterpret_cast<char *>(value.data()), size * sizeof(value[0]));
|
||||
|
||||
this->data(place).sum = value.back();
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
const auto & data = this->data(place);
|
||||
size_t size = data.value.size();
|
||||
|
||||
ColumnArray & arr_to = static_cast<ColumnArray &>(to);
|
||||
ColumnArray::Offsets & offsets_to = arr_to.getOffsets();
|
||||
|
||||
offsets_to.push_back(offsets_to.back() + size);
|
||||
|
||||
if (size)
|
||||
{
|
||||
typename ColVecResult::Container & data_to = static_cast<ColVecResult &>(arr_to.getData()).getData();
|
||||
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
{
|
||||
if (!limit_num_elems)
|
||||
{
|
||||
data_to.push_back(data.get(i, size));
|
||||
}
|
||||
else
|
||||
{
|
||||
data_to.push_back(data.get(i, win_size));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
const char * getHeaderFilePath() const override { return __FILE__; }
|
||||
};
|
||||
|
||||
#undef AGGREGATE_FUNCTION_MOVING_MAX_ARRAY_SIZE
|
||||
|
||||
}
|
@ -108,6 +108,15 @@ static IAggregateFunction * createWithDecimalType(const IDataType & argument_typ
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
template <template <typename, typename> class AggregateFunctionTemplate, typename Data, typename... TArgs>
|
||||
static IAggregateFunction * createWithDecimalType(const IDataType & argument_type, TArgs && ... args)
|
||||
{
|
||||
WhichDataType which(argument_type);
|
||||
if (which.idx == TypeIndex::Decimal32) return new AggregateFunctionTemplate<Decimal32, Data>(std::forward<TArgs>(args)...);
|
||||
if (which.idx == TypeIndex::Decimal64) return new AggregateFunctionTemplate<Decimal64, Data>(std::forward<TArgs>(args)...);
|
||||
if (which.idx == TypeIndex::Decimal128) return new AggregateFunctionTemplate<Decimal128, Data>(std::forward<TArgs>(args)...);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
/** For template with two arguments.
|
||||
*/
|
||||
|
@ -31,6 +31,7 @@ void registerAggregateFunctionsMaxIntersections(AggregateFunctionFactory &);
|
||||
void registerAggregateFunctionMLMethod(AggregateFunctionFactory &);
|
||||
void registerAggregateFunctionEntropy(AggregateFunctionFactory &);
|
||||
void registerAggregateFunctionSimpleLinearRegression(AggregateFunctionFactory &);
|
||||
void registerAggregateFunctionMoving(AggregateFunctionFactory &);
|
||||
|
||||
void registerAggregateFunctionCombinatorIf(AggregateFunctionCombinatorFactory &);
|
||||
void registerAggregateFunctionCombinatorArray(AggregateFunctionCombinatorFactory &);
|
||||
@ -74,6 +75,7 @@ void registerAggregateFunctions()
|
||||
registerAggregateFunctionMLMethod(factory);
|
||||
registerAggregateFunctionEntropy(factory);
|
||||
registerAggregateFunctionSimpleLinearRegression(factory);
|
||||
registerAggregateFunctionMoving(factory);
|
||||
}
|
||||
|
||||
{
|
||||
|
42
dbms/tests/performance/group_array_moving_sum.xml
Normal file
42
dbms/tests/performance/group_array_moving_sum.xml
Normal file
@ -0,0 +1,42 @@
|
||||
<test>
|
||||
<name>Moving Sum</name>
|
||||
|
||||
<type>loop</type>
|
||||
|
||||
<stop_conditions>
|
||||
<all_of>
|
||||
<total_time_ms>30000</total_time_ms>
|
||||
</all_of>
|
||||
<any_of>
|
||||
<average_speed_not_changing_for_ms>6000</average_speed_not_changing_for_ms>
|
||||
<total_time_ms>60000</total_time_ms>
|
||||
</any_of>
|
||||
</stop_conditions>
|
||||
|
||||
<main_metric>
|
||||
<total_time />
|
||||
</main_metric>
|
||||
|
||||
<create_query>CREATE TABLE moving_sum_1m(k UInt64, v UInt64) ENGINE = MergeTree ORDER BY k</create_query>
|
||||
<create_query>CREATE TABLE moving_sum_10m(k UInt64, v UInt64) ENGINE = MergeTree ORDER BY k</create_query>
|
||||
|
||||
<fill_query>INSERT INTO moving_sum_1m SELECT number%100, rand() from numbers(1000000)</fill_query>
|
||||
<fill_query>INSERT INTO moving_sum_10m SELECT number%100, rand() from numbers(10000000)</fill_query>
|
||||
|
||||
<query tag='MovingSumSize10_1M'>SELECT k,groupArrayMovingSum(10)(v) FROM moving_sum_1m GROUP BY k</query>
|
||||
<query tag='MovingSumSize10WithKey_1M'>SELECT k,groupArrayMovingSum(10)(v) FROM moving_sum_1m WHERE k in (49, 50, 51) GROUP BY k</query>
|
||||
<query tag='MovingSumSize1000_1M'>SELECT k,groupArrayMovingSum(1000)(v) FROM moving_sum_1m GROUP BY k</query>
|
||||
<query tag='MovingSumSize1000WithKey_1M'>SELECT k,groupArrayMovingSum(1000)(v) FROM moving_sum_1m WHERE k in (49, 50, 51) GROUP BY k</query>
|
||||
<query tag='MovingSumSize10000_1M'>SELECT k,groupArrayMovingSum(10000)(v) FROM moving_sum_1m GROUP BY k</query>
|
||||
<query tag='MovingSumSize10000WithKey_1M'>SELECT k,groupArrayMovingSum(10000)(v) FROM moving_sum_1m WHERE k in (49, 50, 51) GROUP BY k</query>
|
||||
|
||||
<query tag='MovingSumSize10_10M'>SELECT k,groupArrayMovingSum(10)(v) FROM moving_sum_10m GROUP BY k</query>
|
||||
<query tag='MovingSumSize10WithKey_10M'>SELECT k,groupArrayMovingSum(10)(v) FROM moving_sum_10m WHERE k in (49, 50, 51) GROUP BY k</query>
|
||||
<query tag='MovingSumSize1000_10M'>SELECT k,groupArrayMovingSum(1000)(v) FROM moving_sum_10m GROUP BY k</query>
|
||||
<query tag='MovingSumSize1000WithKey_10M'>SELECT k,groupArrayMovingSum(1000)(v) FROM moving_sum_10m WHERE k in (49, 50, 51) GROUP BY k</query>
|
||||
<query tag='MovingSumSize10000_10M'>SELECT k,groupArrayMovingSum(10000)(v) FROM moving_sum_10m GROUP BY k</query>
|
||||
<query tag='MovingSumSize10000WithKey_10M'>SELECT k,groupArrayMovingSum(10000)(v) FROM moving_sum_10m WHERE k in (49, 50, 51) GROUP BY k</query>
|
||||
|
||||
<drop_query>DROP TABLE IF EXISTS moving_sum_10m</drop_query>
|
||||
<drop_query>DROP TABLE IF EXISTS moving_sum_1m</drop_query>
|
||||
</test>
|
@ -0,0 +1,40 @@
|
||||
k dt v
|
||||
a 2001-02-03 00:00:00 0
|
||||
a 2001-02-03 00:00:01 1
|
||||
a 2001-02-03 00:00:02 2
|
||||
a 2001-02-03 00:00:03 3
|
||||
a 2001-02-03 00:00:04 4
|
||||
b 2001-02-03 00:00:00 0
|
||||
b 2001-02-03 00:00:01 1
|
||||
b 2001-02-03 00:00:02 2
|
||||
b 2001-02-03 00:00:03 3
|
||||
b 2001-02-03 00:00:04 4
|
||||
b 2001-02-03 01:00:00 5
|
||||
b 2001-02-03 01:00:01 6
|
||||
b 2001-02-03 01:00:02 7
|
||||
b 2001-02-03 01:00:03 8
|
||||
b 2001-02-03 01:00:04 9
|
||||
k groupArrayMovingSum(v)
|
||||
String Array(UInt64)
|
||||
a [0,1,3,6,10]
|
||||
b [0,1,3,6,10,15,21,28,36,45]
|
||||
k groupArrayMovingSum(3)(v)
|
||||
String Array(UInt64)
|
||||
a [0,1,3,6,9]
|
||||
b [0,1,3,6,9,12,15,18,21,24]
|
||||
k groupArrayMovingAvg(v)
|
||||
String Array(UInt64)
|
||||
a [0,0,0,1,2]
|
||||
b [0,0,0,0,1,1,2,2,3,4]
|
||||
k groupArrayMovingAvg(3)(v)
|
||||
String Array(UInt64)
|
||||
a [0,0,1,2,3]
|
||||
b [0,0,1,2,3,4,5,6,7,8]
|
||||
k groupArrayMovingSum(v)
|
||||
String Array(UInt64)
|
||||
a [0,1,3,6,10]
|
||||
b [0,1,3,6,10,15,21,28,36,45]
|
||||
k groupArrayMovingSum(v)
|
||||
String Array(UInt64)
|
||||
a [0,1,3,6,10]
|
||||
b [0,1,3,6,10,15,21,28,36,45]
|
43
dbms/tests/queries/0_stateless/00953_moving_functions.sql
Normal file
43
dbms/tests/queries/0_stateless/00953_moving_functions.sql
Normal file
@ -0,0 +1,43 @@
|
||||
DROP TABLE IF EXISTS moving_sum_num;
|
||||
DROP TABLE IF EXISTS moving_sum_dec;
|
||||
|
||||
CREATE TABLE moving_sum_num (
|
||||
k String,
|
||||
dt DateTime,
|
||||
v UInt64
|
||||
)
|
||||
ENGINE = MergeTree ORDER BY (k, dt);
|
||||
|
||||
INSERT INTO moving_sum_num
|
||||
SELECT 'b' k, toDateTime('2001-02-03 00:00:00')+number as dt, number as v
|
||||
FROM system.numbers
|
||||
LIMIT 5
|
||||
UNION ALL
|
||||
SELECT 'a' k, toDateTime('2001-02-03 00:00:00')+number as dt, number as v
|
||||
FROM system.numbers
|
||||
LIMIT 5;
|
||||
|
||||
INSERT INTO moving_sum_num
|
||||
SELECT 'b' k, toDateTime('2001-02-03 01:00:00')+number as dt, 5+number as v
|
||||
FROM system.numbers
|
||||
LIMIT 5;
|
||||
|
||||
SELECT * FROM moving_sum_num ORDER BY k,dt FORMAT TabSeparatedWithNames;
|
||||
|
||||
SELECT k, groupArrayMovingSum(v) FROM (SELECT * FROM moving_sum_num ORDER BY k, dt) GROUP BY k ORDER BY k FORMAT TabSeparatedWithNamesAndTypes;
|
||||
SELECT k, groupArrayMovingSum(3)(v) FROM (SELECT * FROM moving_sum_num ORDER BY k, dt) GROUP BY k ORDER BY k FORMAT TabSeparatedWithNamesAndTypes;
|
||||
|
||||
SELECT k, groupArrayMovingAvg(v) FROM (SELECT * FROM moving_sum_num ORDER BY k, dt) GROUP BY k ORDER BY k FORMAT TabSeparatedWithNamesAndTypes;
|
||||
SELECT k, groupArrayMovingAvg(3)(v) FROM (SELECT * FROM moving_sum_num ORDER BY k, dt) GROUP BY k ORDER BY k FORMAT TabSeparatedWithNamesAndTypes;
|
||||
|
||||
CREATE TABLE moving_sum_dec ENGINE = Memory AS
|
||||
SELECT k, dt, toDecimal64(v, 2) as v
|
||||
FROM moving_sum_num;
|
||||
|
||||
SELECT k, groupArrayMovingSum(v) FROM (SELECT * FROM moving_sum_num ORDER BY k, dt) GROUP BY k ORDER BY k FORMAT TabSeparatedWithNamesAndTypes;
|
||||
SELECT k, groupArrayMovingSum(v) FROM (SELECT * FROM moving_sum_num ORDER BY k, dt) GROUP BY k ORDER BY k FORMAT TabSeparatedWithNamesAndTypes;
|
||||
|
||||
DROP TABLE moving_sum_dec;
|
||||
DROP TABLE moving_sum_num;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user