ClickHouse/dbms/include/DB/AggregateFunctions/AggregateFunctionQuantileTiming.h

1010 lines
27 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#pragma once
#include <limits>
#include <DB/Common/MemoryTracker.h>
#include <DB/Common/HashTable/Hash.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/AggregateFunctions/IUnaryAggregateFunction.h>
#include <DB/AggregateFunctions/IBinaryAggregateFunction.h>
#include <DB/AggregateFunctions/QuantilesCommon.h>
#include <DB/Columns/ColumnArray.h>
#include <ext/range.hpp>
namespace DB
{
/** Вычисляет квантиль для времени в миллисекундах, меньшего 30 сек.
* Если значение больше 30 сек, то значение приравнивается к 30 сек.
*
* Если всего значений не больше 32, то вычисление точное.
*
* Иначе:
* Если время меньше 1024 мс., то вычисление точное.
* Иначе вычисление идёт с округлением до числа, кратного 16 мс.
*/
#define TINY_MAX_ELEMS 31
#define BIG_THRESHOLD 30000
namespace detail
{
/** Вспомогательная структура для оптимизации в случае маленького количества значений
* - плоский массив фиксированного размера "на стеке", в который кладутся все встреченные значения подряд.
* Размер - 64 байта. Должна быть POD-типом (используется в union).
*/
struct QuantileTimingTiny
{
mutable UInt16 elems[TINY_MAX_ELEMS]; /// mutable потому что сортировка массива не считается изменением состояния.
UInt16 count; /// Важно, чтобы count был в конце структуры, так как начало структуры будет впоследствии перезатёрто другими объектами.
/// Вы должны сами инициализировать его нулём.
/// Можно использовать только пока count < TINY_MAX_ELEMS.
void insert(UInt64 x)
{
if (unlikely(x > BIG_THRESHOLD))
x = BIG_THRESHOLD;
elems[count] = x;
++count;
}
/// Можно использовать только пока count + rhs.count <= TINY_MAX_ELEMS.
void merge(const QuantileTimingTiny & rhs)
{
for (size_t i = 0; i < rhs.count; ++i)
{
elems[count] = rhs.elems[i];
++count;
}
}
void serialize(WriteBuffer & buf) const
{
writeBinary(count, buf);
buf.write(reinterpret_cast<const char *>(elems), count * sizeof(elems[0]));
}
void deserialize(ReadBuffer & buf)
{
readBinary(count, buf);
buf.readStrict(reinterpret_cast<char *>(elems), count * sizeof(elems[0]));
}
/** Эту функцию обязательно нужно позвать перед get-функциями. */
void prepare() const
{
std::sort(elems, elems + count);
}
UInt16 get(double level) const
{
return level != 1
? elems[static_cast<size_t>(count * level)]
: elems[count - 1];
}
template <typename ResultType>
void getMany(const double * levels, size_t size, ResultType * result) const
{
const double * levels_end = levels + size;
while (levels != levels_end)
{
*result = get(*levels);
++levels;
++result;
}
}
/// То же самое, но в случае пустого состояния возвращается NaN.
float getFloat(double level) const
{
return count
? get(level)
: std::numeric_limits<float>::quiet_NaN();
}
void getManyFloat(const double * levels, size_t size, float * result) const
{
if (count)
getMany(levels, size, result);
else
for (size_t i = 0; i < size; ++i)
result[i] = std::numeric_limits<float>::quiet_NaN();
}
};
/** Вспомогательная структура для оптимизации в случае среднего количества значений
* - плоский массив, выделенный отдельно, в который кладутся все встреченные значения подряд.
*/
struct QuantileTimingMedium
{
/// sizeof - 24 байта.
using Array = PODArray<UInt16, 64>;
mutable Array elems; /// mutable потому что сортировка массива не считается изменением состояния.
QuantileTimingMedium() {}
QuantileTimingMedium(const UInt16 * begin, const UInt16 * end) : elems(begin, end) {}
void insert(UInt64 x)
{
if (unlikely(x > BIG_THRESHOLD))
x = BIG_THRESHOLD;
elems.emplace_back(x);
}
void merge(const QuantileTimingMedium & rhs)
{
elems.insert(rhs.elems.begin(), rhs.elems.end());
}
void serialize(WriteBuffer & buf) const
{
writeBinary(elems.size(), buf);
buf.write(reinterpret_cast<const char *>(&elems[0]), elems.size() * sizeof(elems[0]));
}
void deserialize(ReadBuffer & buf)
{
size_t size = 0;
readBinary(size, buf);
elems.resize(size);
buf.readStrict(reinterpret_cast<char *>(&elems[0]), size * sizeof(elems[0]));
}
UInt16 get(double level) const
{
UInt16 quantile = 0;
if (!elems.empty())
{
size_t n = level < 1
? level * elems.size()
: (elems.size() - 1);
/// Сортировка массива не будет считаться нарушением константности.
auto & array = const_cast<Array &>(elems);
std::nth_element(array.begin(), array.begin() + n, array.end());
quantile = array[n];
}
return quantile;
}
template <typename ResultType>
void getMany(const double * levels, const size_t * levels_permutation, size_t size, ResultType * result) const
{
size_t prev_n = 0;
auto & array = const_cast<Array &>(elems);
for (size_t i = 0; i < size; ++i)
{
auto level_index = levels_permutation[i];
auto level = levels[level_index];
size_t n = level < 1
? level * elems.size()
: (elems.size() - 1);
std::nth_element(array.begin() + prev_n, array.begin() + n, array.end());
result[level_index] = array[n];
prev_n = n;
}
}
/// То же самое, но в случае пустого состояния возвращается NaN.
float getFloat(double level) const
{
return !elems.empty()
? get(level)
: std::numeric_limits<float>::quiet_NaN();
}
void getManyFloat(const double * levels, const size_t * levels_permutation, size_t size, float * result) const
{
if (!elems.empty())
getMany(levels, levels_permutation, size, result);
else
for (size_t i = 0; i < size; ++i)
result[i] = std::numeric_limits<float>::quiet_NaN();
}
};
#define SMALL_THRESHOLD 1024
#define BIG_SIZE ((BIG_THRESHOLD - SMALL_THRESHOLD) / BIG_PRECISION)
#define BIG_PRECISION 16
#define SIZE_OF_LARGE_WITHOUT_COUNT ((SMALL_THRESHOLD + BIG_SIZE) * sizeof(UInt64))
/** Для большого количества значений. Размер около 20 КБ.
* TODO: Есть off-by-one ошибки - может возвращаться значение на 1 больше нужного.
*/
class QuantileTimingLarge
{
private:
/// Общее число значений.
UInt64 count;
/// Число значений для каждого значения меньше small_threshold.
UInt64 count_small[SMALL_THRESHOLD];
/// Число значений для каждого значения от small_threshold до big_threshold, округлённого до big_precision.
UInt64 count_big[BIG_SIZE];
/// Получить значение квантиля по индексу в массиве count_big.
static inline UInt16 indexInBigToValue(size_t i)
{
return (i * BIG_PRECISION) + SMALL_THRESHOLD
+ (intHash32<0>(i) % BIG_PRECISION - (BIG_PRECISION / 2)); /// Небольшая рандомизация, чтобы не было заметно, что все значения чётные.
}
public:
QuantileTimingLarge()
{
memset(this, 0, sizeof(*this));
}
QuantileTimingLarge(ReadBuffer & buf)
{
deserialize(buf, true);
}
void insert(UInt64 x)
{
insertWeighted(x, 1);
}
void insertWeighted(UInt64 x, size_t weight)
{
count += weight;
if (x < SMALL_THRESHOLD)
count_small[x] += weight;
else if (x < BIG_THRESHOLD)
count_big[(x - SMALL_THRESHOLD) / BIG_PRECISION] += weight;
}
void merge(const QuantileTimingLarge & rhs)
{
count += rhs.count;
for (size_t i = 0; i < SMALL_THRESHOLD; ++i)
count_small[i] += rhs.count_small[i];
for (size_t i = 0; i < BIG_SIZE; ++i)
count_big[i] += rhs.count_big[i];
}
void serialize(WriteBuffer & buf) const
{
writeBinary(count, buf);
if (count * 2 > SMALL_THRESHOLD + BIG_SIZE)
{
/// Простая сериализация для сильно заполненного случая.
buf.write(reinterpret_cast<const char *>(this) + sizeof(count), SIZE_OF_LARGE_WITHOUT_COUNT);
}
else
{
/// Более компактная сериализация для разреженного случая.
for (size_t i = 0; i < SMALL_THRESHOLD; ++i)
{
if (count_small[i])
{
writeBinary(UInt16(i), buf);
writeBinary(count_small[i], buf);
}
}
for (size_t i = 0; i < BIG_SIZE; ++i)
{
if (count_big[i])
{
writeBinary(UInt16(i + SMALL_THRESHOLD), buf);
writeBinary(count_big[i], buf);
}
}
/// Символизирует конец данных.
writeBinary(UInt16(BIG_THRESHOLD), buf);
}
}
void deserialize(ReadBuffer & buf, bool need_memset = false)
{
readBinary(count, buf);
if (count * 2 > SMALL_THRESHOLD + BIG_SIZE)
{
buf.readStrict(reinterpret_cast<char *>(this) + sizeof(count), SIZE_OF_LARGE_WITHOUT_COUNT);
}
else
{
/// Используется, если в конструкторе ещё не был сделан memset.
if (need_memset)
memset(reinterpret_cast<char *>(this) + sizeof(count), 0, SIZE_OF_LARGE_WITHOUT_COUNT);
while (true)
{
UInt16 index = 0;
readBinary(index, buf);
if (index == BIG_THRESHOLD)
break;
UInt64 count = 0;
readBinary(count, buf);
if (index < SMALL_THRESHOLD)
count_small[index] = count;
else
count_big[index - SMALL_THRESHOLD] = count;
}
}
}
void deserializeMerge(ReadBuffer & buf)
{
merge(QuantileTimingLarge(buf));
}
/// Получить значение квантиля уровня level. Уровень должен быть от 0 до 1.
UInt16 get(double level) const
{
UInt64 pos = count * level;
UInt64 accumulated = 0;
size_t i = 0;
while (i < SMALL_THRESHOLD && accumulated < pos)
{
accumulated += count_small[i];
++i;
}
if (i < SMALL_THRESHOLD)
return i;
i = 0;
while (i < BIG_SIZE && accumulated < pos)
{
accumulated += count_big[i];
++i;
}
if (i < BIG_SIZE)
return indexInBigToValue(i);
return BIG_THRESHOLD;
}
/// Получить значения size квантилей уровней levels. Записать size результатов начиная с адреса result.
/// indices - массив индексов levels такой, что соответствующие элементы будут идти в порядке по возрастанию.
template <typename ResultType>
void getMany(const double * levels, const size_t * indices, size_t size, ResultType * result) const
{
const auto indices_end = indices + size;
auto index = indices;
UInt64 pos = count * levels[*index];
UInt64 accumulated = 0;
size_t i = 0;
while (i < SMALL_THRESHOLD)
{
while (i < SMALL_THRESHOLD && accumulated < pos)
{
accumulated += count_small[i];
++i;
}
if (i < SMALL_THRESHOLD)
{
result[*index] = i;
++index;
if (index == indices_end)
return;
pos = count * levels[*index];
}
}
i = 0;
while (i < BIG_SIZE)
{
while (i < BIG_SIZE && accumulated < pos)
{
accumulated += count_big[i];
++i;
}
if (i < BIG_SIZE)
{
result[*index] = indexInBigToValue(i);
++index;
if (index == indices_end)
return;
pos = count * levels[*index];
}
}
while (index < indices_end)
{
result[*index] = BIG_THRESHOLD;
++index;
}
}
/// То же самое, но в случае пустого состояния возвращается NaN.
float getFloat(double level) const
{
return count
? get(level)
: std::numeric_limits<float>::quiet_NaN();
}
void getManyFloat(const double * levels, const size_t * levels_permutation, size_t size, float * result) const
{
if (count)
getMany(levels, levels_permutation, size, result);
else
for (size_t i = 0; i < size; ++i)
result[i] = std::numeric_limits<float>::quiet_NaN();
}
};
}
/** sizeof - 64 байта.
* Если их не хватает - выделяет дополнительно около 20 КБ памяти.
*/
class QuantileTiming : private boost::noncopyable
{
private:
union
{
detail::QuantileTimingTiny tiny;
detail::QuantileTimingMedium medium;
detail::QuantileTimingLarge * large;
};
enum class Kind : UInt8
{
Tiny = 1,
Medium = 2,
Large = 3
};
Kind which() const
{
if (tiny.count <= TINY_MAX_ELEMS)
return Kind::Tiny;
if (tiny.count == TINY_MAX_ELEMS + 1)
return Kind::Medium;
return Kind::Large;
}
void tinyToMedium()
{
detail::QuantileTimingTiny tiny_copy = tiny;
new (&medium) detail::QuantileTimingMedium(tiny_copy.elems, tiny_copy.elems + tiny_copy.count);
tiny.count = TINY_MAX_ELEMS + 1;
}
void mediumToLarge()
{
if (current_memory_tracker)
current_memory_tracker->alloc(sizeof(detail::QuantileTimingLarge));
/// На время копирования данных из medium, устанавливать значение large ещё нельзя (иначе оно перезатрёт часть данных).
detail::QuantileTimingLarge * tmp_large = new detail::QuantileTimingLarge;
for (const auto & elem : medium.elems)
tmp_large->insert(elem);
large = tmp_large;
tiny.count = TINY_MAX_ELEMS + 2;
}
void tinyToLarge()
{
if (current_memory_tracker)
current_memory_tracker->alloc(sizeof(detail::QuantileTimingLarge));
/// На время копирования данных из medium, устанавливать значение large ещё нельзя (иначе оно перезатрёт часть данных).
detail::QuantileTimingLarge * tmp_large = new detail::QuantileTimingLarge;
for (size_t i = 0; i < tiny.count; ++i)
tmp_large->insert(tiny.elems[i]);
large = tmp_large;
tiny.count = TINY_MAX_ELEMS + 2;
}
public:
QuantileTiming()
{
tiny.count = 0;
}
~QuantileTiming()
{
Kind kind = which();
if (kind == Kind::Medium)
{
medium.~QuantileTimingMedium();
}
else if (kind == Kind::Large)
{
delete large;
if (current_memory_tracker)
current_memory_tracker->free(sizeof(detail::QuantileTimingLarge));
}
}
void insert(UInt64 x)
{
if (tiny.count < TINY_MAX_ELEMS)
{
tiny.insert(x);
}
else
{
if (unlikely(tiny.count == TINY_MAX_ELEMS))
tinyToMedium();
if (which() == Kind::Medium)
{
if (unlikely(medium.elems.size() >= sizeof(detail::QuantileTimingLarge) / sizeof(medium.elems[0]) / 2))
{
mediumToLarge();
large->insert(x);
}
else
medium.insert(x);
}
else
large->insert(x);
}
}
void insertWeighted(UInt64 x, size_t weight)
{
/// NOTE: Первое условие - для того, чтобы избежать переполнения.
if (weight < TINY_MAX_ELEMS && tiny.count + weight <= TINY_MAX_ELEMS)
{
for (size_t i = 0; i < weight; ++i)
tiny.insert(x);
}
else
{
if (unlikely(tiny.count <= TINY_MAX_ELEMS))
tinyToLarge(); /// Для weighted варианта medium не используем - предположительно, нецелесообразно.
large->insertWeighted(x, weight);
}
}
/// TODO Слишком сложный код.
void merge(const QuantileTiming & rhs)
{
if (tiny.count + rhs.tiny.count <= TINY_MAX_ELEMS)
{
tiny.merge(rhs.tiny);
}
else
{
auto kind = which();
auto rhs_kind = rhs.which();
if (kind == Kind::Tiny && rhs_kind == Kind::Medium)
tinyToMedium();
else if (kind == Kind::Tiny && rhs_kind == Kind::Large)
tinyToLarge();
else if (kind == Kind::Medium && rhs_kind == Kind::Large)
mediumToLarge();
if (kind == Kind::Medium && rhs_kind == Kind::Medium)
{
medium.merge(rhs.medium);
}
else if (kind == Kind::Large && rhs_kind == Kind::Large)
{
large->merge(*rhs.large);
}
else if (kind == Kind::Medium && rhs_kind == Kind::Tiny)
{
medium.elems.insert(rhs.tiny.elems, rhs.tiny.elems + rhs.tiny.count);
}
else if (kind == Kind::Large && rhs_kind == Kind::Tiny)
{
for (size_t i = 0; i < rhs.tiny.count; ++i)
large->insert(rhs.tiny.elems[i]);
}
else if (kind == Kind::Large && rhs_kind == Kind::Medium)
{
for (const auto & elem : rhs.medium.elems)
large->insert(elem);
}
}
}
void serialize(WriteBuffer & buf) const
{
auto kind = which();
DB::writePODBinary(kind, buf);
if (kind == Kind::Tiny)
tiny.serialize(buf);
else if (kind == Kind::Medium)
medium.serialize(buf);
else
large->serialize(buf);
}
/// Вызывается для пустого объекта.
void deserialize(ReadBuffer & buf)
{
Kind kind;
DB::readPODBinary(kind, buf);
if (kind == Kind::Tiny)
{
tiny.deserialize(buf);
}
else if (kind == Kind::Medium)
{
tinyToMedium();
medium.deserialize(buf);
}
else if (kind == Kind::Large)
{
tinyToLarge();
large->deserialize(buf);
}
}
void deserializeMerge(ReadBuffer & buf)
{
QuantileTiming rhs;
rhs.deserialize(buf);
merge(rhs);
}
/// Получить значение квантиля уровня level. Уровень должен быть от 0 до 1.
UInt16 get(double level) const
{
Kind kind = which();
if (kind == Kind::Tiny)
{
tiny.prepare();
return tiny.get(level);
}
else if (kind == Kind::Medium)
{
return medium.get(level);
}
else
{
return large->get(level);
}
}
/// Получить значения size квантилей уровней levels. Записать size результатов начиная с адреса result.
template <typename ResultType>
void getMany(const double * levels, const size_t * levels_permutation, size_t size, ResultType * result) const
{
Kind kind = which();
if (kind == Kind::Tiny)
{
tiny.prepare();
tiny.getMany(levels, size, result);
}
else if (kind == Kind::Medium)
{
medium.getMany(levels, levels_permutation, size, result);
}
else if (kind == Kind::Large)
{
large->getMany(levels, levels_permutation, size, result);
}
}
/// То же самое, но в случае пустого состояния возвращается NaN.
float getFloat(double level) const
{
return tiny.count
? get(level)
: std::numeric_limits<float>::quiet_NaN();
}
void getManyFloat(const double * levels, const size_t * levels_permutation, size_t size, float * result) const
{
if (tiny.count)
getMany(levels, levels_permutation, size, result);
else
for (size_t i = 0; i < size; ++i)
result[i] = std::numeric_limits<float>::quiet_NaN();
}
};
#undef SMALL_THRESHOLD
#undef BIG_THRESHOLD
#undef BIG_SIZE
#undef BIG_PRECISION
#undef TINY_MAX_ELEMS
template <typename ArgumentFieldType>
class AggregateFunctionQuantileTiming final : public IUnaryAggregateFunction<QuantileTiming, AggregateFunctionQuantileTiming<ArgumentFieldType> >
{
private:
double level;
public:
AggregateFunctionQuantileTiming(double level_ = 0.5) : level(level_) {}
String getName() const override { return "quantileTiming"; }
DataTypePtr getReturnType() const override
{
return new DataTypeFloat32;
}
void setArgument(const DataTypePtr & argument)
{
}
void setParameters(const Array & params) override
{
if (params.size() != 1)
throw Exception("Aggregate function " + getName() + " requires exactly one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
level = apply_visitor(FieldVisitorConvertToNumber<Float64>(), params[0]);
}
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num) const
{
this->data(place).insert(static_cast<const ColumnVector<ArgumentFieldType> &>(column).getData()[row_num]);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
{
this->data(place).merge(this->data(rhs));
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
this->data(place).serialize(buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const override
{
this->data(place).deserializeMerge(buf);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
static_cast<ColumnFloat32 &>(to).getData().push_back(this->data(place).getFloat(level));
}
};
/** То же самое, но с двумя аргументами. Второй аргумент - "вес" (целое число) - сколько раз учитывать значение.
*/
template <typename ArgumentFieldType, typename WeightFieldType>
class AggregateFunctionQuantileTimingWeighted final
: public IBinaryAggregateFunction<QuantileTiming, AggregateFunctionQuantileTimingWeighted<ArgumentFieldType, WeightFieldType>>
{
private:
double level;
public:
AggregateFunctionQuantileTimingWeighted(double level_ = 0.5) : level(level_) {}
String getName() const override { return "quantileTimingWeighted"; }
DataTypePtr getReturnType() const override
{
return new DataTypeFloat32;
}
void setArgumentsImpl(const DataTypes & arguments)
{
}
void setParameters(const Array & params) override
{
if (params.size() != 1)
throw Exception("Aggregate function " + getName() + " requires exactly one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
level = apply_visitor(FieldVisitorConvertToNumber<Float64>(), params[0]);
}
void addImpl(AggregateDataPtr place, const IColumn & column_value, const IColumn & column_weight, size_t row_num) const
{
this->data(place).insertWeighted(
static_cast<const ColumnVector<ArgumentFieldType> &>(column_value).getData()[row_num],
static_cast<const ColumnVector<WeightFieldType> &>(column_weight).getData()[row_num]);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
{
this->data(place).merge(this->data(rhs));
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
this->data(place).serialize(buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const override
{
this->data(place).deserializeMerge(buf);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
static_cast<ColumnFloat32 &>(to).getData().push_back(this->data(place).getFloat(level));
}
};
/** То же самое, но позволяет вычислить сразу несколько квантилей.
* Для этого, принимает в качестве параметров несколько уровней. Пример: quantilesTiming(0.5, 0.8, 0.9, 0.95)(ConnectTiming).
* Возвращает массив результатов.
*/
template <typename ArgumentFieldType>
class AggregateFunctionQuantilesTiming final : public IUnaryAggregateFunction<QuantileTiming, AggregateFunctionQuantilesTiming<ArgumentFieldType> >
{
private:
QuantileLevels<double> levels;
public:
String getName() const override { return "quantilesTiming"; }
DataTypePtr getReturnType() const override
{
return new DataTypeArray(new DataTypeFloat32);
}
void setArgument(const DataTypePtr & argument)
{
}
void setParameters(const Array & params) override
{
levels.set(params);
}
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num) const
{
this->data(place).insert(static_cast<const ColumnVector<ArgumentFieldType> &>(column).getData()[row_num]);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
{
this->data(place).merge(this->data(rhs));
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
this->data(place).serialize(buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const override
{
this->data(place).deserializeMerge(buf);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
ColumnArray & arr_to = static_cast<ColumnArray &>(to);
ColumnArray::Offsets_t & offsets_to = arr_to.getOffsets();
size_t size = levels.size();
offsets_to.push_back((offsets_to.size() == 0 ? 0 : offsets_to.back()) + size);
typename ColumnFloat32::Container_t & data_to = static_cast<ColumnFloat32 &>(arr_to.getData()).getData();
size_t old_size = data_to.size();
data_to.resize(data_to.size() + size);
this->data(place).getManyFloat(&levels.levels[0], &levels.permutation[0], size, &data_to[old_size]);
}
};
template <typename ArgumentFieldType, typename WeightFieldType>
class AggregateFunctionQuantilesTimingWeighted final
: public IBinaryAggregateFunction<QuantileTiming, AggregateFunctionQuantilesTimingWeighted<ArgumentFieldType, WeightFieldType>>
{
private:
QuantileLevels<double> levels;
public:
String getName() const override { return "quantilesTimingWeighted"; }
DataTypePtr getReturnType() const override
{
return new DataTypeArray(new DataTypeFloat32);
}
void setArgumentsImpl(const DataTypes & arguments)
{
}
void setParameters(const Array & params) override
{
levels.set(params);
}
void addImpl(AggregateDataPtr place, const IColumn & column_value, const IColumn & column_weight, size_t row_num) const
{
this->data(place).insertWeighted(
static_cast<const ColumnVector<ArgumentFieldType> &>(column_value).getData()[row_num],
static_cast<const ColumnVector<WeightFieldType> &>(column_weight).getData()[row_num]);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
{
this->data(place).merge(this->data(rhs));
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
this->data(place).serialize(buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const override
{
this->data(place).deserializeMerge(buf);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
ColumnArray & arr_to = static_cast<ColumnArray &>(to);
ColumnArray::Offsets_t & offsets_to = arr_to.getOffsets();
size_t size = levels.size();
offsets_to.push_back((offsets_to.size() == 0 ? 0 : offsets_to.back()) + size);
typename ColumnFloat32::Container_t & data_to = static_cast<ColumnFloat32 &>(arr_to.getData()).getData();
size_t old_size = data_to.size();
data_to.resize(data_to.size() + size);
this->data(place).getManyFloat(&levels.levels[0], &levels.permutation[0], size, &data_to[old_size]);
}
};
}