Vectorize "sum" function

This commit is contained in:
Alexey Milovidov 2020-05-18 06:50:53 +03:00
parent 7efd2a825f
commit 6f0c78dfdd

View File

@ -25,6 +25,36 @@ struct AggregateFunctionSumData
sum += value;
}
/// Vectorized version
template <typename Value>
void addMany(const Value * __restrict ptr, size_t count)
{
/// Compiler cannot unroll this loop, do it manually.
/// Something around the number of SSE registers * the number of elements fit in register.
constexpr size_t unroll_count = 128 / sizeof(T);
T partial_sums[unroll_count]{};
const auto * end = ptr + count;
const auto * unrolled_end = ptr + (count / unroll_count * unroll_count);
while (ptr < unrolled_end)
{
for (size_t i = 0; i < unroll_count; ++i)
partial_sums[i] += ptr[i];
ptr += unroll_count;
}
for (size_t i = 0; i < unroll_count; ++i)
sum += partial_sums[i];
while (ptr < end)
{
sum += *ptr;
++ptr;
}
}
void merge(const AggregateFunctionSumData & rhs)
{
sum += rhs.sum;
@ -55,21 +85,60 @@ struct AggregateFunctionSumKahanData
T sum{};
T compensation{};
template <typename Value>
ALWAYS_INLINE void addImpl(Value value, T & out_sum, T & out_compensation)
{
auto compensated_value = value - out_compensation;
auto new_sum = out_sum + compensated_value;
out_compensation = (new_sum - out_sum) - compensated_value;
out_sum = new_sum;
}
void add(T value)
{
auto compensated_value = value - compensation;
auto new_sum = sum + compensated_value;
compensation = (new_sum - sum) - compensated_value;
sum = new_sum;
addImpl(value, sum, compensation);
}
/// Vectorized version
template <typename Value>
void addMany(const Value * __restrict ptr, size_t count)
{
constexpr size_t unroll_count = 4; // 128 / sizeof(T);
T partial_sums[unroll_count]{};
T partial_compensations[unroll_count]{};
const auto * end = ptr + count;
const auto * unrolled_end = ptr + (count / unroll_count * unroll_count);
while (ptr < unrolled_end)
{
for (size_t i = 0; i < unroll_count; ++i)
addImpl(ptr[i], partial_sums[i], partial_compensations[i]);
ptr += unroll_count;
}
for (size_t i = 0; i < unroll_count; ++i)
mergeImpl(sum, compensation, partial_sums[i], partial_compensations[i]);
while (ptr < end)
{
addImpl(*ptr, sum, compensation);
++ptr;
}
}
void ALWAYS_INLINE mergeImpl(T & to_sum, T & to_compensation, T from_sum, T from_compensation)
{
auto raw_sum = to_sum + from_sum;
auto rhs_compensated = raw_sum - to_sum;
auto compensations = ((from_sum - rhs_compensated) + (to_sum - (raw_sum - rhs_compensated))) + compensation + from_compensation;
to_sum = raw_sum + compensations;
to_compensation = compensations - (to_sum - raw_sum);
}
void merge(const AggregateFunctionSumKahanData & rhs)
{
auto raw_sum = sum + rhs.sum;
auto rhs_compensated = raw_sum - sum;
auto compensations = ((rhs.sum - rhs_compensated) + (sum - (raw_sum - rhs_compensated))) + compensation + rhs.compensation;
sum = raw_sum + compensations;
compensation = compensations - (sum - raw_sum);
mergeImpl(sum, compensation, rhs.sum, rhs.compensation);
}
void write(WriteBuffer & buf) const
@ -141,6 +210,13 @@ public:
this->data(place).add(column.getData()[row_num]);
}
/// Vectorized version when there is no GROUP BY keys.
void addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena *) const override
{
const auto & column = static_cast<const ColVecType &>(*columns[0]);
this->data(place).addMany(column.getData().data(), batch_size);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
{
this->data(place).merge(this->data(rhs));