diff --git a/src/AggregateFunctions/AggregateFunctionTimeSeriesGroupSum.cpp b/src/AggregateFunctions/AggregateFunctionTimeSeriesGroupSum.cpp deleted file mode 100644 index c8711c257f8..00000000000 --- a/src/AggregateFunctions/AggregateFunctionTimeSeriesGroupSum.cpp +++ /dev/null @@ -1,35 +0,0 @@ -#include "AggregateFunctionTimeSeriesGroupSum.h" -#include "AggregateFunctionFactory.h" -#include "FactoryHelpers.h" -#include "Helpers.h" -#include "registerAggregateFunctions.h" - - -namespace DB -{ -namespace ErrorCodes -{ - extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; -} -namespace -{ - template - AggregateFunctionPtr createAggregateFunctionTimeSeriesGroupSum(const std::string & name, const DataTypes & arguments, const Array & params) - { - assertNoParameters(name, params); - - if (arguments.size() < 3) - throw Exception("Not enough event arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); - - return std::make_shared>(arguments); - } - -} - -void registerAggregateFunctionTimeSeriesGroupSum(AggregateFunctionFactory & factory) -{ - factory.registerFunction("timeSeriesGroupSum", createAggregateFunctionTimeSeriesGroupSum); - factory.registerFunction("timeSeriesGroupRateSum", createAggregateFunctionTimeSeriesGroupSum); -} - -} diff --git a/src/AggregateFunctions/AggregateFunctionTimeSeriesGroupSum.h b/src/AggregateFunctions/AggregateFunctionTimeSeriesGroupSum.h deleted file mode 100644 index 63dde3f1738..00000000000 --- a/src/AggregateFunctions/AggregateFunctionTimeSeriesGroupSum.h +++ /dev/null @@ -1,291 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "IAggregateFunction.h" - - -namespace DB -{ - -namespace ErrorCodes -{ - extern const int LOGICAL_ERROR; - extern const int ILLEGAL_TYPE_OF_ARGUMENT; -} - -template -struct AggregateFunctionTimeSeriesGroupSumData -{ - using DataPoint = std::pair; - struct Points - { - using Dps = std::queue; - Dps dps; - void add(Int64 t, Float64 v) - { - dps.push(std::make_pair(t, v)); - if (dps.size() > 2) - dps.pop(); - } - Float64 getval(Int64 t) - { - Int64 t1, t2; - Float64 v1, v2; - if (rate) - { - if (dps.size() < 2) - return 0; - t1 = dps.back().first; - t2 = dps.front().first; - v1 = dps.back().second; - v2 = dps.front().second; - return (v1 - v2) / Float64(t1 - t2); - } - else - { - if (dps.size() == 1 && t == dps.front().first) - return dps.front().second; - t1 = dps.back().first; - t2 = dps.front().first; - v1 = dps.back().second; - v2 = dps.front().second; - return v2 + ((v1 - v2) * Float64(t - t2)) / Float64(t1 - t2); - } - } - }; - - typedef std::map Series; - typedef PODArrayWithStackMemory AggSeries; - Series ss; - AggSeries result; - - void add(UInt64 uid, Int64 t, Float64 v) - { //suppose t is coming asc - typename Series::iterator it_ss; - if (ss.count(uid) == 0) - { //time series not exist, insert new one - Points tmp; - tmp.add(t, v); - ss.emplace(uid, tmp); - it_ss = ss.find(uid); - } - else - { - it_ss = ss.find(uid); - it_ss->second.add(t, v); - } - if (result.size() > 0 && t < result.back().first) - throw Exception{"timeSeriesGroupSum or timeSeriesGroupRateSum must order by timestamp asc.", ErrorCodes::LOGICAL_ERROR}; - if (result.size() > 0 && t == result.back().first) - { - //do not add new point - if (rate) - result.back().second += it_ss->second.getval(t); - else - result.back().second += v; - } - else - { - if (rate) - result.emplace_back(std::make_pair(t, it_ss->second.getval(t))); - else - result.emplace_back(std::make_pair(t, v)); - } - ssize_t i = result.size() - 1; - //reverse find out the index of timestamp that more than previous timestamp of t - while (result[i].first > it_ss->second.dps.front().first && i >= 0) - i--; - - i++; - while (i < ssize_t(result.size()) - 1) - { - result[i].second += it_ss->second.getval(result[i].first); - i++; - } - } - - void merge(const AggregateFunctionTimeSeriesGroupSumData & other) - { - //if ts has overlap, then aggregate two series by interpolation; - AggSeries tmp; - tmp.reserve(other.result.size() + result.size()); - size_t i = 0, j = 0; - Int64 t1, t2; - Float64 v1, v2; - while (i < result.size() && j < other.result.size()) - { - if (result[i].first < other.result[j].first) - { - if (j == 0) - { - tmp.emplace_back(result[i]); - } - else - { - t1 = other.result[j].first; - t2 = other.result[j - 1].first; - v1 = other.result[j].second; - v2 = other.result[j - 1].second; - Float64 value = result[i].second + v2 + (v1 - v2) * (Float64(result[i].first - t2)) / Float64(t1 - t2); - tmp.emplace_back(std::make_pair(result[i].first, value)); - } - i++; - } - else if (result[i].first > other.result[j].first) - { - if (i == 0) - { - tmp.emplace_back(other.result[j]); - } - else - { - t1 = result[i].first; - t2 = result[i - 1].first; - v1 = result[i].second; - v2 = result[i - 1].second; - Float64 value = other.result[j].second + v2 + (v1 - v2) * (Float64(other.result[j].first - t2)) / Float64(t1 - t2); - tmp.emplace_back(std::make_pair(other.result[j].first, value)); - } - j++; - } - else - { - tmp.emplace_back(std::make_pair(result[i].first, result[i].second + other.result[j].second)); - i++; - j++; - } - } - while (i < result.size()) - { - tmp.emplace_back(result[i]); - i++; - } - while (j < other.result.size()) - { - tmp.push_back(other.result[j]); - j++; - } - swap(result, tmp); - } - - void serialize(WriteBuffer & buf) const - { - size_t size = result.size(); - writeVarUInt(size, buf); - if (size > 0) - { - buf.write(reinterpret_cast(result.data()), size * sizeof(result[0])); - } - } - - void deserialize(ReadBuffer & buf) - { - size_t size = 0; - readVarUInt(size, buf); - result.resize(size); - if (size > 0) - { - buf.read(reinterpret_cast(result.data()), size * sizeof(result[0])); - } - } -}; -template -class AggregateFunctionTimeSeriesGroupSum final - : public IAggregateFunctionDataHelper, AggregateFunctionTimeSeriesGroupSum> -{ -private: -public: - String getName() const override { return rate ? "timeSeriesGroupRateSum" : "timeSeriesGroupSum"; } - - AggregateFunctionTimeSeriesGroupSum(const DataTypes & arguments) - : IAggregateFunctionDataHelper, AggregateFunctionTimeSeriesGroupSum>(arguments, {}) - { - if (!WhichDataType(arguments[0].get()).isUInt64()) - throw Exception{"Illegal type " + arguments[0].get()->getName() + " of argument 1 of aggregate function " + getName() - + ", must be UInt64", - ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT}; - - if (!WhichDataType(arguments[1].get()).isInt64()) - throw Exception{"Illegal type " + arguments[1].get()->getName() + " of argument 2 of aggregate function " + getName() - + ", must be Int64", - ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT}; - - if (!WhichDataType(arguments[2].get()).isFloat64()) - throw Exception{"Illegal type " + arguments[2].get()->getName() + " of argument 3 of aggregate function " + getName() - + ", must be Float64", - ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT}; - } - - DataTypePtr getReturnType() const override - { - auto datatypes = std::vector(); - datatypes.push_back(std::make_shared()); - datatypes.push_back(std::make_shared()); - - return std::make_shared(std::make_shared(datatypes)); - } - - void add(AggregateDataPtr place, const IColumn ** columns, const size_t row_num, Arena *) const override - { - auto uid = assert_cast *>(columns[0])->getData()[row_num]; - auto ts = assert_cast *>(columns[1])->getData()[row_num]; - auto val = assert_cast *>(columns[2])->getData()[row_num]; - if (uid && ts && val) - { - this->data(place).add(uid, ts, val); - } - } - - void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override { this->data(place).merge(this->data(rhs)); } - - void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override { this->data(place).serialize(buf); } - - void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override { this->data(place).deserialize(buf); } - - void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override - { - const auto & value = this->data(place).result; - size_t size = value.size(); - - ColumnArray & arr_to = assert_cast(to); - ColumnArray::Offsets & offsets_to = arr_to.getOffsets(); - size_t old_size = offsets_to.back(); - - offsets_to.push_back(offsets_to.back() + size); - - if (size) - { - typename ColumnInt64::Container & ts_to - = assert_cast(assert_cast(arr_to.getData()).getColumn(0)).getData(); - typename ColumnFloat64::Container & val_to - = assert_cast(assert_cast(arr_to.getData()).getColumn(1)).getData(); - ts_to.reserve(old_size + size); - val_to.reserve(old_size + size); - size_t i = 0; - while (i < this->data(place).result.size()) - { - ts_to.push_back(this->data(place).result[i].first); - val_to.push_back(this->data(place).result[i].second); - i++; - } - } - } - - bool allocatesMemoryInArena() const override { return true; } -}; -} diff --git a/src/AggregateFunctions/registerAggregateFunctions.cpp b/src/AggregateFunctions/registerAggregateFunctions.cpp index 90109a98433..9e179ead0df 100644 --- a/src/AggregateFunctions/registerAggregateFunctions.cpp +++ b/src/AggregateFunctions/registerAggregateFunctions.cpp @@ -32,7 +32,6 @@ void registerAggregateFunctionsBitmap(AggregateFunctionFactory &); void registerAggregateFunctionsMaxIntersections(AggregateFunctionFactory &); void registerAggregateFunctionHistogram(AggregateFunctionFactory &); void registerAggregateFunctionRetention(AggregateFunctionFactory &); -void registerAggregateFunctionTimeSeriesGroupSum(AggregateFunctionFactory &); void registerAggregateFunctionMLMethod(AggregateFunctionFactory &); void registerAggregateFunctionEntropy(AggregateFunctionFactory &); void registerAggregateFunctionSimpleLinearRegression(AggregateFunctionFactory &); @@ -86,7 +85,6 @@ void registerAggregateFunctions() registerAggregateFunctionsMaxIntersections(factory); registerAggregateFunctionHistogram(factory); registerAggregateFunctionRetention(factory); - registerAggregateFunctionTimeSeriesGroupSum(factory); registerAggregateFunctionMLMethod(factory); registerAggregateFunctionEntropy(factory); registerAggregateFunctionSimpleLinearRegression(factory); diff --git a/src/AggregateFunctions/ya.make b/src/AggregateFunctions/ya.make index f5e64f1471b..8ba5655ace9 100644 --- a/src/AggregateFunctions/ya.make +++ b/src/AggregateFunctions/ya.make @@ -45,7 +45,6 @@ SRCS( AggregateFunctionStatisticsSimple.cpp AggregateFunctionSum.cpp AggregateFunctionSumMap.cpp - AggregateFunctionTimeSeriesGroupSum.cpp AggregateFunctionTopK.cpp AggregateFunctionUniq.cpp AggregateFunctionUniqCombined.cpp diff --git a/tests/fuzz/ast.dict b/tests/fuzz/ast.dict index fff02af1c4d..8327f276b31 100644 --- a/tests/fuzz/ast.dict +++ b/tests/fuzz/ast.dict @@ -344,8 +344,6 @@ "TABLE" "TABLES" "TEMPORARY" -"timeSeriesGroupRateSum" -"timeSeriesGroupSum" "TIMESTAMP" "TIMESTAMP_ADD" "TIMESTAMPADD" diff --git a/tests/queries/0_stateless/00910_aggregation_timeseriesgroupsum.reference b/tests/queries/0_stateless/00910_aggregation_timeseriesgroupsum.reference deleted file mode 100644 index dbcad97e743..00000000000 --- a/tests/queries/0_stateless/00910_aggregation_timeseriesgroupsum.reference +++ /dev/null @@ -1,2 +0,0 @@ -[(2,0.2),(3,0.8999999999999999),(7,2.0999999999999996),(8,2.4),(12,3.5999999999999996),(17,5.1000000000000005),(18,5.4),(24,7.199999999999999),(25,2.5)] -[(2,0),(3,0.09999999999999999),(7,0.3),(8,0.30000000000000004),(12,0.29999999999999993),(17,0.30000000000000004),(18,0.30000000000000004),(24,0.29999999999999993),(25,0.1)] diff --git a/tests/queries/0_stateless/00910_aggregation_timeseriesgroupsum.sql b/tests/queries/0_stateless/00910_aggregation_timeseriesgroupsum.sql deleted file mode 100644 index 3a1a334469c..00000000000 --- a/tests/queries/0_stateless/00910_aggregation_timeseriesgroupsum.sql +++ /dev/null @@ -1,10 +0,0 @@ -drop table if exists tsgroupsum_test; - -create table tsgroupsum_test (uid UInt64, ts Int64, value Float64) engine=Memory; -insert into tsgroupsum_test values (1,2,0.2),(1,7,0.7),(1,12,1.2),(1,17,1.7),(1,25,2.5); -insert into tsgroupsum_test values (2,3,0.6),(2,8,1.6),(2,12,2.4),(2,18,3.6),(2,24,4.8); - -select timeSeriesGroupSum(uid, ts, value) from (select * from tsgroupsum_test order by ts asc); -select timeSeriesGroupRateSum(uid, ts, value) from (select * from tsgroupsum_test order by ts asc); - -drop table tsgroupsum_test; diff --git a/tests/queries/0_stateless/01560_timeseriesgroupsum_segfault.reference b/tests/queries/0_stateless/01560_timeseriesgroupsum_segfault.reference deleted file mode 100644 index 814095e7818..00000000000 --- a/tests/queries/0_stateless/01560_timeseriesgroupsum_segfault.reference +++ /dev/null @@ -1,3 +0,0 @@ -[] -1 -server is still alive diff --git a/tests/queries/0_stateless/01560_timeseriesgroupsum_segfault.sql b/tests/queries/0_stateless/01560_timeseriesgroupsum_segfault.sql deleted file mode 100644 index eb7c91967e5..00000000000 --- a/tests/queries/0_stateless/01560_timeseriesgroupsum_segfault.sql +++ /dev/null @@ -1,23 +0,0 @@ -DROP TABLE IF EXISTS tsgs_local; -DROP TABLE IF EXISTS tsgs; - -CREATE TABLE tsgs_local ENGINE = MergeTree ORDER BY tuple() AS -SELECT - toUInt64(13820745146630357293) AS a, - toInt64(1604422500000000000) AS b, - toFloat64(0) AS c -FROM numbers(100); - --- the issue (https://github.com/ClickHouse/ClickHouse/issues/16862) happens during serialization of the state --- so happens only when Distributed tables are used or with -State modifier. - -CREATE TABLE tsgs AS tsgs_local ENGINE = Distributed(test_cluster_two_shards, currentDatabase(), tsgs_local); - -SELECT timeSeriesGroupSum(a, b, c) FROM tsgs; - -SELECT count() FROM ( SELECT timeSeriesGroupSumState(a, b, c) as x FROM tsgs_local) WHERE NOT ignore(*); - -SELECT 'server is still alive'; - -DROP TABLE tsgs_local; -DROP TABLE tsgs;