mann-whitney

This commit is contained in:
nikitamikhaylov 2020-11-12 01:52:07 +03:00
parent 9afe579225
commit 13081370ba
10 changed files with 461 additions and 169 deletions

View File

@ -0,0 +1,37 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionMannWhitney.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include "registerAggregateFunctions.h"
#include <AggregateFunctions/Helpers.h>
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
namespace DB
{
namespace
{
AggregateFunctionPtr createAggregateFunctionMannWhitneyUTest(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
assertBinary(name, argument_types);
if (!isNumber(argument_types[0]) || !isNumber(argument_types[1]))
throw Exception("Aggregate function " + name + " only supports numerical types", ErrorCodes::NOT_IMPLEMENTED);
return std::make_shared<AggregateFunctionMannWhitney<Float64>>(argument_types, parameters);
}
}
void registerAggregateFunctionMannWhitney(AggregateFunctionFactory & factory)
{
factory.registerFunction("mannWhitneyUTest", createAggregateFunctionMannWhitneyUTest);
}
}

View File

@ -0,0 +1,234 @@
#pragma once
#include <AggregateFunctions/IAggregateFunction.h>
#include <AggregateFunctions/StatCommon.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnTuple.h>
#include <Common/assert_cast.h>
#include <Common/FieldVisitors.h>
#include <Common/PODArray_fwd.h>
#include <common/types.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeTuple.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <limits>
#include <DataTypes/DataTypeArray.h>
#include <Common/ArenaAllocator.h>
#include <iostream>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_ARGUMENTS;
}
/// Required two samples be of the same type. Because we need to compute ranks of all observations from both samples.
template <typename T>
struct MannWhitneyData : public StatisticalSample<T, T>
{
enum class Alternative
{
TwoSided,
Less,
Greater
};
using Sample = typename StatisticalSample<T, T>::SampleX;
std::pair<Float64, Float64> getResult(Alternative alternative, bool continuity_correction)
{
ConcatenatedSamples both(this->x, this->y);
RanksArray ranks;
Float64 tie_correction;
std::tie(ranks, tie_correction) = computeRanksAndTieCorrection(both);
const Float64 n1 = this->size_x;
const Float64 n2 = this->size_y;
Float64 r1 = 0;
for (size_t i = 0; i < n1; ++i)
r1 += ranks[i];
const Float64 u1 = n1 * n2 + (n1 * (n1 + 1.)) / 2. - r1;
const Float64 u2 = n1 * n2 - u1;
const Float64 meanrank = n1 * n2 /2. + 0.5 * continuity_correction;
const Float64 sd = std::sqrt(tie_correction * n1 * n2 * (n1 + n2 + 1) / 12.0);
Float64 u = 0;
if (alternative == Alternative::TwoSided)
u = std::max(u1, u2);
else if (alternative == Alternative::Less)
u = u1;
else if (alternative == Alternative::Greater)
u = u2;
const Float64 z = (u - meanrank) / sd;
const Float64 cdf = integrateSimpson(0, z, [] (Float64 t) { return std::pow(M_E, -0.5 * t * t) / std::sqrt(2 * M_PI);});
Float64 p_value = 0;
if (alternative == Alternative::TwoSided)
p_value = 1 - 2 * cdf;
else
p_value = 0.5 - cdf;
return {u2, p_value};
}
private:
/// We need to compute ranks according to all samples. Use this class to avoid extra copy and memory allocation.
class ConcatenatedSamples {
public:
ConcatenatedSamples(const Sample & first_, const Sample & second_)
: first(first_), second(second_) {}
const T & operator[](size_t ind) const
{
if (ind < first.size())
return first[ind];
return second[ind % first.size()];
}
size_t size() const
{
return first.size() + second.size();
}
private:
const Sample & first;
const Sample & second;
};
};
template <typename T>
class AggregateFunctionMannWhitney :
public IAggregateFunctionDataHelper<MannWhitneyData<T>, AggregateFunctionMannWhitney<T>>
{
private:
using Alternative = typename MannWhitneyData<T>::Alternative;
typename MannWhitneyData<T>::Alternative alternative;
bool continuity_correction{true};
public:
explicit AggregateFunctionMannWhitney(const DataTypes & arguments, const Array & params)
:IAggregateFunctionDataHelper<MannWhitneyData<T>, AggregateFunctionMannWhitney<T>> ({arguments}, {})
{
if (params.size() > 2)
throw Exception("Aggregate function " + getName() + " require two parameter or less", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (params.empty()) {
alternative = Alternative::TwoSided;
return;
}
if (params[0].getType() != Field::Types::String)
throw Exception("Aggregate function " + getName() + " require require first parameter to be a String", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
auto param = params[0].get<String>();
if (param == "two-sided")
alternative = Alternative::TwoSided;
else if (param == "less")
alternative = Alternative::Less;
else if (param == "greater")
alternative = Alternative::Greater;
else
throw Exception("Unknown parameter in aggregate function " + getName() +
". It must be one of: 'two sided', 'less', 'greater'", ErrorCodes::BAD_ARGUMENTS);
if (params.size() != 2) {
return;
}
if (params[1].getType() != Field::Types::UInt64)
throw Exception("Aggregate function " + getName() + " require require second parameter to be a UInt64", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
continuity_correction = static_cast<bool>(params[1].get<UInt64>());
}
String getName() const override
{
return "mannWhitneyUTest";
}
DataTypePtr getReturnType() const override
{
DataTypes types
{
std::make_shared<DataTypeNumber<Float64>>(),
std::make_shared<DataTypeNumber<Float64>>(),
};
Strings names
{
"u_statistic",
"p_value"
};
return std::make_shared<DataTypeTuple>(
std::move(types),
std::move(names)
);
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{
Float64 value = columns[0]->getFloat64(row_num);
UInt8 is_second = columns[1]->getUInt(row_num);
if (is_second)
this->data(place).addY(value, arena);
else
this->data(place).addX(value, arena);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
auto & a = this->data(place);
auto & b = this->data(rhs);
a.merge(b, arena);
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
this->data(place).write(buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override
{
this->data(place).read(buf, arena);
}
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
if (!this->data(place).size_x || !this->data(place).size_y)
throw Exception("Aggregate function " + getName() + " require both samples to be non empty", ErrorCodes::BAD_ARGUMENTS);
auto [u_statistic, p_value] = this->data(place).getResult(alternative, continuity_correction);
/// Because p-value is a probability.
p_value = std::min(1.0, std::max(0.0, p_value));
auto & column_tuple = assert_cast<ColumnTuple &>(to);
auto & column_stat = assert_cast<ColumnVector<Float64> &>(column_tuple.getColumn(0));
auto & column_value = assert_cast<ColumnVector<Float64> &>(column_tuple.getColumn(1));
column_stat.getData().push_back(u_statistic);
column_value.getData().push_back(p_value);
}
};
};

View File

@ -21,23 +21,10 @@ AggregateFunctionPtr createAggregateFunctionRankCorrelation(const std::string &
assertBinary(name, argument_types);
assertNoParameters(name, parameters);
AggregateFunctionPtr res;
if (isDecimal(argument_types[0]) || isDecimal(argument_types[1]))
{
if (!isNumber(argument_types[0]) || !isNumber(argument_types[1]))
throw Exception("Aggregate function " + name + " only supports numerical types", ErrorCodes::NOT_IMPLEMENTED);
}
else
{
res.reset(createWithTwoNumericTypes<AggregateFunctionRankCorrelation>(*argument_types[0], *argument_types[1], argument_types));
}
if (!res)
{
throw Exception("Aggregate function " + name + " only supports numerical types", ErrorCodes::NOT_IMPLEMENTED);
}
return res;
return std::make_shared<AggregateFunctionRankCorrelation>(argument_types);
}
}

View File

@ -1,58 +1,55 @@
#pragma once
#include <AggregateFunctions/IAggregateFunction.h>
#include <AggregateFunctions/StatSample.h>
#include <AggregateFunctions/StatCommon.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnTuple.h>
#include <Common/assert_cast.h>
#include <Common/FieldVisitors.h>
#include <Common/PODArray_fwd.h>
#include <common/types.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeTuple.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <limits>
#include <DataTypes/DataTypeArray.h>
#include <Common/ArenaAllocator.h>
#include <type_traits>
#include <iostream>
namespace DB
{
template <typename X = Float64, typename Y = Float64>
struct AggregateFunctionRankCorrelationData final
struct RankCorrelationData : public StatisticalSample<Float64, Float64>
{
size_t size_x = 0;
Float64 getResult() {
RanksArray ranks_x;
std::tie(ranks_x, std::ignore) = computeRanksAndTieCorrection(this->x);
using AllocatorFirstSample = MixedAlignedArenaAllocator<alignof(X), 4096>;
using FirstSample = PODArray<X, 32, AllocatorFirstSample>;
RanksArray ranks_y;
std::tie(ranks_y, std::ignore) = computeRanksAndTieCorrection(this->y);
using AllocatorSecondSample = MixedAlignedArenaAllocator<alignof(Y), 4096>;
using SecondSample = PODArray<Y, 32, AllocatorSecondSample>;
/// In our case sizes of both samples are equal.
const auto size = this->size_x;
FirstSample first;
SecondSample second;
/// Count d^2 sum
Float64 answer = 0;
for (size_t j = 0; j < size; ++j)
answer += (ranks_x[j] - ranks_y[j]) * (ranks_x[j] - ranks_y[j]);
answer *= 6;
answer /= size * (size * size - 1);
answer = 1 - answer;
return answer;
}
};
template <typename X, typename Y>
class AggregateFunctionRankCorrelation :
public IAggregateFunctionDataHelper<AggregateFunctionRankCorrelationData<X, Y>, AggregateFunctionRankCorrelation<X, Y>>
public IAggregateFunctionDataHelper<RankCorrelationData, AggregateFunctionRankCorrelation>
{
using Data = AggregateFunctionRankCorrelationData<X, Y>;
using FirstSample = typename Data::FirstSample;
using SecondSample = typename Data::SecondSample;
public:
explicit AggregateFunctionRankCorrelation(const DataTypes & arguments)
:IAggregateFunctionDataHelper<AggregateFunctionRankCorrelationData<X, Y>,AggregateFunctionRankCorrelation<X, Y>> ({arguments}, {})
:IAggregateFunctionDataHelper<RankCorrelationData, AggregateFunctionRankCorrelation> ({arguments}, {})
{}
String getName() const override
@ -65,24 +62,12 @@ public:
return std::make_shared<DataTypeNumber<Float64>>();
}
void insert(Data & a, const std::pair<X, Y> & x, Arena * arena) const
{
++a.size_x;
a.first.push_back(x.first, arena);
a.second.push_back(x.second, arena);
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{
auto & a = this->data(place);
auto new_x = assert_cast<const ColumnVector<X> &>(*columns[0]).getData()[row_num];
auto new_y = assert_cast<const ColumnVector<Y> &>(*columns[1]).getData()[row_num];
a.size_x += 1;
a.first.push_back(new_x, arena);
a.second.push_back(new_y, arena);
Float64 new_x = columns[0]->getFloat64(row_num);
Float64 new_y = columns[1]->getFloat64(row_num);
this->data(place).addX(new_x, arena);
this->data(place).addY(new_y, arena);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
@ -90,62 +75,22 @@ public:
auto & a = this->data(place);
auto & b = this->data(rhs);
if (b.size_x)
for (size_t i = 0; i < b.size_x; ++i)
insert(a, std::make_pair(b.first[i], b.second[i]), arena);
a.merge(b, arena);
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
const auto & first = this->data(place).first;
const auto & second = this->data(place).second;
size_t size = this->data(place).size_x;
writeVarUInt(size, buf);
buf.write(reinterpret_cast<const char *>(first.data()), size * sizeof(first[0]));
buf.write(reinterpret_cast<const char *>(second.data()), size * sizeof(second[0]));
this->data(place).write(buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override
{
size_t size = 0;
readVarUInt(size, buf);
auto & first = this->data(place).first;
first.resize(size, arena);
buf.read(reinterpret_cast<char *>(first.data()), size * sizeof(first[0]));
auto & second = this->data(place).second;
second.resize(size, arena);
buf.read(reinterpret_cast<char *>(second.data()), size * sizeof(second[0]));
this->data(place).read(buf, arena);
}
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
/// Because ranks are adjusted, we have to store each of them in Float type.
using RanksArray = PODArrayWithStackMemory<Float64, 32>;
const auto & first = this->data(place).first;
const auto & second = this->data(place).second;
size_t size = this->data(place).size_x;
RanksArray first_ranks;
first_ranks.resize(first.size());
computeRanks<FirstSample, RanksArray>(first, first_ranks);
RanksArray second_ranks;
second_ranks.resize(second.size());
computeRanks<SecondSample, RanksArray>(second, second_ranks);
// count d^2 sum
Float64 answer = static_cast<Float64>(0);
for (size_t j = 0; j < size; ++ j)
answer += (first_ranks[j] - second_ranks[j]) * (first_ranks[j] - second_ranks[j]);
answer *= 6;
answer /= size * (size * size - 1);
answer = 1 - answer;
auto answer = this->data(place).getResult();
auto & column = static_cast<ColumnVector<Float64> &>(to);
column.getData().push_back(answer);

View File

@ -0,0 +1,114 @@
#pragma once
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Common/ArenaAllocator.h>
#include <numeric>
#include <algorithm>
#include <utility>
namespace DB
{
template <typename F>
static Float64 integrateSimpson(Float64 a, Float64 b, F && func)
{
const size_t iterations = std::max(1e6, 1e4 * std::abs(std::round(b)));
const long double h = (b - a) / iterations;
Float64 sum_odds = 0.0;
for (size_t i = 1; i < iterations; i += 2)
sum_odds += func(a + i * h);
Float64 sum_evens = 0.0;
for (size_t i = 2; i < iterations; i += 2)
sum_evens += func(a + i * h);
return (func(a) + func(b) + 2 * sum_evens + 4 * sum_odds) * h / 3;
}
/// Because ranks are adjusted, we have to store each of them in Float type.
using RanksArray = std::vector<Float64>;
template <typename Values>
std::pair<RanksArray, Float64> computeRanksAndTieCorrection(const Values & values) {
const size_t size = values.size();
/// Save initial positions, than sort indices according to the values.
std::vector<size_t> indexes(size);
std::iota(indexes.begin(), indexes.end(), 0);
std::sort(indexes.begin(), indexes.end(),
[&] (size_t lhs, size_t rhs) { return values[lhs] < values[rhs]; });
size_t left = 0;
Float64 tie_numenator = 0;
RanksArray out(size);
while (left < size) {
size_t right = left;
while (right < size && values[indexes[left]] == values[indexes[right]]) {
++right;
}
auto adjusted = (left + right + 1.) / 2.;
auto count_equal = right - left;
tie_numenator += std::pow(count_equal, 3) - count_equal;
for (size_t iter = left; iter < right; ++iter) {
out[indexes[iter]] = adjusted;
}
left = right;
}
std::cout << "tie_numenator " << tie_numenator << std::endl;
std::cout << "size " << size << std::endl;
return {out, 1 - (tie_numenator / (std::pow(size, 3) - size))};
}
template <typename X, typename Y>
struct StatisticalSample
{
using AllocatorXSample = MixedAlignedArenaAllocator<alignof(X), 4096>;
using SampleX = PODArray<X, 32, AllocatorXSample>;
using AllocatorYSample = MixedAlignedArenaAllocator<alignof(Y), 4096>;
using SampleY = PODArray<Y, 32, AllocatorYSample>;
SampleX x;
SampleY y;
size_t size_x;
size_t size_y;
void addX(X value, Arena * arena) {
++size_x;
x.push_back(value, arena);
}
void addY(Y value, Arena * arena) {
++size_y;
y.push_back(value, arena);
}
void merge(const StatisticalSample & rhs, Arena * arena)
{
size_x += rhs.size_x;
size_y += rhs.size_y;
x.insert(rhs.x.begin(), rhs.x.end(), arena);
y.insert(rhs.y.begin(), rhs.y.end(), arena);
}
void write(WriteBuffer & buf) const
{
writeVarUInt(size_x, buf);
writeVarUInt(size_y, buf);
buf.write(reinterpret_cast<const char *>(x.data()), size_x * sizeof(x[0]));
buf.write(reinterpret_cast<const char *>(y.data()), size_y * sizeof(y[0]));
}
void read(ReadBuffer & buf, Arena * arena)
{
readVarUInt(size_x, buf);
readVarUInt(size_y, buf);
x.resize(size_x, arena);
y.resize(size_y, arena);
buf.read(reinterpret_cast<char *>(x.data()), size_x * sizeof(x[0]));
buf.read(reinterpret_cast<char *>(y.data()), size_y * sizeof(y[0]));
}
};
}

View File

@ -1,68 +0,0 @@
#pragma once
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Common/ArenaAllocator.h>
#include <numeric>
#include <algorithm>
namespace DB
{
template <typename Values, typename Ranks>
void computeRanks(const Values & values, Ranks & out) {
std::vector<size_t> indexes(values.size());
std::iota(indexes.begin(), indexes.end(), 0);
std::sort(indexes.begin(), indexes.end(),
[&] (size_t lhs, size_t rhs) { return values[lhs] < values[rhs]; });
size_t left = 0;
while (left < indexes.size()) {
size_t right = left;
while (right < indexes.size() && values[indexes[left]] == values[indexes[right]]) {
++right;
}
auto adjusted = (left + right + 1.) / 2.;
for (size_t iter = left; iter < right; ++iter) {
out[indexes[iter]] = adjusted;
}
left = right;
}
}
template <typename T = double>
struct StatisticalSampleData
{
using Allocator = MixedAlignedArenaAllocator<alignof(T), 4096>;
using SampleArray = PODArray<T, 32, Allocator>;
SampleArray first;
SampleArray second;
void add(T x, T y)
{
first.push_back(x);
second.push_back(y);
}
void merge(const StatisticalSampleData & rhs)
{
first.insert(first.end(), rhs.first.begin(), rhs.first.end());
second.insert(second.end(), rhs.second.begin(), rhs.second.end());
}
void write(WriteBuffer & buf) const
{
writePODBinary(*this, buf);
}
void read(ReadBuffer & buf) const
{
readPODBinary(*this, buf);
}
};
}

View File

@ -42,6 +42,7 @@ void registerAggregateFunctionAggThrow(AggregateFunctionFactory &);
void registerAggregateFunctionWelchTTest(AggregateFunctionFactory &);
void registerAggregateFunctionStudentTTest(AggregateFunctionFactory &);
void registerAggregateFunctionRankCorrelation(AggregateFunctionFactory &);
void registerAggregateFunctionMannWhitney(AggregateFunctionFactory &);
class AggregateFunctionCombinatorFactory;
void registerAggregateFunctionCombinatorIf(AggregateFunctionCombinatorFactory &);
@ -94,6 +95,7 @@ void registerAggregateFunctions()
registerAggregateFunctionCategoricalIV(factory);
registerAggregateFunctionAggThrow(factory);
registerAggregateFunctionRankCorrelation(factory);
registerAggregateFunctionMannWhitney(factory);
}
{

View File

@ -0,0 +1,27 @@
#include <IO/WriteBufferFromString.h>
#include <IO/ReadBufferFromString.h>
#include <Common/PODArray.h>
#include <AggregateFunctions/StatCommon.h>
#include <iostream>
#include <gtest/gtest.h>
TEST(Ranks, Simple)
{
using namespace DB;
RanksArray sample = {310, 195, 480, 530, 155, 530, 245, 385, 450, 450, 465, 545, 170, 180, 125, 180, 230, 170, 75, 430, 480, 495, 295};
RanksArray ranks;
Float64 t = 0;
std::tie(ranks, t) = computeRanksAndTieCorrection(sample);
RanksArray expected{12.0, 8.0, 18.5, 21.5, 3.0, 21.5, 10.0, 13.0, 15.5, 15.5, 17.0, 23.0, 4.5, 6.5, 2.0, 6.5, 9.0, 4.5, 1.0, 14.0, 18.5, 20.0, 11.0};
ASSERT_EQ(ranks.size(), expected.size());
for (size_t i = 0; i < ranks.size(); ++i) {
ASSERT_DOUBLE_EQ(ranks[i], expected[i]);
}
ASSERT_DOUBLE_EQ(t, 0.9975296442687747);
}

View File

@ -0,0 +1,5 @@
(223,0.5426959774289524)
223.0 0.5426959774289482
223 0.5426959774289524
223 0.5426959774289524
223 0.5426959774289524

View File

@ -0,0 +1,9 @@
DROP TABLE IF EXISTS mann_whitney_test;
CREATE TABLE mann_whitney_test (left Float64, right UInt8) ENGINE = Memory;
INSERT INTO mann_whitney_test VALUES (310, 0), (195, 0), (530, 0), (155, 0), (530, 0), (245, 0), (385, 0), (450, 0), (465, 0), (545, 0), (170, 0), (180, 0), (125, 0), (180, 0), (230, 0), (75, 0), (430, 0), (480, 0), (495, 0), (295, 0), (116, 1), (171, 1), (176, 1), (421, 1), (111, 1), (326, 1), (481, 1), (111, 1), (346, 1), (441, 1), (261, 1), (411, 1), (206, 1), (521, 1), (456, 1), (446, 1), (296, 1), (51, 1), (426, 1), (261, 1);
SELECT mannWhitneyUTest(left, right) from mann_whitney_test;
SELECT '223.0', '0.5426959774289482';
WITH mannWhitneyUTest(left, right) AS pair SELECT roundBankers(pair.1, 16) as t_stat, roundBankers(pair.2, 16) as p_value from mann_whitney_test;
WITH mannWhitneyUTest('two-sided', 1)(left, right) as pair SELECT roundBankers(pair.1, 16) as t_stat, roundBankers(pair.2, 16) as p_value from mann_whitney_test;
WITH mannWhitneyUTest('two-sided')(left, right) as pair SELECT roundBankers(pair.1, 16) as t_stat, roundBankers(pair.2, 16) as p_value from mann_whitney_test;
DROP TABLE IF EXISTS mann_whitney_test;