#pragma once #include #include #include #include #include #include #include #include #include /** This is simple, not numerically stable * implementations of variance/covariance/correlation functions. * * It is about two times faster than stable variants. * Numerical errors may occur during summation. * * This implementation is selected as default, * because "you don't pay for what you don't need" principle. * * For more sophisticated implementation, look at AggregateFunctionStatistics.h */ namespace DB { namespace ErrorCodes { extern const int LOGICAL_ERROR; extern const int DECIMAL_OVERFLOW; } template struct VarMoments { T m0{}; T m1{}; T m2{}; void add(T x) { ++m0; m1 += x; m2 += x * x; } void merge(const VarMoments & rhs) { m0 += rhs.m0; m1 += rhs.m1; m2 += rhs.m2; } void write(WriteBuffer & buf) const { writePODBinary(*this, buf); } void read(ReadBuffer & buf) { readPODBinary(*this, buf); } T getPopulation() const { return (m2 - m1 * m1 / m0) / m0; } T getSample() const { if (m0 == 0) return std::numeric_limits::quiet_NaN(); return (m2 - m1 * m1 / m0) / (m0 - 1); } T get() const { throw Exception("Unexpected call", ErrorCodes::LOGICAL_ERROR); } }; template struct VarMomentsDecimal { using NativeType = typename T::NativeType; UInt64 m0{}; NativeType m1{}; NativeType m2{}; void add(NativeType x) { ++m0; m1 += x; NativeType tmp; /// scale' = 2 * scale if (common::mulOverflow(x, x, tmp) || common::addOverflow(m2, tmp, m2)) throw Exception("Decimal math overflow", ErrorCodes::DECIMAL_OVERFLOW); } void merge(const VarMomentsDecimal & rhs) { m0 += rhs.m0; m1 += rhs.m1; if (common::addOverflow(m2, rhs.m2, m2)) throw Exception("Decimal math overflow", ErrorCodes::DECIMAL_OVERFLOW); } void write(WriteBuffer & buf) const { writePODBinary(*this, buf); } void read(ReadBuffer & buf) { readPODBinary(*this, buf); } Float64 getPopulation(UInt32 scale) const { NativeType tmp; if (common::mulOverflow(m1, m1, tmp) || common::subOverflow(m2, NativeType(tmp/m0), tmp)) throw Exception("Decimal math overflow", ErrorCodes::DECIMAL_OVERFLOW); return convertFromDecimal, DataTypeNumber>(tmp / m0, scale); } Float64 getSample(UInt32 scale) const { if (m0 == 0) return std::numeric_limits::quiet_NaN(); NativeType tmp; if (common::mulOverflow(m1, m1, tmp) || common::subOverflow(m2, NativeType(tmp/m0), tmp)) throw Exception("Decimal math overflow", ErrorCodes::DECIMAL_OVERFLOW); return convertFromDecimal, DataTypeNumber>(tmp / (m0 - 1), scale); } Float64 get() const { throw Exception("Unexpected call", ErrorCodes::LOGICAL_ERROR); } }; template struct CovarMoments { T m0{}; T x1{}; T y1{}; T xy{}; void add(T x, T y) { ++m0; x1 += x; y1 += y; xy += x * y; } void merge(const CovarMoments & rhs) { m0 += rhs.m0; x1 += rhs.x1; y1 += rhs.y1; xy += rhs.xy; } void write(WriteBuffer & buf) const { writePODBinary(*this, buf); } void read(ReadBuffer & buf) { readPODBinary(*this, buf); } T getPopulation() const { return (xy - x1 * y1 / m0) / m0; } T getSample() const { if (m0 == 0) return std::numeric_limits::quiet_NaN(); return (xy - x1 * y1 / m0) / (m0 - 1); } T get() const { throw Exception("Unexpected call", ErrorCodes::LOGICAL_ERROR); } }; template struct CorrMoments { T m0{}; T x1{}; T y1{}; T xy{}; T x2{}; T y2{}; void add(T x, T y) { ++m0; x1 += x; y1 += y; xy += x * y; x2 += x * x; y2 += y * y; } void merge(const CorrMoments & rhs) { m0 += rhs.m0; x1 += rhs.x1; y1 += rhs.y1; xy += rhs.xy; x2 += rhs.x2; y2 += rhs.y2; } void write(WriteBuffer & buf) const { writePODBinary(*this, buf); } void read(ReadBuffer & buf) { readPODBinary(*this, buf); } T get() const { return (m0 * xy - x1 * y1) / sqrt((m0 * x2 - x1 * x1) * (m0 * y2 - y1 * y1)); } T getPopulation() const { throw Exception("Unexpected call", ErrorCodes::LOGICAL_ERROR); } T getSample() const { throw Exception("Unexpected call", ErrorCodes::LOGICAL_ERROR); } }; enum class StatisticsFunctionKind { varPop, varSamp, stddevPop, stddevSamp, covarPop, covarSamp, corr }; template struct StatFuncOneArg { using Type1 = T; using Type2 = T; using ResultType = std::conditional_t, Float32, Float64>; using Data = std::conditional_t, VarMomentsDecimal, VarMoments>; static constexpr StatisticsFunctionKind kind = _kind; static constexpr UInt32 num_args = 1; }; template struct StatFuncTwoArg { using Type1 = T1; using Type2 = T2; using ResultType = std::conditional_t && std::is_same_v, Float32, Float64>; using Data = std::conditional_t<_kind == StatisticsFunctionKind::corr, CorrMoments, CovarMoments>; static constexpr StatisticsFunctionKind kind = _kind; static constexpr UInt32 num_args = 2; }; template class AggregateFunctionVarianceSimple final : public IAggregateFunctionDataHelper> { public: using T1 = typename StatFunc::Type1; using T2 = typename StatFunc::Type2; using ColVecT1 = std::conditional_t, ColumnDecimal, ColumnVector>; using ColVecT2 = std::conditional_t, ColumnDecimal, ColumnVector>; using ResultType = typename StatFunc::ResultType; using ColVecResult = ColumnVector; AggregateFunctionVarianceSimple() : src_scale(0) {} AggregateFunctionVarianceSimple(const IDataType & data_type) : src_scale(getDecimalScale(data_type)) {} String getName() const override { switch (StatFunc::kind) { case StatisticsFunctionKind::varPop: return "varPop"; case StatisticsFunctionKind::varSamp: return "varSamp"; case StatisticsFunctionKind::stddevPop: return "stddevPop"; case StatisticsFunctionKind::stddevSamp: return "stddevSamp"; case StatisticsFunctionKind::covarPop: return "covarPop"; case StatisticsFunctionKind::covarSamp: return "covarSamp"; case StatisticsFunctionKind::corr: return "corr"; } __builtin_unreachable(); } DataTypePtr getReturnType() const override { return std::make_shared>(); } void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override { if constexpr (StatFunc::num_args == 2) this->data(place).add( static_cast(*columns[0]).getData()[row_num], static_cast(*columns[1]).getData()[row_num]); else this->data(place).add( static_cast(*columns[0]).getData()[row_num]); } void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override { this->data(place).merge(this->data(rhs)); } void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override { this->data(place).write(buf); } void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override { this->data(place).read(buf); } void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override { const auto & data = this->data(place); auto & dst = static_cast(to).getData(); if constexpr (IsDecimalNumber) { switch (StatFunc::kind) { case StatisticsFunctionKind::varPop: dst.push_back(data.getPopulation(src_scale * 2)); break; case StatisticsFunctionKind::varSamp: dst.push_back(data.getSample(src_scale * 2)); break; case StatisticsFunctionKind::stddevPop: dst.push_back(sqrt(data.getPopulation(src_scale * 2))); break; case StatisticsFunctionKind::stddevSamp: dst.push_back(sqrt(data.getSample(src_scale * 2))); break; } } else { switch (StatFunc::kind) { case StatisticsFunctionKind::varPop: dst.push_back(data.getPopulation()); break; case StatisticsFunctionKind::varSamp: dst.push_back(data.getSample()); break; case StatisticsFunctionKind::stddevPop: dst.push_back(sqrt(data.getPopulation())); break; case StatisticsFunctionKind::stddevSamp: dst.push_back(sqrt(data.getSample())); break; case StatisticsFunctionKind::covarPop: dst.push_back(data.getPopulation()); break; case StatisticsFunctionKind::covarSamp: dst.push_back(data.getSample()); break; case StatisticsFunctionKind::corr: dst.push_back(data.get()); break; } } } const char * getHeaderFilePath() const override { return __FILE__; } private: UInt32 src_scale; }; template using AggregateFunctionVarPopSimple = AggregateFunctionVarianceSimple>; template using AggregateFunctionVarSampSimple = AggregateFunctionVarianceSimple>; template using AggregateFunctionStddevPopSimple = AggregateFunctionVarianceSimple>; template using AggregateFunctionStddevSampSimple = AggregateFunctionVarianceSimple>; template using AggregateFunctionCovarPopSimple = AggregateFunctionVarianceSimple>; template using AggregateFunctionCovarSampSimple = AggregateFunctionVarianceSimple>; template using AggregateFunctionCorrSimple = AggregateFunctionVarianceSimple>; }