Kahan summation: development [#CLICKHOUSE-2].

This commit is contained in:
Alexey Milovidov 2017-12-23 01:02:52 +03:00
parent eebdac1f0e
commit 2f8a79eee6

View File

@ -1,5 +1,7 @@
#pragma once
#include <type_traits>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
@ -16,12 +18,81 @@ template <typename T>
struct AggregateFunctionSumData
{
T sum{};
void add(T value)
{
sum += value;
}
void merge(const AggregateFunctionSumData & rhs)
{
sum += rhs.sum;
}
void write(WriteBuffer & buf) const
{
writeBinary(sum, buf);
}
void read(ReadBuffer & buf)
{
readBinary(sum, buf);
}
T get() const
{
return sum;
}
};
template <typename T>
struct AggregateFunctionSumKahanData
{
static_assert(std::is_floating_point_v<T>,
"It doesn't make sense to use Kahan Summation algorithm for non floating point types");
T sum{};
T compensation{};
void add(T value)
{
auto compensated_value = value - compensation;
auto new_sum = sum + compensated_value;
compensation = (new_sum - sum) - compensated_value;
sum = new_sum;
}
void merge(const AggregateFunctionSumKahanData & rhs)
{
auto raw_sum = sum + rhs.sum;
auto rhs_compensated = raw_sum - sum;
auto compensations = ((rhs.sum - rhs_compensated) + (sum - (raw_sum - rhs_compensated))) + compensation + rhs.compensation;
sum = raw_sum + compensations;
compensation = compensations - (sum - raw_sum);
}
void write(WriteBuffer & buf) const
{
writeBinary(sum, buf);
writeBinary(compensation, buf);
}
void read(ReadBuffer & buf)
{
readBinary(sum, buf);
readBinary(compensation, buf);
}
T get() const
{
return sum;
}
};
/// Counts the sum of the numbers.
template <typename T, typename TResult = T>
class AggregateFunctionSum final : public IAggregateFunctionDataHelper<AggregateFunctionSumData<TResult>, AggregateFunctionSum<T, TResult>>
template <typename T, typename TResult, typename Data>
class AggregateFunctionSum final : public IAggregateFunctionDataHelper<Data<TResult>, AggregateFunctionSum<T, TResult, Data>>
{
public:
String getName() const override { return "sum"; }
@ -33,27 +104,27 @@ public:
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
{
this->data(place).sum += static_cast<const ColumnVector<T> &>(*columns[0]).getData()[row_num];
this->data(place).add(static_cast<const ColumnVector<T> &>(*columns[0]).getData()[row_num]);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
{
this->data(place).sum += this->data(rhs).sum;
this->data(place).merge(this->data(rhs));
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
writeBinary(this->data(place).sum, buf);
this->data(place).write(buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
readBinary(this->data(place).sum, buf);
this->data(place).read(buf);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
static_cast<ColumnVector<TResult> &>(to).getData().push_back(this->data(place).sum);
static_cast<ColumnVector<TResult> &>(to).getData().push_back(this->data(place).get());
}
const char * getHeaderFilePath() const override { return __FILE__; }