diff --git a/dbms/src/AggregateFunctions/AggregateFunctionTSGroupSum.cpp b/dbms/src/AggregateFunctions/AggregateFunctionTSGroupSum.cpp index 0bb586b5151..765e12b86e5 100644 --- a/dbms/src/AggregateFunctions/AggregateFunctionTSGroupSum.cpp +++ b/dbms/src/AggregateFunctions/AggregateFunctionTSGroupSum.cpp @@ -1,31 +1,30 @@ -#include -#include -#include -#include +#include "AggregateFunctionTSGroupSum.h" +#include "AggregateFunctionFactory.h" +#include "FactoryHelpers.h" +#include "Helpers.h" namespace DB { - - namespace +namespace +{ + template + AggregateFunctionPtr createAggregateFunctionTSgroupSum(const std::string & name, const DataTypes & arguments, const Array & params) { - template - AggregateFunctionPtr createAggregateFunctionTSgroupSum(const std::string & name, const DataTypes & arguments, const Array & params) - { - assertNoParameters(name, params); + assertNoParameters(name, params); - if (arguments.size() < 3) - throw Exception("Not enough event arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + if (arguments.size() < 3) + throw Exception("Not enough event arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); - return std::make_shared>(arguments); - } - - } - - void registerAggregateFunctionTSgroupSum(AggregateFunctionFactory & factory) - { - factory.registerFunction("TSgroupSum", createAggregateFunctionTSgroupSum, AggregateFunctionFactory::CaseInsensitive); - factory.registerFunction("TSgroupRateSum", createAggregateFunctionTSgroupSum, AggregateFunctionFactory::CaseInsensitive); + return std::make_shared>(arguments); } } + +void registerAggregateFunctionTSgroupSum(AggregateFunctionFactory & factory) +{ + factory.registerFunction("TSgroupSum", createAggregateFunctionTSgroupSum, AggregateFunctionFactory::CaseInsensitive); + factory.registerFunction("TSgroupRateSum", createAggregateFunctionTSgroupSum, AggregateFunctionFactory::CaseInsensitive); +} + +} diff --git a/dbms/src/AggregateFunctions/AggregateFunctionTSGroupSum.h b/dbms/src/AggregateFunctions/AggregateFunctionTSGroupSum.h index 1eddeaf18d0..f82e00da8ef 100644 --- a/dbms/src/AggregateFunctions/AggregateFunctionTSGroupSum.h +++ b/dbms/src/AggregateFunctions/AggregateFunctionTSGroupSum.h @@ -1,306 +1,287 @@ #pragma once +#include #include -#include #include #include -#include +#include #include -#include +#include #include #include -#include -#include +#include #include +#include +#include #include #include #include #include -#include - -#include +#include "IAggregateFunction.h" namespace DB { - namespace ErrorCodes +namespace ErrorCodes +{ + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int TOO_MANY_ARGUMENTS_FOR_FUNCTION; +} +template +struct AggregateFunctionTSgroupSumData +{ + using DataPoint = std::pair; + struct Points { - extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; - extern const int TOO_MANY_ARGUMENTS_FOR_FUNCTION; - } - template - struct AggregateFunctionTSgroupSumData - { - using DataPoint = std::pair; - struct Points + using Dps = std::queue; + Dps dps; + void add(Int64 t, Float64 v) { - using Dps = std::queue; - Dps dps; - void add(Int64 t, Float64 v) - { - dps.push(std::make_pair(t, v)); - if (dps.size() > 2) - dps.pop(); - } - Float64 getval(Int64 t) - { - Int64 t1, t2; - Float64 v1, v2; - if (rate) - { - if (dps.size() < 2) - return 0; - t1 = dps.back().first; - t2 = dps.front().first; - v1 = dps.back().second; - v2 = dps.front().second; - return (v1-v2)/Float64(t1-t2); - } - else - { - if (dps.size() == 1 && t == dps.front().first) - return dps.front().second; - t1 = dps.back().first; - t2 = dps.front().first; - v1 = dps.back().second; - v2 = dps.front().second; - return v2 + ((v1-v2)*Float64(t-t2))/Float64(t1-t2); - } - } - }; - - static constexpr size_t bytes_on_stack = 128; - typedef std::map Series; - typedef PODArray, bytes_on_stack>> AggSeries; - Series ss; - AggSeries result; - - void add(UInt64 uid, Int64 t, Float64 v) - {//suppose t is coming asc - typename Series::iterator it_ss; - if (ss.count(uid) == 0) - {//time series not exist, insert new one - Points tmp; - tmp.add(t, v); - ss.emplace(uid, tmp); - it_ss = ss.find(uid); - } - else - { - it_ss = ss.find(uid); - it_ss->second.add(t, v); - } - if (result.size() > 0 && t < result.back().first) - throw Exception{"TSgroupSum or TSgroupRateSum must order by timestamp asc!!!", ErrorCodes::LOGICAL_ERROR}; - if (result.size() > 0 && t == result.back().first) - { - //do not add new point - if (rate) - result.back().second += it_ss->second.getval(t); - else - result.back().second += v; - } - else - { - - if (rate) - result.emplace_back(std::make_pair(t, it_ss->second.getval(t))); - else - result.emplace_back(std::make_pair(t,v)); - } - size_t i = result.size() - 1; - //reverse find out the index of timestamp that more than previous timestamp of t - while (result[i].first > it_ss->second.dps.front().first && i >= 0) - i--; - - i++; - while (i < result.size()-1) - { - result[i].second += it_ss->second.getval(result[i].first); - i++; - } + dps.push(std::make_pair(t, v)); + if (dps.size() > 2) + dps.pop(); } - - void merge(const AggregateFunctionTSgroupSumData & other) + Float64 getval(Int64 t) { - //if ts has overlap, then aggregate two series by interpolation; - AggSeries tmp; - tmp.reserve(other.result.size() + result.size()); - size_t i=0, j=0; Int64 t1, t2; Float64 v1, v2; - while (i < result.size() && j < other.result.size()) + if (rate) { - if (result[i].first < other.result[j].first) + if (dps.size() < 2) + return 0; + t1 = dps.back().first; + t2 = dps.front().first; + v1 = dps.back().second; + v2 = dps.front().second; + return (v1 - v2) / Float64(t1 - t2); + } + else + { + if (dps.size() == 1 && t == dps.front().first) + return dps.front().second; + t1 = dps.back().first; + t2 = dps.front().first; + v1 = dps.back().second; + v2 = dps.front().second; + return v2 + ((v1 - v2) * Float64(t - t2)) / Float64(t1 - t2); + } + } + }; + + static constexpr size_t bytes_on_stack = 128; + typedef std::map Series; + typedef PODArray, bytes_on_stack>> AggSeries; + Series ss; + AggSeries result; + + void add(UInt64 uid, Int64 t, Float64 v) + { //suppose t is coming asc + typename Series::iterator it_ss; + if (ss.count(uid) == 0) + { //time series not exist, insert new one + Points tmp; + tmp.add(t, v); + ss.emplace(uid, tmp); + it_ss = ss.find(uid); + } + else + { + it_ss = ss.find(uid); + it_ss->second.add(t, v); + } + if (result.size() > 0 && t < result.back().first) + throw Exception{"TSgroupSum or TSgroupRateSum must order by timestamp asc!!!", ErrorCodes::LOGICAL_ERROR}; + if (result.size() > 0 && t == result.back().first) + { + //do not add new point + if (rate) + result.back().second += it_ss->second.getval(t); + else + result.back().second += v; + } + else + { + if (rate) + result.emplace_back(std::make_pair(t, it_ss->second.getval(t))); + else + result.emplace_back(std::make_pair(t, v)); + } + size_t i = result.size() - 1; + //reverse find out the index of timestamp that more than previous timestamp of t + while (result[i].first > it_ss->second.dps.front().first && i >= 0) + i--; + + i++; + while (i < result.size() - 1) + { + result[i].second += it_ss->second.getval(result[i].first); + i++; + } + } + + void merge(const AggregateFunctionTSgroupSumData & other) + { + //if ts has overlap, then aggregate two series by interpolation; + AggSeries tmp; + tmp.reserve(other.result.size() + result.size()); + size_t i = 0, j = 0; + Int64 t1, t2; + Float64 v1, v2; + while (i < result.size() && j < other.result.size()) + { + if (result[i].first < other.result[j].first) + { + if (j == 0) { - if (j==0) - { - tmp.emplace_back(result[i]); - } - else - { - t1 = other.result[j].first; - t2 = other.result[j-1].first; - v1 = other.result[j].second; - v2 = other.result[j-1].second; - Float64 value = result[i].second + v2 + (v1 - v2) * (Float64(result[i].first - t2)) / Float64(t1 - t2); - tmp.emplace_back(std::make_pair(result[i].first, value)); - } - i++; - } - else if (result[i].first > other.result[j].first) - { - if (i==0) - { - tmp.emplace_back(other.result[j]); - } - else - { - t1 = result[i].first; - t2 = result[i-1].first; - v1 = result[i].second; - v2 = result[i-1].second; - Float64 value = other.result[j].second + v2 + (v1-v2)*(Float64(other.result[j].first-t2))/Float64(t1-t2); - tmp.emplace_back(std::make_pair(other.result[j].first, value)); - } - j++; + tmp.emplace_back(result[i]); } else { - tmp.emplace_back(std::make_pair(result[i].first, result[i].second + other.result[j].second)); - i++; - j++; + t1 = other.result[j].first; + t2 = other.result[j - 1].first; + v1 = other.result[j].second; + v2 = other.result[j - 1].second; + Float64 value = result[i].second + v2 + (v1 - v2) * (Float64(result[i].first - t2)) / Float64(t1 - t2); + tmp.emplace_back(std::make_pair(result[i].first, value)); } - } - while (i < result.size()) - { - tmp.emplace_back(result[i]); i++; } - while (j < other.result.size()) + else if (result[i].first > other.result[j].first) { - tmp.push_back(other.result[j]); + if (i == 0) + { + tmp.emplace_back(other.result[j]); + } + else + { + t1 = result[i].first; + t2 = result[i - 1].first; + v1 = result[i].second; + v2 = result[i - 1].second; + Float64 value = other.result[j].second + v2 + (v1 - v2) * (Float64(other.result[j].first - t2)) / Float64(t1 - t2); + tmp.emplace_back(std::make_pair(other.result[j].first, value)); + } + j++; + } + else + { + tmp.emplace_back(std::make_pair(result[i].first, result[i].second + other.result[j].second)); + i++; j++; } - swap(result, tmp); } - - void serialize(WriteBuffer & buf) const + while (i < result.size()) { - size_t size = result.size(); - writeVarUInt(size, buf); - buf.write(reinterpret_cast(result.data()), sizeof(result[0])); + tmp.emplace_back(result[i]); + i++; } - - void deserialize(ReadBuffer & buf) + while (j < other.result.size()) { - size_t size = 0; - readVarUInt(size, buf); - result.resize(size); - buf.read(reinterpret_cast(result.data()), size * sizeof(result[0])); + tmp.push_back(other.result[j]); + j++; } - }; - template - class AggregateFunctionTSgroupSum final - : public IAggregateFunctionDataHelper, AggregateFunctionTSgroupSum> + swap(result, tmp); + } + + void serialize(WriteBuffer & buf) const { - private: + size_t size = result.size(); + writeVarUInt(size, buf); + buf.write(reinterpret_cast(result.data()), sizeof(result[0])); + } - public: - String getName() const override + void deserialize(ReadBuffer & buf) + { + size_t size = 0; + readVarUInt(size, buf); + result.resize(size); + buf.read(reinterpret_cast(result.data()), size * sizeof(result[0])); + } +}; +template +class AggregateFunctionTSgroupSum final + : public IAggregateFunctionDataHelper, AggregateFunctionTSgroupSum> +{ +private: +public: + String getName() const override { return rate ? "TSgroupRateSum" : "TSgroupSum"; } + + AggregateFunctionTSgroupSum(const DataTypes & arguments) + : IAggregateFunctionDataHelper, AggregateFunctionTSgroupSum>(arguments, {}) + { + if (!WhichDataType(arguments[0].get()).isUInt64()) + throw Exception{"Illegal type " + arguments[0].get()->getName() + " of argument 1 of aggregate function " + getName() + + ", must be UInt64", + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT}; + + if (!WhichDataType(arguments[1].get()).isInt64()) + throw Exception{"Illegal type " + arguments[1].get()->getName() + " of argument 2 of aggregate function " + getName() + + ", must be Int64", + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT}; + + if (!WhichDataType(arguments[2].get()).isFloat64()) + throw Exception{"Illegal type " + arguments[2].get()->getName() + " of argument 3 of aggregate function " + getName() + + ", must be Float64", + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT}; + } + + DataTypePtr getReturnType() const override + { + auto datatypes = std::vector(); + datatypes.push_back(std::make_shared()); + datatypes.push_back(std::make_shared()); + + return std::make_shared(std::make_shared(datatypes)); + } + + void add(AggregateDataPtr place, const IColumn ** columns, const size_t row_num, Arena *) const override + { + auto uid = static_cast *>(columns[0])->getData()[row_num]; + auto ts = static_cast *>(columns[1])->getData()[row_num]; + auto val = static_cast *>(columns[2])->getData()[row_num]; + if (uid && ts && val) { - return rate?"TSgroupRateSum":"TSgroupSum"; + this->data(place).add(uid, ts, val); } + } - AggregateFunctionTSgroupSum(const DataTypes & arguments) - : IAggregateFunctionDataHelper, AggregateFunctionTSgroupSum>(arguments, {}) + void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override { this->data(place).merge(this->data(rhs)); } + + void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override { this->data(place).serialize(buf); } + + void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override { this->data(place).deserialize(buf); } + + void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + { + const auto & value = this->data(place).result; + size_t size = value.size(); + + ColumnArray & arr_to = static_cast(to); + ColumnArray::Offsets & offsets_to = arr_to.getOffsets(); + size_t old_size = offsets_to.back(); + + offsets_to.push_back(offsets_to.back() + size); + + if (size) { - if (!WhichDataType(arguments[0].get()).isUInt64()) - throw Exception{"Illegal type " + arguments[0].get()->getName() + " of argument 1 of aggregate function " - + getName() + ", must be UInt64", - ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT}; - - if (!WhichDataType(arguments[1].get()).isInt64()) - throw Exception{"Illegal type " + arguments[1].get()->getName() + " of argument 2 of aggregate function " - + getName() + ", must be Int64", - ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT}; - - if (!WhichDataType(arguments[2].get()).isFloat64()) - throw Exception{"Illegal type " + arguments[2].get()->getName() + " of argument 3 of aggregate function " - + getName() + ", must be Float64", - ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT}; - } - - DataTypePtr getReturnType() const override - { - auto datatypes = std::vector(); - datatypes.push_back(std::make_shared()); - datatypes.push_back(std::make_shared()); - - return std::make_shared(std::make_shared(datatypes)); - } - - void add(AggregateDataPtr place, const IColumn ** columns, const size_t row_num, Arena *) const override - { - auto uid = static_cast *>(columns[0])->getData()[row_num]; - auto ts = static_cast *>(columns[1])->getData()[row_num]; - auto val = static_cast *>(columns[2])->getData()[row_num]; - if (uid && ts && val) + typename ColumnInt64::Container & ts_to + = static_cast(static_cast(arr_to.getData()).getColumn(0)).getData(); + typename ColumnFloat64::Container & val_to + = static_cast(static_cast(arr_to.getData()).getColumn(1)).getData(); + ts_to.reserve(old_size + size); + val_to.reserve(old_size + size); + size_t i = 0; + while (i < this->data(place).result.size()) { - this->data(place).add(uid, ts, val); + ts_to.push_back(this->data(place).result[i].first); + val_to.push_back(this->data(place).result[i].second); + i++; } } + } - void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override - { - this->data(place).merge(this->data(rhs)); - } + bool allocatesMemoryInArena() const override { return true; } - void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override - { - this->data(place).serialize(buf); - } - - void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override - { - this->data(place).deserialize(buf); - } - - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override - { - const auto & value = this->data(place).result; - size_t size = value.size(); - - ColumnArray & arr_to = static_cast(to); - ColumnArray::Offsets & offsets_to = arr_to.getOffsets(); - size_t old_size = offsets_to.back(); - - offsets_to.push_back(offsets_to.back() + size); - - if (size) - { - typename ColumnInt64::Container & ts_to = static_cast(static_cast(arr_to.getData()).getColumn(0)).getData(); - typename ColumnFloat64::Container & val_to = static_cast(static_cast(arr_to.getData()).getColumn(1)).getData(); - ts_to.reserve(old_size + size); - val_to.reserve(old_size + size); - size_t i = 0; - while (i < this->data(place).result.size()) - { - ts_to.push_back(this->data(place).result[i].first); - val_to.push_back(this->data(place).result[i].second); - i++; - } - } - } - - bool allocatesMemoryInArena() const override - { - return true; - } - - const char * getHeaderFilePath() const override - { - return __FILE__; - } - }; + const char * getHeaderFilePath() const override { return __FILE__; } +}; }