From 30eef142b33749b3579b014d071d99270008eb02 Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Tue, 19 May 2015 16:23:13 +0300 Subject: [PATCH 1/4] Merge --- dbms/include/DB/Functions/FunctionsRound.h | 277 ++++++++++++--------- 1 file changed, 156 insertions(+), 121 deletions(-) diff --git a/dbms/include/DB/Functions/FunctionsRound.h b/dbms/include/DB/Functions/FunctionsRound.h index 567379ef5f7..93e328ed94b 100644 --- a/dbms/include/DB/Functions/FunctionsRound.h +++ b/dbms/include/DB/Functions/FunctionsRound.h @@ -92,28 +92,152 @@ namespace DB } }; - template class Op, typename PowersTable> - struct FunctionApproximatingImpl + /// Реализация функций округления на низком уровне. + + template + struct RoundingComputation { - template - static inline U apply(U val, UInt8 precision, - typename std::enable_if::value>::type * = nullptr) + }; + + template + struct RoundingComputation + { + using Data = std::array; + using Scale = __m128; + + static inline void prepareScale(size_t scale, Scale & mm_scale) + { + Float32 fscale = static_cast(scale); + mm_scale = _mm_load1_ps(&fscale); + } + + static inline void compute(const Data & in, const Scale & mm_scale, Data & out) + { + Float32 input[4] __attribute__((aligned(16))) = {in[0], in[1], in[2], in[3]}; + __m128 mm_value = _mm_load_ps(input); + + mm_value = _mm_mul_ps(mm_value, mm_scale); + mm_value = _mm_round_ps(mm_value, rounding_mode); + mm_value = _mm_div_ps(mm_value, mm_scale); + + Float32 res[4] __attribute__((aligned(16))); + _mm_store_ps(res, mm_value); + out = {res[0], res[1], res[2], res[3]}; + } + }; + + template + struct RoundingComputation + { + using Data = std::array; + using Scale = __m128d; + + static inline void prepareScale(size_t scale, Scale & mm_scale) + { + Float64 fscale = static_cast(scale); + mm_scale = _mm_load1_pd(&fscale); + } + + static inline void compute(const Data & in, const Scale & mm_scale, Data & out) + { + Float64 input[2] __attribute__((aligned(16))) = { in[0], in[1] }; + __m128d mm_value = _mm_load_pd(input); + + mm_value = _mm_mul_pd(mm_value, mm_scale); + mm_value = _mm_round_pd(mm_value, rounding_mode); + mm_value = _mm_div_pd(mm_value, mm_scale); + + Float64 res[2] __attribute__((aligned(16))); + _mm_store_pd(res, mm_value); + out = {res[0], res[1]}; + } + }; + + /// Реализация функций округления на высоком уровне. + + template + struct FunctionRoundingImpl + { + }; + + /// В случае целочисленных значений не выполяется округления. + template + struct FunctionRoundingImpl::value>::type> + { + static inline void apply(const PODArray & in, size_t scale, typename ColumnVector::Container_t & out) + { + size_t size = in.size(); + for (size_t i = 0; i < size; ++i) + out[i] = in[i]; + } + + static inline T apply(T val, size_t scale) + { + return val; + } + }; + + template + struct FunctionRoundingImpl::value>::type> + { + private: + using Op = RoundingComputation; + using Data = typename Op::Data; + using Scale = typename Op::Scale; + + public: + static inline void apply(const PODArray & in, size_t scale, typename ColumnVector::Container_t & out) + { + Scale mm_scale; + Op::prepareScale(scale, mm_scale); + + const size_t size = in.size(); + const size_t data_size = std::tuple_size(); + + size_t i; + for (i = 0; i < (size - data_size + 1); i += data_size) + { + Data tmp; + for (size_t j = 0; j < data_size; ++j) + tmp[j] = in[i + j]; + + Data res; + Op::compute(tmp, mm_scale, res); + + for (size_t j = 0; j < data_size; ++j) + out[i + j] = res[j]; + } + + if (i < size) + { + Data tmp{0}; + for (size_t j = 0; (j < data_size) && (i + j) < size; ++j) + tmp[j] = in[i + j]; + + Data res; + Op::compute(tmp, mm_scale, res); + + for (size_t j = 0; (j < data_size) && (i + j) < size; ++j) + out[i + j] = in[i + j]; + } + } + + static inline T apply(T val, size_t scale) { if (val == 0) return val; else { - size_t power = PowersTable::values[precision]; - return Op::apply(val * power) / power; - } - } + Scale mm_scale; + Op::prepareScale(scale, mm_scale); - /// Для целых чисел ничего не надо делать. - template - static inline U apply(U val, UInt8 precision, - typename std::enable_if::value>::type * = nullptr) - { - return val; + Data tmp{0}; + tmp[0] = val; + + Data res; + Op::compute(tmp, mm_scale, res); + return res[0]; + } } }; @@ -164,7 +288,7 @@ namespace { /// Отдельные степени числа 10. - template + template struct PowerOf10 { static const size_t value = 10 * PowerOf10::value; @@ -179,49 +303,49 @@ namespace /// Объявление и определение контейнера содержащего таблицу степеней числа 10. - template + template struct TableContainer { static const std::array values; }; - template + template const std::array TableContainer::values = { TArgs... }; /// Генератор первых N степеней. - template + template struct FillArrayImpl { using result = typename FillArrayImpl::value, TArgs...>::result; }; - template + template struct FillArrayImpl<0, TArgs...> { using result = TableContainer::value, TArgs...>; }; - template + template struct FillArray { - using result = typename FillArrayImpl::result; + using result = typename FillArrayImpl::result; }; - /** Шаблон для функцией, которые вычисляют приближенное значение входного параметра + /** Шаблон для функций, которые вычисляют приближенное значение входного параметра * типа (U)Int8/16/32/64 или Float32/64 и принимают дополнительный необязятельный * параметр указывающий сколько знаков после запятой оставить (по умолчанию - 0). * Op - функция (round/floor/ceil) */ - template class Op, typename Name> - class FunctionApproximating : public IFunction + template + class FunctionRounding : public IFunction { public: static constexpr auto name = Name::name; - static IFunction * create(const Context & context) { return new FunctionApproximating; } + static IFunction * create(const Context & context) { return new FunctionRounding; } private: - using PowersOf10 = FillArray::digits10 + 1>::result; + using PowersOf10 = FillArray::digits10 + 1>::result; private: template @@ -245,10 +369,7 @@ namespace typename ColumnVector::Container_t & vec_res = col_res->getData(); vec_res.resize(col->getData().size()); - const PODArray & a = col->getData(); - size_t size = a.size(); - for (size_t i = 0; i < size; ++i) - vec_res[i] = FunctionApproximatingImpl::apply(a[i], precision); + FunctionRoundingImpl::apply(col->getData(), PowersOf10::values[precision], vec_res); return true; } @@ -258,7 +379,7 @@ namespace if (arguments.size() == 2) precision = getPrecision(block.getByPosition(arguments[1]).column); - T res = FunctionApproximatingImpl::apply(col->getData(), precision); + T res = FunctionRoundingImpl::apply(col->getData(), PowersOf10::values[precision]); ColumnConst * col_res = new ColumnConst(col->size(), res); block.getByPosition(result).column = col_res; @@ -355,92 +476,6 @@ namespace } }; -namespace -{ - /// Определение функцией для использования в шаблоне FunctionApproximating. - - template - struct RoundImpl - { - static inline T apply(T val) - { - return val; - } - }; - - template<> - struct RoundImpl - { - static inline Float32 apply(Float32 val) - { - return roundf(val); - } - }; - - template<> - struct RoundImpl - { - static inline Float64 apply(Float64 val) - { - return round(val); - } - }; - - template - struct CeilImpl - { - static inline T apply(T val) - { - return val; - } - }; - - template<> - struct CeilImpl - { - static inline Float32 apply(Float32 val) - { - return ceilf(val); - } - }; - - template<> - struct CeilImpl - { - static inline Float64 apply(Float64 val) - { - return ceil(val); - } - }; - - template - struct FloorImpl - { - static inline T apply(T val) - { - return val; - } - }; - - template<> - struct FloorImpl - { - static inline Float32 apply(Float32 val) - { - return floorf(val); - } - }; - - template<> - struct FloorImpl - { - static inline Float64 apply(Float64 val) - { - return floor(val); - } - }; -} - struct NameRoundToExp2 { static constexpr auto name = "roundToExp2"; }; struct NameRoundDuration { static constexpr auto name = "roundDuration"; }; struct NameRoundAge { static constexpr auto name = "roundAge"; }; @@ -451,7 +486,7 @@ namespace typedef FunctionUnaryArithmetic FunctionRoundToExp2; typedef FunctionUnaryArithmetic FunctionRoundDuration; typedef FunctionUnaryArithmetic FunctionRoundAge; - typedef FunctionApproximating FunctionRound; - typedef FunctionApproximating FunctionCeil; - typedef FunctionApproximating FunctionFloor; + typedef FunctionRounding FunctionRound; + typedef FunctionRounding FunctionCeil; + typedef FunctionRounding FunctionFloor; } From 8a36ba31c8f0f56ef5d8564e7c27051729e056b0 Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Tue, 19 May 2015 16:26:23 +0300 Subject: [PATCH 2/4] dbms: Server: small cosmetic change. [#METR-15210] --- dbms/include/DB/Functions/FunctionsRound.h | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/dbms/include/DB/Functions/FunctionsRound.h b/dbms/include/DB/Functions/FunctionsRound.h index 93e328ed94b..4b96bf5b9d9 100644 --- a/dbms/include/DB/Functions/FunctionsRound.h +++ b/dbms/include/DB/Functions/FunctionsRound.h @@ -288,7 +288,7 @@ namespace { /// Отдельные степени числа 10. - template + template struct PowerOf10 { static const size_t value = 10 * PowerOf10::value; @@ -303,33 +303,33 @@ namespace /// Объявление и определение контейнера содержащего таблицу степеней числа 10. - template + template struct TableContainer { static const std::array values; }; - template + template const std::array TableContainer::values = { TArgs... }; /// Генератор первых N степеней. - template + template struct FillArrayImpl { using result = typename FillArrayImpl::value, TArgs...>::result; }; - template + template struct FillArrayImpl<0, TArgs...> { using result = TableContainer::value, TArgs...>; }; - template + template struct FillArray { - using result = typename FillArrayImpl::result; + using result = typename FillArrayImpl::result; }; /** Шаблон для функций, которые вычисляют приближенное значение входного параметра From 2421d530334e510f3b697220845e84e920cd8756 Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Tue, 19 May 2015 16:31:14 +0300 Subject: [PATCH 3/4] dbms: Server: small cosmetic change. [#METR-15210] --- dbms/include/DB/Functions/FunctionsRound.h | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/dbms/include/DB/Functions/FunctionsRound.h b/dbms/include/DB/Functions/FunctionsRound.h index 4b96bf5b9d9..5c795cbd54f 100644 --- a/dbms/include/DB/Functions/FunctionsRound.h +++ b/dbms/include/DB/Functions/FunctionsRound.h @@ -211,13 +211,13 @@ namespace DB if (i < size) { Data tmp{0}; - for (size_t j = 0; (j < data_size) && (i + j) < size; ++j) + for (size_t j = 0; (j < data_size) && ((i + j) < size); ++j) tmp[j] = in[i + j]; Data res; Op::compute(tmp, mm_scale, res); - for (size_t j = 0; (j < data_size) && (i + j) < size; ++j) + for (size_t j = 0; (j < data_size) && ((i + j) < size); ++j) out[i + j] = in[i + j]; } } @@ -357,6 +357,8 @@ namespace template bool executeForType(Block & block, const ColumnNumbers & arguments, size_t result) { + using Op = FunctionRoundingImpl; + if (ColumnVector * col = typeid_cast *>(&*block.getByPosition(arguments[0]).column)) { UInt8 precision = 0; @@ -369,7 +371,7 @@ namespace typename ColumnVector::Container_t & vec_res = col_res->getData(); vec_res.resize(col->getData().size()); - FunctionRoundingImpl::apply(col->getData(), PowersOf10::values[precision], vec_res); + Op::apply(col->getData(), PowersOf10::values[precision], vec_res); return true; } @@ -379,7 +381,7 @@ namespace if (arguments.size() == 2) precision = getPrecision(block.getByPosition(arguments[1]).column); - T res = FunctionRoundingImpl::apply(col->getData(), PowersOf10::values[precision]); + T res = Op::apply(col->getData(), PowersOf10::values[precision]); ColumnConst * col_res = new ColumnConst(col->size(), res); block.getByPosition(result).column = col_res; From b46ce05dbe35fba90f6809614b3218522d8518c4 Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Tue, 19 May 2015 18:06:12 +0300 Subject: [PATCH 4/4] Merge --- .../AggregateFunctionsStatistics.h | 426 ++++++++++++++++++ .../AggregateFunctionFactory.cpp | 94 +++- 2 files changed, 519 insertions(+), 1 deletion(-) create mode 100644 dbms/include/DB/AggregateFunctions/AggregateFunctionsStatistics.h diff --git a/dbms/include/DB/AggregateFunctions/AggregateFunctionsStatistics.h b/dbms/include/DB/AggregateFunctions/AggregateFunctionsStatistics.h new file mode 100644 index 00000000000..2ed3b05deea --- /dev/null +++ b/dbms/include/DB/AggregateFunctions/AggregateFunctionsStatistics.h @@ -0,0 +1,426 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include + +namespace DB +{ + +/** Статистические аггрегатные функции: + * varSamp - выборочная дисперсия + * stddevSamp - среднее выборочное квадратичное отклонение + * varPop - дисперсия + * stddevPop - среднее квадратичное отклонение + * covarSamp - выборочная ковариация + * covarPop - ковариация + * corr - корреляция + */ + +/** Параллельный и инкрементальный алгоритм для вычисления дисперсии. + * Источник: "Updating formulae and a pairwise algorithm for computing sample variances" + * (Chan et al., Stanford University, 12.1979) + */ +template +class AggregateFunctionVarianceData +{ +public: + AggregateFunctionVarianceData() = default; + + void update(const IColumn & column, size_t row_num) + { + T received = static_cast &>(column).getData()[row_num]; + Float64 val = static_cast(received); + Float64 delta = val - mean; + + ++count; + mean += delta / count; + m2 += delta * (val - mean); + } + + void mergeWith(const AggregateFunctionVarianceData & source) + { + UInt64 total_count = count + source.count; + if (total_count == 0) + return; + + Float64 factor = static_cast(count * source.count) / total_count; + Float64 delta = mean - source.mean; + + auto res = std::minmax(count, source.count); + if (((1 - static_cast(res.first) / res.second) < 0.001) && (res.first > 10000)) + { + /// Эта формула более стабильная, когда размеры обоих источников велики и сравнимы. + mean = (source.count * source.mean + count * mean) / total_count; + } + else + mean = source.mean + delta * (static_cast(count) / total_count); + + m2 += source.m2 + delta * delta * factor; + count = total_count; + } + + void serialize(WriteBuffer & buf) const + { + writeVarUInt(count, buf); + writeBinary(mean, buf); + writeBinary(m2, buf); + } + + void deserialize(ReadBuffer & buf) + { + readVarUInt(count, buf); + readBinary(mean, buf); + readBinary(m2, buf); + } + + void publish(IColumn & to) const + { + static_cast(to).getData().push_back(Op::apply(m2, count)); + } + +private: + UInt64 count = 0; + Float64 mean = 0.0; + Float64 m2 = 0.0; +}; + +/** Основной код для реализации функций varSamp, stddevSamp, varPop, stddevPop. + */ +template +class AggregateFunctionVariance final : public IUnaryAggregateFunction, AggregateFunctionVariance > +{ +public: + String getName() const override { return Op::name; } + + DataTypePtr getReturnType() const override + { + return new DataTypeFloat64; + } + + void setArgument(const DataTypePtr & argument) override + { + if (!argument->behavesAsNumber()) + throw Exception("Illegal type " + argument->getName() + " of argument for aggregate function " + getName(), + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + } + + void addOne(AggregateDataPtr place, const IColumn & column, size_t row_num) const + { + this->data(place).update(column, row_num); + } + + void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override + { + this->data(place).mergeWith(this->data(rhs)); + } + + void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override + { + this->data(place).serialize(buf); + } + + void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const override + { + AggregateFunctionVarianceData source; + source.deserialize(buf); + + this->data(place).mergeWith(source); + } + + void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + { + this->data(place).publish(to); + } +}; + +namespace +{ + +/** Реализации функции varSamp. + */ +struct VarSampImpl +{ + static constexpr auto name = "varSamp"; + + static inline Float64 apply(Float64 m2, UInt64 count) + { + if (count < 2) + return 0.0; + else + return m2 / (count - 1); + } +}; + +/** Реализация функции stddevSamp. + */ +struct StdDevSampImpl +{ + static constexpr auto name = "stddevSamp"; + + static inline Float64 apply(Float64 m2, UInt64 count) + { + return sqrt(VarSampImpl::apply(m2, count)); + } +}; + +/** Реализация функции varPop. + */ +struct VarPopImpl +{ + static constexpr auto name = "varPop"; + + static inline Float64 apply(Float64 m2, UInt64 count) + { + if (count < 2) + return 0.0; + else + return m2 / count; + } +}; + +/** Реализация функции stddevPop. + */ +struct StdDevPopImpl +{ + static constexpr auto name = "stddevPop"; + + static inline Float64 apply(Float64 m2, UInt64 count) + { + return sqrt(VarPopImpl::apply(m2, count)); + } +}; + +} + +/** Параллельный и инкрементальный алгоритм для вычисления ковариации. + * Источник: "Numerically Stable, Single-Pass, Parallel Statistics Algorithms" + * (J. Bennett et al., Sandia National Laboratories, + * 2009 IEEE International Conference on Cluster Computing) + */ +template +class CovarianceData +{ +public: + CovarianceData() = default; + + void update(const IColumn & column_left, const IColumn & column_right, size_t row_num) + { + T left_received = static_cast &>(column_left).getData()[row_num]; + Float64 val_left = static_cast(left_received); + Float64 left_delta = val_left - left_mean; + + U right_received = static_cast &>(column_right).getData()[row_num]; + Float64 val_right = static_cast(right_received); + Float64 right_delta = val_right - right_mean; + + Float64 old_right_mean = right_mean; + + ++count; + + left_mean += left_delta / count; + right_mean += right_delta / count; + co_moment += (val_left - left_mean) * (val_right - old_right_mean); + + if (compute_marginal_moments) + { + left_m2 += left_delta * (val_left - left_mean); + right_m2 += right_delta * (val_right - right_mean); + } + } + + void mergeWith(const CovarianceData & source) + { + UInt64 total_count = count + source.count; + if (total_count == 0) + return; + + Float64 factor = static_cast(count * source.count) / total_count; + Float64 left_delta = left_mean - source.left_mean; + Float64 right_delta = right_mean - source.right_mean; + + left_mean = source.left_mean + left_delta * (static_cast(count) / total_count); + right_mean = source.right_mean + right_delta * (static_cast(count) / total_count); + co_moment += source.co_moment + left_delta * right_delta * factor; + count = total_count; + + if (compute_marginal_moments) + { + left_m2 += source.left_m2 + left_delta * left_delta * factor; + right_m2 += source.right_m2 + right_delta * right_delta * factor; + } + } + + void serialize(WriteBuffer & buf) const + { + writeVarUInt(count, buf); + writeBinary(left_mean, buf); + writeBinary(right_mean, buf); + writeBinary(co_moment, buf); + + if (compute_marginal_moments) + { + writeBinary(left_m2, buf); + writeBinary(right_m2, buf); + } + } + + void deserialize(ReadBuffer & buf) + { + readVarUInt(count, buf); + readBinary(left_mean, buf); + readBinary(right_mean, buf); + readBinary(co_moment, buf); + + if (compute_marginal_moments) + { + readBinary(left_m2, buf); + readBinary(right_m2, buf); + } + } + + void publish(IColumn & to) const + { + static_cast(to).getData().push_back(Op::apply(co_moment, left_m2, right_m2, count)); + } + +private: + UInt64 count = 0; + Float64 left_mean = 0.0; + Float64 right_mean = 0.0; + Float64 co_moment = 0.0; + Float64 left_m2 = 0.0; + Float64 right_m2 = 0.0; +}; + +template +class AggregateFunctionCovariance final + : public IBinaryAggregateFunction< + CovarianceData, + AggregateFunctionCovariance > +{ +public: + String getName() const override { return Op::name; } + + DataTypePtr getReturnType() const override + { + return new DataTypeFloat64; + } + + void setArgumentsImpl(const DataTypes & arguments) + { + if (!arguments[0]->behavesAsNumber()) + throw Exception("Illegal type " + arguments[0]->getName() + " of first argument to function " + getName(), + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + + if (!arguments[1]->behavesAsNumber()) + throw Exception("Illegal type " + arguments[1]->getName() + " of second argument to function " + getName(), + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + } + + void addOne(AggregateDataPtr place, const IColumn & column_left, const IColumn & column_right, size_t row_num) const + { + this->data(place).update(column_left, column_right, row_num); + } + + void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override + { + this->data(place).mergeWith(this->data(rhs)); + } + + void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override + { + this->data(place).serialize(buf); + } + + void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const override + { + CovarianceData source; + source.deserialize(buf); + + this->data(place).mergeWith(source); + } + + void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + { + this->data(place).publish(to); + } +}; + +namespace +{ + +/** Реализация функции covarSamp. + */ +struct CovarSampImpl +{ + static constexpr auto name = "covarSamp"; + + static inline Float64 apply(Float64 co_moment, Float64 left_m2, Float64 right_m2, UInt64 count) + { + if (count < 2) + return 0.0; + else + return co_moment / (count - 1); + } +}; + +/** Реализация функции covarPop. + */ +struct CovarPopImpl +{ + static constexpr auto name = "covarPop"; + + static inline Float64 apply(Float64 co_moment, Float64 left_m2, Float64 right_m2, UInt64 count) + { + if (count < 2) + return 0.0; + else + return co_moment / count; + } +}; + +/** Реализация функции corr. + */ +struct CorrImpl +{ + static constexpr auto name = "corr"; + + static inline Float64 apply(Float64 co_moment, Float64 left_m2, Float64 right_m2, UInt64 count) + { + if (count < 2) + return 0.0; + else + return co_moment / sqrt(left_m2 * right_m2); + } +}; + +} + +template +using AggregateFunctionVarSamp = AggregateFunctionVariance; + +template +using AggregateFunctionStdDevSamp = AggregateFunctionVariance; + +template +using AggregateFunctionVarPop = AggregateFunctionVariance; + +template +using AggregateFunctionStdDevPop = AggregateFunctionVariance; + +template +using AggregateFunctionCovarSamp = AggregateFunctionCovariance; + +template +using AggregateFunctionCovarPop = AggregateFunctionCovariance; + +template +using AggregateFunctionCorr = AggregateFunctionCovariance; + +} diff --git a/dbms/src/AggregateFunctions/AggregateFunctionFactory.cpp b/dbms/src/AggregateFunctions/AggregateFunctionFactory.cpp index 5db9268e4db..61b81fc37a5 100644 --- a/dbms/src/AggregateFunctions/AggregateFunctionFactory.cpp +++ b/dbms/src/AggregateFunctions/AggregateFunctionFactory.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include @@ -544,6 +545,90 @@ AggregateFunctionPtr AggregateFunctionFactory::get(const String & name, const Da return new AggregateFunctionSequenceMatch; } + else if (name == "varSamp") + { + if (argument_types.size() != 1) + throw Exception("Incorrect number of arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + AggregateFunctionPtr res = createWithNumericType(*argument_types[0]); + + if (!res) + throw Exception("Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + + return res; + } + else if (name == "varPop") + { + if (argument_types.size() != 1) + throw Exception("Incorrect number of arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + AggregateFunctionPtr res = createWithNumericType(*argument_types[0]); + + if (!res) + throw Exception("Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + + return res; + } + else if (name == "stddevSamp") + { + if (argument_types.size() != 1) + throw Exception("Incorrect number of arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + AggregateFunctionPtr res = createWithNumericType(*argument_types[0]); + + if (!res) + throw Exception("Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + + return res; + } + else if (name == "stddevPop") + { + if (argument_types.size() != 1) + throw Exception("Incorrect number of arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + AggregateFunctionPtr res = createWithNumericType(*argument_types[0]); + + if (!res) + throw Exception("Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + + return res; + } + else if (name == "covarSamp") + { + if (argument_types.size() != 2) + throw Exception("Incorrect number of arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + AggregateFunctionPtr res = createWithTwoNumericTypes(*argument_types[0], *argument_types[1]); + if (!res) + throw Exception("Illegal types " + argument_types[0]->getName() + " and " + argument_types[1]->getName() + + " of arguments for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + + return res; + } + else if (name == "covarPop") + { + if (argument_types.size() != 2) + throw Exception("Incorrect number of arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + AggregateFunctionPtr res = createWithTwoNumericTypes(*argument_types[0], *argument_types[1]); + if (!res) + throw Exception("Illegal types " + argument_types[0]->getName() + " and " + argument_types[1]->getName() + + " of arguments for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + + return res; + } + else if (name == "corr") + { + if (argument_types.size() != 2) + throw Exception("Incorrect number of arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + AggregateFunctionPtr res = createWithTwoNumericTypes(*argument_types[0], *argument_types[1]); + if (!res) + throw Exception("Illegal types " + argument_types[0]->getName() + " and " + argument_types[1]->getName() + + " of arguments for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + + return res; + } else if (recursion_level == 0 && name.size() > strlen("State") && !(strcmp(name.data() + name.size() - strlen("State"), "State"))) { /// Для агрегатных функций вида aggState, где agg - имя другой агрегатной функции. @@ -639,7 +724,14 @@ const AggregateFunctionFactory::FunctionNames & AggregateFunctionFactory::getFun "medianTimingWeighted", "quantileDeterministic", "quantilesDeterministic", - "sequenceMatch" + "sequenceMatch", + "varSamp", + "varPop", + "stddevSamp", + "stddevPop", + "covarSamp", + "covarPop", + "corr" }; return names;