add quantileBfloat16

This commit is contained in:
redclusive 2021-04-14 23:38:56 +03:00
parent 021cafff49
commit 043af0a5ca
4 changed files with 247 additions and 0 deletions

View File

@ -52,6 +52,9 @@ template <typename Value, bool float_return> using FuncQuantilesTDigest = Aggreg
template <typename Value, bool float_return> using FuncQuantileTDigestWeighted = AggregateFunctionQuantile<Value, QuantileTDigest<Value>, NameQuantileTDigestWeighted, true, std::conditional_t<float_return, Float32, void>, false>;
template <typename Value, bool float_return> using FuncQuantilesTDigestWeighted = AggregateFunctionQuantile<Value, QuantileTDigest<Value>, NameQuantilesTDigestWeighted, true, std::conditional_t<float_return, Float32, void>, true>;
template <typename Value, bool float_return> using FuncQuantileBfloat16 = AggregateFunctionQuantile<Value, QuantileBfloat16Histogram<Value>, NameQuantileBfloat16, false, std::conditional_t<float_return, Float64, void>, false>;
template <typename Value, bool float_return> using FuncQuantilesBfloat16 = AggregateFunctionQuantile<Value, QuantileBfloat16Histogram<Value>, NameQuantilesBfloat16, false, std::conditional_t<float_return, Float64, void>, true>;
template <template <typename, bool> class Function>
static constexpr bool supportDecimal()
@ -155,6 +158,9 @@ void registerAggregateFunctionsQuantile(AggregateFunctionFactory & factory)
factory.registerFunction(NameQuantileTDigestWeighted::name, createAggregateFunctionQuantile<FuncQuantileTDigestWeighted>);
factory.registerFunction(NameQuantilesTDigestWeighted::name, createAggregateFunctionQuantile<FuncQuantilesTDigestWeighted>);
factory.registerFunction(NameQuantileBfloat16::name, createAggregateFunctionQuantile<FuncQuantileBfloat16>);
factory.registerFunction(NameQuantilesBfloat16::name, createAggregateFunctionQuantile<FuncQuantilesBfloat16>);
/// 'median' is an alias for 'quantile'
factory.registerAlias("median", NameQuantile::name);
factory.registerAlias("medianDeterministic", NameQuantileDeterministic::name);
@ -166,6 +172,7 @@ void registerAggregateFunctionsQuantile(AggregateFunctionFactory & factory)
factory.registerAlias("medianTimingWeighted", NameQuantileTimingWeighted::name);
factory.registerAlias("medianTDigest", NameQuantileTDigest::name);
factory.registerAlias("medianTDigestWeighted", NameQuantileTDigestWeighted::name);
factory.registerAlias("medianBfloat16", NameQuantileBfloat16::name);
}
}

View File

@ -9,6 +9,7 @@
#include <AggregateFunctions/QuantileExactWeighted.h>
#include <AggregateFunctions/QuantileTiming.h>
#include <AggregateFunctions/QuantileTDigest.h>
#include <AggregateFunctions/QuantileBfloat16Histogram.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <AggregateFunctions/QuantilesCommon.h>
@ -228,4 +229,7 @@ struct NameQuantileTDigestWeighted { static constexpr auto name = "quantileTDige
struct NameQuantilesTDigest { static constexpr auto name = "quantilesTDigest"; };
struct NameQuantilesTDigestWeighted { static constexpr auto name = "quantilesTDigestWeighted"; };
struct NameQuantileBfloat16 { static constexpr auto name = "quantileBfloat16"; };
struct NameQuantilesBfloat16 { static constexpr auto name = "quantilesBfloat16"; };
}

View File

