Merge pull request #16883 from nikitamikhaylov/refactor-rank-corr

mannWhitneyUTest + studentTTest + welchTTest + small rankCorr refactor
This commit is contained in:
Nikita Mikhaylov 2020-11-26 15:17:18 +03:00 committed by GitHub
commit adb82649c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 1415 additions and 478 deletions

View File

@ -288,6 +288,7 @@ TESTS_TO_SKIP=(
# Require python libraries like scipy, pandas and numpy
01322_ttest_scipy
01561_mann_whitney_scipy
01545_system_errors
# Checks system.errors

View File

@ -0,0 +1,37 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionMannWhitney.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include "registerAggregateFunctions.h"
#include <AggregateFunctions/Helpers.h>
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
namespace DB
{
namespace
{
AggregateFunctionPtr createAggregateFunctionMannWhitneyUTest(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
assertBinary(name, argument_types);
if (!isNumber(argument_types[0]) || !isNumber(argument_types[1]))
throw Exception("Aggregate function " + name + " only supports numerical types", ErrorCodes::NOT_IMPLEMENTED);
return std::make_shared<AggregateFunctionMannWhitney>(argument_types, parameters);
}
}
void registerAggregateFunctionMannWhitney(AggregateFunctionFactory & factory)
{
factory.registerFunction("mannWhitneyUTest", createAggregateFunctionMannWhitneyUTest);
}
}

View File

@ -0,0 +1,246 @@
#pragma once
#include <AggregateFunctions/IAggregateFunction.h>
#include <AggregateFunctions/StatCommon.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnTuple.h>
#include <Common/assert_cast.h>
#include <Common/FieldVisitors.h>
#include <Common/PODArray_fwd.h>
#include <common/types.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeTuple.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <limits>
#include <DataTypes/DataTypeArray.h>
#include <Common/ArenaAllocator.h>
#include <iostream>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_ARGUMENTS;
}
struct MannWhitneyData : public StatisticalSample<Float64, Float64>
{
/*Since null hypothesis is "for randomly selected values X and Y from two populations,
*the probability of X being greater than Y is equal to the probability of Y being greater than X".
*Or "the distribution F of first sample equals to the distribution G of second sample".
*Then alternative for this hypothesis (H1) is "two-sided"(F != G), "less"(F < G), "greater" (F > G). */
enum class Alternative
{
TwoSided,
Less,
Greater
};
/// The behaviour equals to the similar function from scipy.
/// https://github.com/scipy/scipy/blob/ab9e9f17e0b7b2d618c4d4d8402cd4c0c200d6c0/scipy/stats/stats.py#L6978
std::pair<Float64, Float64> getResult(Alternative alternative, bool continuity_correction)
{
ConcatenatedSamples both(this->x, this->y);
RanksArray ranks;
Float64 tie_correction;
/// Compute ranks according to both samples.
std::tie(ranks, tie_correction) = computeRanksAndTieCorrection(both);
const Float64 n1 = this->size_x;
const Float64 n2 = this->size_y;
Float64 r1 = 0;
for (size_t i = 0; i < n1; ++i)
r1 += ranks[i];
const Float64 u1 = n1 * n2 + (n1 * (n1 + 1.)) / 2. - r1;
const Float64 u2 = n1 * n2 - u1;
/// The distribution of U-statistic under null hypothesis H0 is symmetric with respect to meanrank.
const Float64 meanrank = n1 * n2 /2. + 0.5 * continuity_correction;
const Float64 sd = std::sqrt(tie_correction * n1 * n2 * (n1 + n2 + 1) / 12.0);
Float64 u = 0;
if (alternative == Alternative::TwoSided)
/// There is no difference which u_i to take as u, because z will be differ only in sign and we take std::abs() from it.
u = std::max(u1, u2);
else if (alternative == Alternative::Less)
u = u1;
else if (alternative == Alternative::Greater)
u = u2;
Float64 z = (u - meanrank) / sd;
if (alternative == Alternative::TwoSided)
z = std::abs(z);
/// In fact cdf is a probability function, so it is intergral of density from (-inf, z].
/// But since standard normal distribution is symmetric, cdf(0) = 0.5 and we have to compute integral from [0, z].
const Float64 cdf = integrateSimpson(0, z, [] (Float64 t) { return std::pow(M_E, -0.5 * t * t) / std::sqrt(2 * M_PI);});
Float64 p_value = 0;
if (alternative == Alternative::TwoSided)
p_value = 1 - 2 * cdf;
else
p_value = 0.5 - cdf;
return {u2, p_value};
}
private:
using Sample = typename StatisticalSample<Float64, Float64>::SampleX;
/// We need to compute ranks according to all samples. Use this class to avoid extra copy and memory allocation.
class ConcatenatedSamples
{
public:
ConcatenatedSamples(const Sample & first_, const Sample & second_)
: first(first_), second(second_) {}
const Float64 & operator[](size_t ind) const
{
if (ind < first.size())
return first[ind];
return second[ind % first.size()];
}
size_t size() const
{
return first.size() + second.size();
}
private:
const Sample & first;
const Sample & second;
};
};
class AggregateFunctionMannWhitney final:
public IAggregateFunctionDataHelper<MannWhitneyData, AggregateFunctionMannWhitney>
{
private:
using Alternative = typename MannWhitneyData::Alternative;
Alternative alternative;
bool continuity_correction{true};
public:
explicit AggregateFunctionMannWhitney(const DataTypes & arguments, const Array & params)
:IAggregateFunctionDataHelper<MannWhitneyData, AggregateFunctionMannWhitney> ({arguments}, {})
{
if (params.size() > 2)
throw Exception("Aggregate function " + getName() + " require two parameter or less", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (params.empty())
{
alternative = Alternative::TwoSided;
return;
}
if (params[0].getType() != Field::Types::String)
throw Exception("Aggregate function " + getName() + " require require first parameter to be a String", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
auto param = params[0].get<String>();
if (param == "two-sided")
alternative = Alternative::TwoSided;
else if (param == "less")
alternative = Alternative::Less;
else if (param == "greater")
alternative = Alternative::Greater;
else
throw Exception("Unknown parameter in aggregate function " + getName() +
". It must be one of: 'two sided', 'less', 'greater'", ErrorCodes::BAD_ARGUMENTS);
if (params.size() != 2)
return;
if (params[1].getType() != Field::Types::UInt64)
throw Exception("Aggregate function " + getName() + " require require second parameter to be a UInt64", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
continuity_correction = static_cast<bool>(params[1].get<UInt64>());
}
String getName() const override
{
return "mannWhitneyUTest";
}
DataTypePtr getReturnType() const override
{
DataTypes types
{
std::make_shared<DataTypeNumber<Float64>>(),
std::make_shared<DataTypeNumber<Float64>>(),
};
Strings names
{
"u_statistic",
"p_value"
};
return std::make_shared<DataTypeTuple>(
std::move(types),
std::move(names)
);
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{
Float64 value = columns[0]->getFloat64(row_num);
UInt8 is_second = columns[1]->getUInt(row_num);
if (is_second)
this->data(place).addY(value, arena);
else
this->data(place).addX(value, arena);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
auto & a = this->data(place);
auto & b = this->data(rhs);
a.merge(b, arena);
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
this->data(place).write(buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override
{
this->data(place).read(buf, arena);
}
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
if (!this->data(place).size_x || !this->data(place).size_y)
throw Exception("Aggregate function " + getName() + " require both samples to be non empty", ErrorCodes::BAD_ARGUMENTS);
auto [u_statistic, p_value] = this->data(place).getResult(alternative, continuity_correction);
/// Because p-value is a probability.
p_value = std::min(1.0, std::max(0.0, p_value));
auto & column_tuple = assert_cast<ColumnTuple &>(to);
auto & column_stat = assert_cast<ColumnVector<Float64> &>(column_tuple.getColumn(0));
auto & column_value = assert_cast<ColumnVector<Float64> &>(column_tuple.getColumn(1));
column_stat.getData().push_back(u_statistic);
column_value.getData().push_back(p_value);
}
};
};

View File

@ -21,23 +21,10 @@ AggregateFunctionPtr createAggregateFunctionRankCorrelation(const std::string &
assertBinary(name, argument_types);
assertNoParameters(name, parameters);
AggregateFunctionPtr res;
if (isDecimal(argument_types[0]) || isDecimal(argument_types[1]))
{
if (!isNumber(argument_types[0]) || !isNumber(argument_types[1]))
throw Exception("Aggregate function " + name + " only supports numerical types", ErrorCodes::NOT_IMPLEMENTED);
}
else
{
res.reset(createWithTwoNumericTypes<AggregateFunctionRankCorrelation>(*argument_types[0], *argument_types[1], argument_types));
}
if (!res)
{
throw Exception("Aggregate function " + name + " only supports numerical types", ErrorCodes::NOT_IMPLEMENTED);
}
return res;
return std::make_shared<AggregateFunctionRankCorrelation>(argument_types);
}
}

View File

@ -1,73 +1,56 @@
#pragma once
#include <AggregateFunctions/IAggregateFunction.h>
#include <AggregateFunctions/StatCommon.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnTuple.h>
#include <Common/assert_cast.h>
#include <Common/FieldVisitors.h>
#include <Common/PODArray_fwd.h>
#include <common/types.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeTuple.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <limits>
#include <DataTypes/DataTypeArray.h>
#include <Common/ArenaAllocator.h>
#include <type_traits>
namespace DB
{
template <template <typename> class Comparator>
struct ComparePairFirst final
struct RankCorrelationData : public StatisticalSample<Float64, Float64>
{
template <typename X, typename Y>
bool operator()(const std::pair<X, Y> & lhs, const std::pair<X, Y> & rhs) const
Float64 getResult()
{
return Comparator<X>{}(lhs.first, rhs.first);
RanksArray ranks_x;
std::tie(ranks_x, std::ignore) = computeRanksAndTieCorrection(this->x);
RanksArray ranks_y;
std::tie(ranks_y, std::ignore) = computeRanksAndTieCorrection(this->y);
/// In our case sizes of both samples are equal.
const auto size = this->size_x;
/// Count d^2 sum
Float64 answer = 0;
for (size_t j = 0; j < size; ++j)
answer += (ranks_x[j] - ranks_y[j]) * (ranks_x[j] - ranks_y[j]);
answer *= 6;
answer /= size * (size * size - 1);
answer = 1 - answer;
return answer;
}
};
template <template <typename> class Comparator>
struct ComparePairSecond final
{
template <typename X, typename Y>
bool operator()(const std::pair<X, Y> & lhs, const std::pair<X, Y> & rhs) const
{
return Comparator<Y>{}(lhs.second, rhs.second);
}
};
template <typename X = Float64, typename Y = Float64>
struct AggregateFunctionRankCorrelationData final
{
size_t size_x = 0;
using Allocator = MixedAlignedArenaAllocator<alignof(std::pair<X, Y>), 4096>;
using Array = PODArray<std::pair<X, Y>, 32, Allocator>;
Array values;
};
template <typename X, typename Y>
class AggregateFunctionRankCorrelation :
public IAggregateFunctionDataHelper<AggregateFunctionRankCorrelationData<X, Y>, AggregateFunctionRankCorrelation<X, Y>>
public IAggregateFunctionDataHelper<RankCorrelationData, AggregateFunctionRankCorrelation>
{
using Data = AggregateFunctionRankCorrelationData<X, Y>;
using Allocator = MixedAlignedArenaAllocator<alignof(std::pair<Float64, Float64>), 4096>;
using Array = PODArray<std::pair<Float64, Float64>, 32, Allocator>;
public:
explicit AggregateFunctionRankCorrelation(const DataTypes & arguments)
:IAggregateFunctionDataHelper<AggregateFunctionRankCorrelationData<X, Y>,AggregateFunctionRankCorrelation<X, Y>> ({arguments}, {})
:IAggregateFunctionDataHelper<RankCorrelationData, AggregateFunctionRankCorrelation> ({arguments}, {})
{}
String getName() const override
@ -80,24 +63,12 @@ public:
return std::make_shared<DataTypeNumber<Float64>>();
}
void insert(Data & a, const std::pair<X, Y> & x, Arena * arena) const
{
++a.size_x;
a.values.push_back(x, arena);
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{
auto & a = this->data(place);
auto new_x = assert_cast<const ColumnVector<X> &>(*columns[0]).getData()[row_num];
auto new_y = assert_cast<const ColumnVector<Y> &>(*columns[1]).getData()[row_num];
auto new_arg = std::make_pair(new_x, new_y);
a.size_x += 1;
a.values.push_back(new_arg, arena);
Float64 new_x = columns[0]->getFloat64(row_num);
Float64 new_y = columns[1]->getFloat64(row_num);
this->data(place).addX(new_x, arena);
this->data(place).addY(new_y, arena);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
@ -105,116 +76,22 @@ public:
auto & a = this->data(place);
auto & b = this->data(rhs);
if (b.size_x)
for (size_t i = 0; i < b.size_x; ++i)
insert(a, b.values[i], arena);
a.merge(b, arena);
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
const auto & value = this->data(place).values;
size_t size = this->data(place).size_x;
writeVarUInt(size, buf);
buf.write(reinterpret_cast<const char *>(value.data()), size * sizeof(value[0]));
this->data(place).write(buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override
{
size_t size = 0;
readVarUInt(size, buf);
auto & value = this->data(place).values;
value.resize(size, arena);
buf.read(reinterpret_cast<char *>(value.data()), size * sizeof(value[0]));
this->data(place).read(buf, arena);
}
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena * /*arena*/) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
const auto & value = this->data(place).values;
size_t size = this->data(place).size_x;
// create a copy of values not to format data
PODArrayWithStackMemory<std::pair<Float64, Float64>, 32> tmp_values;
tmp_values.resize(size);
for (size_t j = 0; j < size; ++ j)
tmp_values[j] = static_cast<std::pair<Float64, Float64>>(value[j]);
// sort x_values
std::sort(std::begin(tmp_values), std::end(tmp_values), ComparePairFirst<std::greater>{});
for (size_t j = 0; j < size;)
{
// replace x_values with their ranks
size_t rank = j + 1;
size_t same = 1;
size_t cur_sum = rank;
size_t cur_start = j;
while (j < size - 1)
{
if (tmp_values[j].first == tmp_values[j + 1].first)
{
// rank of (j + 1)th number
rank += 1;
++same;
cur_sum += rank;
++j;
}
else
break;
}
// insert rank is calculated as average of ranks of equal values
Float64 insert_rank = static_cast<Float64>(cur_sum) / same;
for (size_t i = cur_start; i <= j; ++i)
tmp_values[i].first = insert_rank;
++j;
}
// sort y_values
std::sort(std::begin(tmp_values), std::end(tmp_values), ComparePairSecond<std::greater>{});
// replace y_values with their ranks
for (size_t j = 0; j < size;)
{
// replace x_values with their ranks
size_t rank = j + 1;
size_t same = 1;
size_t cur_sum = rank;
size_t cur_start = j;
while (j < size - 1)
{
if (tmp_values[j].second == tmp_values[j + 1].second)
{
// rank of (j + 1)th number
rank += 1;
++same;
cur_sum += rank;
++j;
}
else
{
break;
}
}
// insert rank is calculated as average of ranks of equal values
Float64 insert_rank = static_cast<Float64>(cur_sum) / same;
for (size_t i = cur_start; i <= j; ++i)
tmp_values[i].second = insert_rank;
++j;
}
// count d^2 sum
Float64 answer = static_cast<Float64>(0);
for (size_t j = 0; j < size; ++ j)
answer += (tmp_values[j].first - tmp_values[j].second) * (tmp_values[j].first - tmp_values[j].second);
answer *= 6;
answer /= size * (size * size - 1);
answer = 1 - answer;
auto answer = this->data(place).getResult();
auto & column = static_cast<ColumnVector<Float64> &>(to);
column.getData().push_back(answer);

View File

@ -8,6 +8,7 @@
#include <IO/ReadHelpers.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <AggregateFunctions/Moments.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypesDecimal.h>
@ -30,310 +31,6 @@
namespace DB
{
namespace ErrorCodes
{
extern const int DECIMAL_OVERFLOW;
}
/**
Calculating univariate central moments
Levels:
level 2 (pop & samp): var, stddev
level 3: skewness
level 4: kurtosis
References:
https://en.wikipedia.org/wiki/Moment_(mathematics)
https://en.wikipedia.org/wiki/Skewness
https://en.wikipedia.org/wiki/Kurtosis
*/
template <typename T, size_t _level>
struct VarMoments
{
T m[_level + 1]{};
void add(T x)
{
++m[0];
m[1] += x;
m[2] += x * x;
if constexpr (_level >= 3) m[3] += x * x * x;
if constexpr (_level >= 4) m[4] += x * x * x * x;
}
void merge(const VarMoments & rhs)
{
m[0] += rhs.m[0];
m[1] += rhs.m[1];
m[2] += rhs.m[2];
if constexpr (_level >= 3) m[3] += rhs.m[3];
if constexpr (_level >= 4) m[4] += rhs.m[4];
}
void write(WriteBuffer & buf) const
{
writePODBinary(*this, buf);
}
void read(ReadBuffer & buf)
{
readPODBinary(*this, buf);
}
T getPopulation() const
{
if (m[0] == 0)
return std::numeric_limits<T>::quiet_NaN();
/// Due to numerical errors, the result can be slightly less than zero,
/// but it should be impossible. Trim to zero.
return std::max(T{}, (m[2] - m[1] * m[1] / m[0]) / m[0]);
}
T getSample() const
{
if (m[0] <= 1)
return std::numeric_limits<T>::quiet_NaN();
return std::max(T{}, (m[2] - m[1] * m[1] / m[0]) / (m[0] - 1));
}
T getMoment3() const
{
if (m[0] == 0)
return std::numeric_limits<T>::quiet_NaN();
// to avoid accuracy problem
if (m[0] == 1)
return 0;
return (m[3]
- (3 * m[2]
- 2 * m[1] * m[1] / m[0]
) * m[1] / m[0]
) / m[0];
}
T getMoment4() const
{
if (m[0] == 0)
return std::numeric_limits<T>::quiet_NaN();
// to avoid accuracy problem
if (m[0] == 1)
return 0;
return (m[4]
- (4 * m[3]
- (6 * m[2]
- 3 * m[1] * m[1] / m[0]
) * m[1] / m[0]
) * m[1] / m[0]
) / m[0];
}
};
template <typename T, size_t _level>
class VarMomentsDecimal
{
public:
using NativeType = typename T::NativeType;
void add(NativeType x)
{
++m0;
getM(1) += x;
NativeType tmp;
bool overflow = common::mulOverflow(x, x, tmp) || common::addOverflow(getM(2), tmp, getM(2));
if constexpr (_level >= 3)
overflow = overflow || common::mulOverflow(tmp, x, tmp) || common::addOverflow(getM(3), tmp, getM(3));
if constexpr (_level >= 4)
overflow = overflow || common::mulOverflow(tmp, x, tmp) || common::addOverflow(getM(4), tmp, getM(4));
if (overflow)
throw Exception("Decimal math overflow", ErrorCodes::DECIMAL_OVERFLOW);
}
void merge(const VarMomentsDecimal & rhs)
{
m0 += rhs.m0;
getM(1) += rhs.getM(1);
bool overflow = common::addOverflow(getM(2), rhs.getM(2), getM(2));
if constexpr (_level >= 3)
overflow = overflow || common::addOverflow(getM(3), rhs.getM(3), getM(3));
if constexpr (_level >= 4)
overflow = overflow || common::addOverflow(getM(4), rhs.getM(4), getM(4));
if (overflow)
throw Exception("Decimal math overflow", ErrorCodes::DECIMAL_OVERFLOW);
}
void write(WriteBuffer & buf) const { writePODBinary(*this, buf); }
void read(ReadBuffer & buf) { readPODBinary(*this, buf); }
Float64 getPopulation(UInt32 scale) const
{
if (m0 == 0)
return std::numeric_limits<Float64>::infinity();
NativeType tmp;
if (common::mulOverflow(getM(1), getM(1), tmp) ||
common::subOverflow(getM(2), NativeType(tmp / m0), tmp))
throw Exception("Decimal math overflow", ErrorCodes::DECIMAL_OVERFLOW);
return std::max(Float64{}, DecimalUtils::convertTo<Float64>(T(tmp / m0), scale));
}
Float64 getSample(UInt32 scale) const
{
if (m0 == 0)
return std::numeric_limits<Float64>::quiet_NaN();
if (m0 == 1)
return std::numeric_limits<Float64>::infinity();
NativeType tmp;
if (common::mulOverflow(getM(1), getM(1), tmp) ||
common::subOverflow(getM(2), NativeType(tmp / m0), tmp))
throw Exception("Decimal math overflow", ErrorCodes::DECIMAL_OVERFLOW);
return std::max(Float64{}, DecimalUtils::convertTo<Float64>(T(tmp / (m0 - 1)), scale));
}
Float64 getMoment3(UInt32 scale) const
{
if (m0 == 0)
return std::numeric_limits<Float64>::infinity();
NativeType tmp;
if (common::mulOverflow(2 * getM(1), getM(1), tmp) ||
common::subOverflow(3 * getM(2), NativeType(tmp / m0), tmp) ||
common::mulOverflow(tmp, getM(1), tmp) ||
common::subOverflow(getM(3), NativeType(tmp / m0), tmp))
throw Exception("Decimal math overflow", ErrorCodes::DECIMAL_OVERFLOW);
return DecimalUtils::convertTo<Float64>(T(tmp / m0), scale);
}
Float64 getMoment4(UInt32 scale) const
{
if (m0 == 0)
return std::numeric_limits<Float64>::infinity();
NativeType tmp;
if (common::mulOverflow(3 * getM(1), getM(1), tmp) ||
common::subOverflow(6 * getM(2), NativeType(tmp / m0), tmp) ||
common::mulOverflow(tmp, getM(1), tmp) ||
common::subOverflow(4 * getM(3), NativeType(tmp / m0), tmp) ||
common::mulOverflow(tmp, getM(1), tmp) ||
common::subOverflow(getM(4), NativeType(tmp / m0), tmp))
throw Exception("Decimal math overflow", ErrorCodes::DECIMAL_OVERFLOW);
return DecimalUtils::convertTo<Float64>(T(tmp / m0), scale);
}
private:
UInt64 m0{};
NativeType m[_level]{};
NativeType & getM(size_t i) { return m[i - 1]; }
const NativeType & getM(size_t i) const { return m[i - 1]; }
};
/**
Calculating multivariate central moments
Levels:
level 2 (pop & samp): covar
References:
https://en.wikipedia.org/wiki/Moment_(mathematics)
*/
template <typename T>
struct CovarMoments
{
T m0{};
T x1{};
T y1{};
T xy{};
void add(T x, T y)
{
++m0;
x1 += x;
y1 += y;
xy += x * y;
}
void merge(const CovarMoments & rhs)
{
m0 += rhs.m0;
x1 += rhs.x1;
y1 += rhs.y1;
xy += rhs.xy;
}
void write(WriteBuffer & buf) const
{
writePODBinary(*this, buf);
}
void read(ReadBuffer & buf)
{
readPODBinary(*this, buf);
}
T NO_SANITIZE_UNDEFINED getPopulation() const
{
return (xy - x1 * y1 / m0) / m0;
}
T NO_SANITIZE_UNDEFINED getSample() const
{
if (m0 == 0)
return std::numeric_limits<T>::quiet_NaN();
return (xy - x1 * y1 / m0) / (m0 - 1);
}
};
template <typename T>
struct CorrMoments
{
T m0{};
T x1{};
T y1{};
T xy{};
T x2{};
T y2{};
void add(T x, T y)
{
++m0;
x1 += x;
y1 += y;
xy += x * y;
x2 += x * x;
y2 += y * y;
}
void merge(const CorrMoments & rhs)
{
m0 += rhs.m0;
x1 += rhs.x1;
y1 += rhs.y1;
xy += rhs.xy;
x2 += rhs.x2;
y2 += rhs.y2;
}
void write(WriteBuffer & buf) const
{
writePODBinary(*this, buf);
}
void read(ReadBuffer & buf)
{
readPODBinary(*this, buf);
}
T NO_SANITIZE_UNDEFINED get() const
{
return (m0 * xy - x1 * y1) / sqrt((m0 * x2 - x1 * x1) * (m0 * y2 - y1 * y1));
}
};
enum class StatisticsFunctionKind
{
varPop, varSamp,

View File

@ -0,0 +1,77 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionTTest.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include <AggregateFunctions/Moments.h>
#include "registerAggregateFunctions.h"
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
namespace DB
{
namespace
{
/** Student T-test applies to two samples of independent random variables
* that have normal distributions with equal (but unknown) variances.
* It allows to answer the question whether means of the distributions differ.
*
* If variances are not considered equal, Welch T-test should be used instead.
*/
struct StudentTTestData : public TTestMoments<Float64>
{
static constexpr auto name = "studentTTest";
std::pair<Float64, Float64> getResult() const
{
Float64 mean_x = x1 / nx;
Float64 mean_y = y1 / ny;
/// To estimate the variance we first estimate two means.
/// That's why the number of degrees of freedom is the total number of values of both samples minus 2.
Float64 degrees_of_freedom = nx + ny - 2;
/// Calculate s^2
/// The original formulae looks like
/// \frac{\sum_{i = 1}^{n_x}{(x_i - \bar{x}) ^ 2} + \sum_{i = 1}^{n_y}{(y_i - \bar{y}) ^ 2}}{n_x + n_y - 2}
/// But we made some mathematical transformations not to store original sequences.
/// Also we dropped sqrt, because later it will be squared later.
Float64 all_x = x2 + nx * mean_x * mean_x - 2 * mean_x * x1;
Float64 all_y = y2 + ny * mean_y * mean_y - 2 * mean_y * y1;
Float64 s2 = (all_x + all_y) / degrees_of_freedom;
Float64 std_err2 = s2 * (1. / nx + 1. / ny);
/// t-statistic
Float64 t_stat = (mean_x - mean_y) / sqrt(std_err2);
return {t_stat, getPValue(degrees_of_freedom, t_stat * t_stat)};
}
};
AggregateFunctionPtr createAggregateFunctionStudentTTest(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
assertBinary(name, argument_types);
assertNoParameters(name, parameters);
if (!isNumber(argument_types[0]) || !isNumber(argument_types[1]))
throw Exception("Aggregate function " + name + " only supports numerical types", ErrorCodes::BAD_ARGUMENTS);
return std::make_shared<AggregateFunctionTTest<StudentTTestData>>(argument_types);
}
}
void registerAggregateFunctionStudentTTest(AggregateFunctionFactory & factory)
{
factory.registerFunction("studentTTest", createAggregateFunctionStudentTTest);
}
}

View File

@ -0,0 +1,154 @@
#pragma once
#include <AggregateFunctions/IAggregateFunction.h>
#include <AggregateFunctions/StatCommon.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnTuple.h>
#include <Common/assert_cast.h>
#include <Core/Types.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeTuple.h>
#include <cmath>
/// This function is used in implementations of different T-Tests.
/// On Darwin it's unavailable in math.h but actually exists in the library (can be linked successfully).
#if defined(OS_DARWIN)
extern "C"
{
double lgamma_r(double x, int * signgamp);
}
#endif
namespace DB
{
class ReadBuffer;
class WriteBuffer;
/**
* If you have a cumulative distribution function F, then calculating the p-value for given statistic T is simply 1F(T)
* In our case p-value is two-sided, so we multiply it by 2.
* So cumulative distribution function F equals to
* \[ F(t) = \int_{-\infty}^{t} f(u)du = 1 - \frac{1}{2} I_{x(t)}(\frac{v}{2}, \frac{1}{2}) \]
* where \[ x(t) = \frac{v}{t^2 + v} \]: https://en.wikipedia.org/wiki/Student%27s_t-distribution#Cumulative_distribution_function
*
* so our resulting \[ p-value = I_{x(t)}(\frac{v}{2}, \frac{1}{2}) \].
*
* And I is regularized incomplete beta function: https://en.wikipedia.org/wiki/Beta_function#Incomplete_beta_function
*
* Keepenig in mind that \[ \mathrm {B} (x;a,b)=\int _{0}^{x}r^{a-1}\,(1-r)^{b-1}\,\mathrm {d} r.\! \]
* and
* \[ \mathrm {B} (x,y)={\dfrac {\Gamma (x)\,\Gamma (y)}{\Gamma (x+y)}}=\
* \exp(\ln {\dfrac {\Gamma (x)\,\Gamma (y)}{\Gamma (x+y)}})=\exp((\ln(\Gamma (x))+\ln(\Gamma (y))-\ln(\Gamma (x+y))) \]
*
* p-value can be calculated in terms of gamma functions and integrals more simply:
* \[ {\frac {\int _{0}^{\frac {\nu }{t^{2}+\nu }}r^{{\frac {\nu }{2}}-1}\,(1-r)^{-0.5}\,\mathrm {d} r}\
* {\exp((\ln(\Gamma ({\frac {\nu }{2}}))+\ln(\Gamma (0.5))-\ln(\Gamma ({\frac {\nu }{2}}+0.5)))}} \]
*
* which simplifies to:
*
* \[ {\frac {\int _{0}^{\frac {\nu }{t^{2}+\nu }}{\frac {r^{{\frac {\nu }{2}}-1}}{\sqrt {1-r}}}\,\mathrm {d} r}\
* {\exp((\ln(\Gamma ({\frac {\nu }{2}}))+\ln(\Gamma (0.5))-\ln(\Gamma ({\frac {\nu }{2}}+0.5)))}} \]
*
* Read here for details https://rosettacode.org/wiki/Welch%27s_t-test#
*
* Both WelchTTest and StudentTTest have t-statistric with Student distribution but with different degrees of freedom.
* So the procedure of computing p-value is the same.
*/
static inline Float64 getPValue(Float64 degrees_of_freedom, Float64 t_stat2)
{
Float64 numerator = integrateSimpson(0, degrees_of_freedom / (t_stat2 + degrees_of_freedom),
[degrees_of_freedom](double x) { return std::pow(x, degrees_of_freedom / 2 - 1) / std::sqrt(1 - x); });
int unused;
Float64 denominator = std::exp(
lgamma_r(degrees_of_freedom / 2, &unused)
+ lgamma_r(0.5, &unused)
- lgamma_r(degrees_of_freedom / 2 + 0.5, &unused));
return std::min(1.0, std::max(0.0, numerator / denominator));
}
/// Returns tuple of (t-statistic, p-value)
/// https://cpb-us-w2.wpmucdn.com/voices.uchicago.edu/dist/9/1193/files/2016/01/05b-TandP.pdf
template <typename Data>
class AggregateFunctionTTest :
public IAggregateFunctionDataHelper<Data, AggregateFunctionTTest<Data>>
{
public:
AggregateFunctionTTest(const DataTypes & arguments)
: IAggregateFunctionDataHelper<Data, AggregateFunctionTTest<Data>>({arguments}, {})
{
}
String getName() const override
{
return Data::name;
}
DataTypePtr getReturnType() const override
{
DataTypes types
{
std::make_shared<DataTypeNumber<Float64>>(),
std::make_shared<DataTypeNumber<Float64>>(),
};
Strings names
{
"t_statistic",
"p_value"
};
return std::make_shared<DataTypeTuple>(
std::move(types),
std::move(names)
);
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
{
Float64 value = columns[0]->getFloat64(row_num);
UInt8 is_second = columns[1]->getUInt(row_num);
if (is_second)
this->data(place).addY(value);
else
this->data(place).addX(value);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
{
this->data(place).merge(this->data(rhs));
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
this->data(place).write(buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
this->data(place).read(buf);
}
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
auto [t_statistic, p_value] = this->data(place).getResult();
/// Because p-value is a probability.
p_value = std::min(1.0, std::max(0.0, p_value));
auto & column_tuple = assert_cast<ColumnTuple &>(to);
auto & column_stat = assert_cast<ColumnVector<Float64> &>(column_tuple.getColumn(0));
auto & column_value = assert_cast<ColumnVector<Float64> &>(column_tuple.getColumn(1));
column_stat.getData().push_back(t_statistic);
column_value.getData().push_back(p_value);
}
};
};

View File

@ -0,0 +1,74 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionTTest.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include <AggregateFunctions/Moments.h>
#include "registerAggregateFunctions.h"
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
namespace DB
{
namespace
{
struct WelchTTestData : public TTestMoments<Float64>
{
static constexpr auto name = "welchTTest";
std::pair<Float64, Float64> getResult() const
{
Float64 mean_x = x1 / nx;
Float64 mean_y = y1 / ny;
/// s_x^2, s_y^2
/// The original formulae looks like \frac{1}{size_x - 1} \sum_{i = 1}^{size_x}{(x_i - \bar{x}) ^ 2}
/// But we made some mathematical transformations not to store original sequences.
/// Also we dropped sqrt, because later it will be squared later.
Float64 sx2 = (x2 + nx * mean_x * mean_x - 2 * mean_x * x1) / (nx - 1);
Float64 sy2 = (y2 + ny * mean_y * mean_y - 2 * mean_y * y1) / (ny - 1);
/// t-statistic
Float64 t_stat = (mean_x - mean_y) / sqrt(sx2 / nx + sy2 / ny);
/// degrees of freedom
Float64 numerator_sqrt = sx2 / nx + sy2 / ny;
Float64 numerator = numerator_sqrt * numerator_sqrt;
Float64 denominator_x = sx2 * sx2 / (nx * nx * (nx - 1));
Float64 denominator_y = sy2 * sy2 / (ny * ny * (ny - 1));
Float64 degrees_of_freedom = numerator / (denominator_x + denominator_y);
return {t_stat, getPValue(degrees_of_freedom, t_stat * t_stat)};
}
};
AggregateFunctionPtr createAggregateFunctionWelchTTest(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
assertBinary(name, argument_types);
assertNoParameters(name, parameters);
if (!isNumber(argument_types[0]) || !isNumber(argument_types[1]))
throw Exception("Aggregate function " + name + " only supports numerical types", ErrorCodes::BAD_ARGUMENTS);
return std::make_shared<AggregateFunctionTTest<WelchTTestData>>(argument_types);
}
}
void registerAggregateFunctionWelchTTest(AggregateFunctionFactory & factory)
{
factory.registerFunction("welchTTest", createAggregateFunctionWelchTTest);
}
}

View File

@ -0,0 +1,361 @@
#pragma once
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int DECIMAL_OVERFLOW;
}
/**
Calculating univariate central moments
Levels:
level 2 (pop & samp): var, stddev
level 3: skewness
level 4: kurtosis
References:
https://en.wikipedia.org/wiki/Moment_(mathematics)
https://en.wikipedia.org/wiki/Skewness
https://en.wikipedia.org/wiki/Kurtosis
*/
template <typename T, size_t _level>
struct VarMoments
{
T m[_level + 1]{};
void add(T x)
{
++m[0];
m[1] += x;
m[2] += x * x;
if constexpr (_level >= 3) m[3] += x * x * x;
if constexpr (_level >= 4) m[4] += x * x * x * x;
}
void merge(const VarMoments & rhs)
{
m[0] += rhs.m[0];
m[1] += rhs.m[1];
m[2] += rhs.m[2];
if constexpr (_level >= 3) m[3] += rhs.m[3];
if constexpr (_level >= 4) m[4] += rhs.m[4];
}
void write(WriteBuffer & buf) const
{
writePODBinary(*this, buf);
}
void read(ReadBuffer & buf)
{
readPODBinary(*this, buf);
}
T getPopulation() const
{
if (m[0] == 0)
return std::numeric_limits<T>::quiet_NaN();
/// Due to numerical errors, the result can be slightly less than zero,
/// but it should be impossible. Trim to zero.
return std::max(T{}, (m[2] - m[1] * m[1] / m[0]) / m[0]);
}
T getSample() const
{
if (m[0] <= 1)
return std::numeric_limits<T>::quiet_NaN();
return std::max(T{}, (m[2] - m[1] * m[1] / m[0]) / (m[0] - 1));
}
T getMoment3() const
{
if (m[0] == 0)
return std::numeric_limits<T>::quiet_NaN();
// to avoid accuracy problem
if (m[0] == 1)
return 0;
/// \[ \frac{1}{m_0} (m_3 - (3 * m_2 - \frac{2 * {m_1}^2}{m_0}) * \frac{m_1}{m_0});\]
return (m[3]
- (3 * m[2]
- 2 * m[1] * m[1] / m[0]
) * m[1] / m[0]
) / m[0];
}
T getMoment4() const
{
if (m[0] == 0)
return std::numeric_limits<T>::quiet_NaN();
// to avoid accuracy problem
if (m[0] == 1)
return 0;
/// \[ \frac{1}{m_0}(m_4 - (4 * m_3 - (6 * m_2 - \frac{3 * m_1^2}{m_0} ) \frac{m_1}{m_0})\frac{m_1}{m_0})\]
return (m[4]
- (4 * m[3]
- (6 * m[2]
- 3 * m[1] * m[1] / m[0]
) * m[1] / m[0]
) * m[1] / m[0]
) / m[0];
}
};
template <typename T, size_t _level>
class VarMomentsDecimal
{
public:
using NativeType = typename T::NativeType;
void add(NativeType x)
{
++m0;
getM(1) += x;
NativeType tmp;
bool overflow = common::mulOverflow(x, x, tmp) || common::addOverflow(getM(2), tmp, getM(2));
if constexpr (_level >= 3)
overflow = overflow || common::mulOverflow(tmp, x, tmp) || common::addOverflow(getM(3), tmp, getM(3));
if constexpr (_level >= 4)
overflow = overflow || common::mulOverflow(tmp, x, tmp) || common::addOverflow(getM(4), tmp, getM(4));
if (overflow)
throw Exception("Decimal math overflow", ErrorCodes::DECIMAL_OVERFLOW);
}
void merge(const VarMomentsDecimal & rhs)
{
m0 += rhs.m0;
getM(1) += rhs.getM(1);
bool overflow = common::addOverflow(getM(2), rhs.getM(2), getM(2));
if constexpr (_level >= 3)
overflow = overflow || common::addOverflow(getM(3), rhs.getM(3), getM(3));
if constexpr (_level >= 4)
overflow = overflow || common::addOverflow(getM(4), rhs.getM(4), getM(4));
if (overflow)
throw Exception("Decimal math overflow", ErrorCodes::DECIMAL_OVERFLOW);
}
void write(WriteBuffer & buf) const { writePODBinary(*this, buf); }
void read(ReadBuffer & buf) { readPODBinary(*this, buf); }
Float64 getPopulation(UInt32 scale) const
{
if (m0 == 0)
return std::numeric_limits<Float64>::infinity();
NativeType tmp;
if (common::mulOverflow(getM(1), getM(1), tmp) ||
common::subOverflow(getM(2), NativeType(tmp / m0), tmp))
throw Exception("Decimal math overflow", ErrorCodes::DECIMAL_OVERFLOW);
return std::max(Float64{}, DecimalUtils::convertTo<Float64>(T(tmp / m0), scale));
}
Float64 getSample(UInt32 scale) const
{
if (m0 == 0)
return std::numeric_limits<Float64>::quiet_NaN();
if (m0 == 1)
return std::numeric_limits<Float64>::infinity();
NativeType tmp;
if (common::mulOverflow(getM(1), getM(1), tmp) ||
common::subOverflow(getM(2), NativeType(tmp / m0), tmp))
throw Exception("Decimal math overflow", ErrorCodes::DECIMAL_OVERFLOW);
return std::max(Float64{}, DecimalUtils::convertTo<Float64>(T(tmp / (m0 - 1)), scale));
}
Float64 getMoment3(UInt32 scale) const
{
if (m0 == 0)
return std::numeric_limits<Float64>::infinity();
NativeType tmp;
if (common::mulOverflow(2 * getM(1), getM(1), tmp) ||
common::subOverflow(3 * getM(2), NativeType(tmp / m0), tmp) ||
common::mulOverflow(tmp, getM(1), tmp) ||
common::subOverflow(getM(3), NativeType(tmp / m0), tmp))
throw Exception("Decimal math overflow", ErrorCodes::DECIMAL_OVERFLOW);
return DecimalUtils::convertTo<Float64>(T(tmp / m0), scale);
}
Float64 getMoment4(UInt32 scale) const
{
if (m0 == 0)
return std::numeric_limits<Float64>::infinity();
NativeType tmp;
if (common::mulOverflow(3 * getM(1), getM(1), tmp) ||
common::subOverflow(6 * getM(2), NativeType(tmp / m0), tmp) ||
common::mulOverflow(tmp, getM(1), tmp) ||
common::subOverflow(4 * getM(3), NativeType(tmp / m0), tmp) ||
common::mulOverflow(tmp, getM(1), tmp) ||
common::subOverflow(getM(4), NativeType(tmp / m0), tmp))
throw Exception("Decimal math overflow", ErrorCodes::DECIMAL_OVERFLOW);
return DecimalUtils::convertTo<Float64>(T(tmp / m0), scale);
}
private:
UInt64 m0{};
NativeType m[_level]{};
NativeType & getM(size_t i) { return m[i - 1]; }
const NativeType & getM(size_t i) const { return m[i - 1]; }
};
/**
Calculating multivariate central moments
Levels:
level 2 (pop & samp): covar
References:
https://en.wikipedia.org/wiki/Moment_(mathematics)
*/
template <typename T>
struct CovarMoments
{
T m0{};
T x1{};
T y1{};
T xy{};
void add(T x, T y)
{
++m0;
x1 += x;
y1 += y;
xy += x * y;
}
void merge(const CovarMoments & rhs)
{
m0 += rhs.m0;
x1 += rhs.x1;
y1 += rhs.y1;
xy += rhs.xy;
}
void write(WriteBuffer & buf) const
{
writePODBinary(*this, buf);
}
void read(ReadBuffer & buf)
{
readPODBinary(*this, buf);
}
T NO_SANITIZE_UNDEFINED getPopulation() const
{
return (xy - x1 * y1 / m0) / m0;
}
T NO_SANITIZE_UNDEFINED getSample() const
{
if (m0 == 0)
return std::numeric_limits<T>::quiet_NaN();
return (xy - x1 * y1 / m0) / (m0 - 1);
}
};
template <typename T>
struct CorrMoments
{
T m0{};
T x1{};
T y1{};
T xy{};
T x2{};
T y2{};
void add(T x, T y)
{
++m0;
x1 += x;
y1 += y;
xy += x * y;
x2 += x * x;
y2 += y * y;
}
void merge(const CorrMoments & rhs)
{
m0 += rhs.m0;
x1 += rhs.x1;
y1 += rhs.y1;
xy += rhs.xy;
x2 += rhs.x2;
y2 += rhs.y2;
}
void write(WriteBuffer & buf) const
{
writePODBinary(*this, buf);
}
void read(ReadBuffer & buf)
{
readPODBinary(*this, buf);
}
T NO_SANITIZE_UNDEFINED get() const
{
return (m0 * xy - x1 * y1) / sqrt((m0 * x2 - x1 * x1) * (m0 * y2 - y1 * y1));
}
};
/// Data for calculation of Student and Welch T-Tests.
template <typename T>
struct TTestMoments
{
T nx{};
T ny{};
T x1{};
T y1{};
T x2{};
T y2{};
void addX(T value)
{
++nx;
x1 += value;
x2 += value * value;
}
void addY(T value)
{
++ny;
y1 += value;
y2 += value * value;
}
void merge(const TTestMoments & rhs)
{
nx += rhs.nx;
ny += rhs.ny;
x1 += rhs.x1;
y1 += rhs.y1;
x2 += rhs.x2;
y2 += rhs.y2;
}
void write(WriteBuffer & buf) const
{
writePODBinary(*this, buf);
}
void read(ReadBuffer & buf)
{
readPODBinary(*this, buf);
}
};
}

View File

@ -0,0 +1,114 @@
#pragma once
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Common/ArenaAllocator.h>
#include <numeric>
#include <algorithm>
#include <utility>
namespace DB
{
template <typename F>
static Float64 integrateSimpson(Float64 a, Float64 b, F && func)
{
const size_t iterations = std::max(1e6, 1e4 * std::abs(std::round(b) - std::round(a)));
const long double h = (b - a) / iterations;
Float64 sum_odds = 0.0;
for (size_t i = 1; i < iterations; i += 2)
sum_odds += func(a + i * h);
Float64 sum_evens = 0.0;
for (size_t i = 2; i < iterations; i += 2)
sum_evens += func(a + i * h);
return (func(a) + func(b) + 2 * sum_evens + 4 * sum_odds) * h / 3;
}
/// Because ranks are adjusted, we have to store each of them in Float type.
using RanksArray = std::vector<Float64>;
template <typename Values>
std::pair<RanksArray, Float64> computeRanksAndTieCorrection(const Values & values)
{
const size_t size = values.size();
/// Save initial positions, than sort indices according to the values.
std::vector<size_t> indexes(size);
std::iota(indexes.begin(), indexes.end(), 0);
std::sort(indexes.begin(), indexes.end(),
[&] (size_t lhs, size_t rhs) { return values[lhs] < values[rhs]; });
size_t left = 0;
Float64 tie_numenator = 0;
RanksArray out(size);
while (left < size)
{
size_t right = left;
while (right < size && values[indexes[left]] == values[indexes[right]])
++right;
auto adjusted = (left + right + 1.) / 2.;
auto count_equal = right - left;
tie_numenator += std::pow(count_equal, 3) - count_equal;
for (size_t iter = left; iter < right; ++iter)
out[indexes[iter]] = adjusted;
left = right;
}
return {out, 1 - (tie_numenator / (std::pow(size, 3) - size))};
}
template <typename X, typename Y>
struct StatisticalSample
{
using AllocatorXSample = MixedAlignedArenaAllocator<alignof(X), 4096>;
using SampleX = PODArray<X, 32, AllocatorXSample>;
using AllocatorYSample = MixedAlignedArenaAllocator<alignof(Y), 4096>;
using SampleY = PODArray<Y, 32, AllocatorYSample>;
SampleX x{};
SampleY y{};
size_t size_x{0};
size_t size_y{0};
void addX(X value, Arena * arena)
{
++size_x;
x.push_back(value, arena);
}
void addY(Y value, Arena * arena)
{
++size_y;
y.push_back(value, arena);
}
void merge(const StatisticalSample & rhs, Arena * arena)
{
size_x += rhs.size_x;
size_y += rhs.size_y;
x.insert(rhs.x.begin(), rhs.x.end(), arena);
y.insert(rhs.y.begin(), rhs.y.end(), arena);
}
void write(WriteBuffer & buf) const
{
writeVarUInt(size_x, buf);
writeVarUInt(size_y, buf);
buf.write(reinterpret_cast<const char *>(x.data()), size_x * sizeof(x[0]));
buf.write(reinterpret_cast<const char *>(y.data()), size_y * sizeof(y[0]));
}
void read(ReadBuffer & buf, Arena * arena)
{
readVarUInt(size_x, buf);
readVarUInt(size_y, buf);
x.resize(size_x, arena);
y.resize(size_y, arena);
buf.read(reinterpret_cast<char *>(x.data()), size_x * sizeof(x[0]));
buf.read(reinterpret_cast<char *>(y.data()), size_y * sizeof(y[0]));
}
};
}

View File

@ -39,9 +39,10 @@ void registerAggregateFunctionSimpleLinearRegression(AggregateFunctionFactory &)
void registerAggregateFunctionMoving(AggregateFunctionFactory &);
void registerAggregateFunctionCategoricalIV(AggregateFunctionFactory &);
void registerAggregateFunctionAggThrow(AggregateFunctionFactory &);
void registerAggregateFunctionRankCorrelation(AggregateFunctionFactory &);
void registerAggregateFunctionMannWhitney(AggregateFunctionFactory &);
void registerAggregateFunctionWelchTTest(AggregateFunctionFactory &);
void registerAggregateFunctionStudentTTest(AggregateFunctionFactory &);
void registerAggregateFunctionRankCorrelation(AggregateFunctionFactory &);
class AggregateFunctionCombinatorFactory;
void registerAggregateFunctionCombinatorIf(AggregateFunctionCombinatorFactory &);
@ -94,6 +95,9 @@ void registerAggregateFunctions()
registerAggregateFunctionCategoricalIV(factory);
registerAggregateFunctionAggThrow(factory);
registerAggregateFunctionRankCorrelation(factory);
registerAggregateFunctionMannWhitney(factory);
registerAggregateFunctionWelchTTest(factory);
registerAggregateFunctionStudentTTest(factory);
}
{

View File

@ -0,0 +1,27 @@
#include <IO/WriteBufferFromString.h>
#include <IO/ReadBufferFromString.h>
#include <Common/PODArray.h>
#include <AggregateFunctions/StatCommon.h>
#include <iostream>
#include <gtest/gtest.h>
TEST(Ranks, Simple)
{
using namespace DB;
RanksArray sample = {310, 195, 480, 530, 155, 530, 245, 385, 450, 450, 465, 545, 170, 180, 125, 180, 230, 170, 75, 430, 480, 495, 295};
RanksArray ranks;
Float64 t = 0;
std::tie(ranks, t) = computeRanksAndTieCorrection(sample);
RanksArray expected{12.0, 8.0, 18.5, 21.5, 3.0, 21.5, 10.0, 13.0, 15.5, 15.5, 17.0, 23.0, 4.5, 6.5, 2.0, 6.5, 9.0, 4.5, 1.0, 14.0, 18.5, 20.0, 11.0};
ASSERT_EQ(ranks.size(), expected.size());
for (size_t i = 0; i < ranks.size(); ++i)
ASSERT_DOUBLE_EQ(ranks[i], expected[i]);
ASSERT_DOUBLE_EQ(t, 0.9975296442687747);
}

View File

@ -29,6 +29,7 @@ SRCS(
AggregateFunctionHistogram.cpp
AggregateFunctionIf.cpp
AggregateFunctionMLMethod.cpp
AggregateFunctionMannWhitney.cpp
AggregateFunctionMaxIntersections.cpp
AggregateFunctionMerge.cpp
AggregateFunctionMinMaxAny.cpp
@ -43,6 +44,7 @@ SRCS(
AggregateFunctionState.cpp
AggregateFunctionStatistics.cpp
AggregateFunctionStatisticsSimple.cpp
AggregateFunctionStudentTTest.cpp
AggregateFunctionSum.cpp
AggregateFunctionSumMap.cpp
AggregateFunctionTimeSeriesGroupSum.cpp
@ -50,6 +52,7 @@ SRCS(
AggregateFunctionUniq.cpp
AggregateFunctionUniqCombined.cpp
AggregateFunctionUniqUpTo.cpp
AggregateFunctionWelchTTest.cpp
AggregateFunctionWindowFunnel.cpp
UniqCombinedBiasData.cpp
UniqVariadicHash.cpp

View File

@ -7,7 +7,7 @@ SELECT '1';
SELECT rankCorr(number, number) FROM numbers(100);
SELECT '-1';
SELECT rankCorr(number, -1 * number) FROM numbers(100);
SELECT rankCorr(number, -1 * CAST(number AS Int64)) FROM numbers(100);
SELECT '-0.037';
SELECT roundBankers(rankCorr(exp(number), sin(number)), 3) FROM numbers(100);

View File

@ -0,0 +1,14 @@
0.021378001462867
0.0213780014628671
0.090773324285671
0.0907733242891952
0.00339907162713746
0.0033990715715539
-0.5028215369186904 0.6152361677168877
-0.5028215369187079 0.6152361677171103
14.971190998235835 5.898143508382202e-44
14.971190998235837 0
-2.610898982580138 0.00916587538237954
-2.610898982580134 0.0091658753823834
-28.740781574102936 7.667329672103986e-133
-28.74078157410298 0

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,75 @@
#!/usr/bin/env python3
import os
import sys
from scipy import stats
import pandas as pd
import numpy as np
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from pure_http_client import ClickHouseClient
def test_and_check(name, a, b, t_stat, p_value, precision=1e-2):
client = ClickHouseClient()
client.query("DROP TABLE IF EXISTS ttest;")
client.query("CREATE TABLE ttest (left Float64, right UInt8) ENGINE = Memory;");
client.query("INSERT INTO ttest VALUES {};".format(", ".join(['({},{})'.format(i, 0) for i in a])))
client.query("INSERT INTO ttest VALUES {};".format(", ".join(['({},{})'.format(j, 1) for j in b])))
real = client.query_return_df(
"SELECT roundBankers({}(left, right).1, 16) as t_stat, ".format(name) +
"roundBankers({}(left, right).2, 16) as p_value ".format(name) +
"FROM ttest FORMAT TabSeparatedWithNames;")
real_t_stat = real['t_stat'][0]
real_p_value = real['p_value'][0]
assert(abs(real_t_stat - np.float64(t_stat)) < precision), "clickhouse_t_stat {}, scipy_t_stat {}".format(real_t_stat, t_stat)
assert(abs(real_p_value - np.float64(p_value)) < precision), "clickhouse_p_value {}, scipy_p_value {}".format(real_p_value, p_value)
client.query("DROP TABLE IF EXISTS ttest;")
def test_student():
rvs1 = np.round(stats.norm.rvs(loc=1, scale=5,size=500), 2)
rvs2 = np.round(stats.norm.rvs(loc=10, scale=5,size=500), 2)
s, p = stats.ttest_ind(rvs1, rvs2, equal_var = True)
test_and_check("studentTTest", rvs1, rvs2, s, p)
rvs1 = np.round(stats.norm.rvs(loc=0, scale=5,size=500), 2)
rvs2 = np.round(stats.norm.rvs(loc=0, scale=5,size=500), 2)
s, p = stats.ttest_ind(rvs1, rvs2, equal_var = True)
test_and_check("studentTTest", rvs1, rvs2, s, p)
rvs1 = np.round(stats.norm.rvs(loc=2, scale=10,size=512), 2)
rvs2 = np.round(stats.norm.rvs(loc=5, scale=20,size=1024), 2)
s, p = stats.ttest_ind(rvs1, rvs2, equal_var = True)
test_and_check("studentTTest", rvs1, rvs2, s, p)
rvs1 = np.round(stats.norm.rvs(loc=0, scale=10,size=1024), 2)
rvs2 = np.round(stats.norm.rvs(loc=0, scale=10,size=512), 2)
s, p = stats.ttest_ind(rvs1, rvs2, equal_var = True)
test_and_check("studentTTest", rvs1, rvs2, s, p)
def test_welch():
rvs1 = np.round(stats.norm.rvs(loc=1, scale=15,size=500), 2)
rvs2 = np.round(stats.norm.rvs(loc=10, scale=5,size=500), 2)
s, p = stats.ttest_ind(rvs1, rvs2, equal_var = False)
test_and_check("welchTTest", rvs1, rvs2, s, p)
rvs1 = np.round(stats.norm.rvs(loc=0, scale=7,size=500), 2)
rvs2 = np.round(stats.norm.rvs(loc=0, scale=3,size=500), 2)
s, p = stats.ttest_ind(rvs1, rvs2, equal_var = False)
test_and_check("welchTTest", rvs1, rvs2, s, p)
rvs1 = np.round(stats.norm.rvs(loc=0, scale=10,size=1024), 2)
rvs2 = np.round(stats.norm.rvs(loc=5, scale=1,size=512), 2)
s, p = stats.ttest_ind(rvs1, rvs2, equal_var = False)
test_and_check("welchTTest", rvs1, rvs2, s, p)
rvs1 = np.round(stats.norm.rvs(loc=5, scale=10,size=512), 2)
rvs2 = np.round(stats.norm.rvs(loc=5, scale=10,size=1024), 2)
s, p = stats.ttest_ind(rvs1, rvs2, equal_var = False)
test_and_check("welchTTest", rvs1, rvs2, s, p)
if __name__ == "__main__":
test_student()
test_welch()
print("Ok.")

View File

@ -0,0 +1 @@
Ok.

View File

@ -0,0 +1,8 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh
# We should have correct env vars from shell_config.sh to run this test
python3 "$CURDIR"/01558_ttest_scipy.python

View File

@ -0,0 +1,5 @@
(223,0.5426959774289524)
223.0 0.5426959774289482
223 0.5426959774289524
223 0.5426959774289524
223 0.5426959774289524

View File

@ -0,0 +1,9 @@
DROP TABLE IF EXISTS mann_whitney_test;
CREATE TABLE mann_whitney_test (left Float64, right UInt8) ENGINE = Memory;
INSERT INTO mann_whitney_test VALUES (310, 0), (195, 0), (530, 0), (155, 0), (530, 0), (245, 0), (385, 0), (450, 0), (465, 0), (545, 0), (170, 0), (180, 0), (125, 0), (180, 0), (230, 0), (75, 0), (430, 0), (480, 0), (495, 0), (295, 0), (116, 1), (171, 1), (176, 1), (421, 1), (111, 1), (326, 1), (481, 1), (111, 1), (346, 1), (441, 1), (261, 1), (411, 1), (206, 1), (521, 1), (456, 1), (446, 1), (296, 1), (51, 1), (426, 1), (261, 1);
SELECT mannWhitneyUTest(left, right) from mann_whitney_test;
SELECT '223.0', '0.5426959774289482';
WITH mannWhitneyUTest(left, right) AS pair SELECT roundBankers(pair.1, 16) as t_stat, roundBankers(pair.2, 16) as p_value from mann_whitney_test;
WITH mannWhitneyUTest('two-sided', 1)(left, right) as pair SELECT roundBankers(pair.1, 16) as t_stat, roundBankers(pair.2, 16) as p_value from mann_whitney_test;
WITH mannWhitneyUTest('two-sided')(left, right) as pair SELECT roundBankers(pair.1, 16) as t_stat, roundBankers(pair.2, 16) as p_value from mann_whitney_test;
DROP TABLE IF EXISTS mann_whitney_test;

View File

@ -0,0 +1,53 @@
#!/usr/bin/env python3
import os
import sys
from scipy import stats
import pandas as pd
import numpy as np
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from pure_http_client import ClickHouseClient
def test_and_check(name, a, b, t_stat, p_value):
client = ClickHouseClient()
client.query("DROP TABLE IF EXISTS mann_whitney;")
client.query("CREATE TABLE mann_whitney (left Float64, right UInt8) ENGINE = Memory;");
client.query("INSERT INTO mann_whitney VALUES {};".format(", ".join(['({},{}), ({},{})'.format(i, 0, j, 1) for i,j in zip(a, b)])))
real = client.query_return_df(
"SELECT roundBankers({}(left, right).1, 16) as t_stat, ".format(name) +
"roundBankers({}(left, right).2, 16) as p_value ".format(name) +
"FROM mann_whitney FORMAT TabSeparatedWithNames;")
real_t_stat = real['t_stat'][0]
real_p_value = real['p_value'][0]
assert(abs(real_t_stat - np.float64(t_stat) < 1e-2)), "clickhouse_t_stat {}, scipy_t_stat {}".format(real_t_stat, t_stat)
assert(abs(real_p_value - np.float64(p_value)) < 1e-2), "clickhouse_p_value {}, scipy_p_value {}".format(real_p_value, p_value)
client.query("DROP TABLE IF EXISTS mann_whitney;")
def test_mann_whitney():
rvs1 = np.round(stats.norm.rvs(loc=1, scale=5,size=500), 5)
rvs2 = np.round(stats.expon.rvs(scale=0.2,size=500), 5)
s, p = stats.mannwhitneyu(rvs1, rvs2, alternative='two-sided')
test_and_check("mannWhitneyUTest", rvs1, rvs2, s, p)
test_and_check("mannWhitneyUTest('two-sided')", rvs1, rvs2, s, p)
equal = np.round(stats.cauchy.rvs(scale=5, size=500), 5)
s, p = stats.mannwhitneyu(equal, equal, alternative='two-sided')
test_and_check("mannWhitneyUTest('two-sided')", equal, equal, s, p)
s, p = stats.mannwhitneyu(equal, equal, alternative='less', use_continuity=False)
test_and_check("mannWhitneyUTest('less', 0)", equal, equal, s, p)
rvs1 = np.round(stats.cauchy.rvs(scale=10,size=65536), 5)
rvs2 = np.round(stats.norm.rvs(loc=0, scale=10,size=65536), 5)
s, p = stats.mannwhitneyu(rvs1, rvs2, alternative='greater')
test_and_check("mannWhitneyUTest('greater')", rvs1, rvs2, s, p)
if __name__ == "__main__":
test_mann_whitney()
print("Ok.")

View File

@ -0,0 +1 @@
Ok.

View File

@ -0,0 +1,8 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh
# We should have correct env vars from shell_config.sh to run this test
python3 "$CURDIR"/01561_mann_whitney_scipy.python

View File

@ -0,0 +1,49 @@
import os
import io
import sys
import requests
import time
import pandas as pd
CLICKHOUSE_HOST = os.environ.get('CLICKHOUSE_HOST', '127.0.0.1')
CLICKHOUSE_PORT_HTTP = os.environ.get('CLICKHOUSE_PORT_HTTP', '8123')
CLICKHOUSE_SERVER_URL_STR = 'http://' + ':'.join(str(s) for s in [CLICKHOUSE_HOST, CLICKHOUSE_PORT_HTTP]) + "/"
class ClickHouseClient:
def __init__(self, host = CLICKHOUSE_SERVER_URL_STR):
self.host = host
def query(self, query, connection_timeout = 1500):
NUMBER_OF_TRIES = 30
DELAY = 10
for i in range(NUMBER_OF_TRIES):
r = requests.post(
self.host,
params = {'timeout_before_checking_execution_speed': 120, 'max_execution_time': 6000},
timeout = connection_timeout,
data = query)
if r.status_code == 200:
return r.text
else:
print('ATTENTION: try #%d failed' % i)
if i != (NUMBER_OF_TRIES-1):
print(query)
print(r.text)
time.sleep(DELAY*(i+1))
else:
raise ValueError(r.text)
def query_return_df(self, query, connection_timeout = 1500):
data = self.query(query, connection_timeout)
df = pd.read_csv(io.StringIO(data), sep = '\t')
return df
def query_with_data(self, query, content):
content = content.encode('utf-8')
r = requests.post(self.host, data=content)
result = r.text
if r.status_code == 200:
return result
else:
raise ValueError(r.text)