add weight and change logic

This commit is contained in:
RedClusive 2021-04-28 14:54:10 +00:00
parent 6ee46fe936
commit de2523cabd
2 changed files with 123 additions and 185 deletions

View File

@ -1,166 +0,0 @@
#pragma once
#include <IO/ReadBuffer.h>
#include <IO/VarInt.h>
#include <IO/WriteBuffer.h>
#include <Common/HashTable/HashMap.h>
#include <Common/PODArray.h>
#include <common/types.h>
#include <ext/bit_cast.h>
namespace DB
{
template <typename Value>
struct BFloat16Histogram
{
using bfloat16 = UInt16;
using Data = HashMap<bfloat16, size_t>;
using Array = PODArrayWithStackMemory<Float32, 64>;
Data data;
Array array;
void add(const Value & x, size_t to_add = 1)
{
if (isNaN(x))
return;
bfloat16 val = to_bfloat16(x);
if (!data.find(val))
{
sorted = false;
array.push_back(to_Float32(val));
}
count += to_add;
data[val] += to_add;
}
void merge(const BFloat16Histogram & rhs)
{
for (const Float32 & value : rhs.array)
{
add(value, rhs.data.find(to_bfloat16(value))->getMapped());
}
}
void write(WriteBuffer & buf) const
{
writeVarUInt(count, buf);
data.write(buf);
size_t size = array.size();
writeVarUInt(size, buf);
buf.write(reinterpret_cast<const char *>(array.data()), size * sizeof(array[0]));
}
void read(ReadBuffer & buf)
{
count = 0;
readVarUInt(count, buf);
data.read(buf);
size_t size = 0;
readVarUInt(size, buf);
array.resize(size);
buf.read(reinterpret_cast<char *>(array.data()), size * sizeof(array[0]));
}
template <typename ResultType>
ResultType quantile(const Float64 & level)
{
if (array.empty())
{
return onEmpty<ResultType>();
}
sortIfNeeded();
size_t sum = 0;
size_t need = level * count;
for (const Float32 & value : array)
{
sum += data[to_bfloat16(value)];
if (sum >= need)
return value;
}
return array[array.size() - 1];
}
// levels[indices[i]] must be sorted
template <typename T>
void quantilesManySorted(const Float64 * levels, const size_t * indices, size_t size, T * result)
{
if (array.empty())
{
for (size_t i = 0; i < size; ++i)
{
result[indices[i]] = onEmpty<T>();
}
return;
}
sortIfNeeded();
size_t sum = 0;
size_t it = 0;
for (const Float32 & value : array)
{
sum += data[to_bfloat16(value)];
while (it < size && sum >= static_cast<size_t>(levels[indices[it]] * count))
{
result[indices[it++]] = value;
}
}
for (size_t i = it; i < size; ++i)
{
result[indices[i]] = array[array.size() - 1];
}
}
template <typename T>
void quantilesMany(const Float64 * levels, const size_t * indices, size_t size, T * result)
{
if (is_sorted_r(levels, indices, size))
{
quantilesManySorted(levels, indices, size, result);
}
else
{
for (size_t i = 0; i < size; ++i)
result[indices[i]] = quantile<T>(levels[indices[i]]);
}
}
private:
size_t count = 0;
bool sorted = false;
bfloat16 to_bfloat16(const Value & x) const { return ext::bit_cast<UInt32>(static_cast<Float32>(x)) >> 16; }
Float32 to_Float32(const bfloat16 & x) const { return ext::bit_cast<Float32>(x << 16); }
void sortIfNeeded()
{
if (sorted)
return;
sorted = true;
std::sort(array.begin(), array.end());
}
bool is_sorted_r(const Float64 * levels, const size_t * indices, size_t size) const
{
for (size_t i = 0; i < size - 1; ++i)
{
if (levels[indices[i]] > levels[indices[i + 1]])
return false;
}
return true;
}
template <typename ResultType>
ResultType onEmpty() const
{
return std::numeric_limits<ResultType>::quiet_NaN();
}
};
}

View File

