#pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { template struct AggregateFunctionRankCorrelationData final { size_t size_x = 0; using AllocatorFirstSample = MixedAlignedArenaAllocator; using FirstSample = PODArray; using AllocatorSecondSample = MixedAlignedArenaAllocator; using SecondSample = PODArray; FirstSample first; SecondSample second; }; template class AggregateFunctionRankCorrelation : public IAggregateFunctionDataHelper, AggregateFunctionRankCorrelation> { using Data = AggregateFunctionRankCorrelationData; using FirstSample = typename Data::FirstSample; using SecondSample = typename Data::SecondSample; public: explicit AggregateFunctionRankCorrelation(const DataTypes & arguments) :IAggregateFunctionDataHelper,AggregateFunctionRankCorrelation> ({arguments}, {}) {} String getName() const override { return "rankCorr"; } DataTypePtr getReturnType() const override { return std::make_shared>(); } void insert(Data & a, const std::pair & x, Arena * arena) const { ++a.size_x; a.first.push_back(x.first, arena); a.second.push_back(x.second, arena); } void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override { auto & a = this->data(place); auto new_x = assert_cast &>(*columns[0]).getData()[row_num]; auto new_y = assert_cast &>(*columns[1]).getData()[row_num]; a.size_x += 1; a.first.push_back(new_x, arena); a.second.push_back(new_y, arena); } void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override { auto & a = this->data(place); auto & b = this->data(rhs); if (b.size_x) for (size_t i = 0; i < b.size_x; ++i) insert(a, std::make_pair(b.first[i], b.second[i]), arena); } void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override { const auto & first = this->data(place).first; const auto & second = this->data(place).second; size_t size = this->data(place).size_x; writeVarUInt(size, buf); buf.write(reinterpret_cast(first.data()), size * sizeof(first[0])); buf.write(reinterpret_cast(second.data()), size * sizeof(second[0])); } void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override { size_t size = 0; readVarUInt(size, buf); auto & first = this->data(place).first; first.resize(size, arena); buf.read(reinterpret_cast(first.data()), size * sizeof(first[0])); auto & second = this->data(place).second; second.resize(size, arena); buf.read(reinterpret_cast(second.data()), size * sizeof(second[0])); } void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { /// Because ranks are adjusted, we have to store each of them in Float type. using RanksArray = PODArrayWithStackMemory; const auto & first = this->data(place).first; const auto & second = this->data(place).second; size_t size = this->data(place).size_x; RanksArray first_ranks; first_ranks.resize(first.size()); computeRanks(first, first_ranks); RanksArray second_ranks; second_ranks.resize(second.size()); computeRanks(second, second_ranks); // count d^2 sum Float64 answer = static_cast(0); for (size_t j = 0; j < size; ++ j) answer += (first_ranks[j] - second_ranks[j]) * (first_ranks[j] - second_ranks[j]); answer *= 6; answer /= size * (size * size - 1); answer = 1 - answer; auto & column = static_cast &>(to); column.getData().push_back(answer); } }; };