movingSum: uses template for groupArrayMovingSum and groupArrayMovingAvg

This commit is contained in:
unknown 2019-06-18 10:18:33 -04:00
parent 414bb21238
commit 238c0e3b9b
9 changed files with 262 additions and 205 deletions

View File

@ -0,0 +1,100 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionGroupArrayMoving.h>
#include <AggregateFunctions/Helpers.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_ARGUMENTS;
}
namespace
{
template <typename T, typename Tlimit_num_elems>
struct MovingSum
{
using DataType = MovingSumData<T>;
using Function = MovingImpl<T, Tlimit_num_elems, DataType>;
};
template <typename T, typename Tlimit_num_elems>
struct MovingAvg
{
using DataType = MovingAvgData<T>;
using Function = MovingImpl<T, Tlimit_num_elems, DataType>;
};
template <typename T, typename Tlimit_num_elems> using MovingSumTemplate = typename MovingSum<T, Tlimit_num_elems>::Function;
template <typename T, typename Tlimit_num_elems> using MovingAvgTemplate = typename MovingAvg<T, Tlimit_num_elems>::Function;
template <template <typename, typename> class Function, typename has_limit, typename ... TArgs>
inline AggregateFunctionPtr createAggregateFunctionMovingImpl(const std::string & name, const DataTypePtr & argument_type, TArgs ... args)
{
AggregateFunctionPtr res;
if (isDecimal(argument_type))
res.reset(createWithDecimalType<Function, has_limit>(*argument_type, argument_type, std::forward<TArgs>(args)...));
else
res.reset(createWithNumericType<Function, has_limit>(*argument_type, argument_type, std::forward<TArgs>(args)...));
if (!res)
throw Exception("Illegal type " + argument_type->getName() + " of argument for aggregate function " + name,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return res;
}
template <template <typename, typename> class Function>
AggregateFunctionPtr createAggregateFunctionMoving(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
assertUnary(name, argument_types);
bool limit_size = false;
UInt64 max_elems = std::numeric_limits<UInt64>::max();
if (parameters.empty())
{
// cumulative sum without parameter
}
else if (parameters.size() == 1)
{
auto type = parameters[0].getType();
if (type != Field::Types::Int64 && type != Field::Types::UInt64)
throw Exception("Parameter for aggregate function " + name + " should be positive number", ErrorCodes::BAD_ARGUMENTS);
if ((type == Field::Types::Int64 && parameters[0].get<Int64>() < 0) ||
(type == Field::Types::UInt64 && parameters[0].get<UInt64>() == 0))
throw Exception("Parameter for aggregate function " + name + " should be positive number", ErrorCodes::BAD_ARGUMENTS);
limit_size = true;
max_elems = parameters[0].get<UInt64>();
}
else
throw Exception("Incorrect number of parameters for aggregate function " + name + ", should be 0 or 1",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (!limit_size)
return createAggregateFunctionMovingImpl<Function, std::false_type>(name, argument_types[0]);
else
return createAggregateFunctionMovingImpl<Function, std::true_type>(name, argument_types[0], max_elems);
}
}
void registerAggregateFunctionMoving(AggregateFunctionFactory & factory)
{
factory.registerFunction("groupArrayMovingSum", createAggregateFunctionMoving<MovingSumTemplate>);
factory.registerFunction("groupArrayMovingAvg", createAggregateFunctionMoving<MovingAvgTemplate>);
}
}

View File

@ -16,7 +16,7 @@
#include <type_traits>
#define AGGREGATE_FUNCTION_MOVING_SUM_MAX_ARRAY_SIZE 0xFFFFFF
#define AGGREGATE_FUNCTION_MOVING_MAX_ARRAY_SIZE 0xFFFFFF
namespace DB
@ -39,12 +39,57 @@ struct MovingSumData
Array value;
Array window;
T sum = 0;
void add(T val, Arena * arena)
{
sum += val;
value.push_back(sum, arena);
}
T get(size_t idx, UInt64 win_size) const
{
if (idx < win_size)
return value[idx];
else
return value[idx] - value[idx - win_size];
}
};
template <typename T>
struct MovingAvgData
{
// Switch to ordinary Allocator after 4096 bytes to avoid fragmentation and trash in Arena
using Allocator = MixedAlignedArenaAllocator<alignof(T), 4096>;
using Array = PODArray<T, 32, Allocator>;
Array value;
Array window;
T sum = 0;
void add(T val, Arena * arena)
{
sum += val;
value.push_back(sum, arena);
}
T get(size_t idx, UInt64 win_size) const
{
if (idx < win_size)
return value[idx] / win_size;
else
return (value[idx] - value[idx - win_size]) / win_size;
}
};
template <typename T, typename Tlimit_num_elems>
class MovingSumImpl final
: public IAggregateFunctionDataHelper<MovingSumData<T>, MovingSumImpl<T, Tlimit_num_elems>>
template <typename T, typename Tlimit_num_elems, typename Data>
class MovingImpl final
: public IAggregateFunctionDataHelper<Data, MovingImpl<T, Tlimit_num_elems, Data>>
{
static constexpr bool limit_num_elems = Tlimit_num_elems::value;
DataTypePtr & data_type;
@ -54,11 +99,11 @@ public:
using ColVecType = std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, ColumnVector<T>>;
using ColVecResult = std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, ColumnVector<T>>; // probably for overflow function in the future
explicit MovingSumImpl(const DataTypePtr & data_type_, UInt64 win_size_ = std::numeric_limits<UInt64>::max())
: IAggregateFunctionDataHelper<MovingSumData<T>, MovingSumImpl<T, Tlimit_num_elems>>({data_type_}, {})
explicit MovingImpl(const DataTypePtr & data_type_, UInt64 win_size_ = std::numeric_limits<UInt64>::max())
: IAggregateFunctionDataHelper<Data, MovingImpl<T, Tlimit_num_elems, Data>>({data_type_}, {})
, data_type(this->argument_types[0]), win_size(win_size_) {}
String getName() const override { return "movingSum"; }
String getName() const override { return "movingXXX"; }
DataTypePtr getReturnType() const override
{
@ -67,11 +112,9 @@ public:
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{
auto & sum = this->data(place).sum;
auto val = static_cast<const ColVecType &>(*columns[0]).getData()[row_num];
sum += static_cast<const ColVecType &>(*columns[0]).getData()[row_num];
this->data(place).value.push_back(sum, arena);
this->data(place).add(val, arena);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
@ -105,7 +148,7 @@ public:
size_t size = 0;
readVarUInt(size, buf);
if (unlikely(size > AGGREGATE_FUNCTION_MOVING_SUM_MAX_ARRAY_SIZE))
if (unlikely(size > AGGREGATE_FUNCTION_MOVING_MAX_ARRAY_SIZE))
throw Exception("Too large array size", ErrorCodes::TOO_LARGE_ARRAY_SIZE);
auto & value = this->data(place).value;
@ -118,8 +161,8 @@ public:
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
const auto & value = this->data(place).value;
size_t size = value.size();
const auto & data = this->data(place);
size_t size = data.value.size();
ColumnArray & arr_to = static_cast<ColumnArray &>(to);
ColumnArray::Offsets & offsets_to = arr_to.getOffsets();
@ -130,20 +173,15 @@ public:
{
typename ColVecResult::Container & data_to = static_cast<ColVecResult &>(arr_to.getData()).getData();
if (!limit_num_elems)
for (size_t i = 0; i < size; ++i)
{
data_to.insert(this->data(place).value.begin(), this->data(place).value.end());
}
else
{
size_t i = 0;
for (; i < std::min(static_cast<size_t>(win_size), size); ++i)
if (!limit_num_elems)
{
data_to.push_back(value[i]);
data_to.push_back(data.get(i, size));
}
for (; i < size; ++i)
else
{
data_to.push_back(value[i] - value[i - win_size]);
data_to.push_back(data.get(i, win_size));
}
}
}
@ -157,6 +195,6 @@ public:
const char * getHeaderFilePath() const override { return __FILE__; }
};
#undef AGGREGATE_FUNCTION_MOVING_SUM_MAX_ARRAY_SIZE
#undef AGGREGATE_FUNCTION_MOVING_MAX_ARRAY_SIZE
}

View File

@ -1,82 +0,0 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionMovingSum.h>
#include <AggregateFunctions/Helpers.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_ARGUMENTS;
}
namespace
{
template <typename has_limit, typename ... TArgs>
inline AggregateFunctionPtr createAggregateFunctionMovingSumImpl(const std::string & name, const DataTypePtr & argument_type, TArgs ... args)
{
AggregateFunctionPtr res;
if (isDecimal(argument_type))
res.reset(createWithDecimalType<MovingSumImpl, has_limit>(*argument_type, argument_type, std::forward<TArgs>(args)...));
else
res.reset(createWithNumericType<MovingSumImpl, has_limit>(*argument_type, argument_type, std::forward<TArgs>(args)...));
if (!res)
throw Exception("Illegal type " + argument_type->getName() + " of argument for aggregate function " + name,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return res;
}
static AggregateFunctionPtr createAggregateFunctionMovingSum(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
assertUnary(name, argument_types);
bool limit_size = false;
UInt64 max_elems = std::numeric_limits<UInt64>::max();
if (parameters.empty())
{
// cumulative sum without parameter
}
else if (parameters.size() == 1)
{
auto type = parameters[0].getType();
if (type != Field::Types::Int64 && type != Field::Types::UInt64)
throw Exception("Parameter for aggregate function " + name + " should be positive number", ErrorCodes::BAD_ARGUMENTS);
if ((type == Field::Types::Int64 && parameters[0].get<Int64>() < 0) ||
(type == Field::Types::UInt64 && parameters[0].get<UInt64>() == 0))
throw Exception("Parameter for aggregate function " + name + " should be positive number", ErrorCodes::BAD_ARGUMENTS);
limit_size = true;
max_elems = parameters[0].get<UInt64>();
}
else
throw Exception("Incorrect number of parameters for aggregate function " + name + ", should be 0 or 1",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (!limit_size)
return createAggregateFunctionMovingSumImpl<std::false_type>(name, argument_types[0]);
else
return createAggregateFunctionMovingSumImpl<std::true_type>(name, argument_types[0], max_elems);
}
}
void registerAggregateFunctionMovingSum(AggregateFunctionFactory & factory)
{
factory.registerFunction("movingSum", createAggregateFunctionMovingSum);
}
}

View File

@ -31,7 +31,7 @@ void registerAggregateFunctionsMaxIntersections(AggregateFunctionFactory &);
void registerAggregateFunctionMLMethod(AggregateFunctionFactory &);
void registerAggregateFunctionEntropy(AggregateFunctionFactory &);
void registerAggregateFunctionSimpleLinearRegression(AggregateFunctionFactory &);
void registerAggregateFunctionMovingSum(AggregateFunctionFactory &);
void registerAggregateFunctionMoving(AggregateFunctionFactory &);
void registerAggregateFunctionCombinatorIf(AggregateFunctionCombinatorFactory &);
void registerAggregateFunctionCombinatorArray(AggregateFunctionCombinatorFactory &);
@ -75,7 +75,7 @@ void registerAggregateFunctions()
registerAggregateFunctionMLMethod(factory);
registerAggregateFunctionEntropy(factory);
registerAggregateFunctionSimpleLinearRegression(factory);
registerAggregateFunctionMovingSum(factory);
registerAggregateFunctionMoving(factory);
}
{

View File

@ -0,0 +1,42 @@
<test>
<name>Moving Sum</name>
<type>loop</type>
<stop_conditions>
<all_of>
<total_time_ms>30000</total_time_ms>
</all_of>
<any_of>
<average_speed_not_changing_for_ms>6000</average_speed_not_changing_for_ms>
<total_time_ms>60000</total_time_ms>
</any_of>
</stop_conditions>
<main_metric>
<total_time />
</main_metric>
<create_query>CREATE TABLE moving_sum_1m(k UInt64, v UInt64) ENGINE = MergeTree ORDER BY k</create_query>
<create_query>CREATE TABLE moving_sum_10m(k UInt64, v UInt64) ENGINE = MergeTree ORDER BY k</create_query>
<fill_query>INSERT INTO moving_sum_1m SELECT number%100, rand() from numbers(1000000)</fill_query>
<fill_query>INSERT INTO moving_sum_10m SELECT number%100, rand() from numbers(10000000)</fill_query>
<query tag='MovingSumSize10_1M'>SELECT k,groupArrayMovingSum(10)(v) FROM moving_sum_1m GROUP BY k</query>
<query tag='MovingSumSize10WithKey_1M'>SELECT k,groupArrayMovingSum(10)(v) FROM moving_sum_1m WHERE k in (49, 50, 51) GROUP BY k</query>
<query tag='MovingSumSize1000_1M'>SELECT k,groupArrayMovingSum(1000)(v) FROM moving_sum_1m GROUP BY k</query>
<query tag='MovingSumSize1000WithKey_1M'>SELECT k,groupArrayMovingSum(1000)(v) FROM moving_sum_1m WHERE k in (49, 50, 51) GROUP BY k</query>
<query tag='MovingSumSize10000_1M'>SELECT k,groupArrayMovingSum(10000)(v) FROM moving_sum_1m GROUP BY k</query>
<query tag='MovingSumSize10000WithKey_1M'>SELECT k,groupArrayMovingSum(10000)(v) FROM moving_sum_1m WHERE k in (49, 50, 51) GROUP BY k</query>
<query tag='MovingSumSize10_10M'>SELECT k,groupArrayMovingSum(10)(v) FROM moving_sum_10m GROUP BY k</query>
<query tag='MovingSumSize10WithKey_10M'>SELECT k,groupArrayMovingSum(10)(v) FROM moving_sum_10m WHERE k in (49, 50, 51) GROUP BY k</query>
<query tag='MovingSumSize1000_10M'>SELECT k,groupArrayMovingSum(1000)(v) FROM moving_sum_10m GROUP BY k</query>
<query tag='MovingSumSize1000WithKey_10M'>SELECT k,groupArrayMovingSum(1000)(v) FROM moving_sum_10m WHERE k in (49, 50, 51) GROUP BY k</query>
<query tag='MovingSumSize10000_10M'>SELECT k,groupArrayMovingSum(10000)(v) FROM moving_sum_10m GROUP BY k</query>
<query tag='MovingSumSize10000WithKey_10M'>SELECT k,groupArrayMovingSum(10000)(v) FROM moving_sum_10m WHERE k in (49, 50, 51) GROUP BY k</query>
<drop_query>DROP TABLE IF EXISTS moving_sum_10m</drop_query>
<drop_query>DROP TABLE IF EXISTS moving_sum_1m</drop_query>
</test>

View File

@ -1,52 +0,0 @@
<test>
<name>Moving Sum</name>
<type>loop</type>
<stop_conditions>
<all_of>
<total_time_ms>30000</total_time_ms>
</all_of>
<any_of>
<average_speed_not_changing_for_ms>6000</average_speed_not_changing_for_ms>
<total_time_ms>60000</total_time_ms>
</any_of>
</stop_conditions>
<main_metric>
<total_time />
</main_metric>
<create_query>CREATE TABLE moving_sum_1m(k UInt64, v UInt64) ENGINE = MergeTree ORDER BY k</create_query>
<create_query>CREATE TABLE moving_sum_10m(k UInt64, v UInt64) ENGINE = MergeTree ORDER BY k</create_query>
<create_query>CREATE TABLE moving_sum_100m(k UInt64, v UInt64) ENGINE = MergeTree ORDER BY k</create_query>
<fill_query>INSERT INTO moving_sum_1m SELECT number%100, rand() from numbers(1000000)</fill_query>
<fill_query>INSERT INTO moving_sum_10m SELECT number%100, rand() from numbers(10000000)</fill_query>
<fill_query>INSERT INTO moving_sum_100m SELECT number%100, rand() from numbers(100000000)</fill_query>
<query tag='MovingSumSize10_1M'>SELECT k, movingSum(10)(v) FROM moving_sum_1m GROUP BY k</query>
<query tag='MovingSumSize10WithKey_1M'>SELECT k, movingSum(10)(v) FROM moving_sum_1m WHERE k in (49, 50, 51) GROUP BY k</query>
<query tag='MovingSumSize1000_1M'>SELECT k, movingSum(1000)(v) FROM moving_sum_1m GROUP BY k</query>
<query tag='MovingSumSize1000WithKey_1M'>SELECT k, movingSum(1000)(v) FROM moving_sum_1m WHERE k in (49, 50, 51) GROUP BY k</query>
<query tag='MovingSumSize10000_1M'>SELECT k, movingSum(10000)(v) FROM moving_sum_1m GROUP BY k</query>
<query tag='MovingSumSize10000WithKey_1M'>SELECT k, movingSum(10000)(v) FROM moving_sum_1m WHERE k in (49, 50, 51) GROUP BY k</query>
<query tag='MovingSumSize10_10M'>SELECT k, movingSum(10)(v) FROM moving_sum_10m GROUP BY k</query>
<query tag='MovingSumSize10WithKey_10M'>SELECT k, movingSum(10)(v) FROM moving_sum_10m WHERE k in (49, 50, 51) GROUP BY k</query>
<query tag='MovingSumSize1000_10M'>SELECT k, movingSum(1000)(v) FROM moving_sum_10m GROUP BY k</query>
<query tag='MovingSumSize1000WithKey_10M'>SELECT k, movingSum(1000)(v) FROM moving_sum_10m WHERE k in (49, 50, 51) GROUP BY k</query>
<query tag='MovingSumSize10000_10M'>SELECT k, movingSum(10000)(v) FROM moving_sum_10m GROUP BY k</query>
<query tag='MovingSumSize10000WithKey_10M'>SELECT k, movingSum(10000)(v) FROM moving_sum_10m WHERE k in (49, 50, 51) GROUP BY k</query>
<query tag='MovingSumSize10_100M'>SELECT k, movingSum(10)(v) FROM moving_sum_100m GROUP BY k</query>
<query tag='MovingSumSize10WithKey_100M'>SELECT k, movingSum(10)(v) FROM moving_sum_100m WHERE k in (49, 50, 51) GROUP BY k</query>
<query tag='MovingSumSize1000_100M'>SELECT k, movingSum(1000)(v) FROM moving_sum_100m GROUP BY k</query>
<query tag='MovingSumSize1000WithKey_100M'>SELECT k, movingSum(1000)(v) FROM moving_sum_100m WHERE k in (49, 50, 51) GROUP BY k</query>
<query tag='MovingSumSize10000_100M'>SELECT k, movingSum(10000)(v) FROM moving_sum_100m GROUP BY k</query>
<query tag='MovingSumSize10000WithKey_100M'>SELECT k, movingSum(10000)(v) FROM moving_sum_100m WHERE k in (49, 50, 51) GROUP BY k</query>
<drop_query>DROP TABLE IF EXISTS moving_sum_100m</drop_query>
<drop_query>DROP TABLE IF EXISTS moving_sum_10m</drop_query>
<drop_query>DROP TABLE IF EXISTS moving_sum_1m</drop_query>
</test>

View File

@ -14,19 +14,27 @@ b 2001-02-03 01:00:01 6
b 2001-02-03 01:00:02 7
b 2001-02-03 01:00:03 8
b 2001-02-03 01:00:04 9
k movingSum(v)
k groupArrayMovingSum(v)
String Array(UInt64)
a [0,1,3,6,10]
b [0,1,3,6,10,15,21,28,36,45]
k movingSum(3)(v)
k groupArrayMovingSum(3)(v)
String Array(UInt64)
a [0,1,3,6,9]
b [0,1,3,6,9,12,15,18,21,24]
k movingSum(v)
k groupArrayMovingAvg(v)
String Array(UInt64)
a [0,0,0,1,2]
b [0,0,0,0,1,1,2,2,3,4]
k groupArrayMovingAvg(3)(v)
String Array(UInt64)
a [0,0,1,2,3]
b [0,0,1,2,3,4,5,6,7,8]
k groupArrayMovingSum(v)
String Array(UInt64)
a [0,1,3,6,10]
b [0,1,3,6,10,15,21,28,36,45]
k movingSum(v)
k groupArrayMovingSum(v)
String Array(UInt64)
a [0,1,3,6,10]
b [0,1,3,6,10,15,21,28,36,45]

View File

@ -0,0 +1,43 @@
DROP TABLE IF EXISTS moving_sum_num;
DROP TABLE IF EXISTS moving_sum_dec;
CREATE TABLE moving_sum_num (
k String,
dt DateTime,
v UInt64
)
ENGINE = MergeTree ORDER BY (k, dt);
INSERT INTO moving_sum_num
SELECT 'b' k, toDateTime('2001-02-03 00:00:00')+number as dt, number as v
FROM system.numbers
LIMIT 5
UNION ALL
SELECT 'a' k, toDateTime('2001-02-03 00:00:00')+number as dt, number as v
FROM system.numbers
LIMIT 5;
INSERT INTO moving_sum_num
SELECT 'b' k, toDateTime('2001-02-03 01:00:00')+number as dt, 5+number as v
FROM system.numbers
LIMIT 5;
SELECT * FROM moving_sum_num ORDER BY k,dt FORMAT TabSeparatedWithNames;
SELECT k, groupArrayMovingSum(v) FROM (SELECT * FROM moving_sum_num ORDER BY k, dt) GROUP BY k ORDER BY k FORMAT TabSeparatedWithNamesAndTypes;
SELECT k, groupArrayMovingSum(3)(v) FROM (SELECT * FROM moving_sum_num ORDER BY k, dt) GROUP BY k ORDER BY k FORMAT TabSeparatedWithNamesAndTypes;
SELECT k, groupArrayMovingAvg(v) FROM (SELECT * FROM moving_sum_num ORDER BY k, dt) GROUP BY k ORDER BY k FORMAT TabSeparatedWithNamesAndTypes;
SELECT k, groupArrayMovingAvg(3)(v) FROM (SELECT * FROM moving_sum_num ORDER BY k, dt) GROUP BY k ORDER BY k FORMAT TabSeparatedWithNamesAndTypes;
CREATE TABLE moving_sum_dec ENGINE = Memory AS
SELECT k, dt, toDecimal64(v, 2) as v
FROM moving_sum_num;
SELECT k, groupArrayMovingSum(v) FROM (SELECT * FROM moving_sum_num ORDER BY k, dt) GROUP BY k ORDER BY k FORMAT TabSeparatedWithNamesAndTypes;
SELECT k, groupArrayMovingSum(v) FROM (SELECT * FROM moving_sum_num ORDER BY k, dt) GROUP BY k ORDER BY k FORMAT TabSeparatedWithNamesAndTypes;
DROP TABLE moving_sum_dec;
DROP TABLE moving_sum_num;

View File

@ -1,40 +0,0 @@
DROP TABLE IF EXISTS moving_sum_num;
DROP TABLE IF EXISTS moving_sum_dec;
CREATE TABLE moving_sum_num (
k String,
dt DateTime,
v UInt64
)
ENGINE = MergeTree ORDER BY (k, dt);
INSERT INTO moving_sum_num
SELECT 'b' k, toDateTime('2001-02-03 00:00:00')+number as dt, number as v
FROM system.numbers
LIMIT 5
UNION ALL
SELECT 'a' k, toDateTime('2001-02-03 00:00:00')+number as dt, number as v
FROM system.numbers
LIMIT 5;
INSERT INTO moving_sum_num
SELECT 'b' k, toDateTime('2001-02-03 01:00:00')+number as dt, 5+number as v
FROM system.numbers
LIMIT 5;
SELECT * FROM moving_sum_num ORDER BY k,dt FORMAT TabSeparatedWithNames;
SELECT k, movingSum(v) FROM (SELECT * FROM moving_sum_num ORDER BY k, dt) GROUP BY k ORDER BY k FORMAT TabSeparatedWithNamesAndTypes;
SELECT k, movingSum(3)(v) FROM (SELECT * FROM moving_sum_num ORDER BY k, dt) GROUP BY k ORDER BY k FORMAT TabSeparatedWithNamesAndTypes;
CREATE TABLE moving_sum_dec ENGINE = Memory AS
SELECT k, dt, toDecimal64(v, 2) as v
FROM moving_sum_num;
SELECT k, movingSum(v) FROM (SELECT * FROM moving_sum_num ORDER BY k, dt) GROUP BY k ORDER BY k FORMAT TabSeparatedWithNamesAndTypes;
SELECT k, movingSum(v) FROM (SELECT * FROM moving_sum_num ORDER BY k, dt) GROUP BY k ORDER BY k FORMAT TabSeparatedWithNamesAndTypes;
DROP TABLE moving_sum_dec;
DROP TABLE moving_sum_num;