@ -1,46 +1,150 @@
#pragma once
#include <AggregateFunctions/BFloat16Histogram.h>
#include <IO/ReadBuffer.h>
#include <IO/VarInt.h>
#include <IO/WriteBuffer.h>
#include <Common/HashTable/HashMap.h>
#include <Common/NaNUtils.h>
#include <common/types.h>
#include <ext/bit_cast.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
template <typename Value>
struct QuantileBFloat16Histogram
{
using Histogram = BFloat16Histogram<Value>;
Histogram data;
using bfloat16 = UInt16;
using Weight = UInt64;
using Data = HashMap<bfloat16, Weight>;
Data data;
void add(const Value & x) { data.add(x); }
void add(const Value & x) { add(x, 1); }
template <typename Weight>
void add(const Value &, const Weight &)
void add(const Value & x, Weight w)
{
throw Exception("Method add with weight is not implemented for QuantileBFloat16Histogram", ErrorCodes::NOT_IMPLEMENTED);
if (!isNaN(x))
data[to_bfloat16(x)] += w;
}
void merge(const QuantileBFloat16Histogram & rhs) { data.merge(rhs.data); }
void merge(const QuantileBFloat16Histogram & rhs)
{
for (const auto & pair : rhs.data)
data[pair.getKey()] += pair.getMapped();
}
void serialize(WriteBuffer & buf) const { data.write(buf); }
void deserialize(ReadBuffer & buf) { data.read(buf); }
Value get(Float64 level) { return data.template quantile<Value>(level); }
Value get(Float64 level) const { return getImpl<Value>(level); }
void getMany(const Float64 * levels, const size_t * indices, size_t size, Value * result)
void getMany(const Float64 * levels, const size_t * indices, size_t size, Value * result) const
{
data.quantilesMany(levels, indices, size, result);
getManyImpl(levels, indices, size, result);
}
Float64 getFloat(Float64 level) { return data.template quantile<Float64>(level); }
Float64 getFloat(Float64 level) const { return getImpl<Float64>(level); }
void getManyFloat(const Float64 * levels, const size_t * indices, size_t size, Float64 * result)
void getManyFloat(const Float64 * levels, const size_t * indices, size_t size, Float64 * result) const
{
data.quantilesMany(levels, indices, size, result);
getManyImpl(levels, indices, size, result);
}
private:
bfloat16 to_bfloat16(const Value & x) const { return ext::bit_cast<UInt32>(static_cast<Float32>(x)) >> 16; }
Float32 to_Float32(const bfloat16 & x) const { return ext::bit_cast<Float32>(x << 16); }
using Pair = PairNoInit<Float32, Weight>;
template <typename T>
T getImpl(Float64 level) const
{
size_t size = data.size();
if (0 == size)
return std::numeric_limits<T>::quiet_NaN();
std::unique_ptr<Pair[]> array_holder(new Pair[size]);
Pair * array = array_holder.get();
Float64 sum_weight = 0;
Pair * arr_it = array;
for (const auto & pair : data)
{
sum_weight += pair.getMapped();
*arr_it = {to_Float32(pair.getKey()), pair.getMapped()};
++arr_it;
}
std::sort(array, array + size, [](const Pair & a, const Pair & b) { return a.first < b.first; });
Float64 threshold = std::ceil(sum_weight * level);
Float64 accumulated = 0;
for (const Pair * p = array; p != (array + size); ++p)
{
accumulated += p->second;
if (accumulated >= threshold)
return p->first;
}
return array[size - 1].first;
}
template <typename T>
void getManyImpl(const Float64 * levels, const size_t * indices, size_t num_levels, T * result) const
{
size_t size = data.size();
if (0 == size)
{
for (size_t i = 0; i < num_levels; ++i)
result[i] = std::numeric_limits<T>::quiet_NaN();
return;
}
std::unique_ptr<Pair[]> array_holder(new Pair[size]);
Pair * array = array_holder.get();
Float64 sum_weight = 0;
Pair * arr_it = array;
for (const auto & pair : data)
{
sum_weight += pair.getMapped();
*arr_it = {to_Float32(pair.getKey()), pair.getMapped()};
++arr_it;
}
std::sort(array, array + size, [](const Pair & a, const Pair & b) { return a.first < b.first; });
size_t level_index = 0;
Float64 accumulated = 0;
Float64 threshold = std::ceil(sum_weight * levels[indices[level_index]]);
for (const Pair * p = array; p != (array + size); ++p)
{
accumulated += p->second;
while (accumulated >= threshold)
{
result[indices[level_index]] = p->first;
++level_index;
if (level_index == num_levels)
return;
threshold = std::ceil(sum_weight * levels[indices[level_index]]);
}
}
while (level_index < num_levels)
{
result[indices[level_index]] = array[size - 1].first;
++level_index;
}
}
};