@ -0,0 +1,171 @@
#pragma once
#include <IO/ReadBuffer.h>
#include <IO/VarInt.h>
#include <IO/WriteBuffer.h>
#include <Common/HashTable/HashMap.h>
#include <Common/PODArray.h>
#include <common/types.h>
#include <ext/bit_cast.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
template <typename Value>
struct Bfloat16Histogram
{
using bfloat16 = UInt16;
using Data = HashMap<bfloat16, size_t>;
using Array = PODArrayWithStackMemory<Value, 64>;
Data data;
Array array;
void add(const Value & x, size_t to_add = 1)
{
if (isNaN(x))
return;
bfloat16 val = to_bfloat16(x);
if (!data.find(val))
{
sorted = false;
count += to_add;
array.push_back(x);
}
data[val] += to_add;
}
void merge(const Bfloat16Histogram & rhs)
{
for (const Value & value : rhs.array)
{
add(value, rhs.data.find(to_bfloat16(value))->getMapped());
}
}
void write(WriteBuffer & buf) const
{
writeVarUInt(count, buf);
data.write(buf);
size_t size = array.size();
writeVarUInt(size, buf);
buf.write(reinterpret_cast<const char *>(array.data()), size * sizeof(array[0]));
}
void read(ReadBuffer & buf)
{
count = 0;
readVarUInt(count, buf);
data.read(buf);
size_t size = 0;
readVarUInt(size, buf);
array.resize(size);
buf.read(reinterpret_cast<char *>(array.data()), size * sizeof(array[0]));
}
template <typename T>
T quantile(const Float64 & level)
{
if (array.empty())
{
return onEmpty<T>();
}
sortIfNeeded();
size_t sum = 0;
size_t need = level * count;
for (const Value & value : array)
{
sum += data.find(to_bfloat16(value))->getMapped();
if (sum >= need)
return value;
}
return array[array.size() - 1];
}
// levels[indicies[i]] must be sorted
template <typename T>
void quantilesManySorted(const Float64 * levels, const size_t * indices, size_t size, T * result)
{
if (array.empty())
{
for (size_t i = 0; i < size; ++i)
{
result[indices[i]] = onEmpty<T>();
}
return;
}
sortIfNeeded();
size_t sum = 0;
size_t it = 0;
for (const auto & value : array)
{
sum += data.find(to_bfloat16(value))->getMapped();
while (it < size && sum >= static_cast<size_t>(levels[indices[it]] * count))
{
result[indices[it++]] = value;
}
}
for (size_t i = it; i < size; ++i)
{
result[indices[i]] = array[array.size() - 1];
}
}
template <typename T>
void quantilesMany(const Float64 * levels, const size_t * indices, size_t size, T * result)
{
if (is_sorted_r(levels, indices, size))
{
quantilesManySorted(levels, indices, size, result);
}
else
{
for (size_t i = 0; i < size; ++i)
result[indices[i]] = quantile<T>(levels[indices[i]]);
}
}
private:
size_t count = 0;
bool sorted = false;
bfloat16 to_bfloat16(const Value & x) const
{
return ext::bit_cast<UInt32>(static_cast<Float32>(x)) >> 16;
}
void sortIfNeeded()
{
if (sorted)
return;
sorted = true;
std::sort(array.begin(), array.end());
}
bool is_sorted_r(const Float64 * levels, const size_t * indices, size_t size) const
{
for (size_t i = 0; i < size - 1; ++i)
{
if (levels[indices[i]] > levels[indices[i + 1]])
return false;
}
return true;
}
template <typename ResultType>
ResultType onEmpty() const
{
return std::numeric_limits<ResultType>::quiet_NaN();
}
};
}

View File

@ -0,0 +1,65 @@
#pragma once
#include <AggregateFunctions/Bfloat16Histogram.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
template<typename Value>
struct QuantileBfloat16Histogram
{
using Hist = Bfloat16Histogram<Value>;
Hist data;
void add(const Value & x)
{
data.add(x);
}
template <typename Weight>
void add(const Value &, const Weight &)
{
throw Exception("Method add with weight is not implemented for QuantileBfloat16Histogram", ErrorCodes::NOT_IMPLEMENTED);
}
void merge(const QuantileBfloat16Histogram & rhs)
{
data.merge(rhs.data);
}
void serialize(WriteBuffer & buf) const
{
data.write(buf);
}
void deserialize(ReadBuffer & buf)
{
data.read(buf);
}
Value get(Float64 level)
{
return data.template quantile<Value>(level);
}
void getMany(const Float64 * levels, const size_t * indices, size_t size, Value * result)
{
data.quantilesMany(levels, indices, size, result);
}
Float64 getFloat(Float64 level) {
return data.template quantile<Float64>(level);
}
void getManyFloat(const Float64 * levels, const size_t * indices, size_t size, Float64 * result)
{
data.quantilesMany(levels, indices, size, result);
}
};
}