From 57e8a8fdbba8dcee818e3ee6a963c0720c946080 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 21 Nov 2015 15:47:41 +0300 Subject: [PATCH] dbms: added t-digest algorithm [#METR-19024]. --- .../AggregateFunctionQuantileTDigest.h | 349 ++++++++++++++++++ dbms/include/DB/Common/RadixSort.h | 252 +++++++++++++ .../AggregateFunctionFactory.cpp | 2 + .../AggregateFunctionsQuantileTDigest.cpp | 66 ++++ dbms/src/Common/tests/radix_sort.cpp | 116 ++++++ 5 files changed, 785 insertions(+) create mode 100644 dbms/include/DB/AggregateFunctions/AggregateFunctionQuantileTDigest.h create mode 100644 dbms/include/DB/Common/RadixSort.h create mode 100644 dbms/src/AggregateFunctions/AggregateFunctionsQuantileTDigest.cpp create mode 100644 dbms/src/Common/tests/radix_sort.cpp diff --git a/dbms/include/DB/AggregateFunctions/AggregateFunctionQuantileTDigest.h b/dbms/include/DB/AggregateFunctions/AggregateFunctionQuantileTDigest.h new file mode 100644 index 00000000000..83d5977cf3b --- /dev/null +++ b/dbms/include/DB/AggregateFunctions/AggregateFunctionQuantileTDigest.h @@ -0,0 +1,349 @@ +#pragma once + +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include + + +/** Алгоритм реализовал Алексей Борзенков https://███████████.yandex-team.ru/snaury + * Ему принадлежит авторство кода и комментариев в данном namespace, + * за исключением слияния, сериализации и сортировки, а также выбора типов и других изменений. + * Мы благодарим Алексея Борзенкова за написание изначального кода. + */ +namespace tdigest +{ + +/** + * Центроид хранит вес точек вокруг их среднего значения + */ +template +struct Centroid +{ + Value mean; + Count count; + + Centroid() = default; + + explicit Centroid(Value mean, Count count = 1) + : mean(mean) + , count(count) + {} + + Centroid & operator+=(const Centroid & other) + { + count += other.count; + mean += other.count * (other.mean - mean) / count; + return *this; + } + + bool operator<(const Centroid & other) const + { + return mean < other.mean; + } +}; + + +/** :param epsilon: значение \delta из статьи - погрешность в районе + * квантиля 0.5 (по-умолчанию 0.01, т.е. 1%) + * :param max_unmerged: при накоплении кол-ва новых точек сверх этого + * значения запускается компрессия центроидов + * (по-умолчанию 2048, чем выше значение - тем + * больше требуется памяти, но повышается + * амортизация времени выполнения) + */ +template +struct Params +{ + Value epsilon = 0.01; + size_t max_unmerged = 2048; +}; + + +/** Реализация алгоритма t-digest (https://github.com/tdunning/t-digest). + * Этот вариант очень похож на MergingDigest на java, однако решение об + * объединении принимается на основе оригинального условия из статьи + * (через ограничение на размер, используя апроксимацию квантиля каждого + * центроида, а не расстояние на кривой положения их границ). MergingDigest + * на java даёт значительно меньше центроидов, чем данный вариант, что + * негативно влияет на точность при том же факторе компрессии, но даёт + * гарантии размера. Сам автор на предложение об этом варианте сказал, что + * размер дайжеста растёт как O(log(n)), в то время как вариант на java + * не зависит от предполагаемого кол-ва точек. Кроме того вариант на java + * использует asin, чем немного замедляет алгоритм. + */ +template +class MergingDigest +{ + using Params = tdigest::Params; + using Centroid = tdigest::Centroid; + + /// Сразу будет выделена память на несколько элементов так, чтобы состояние занимало 64 байта. + static constexpr size_t bytes_in_arena = 64 - sizeof(DB::PODArray) - sizeof(TotalCount) * 2; + + using Summary = DB::PODArray, bytes_in_arena>>; + + Summary summary; + TotalCount count = 0; + uint32_t unmerged = 0; + + /** Линейная интерполяция в точке x на прямой (x1, y1)..(x2, y2) + */ + static Value interpolate(Value x, Value x1, Value y1, Value x2, Value y2) + { + Value delta = x2 - x1; + Value w1 = (x2 - x) / delta; + Value w2 = (x - x1) / delta; + return w1 * y1 + w2 * y2; + } + + struct RadixSortTraits + { + using Element = Centroid; + using Key = Value; + using CountType = uint32_t; + using KeyBits = uint32_t; + + static constexpr size_t PART_SIZE_BITS = 11; + + using Transform = RadixSortFloatTransform; + using Allocator = RadixSortMallocAllocator; + + /// Функция получения ключа из элемента массива. + static Key & extractKey(Element & elem) { return elem.mean; } + }; + +public: + /** Добавляет к дайджесту изменение x с весом cnt (по-умолчанию 1) + */ + void add(const Params & params, Value x, CentroidCount cnt = 1) + { + add(params, Centroid(x, cnt)); + } + + /** Добавляет к дайджесту центроид c + */ + void add(const Params & params, const Centroid & c) + { + summary.push_back(c); + count += c.count; + ++unmerged; + if (unmerged >= params.max_unmerged) + compress(params); + } + + /** Выполняет компрессию накопленных центроидов + * При объединении сохраняется инвариант на максимальный размер каждого + * центроида, не превышающий 4 q (1 - q) \delta N. + */ + void compress(const Params & params) + { + if (unmerged > 0) + { + if (summary.size() > 3) + { + RadixSort::execute(&summary[0], summary.size()); + + /// Пара подряд идущих столбиков гистограммы. + auto l = summary.begin(); + auto r = std::next(l); + + TotalCount sum = 1; + while (r != summary.end()) + { + // we use quantile which gives us the smallest error + + /// Отношение части гистограммы до l, включая половинку l ко всей гистограмме. То есть, какого уровня квантиль в позиции l. + Value ql = (sum + (l->count - 1) * 0.5) / count; + Value err = ql * (1 - ql); + + /// Отношение части гистограммы до l, включая l и половинку r ко всей гистограмме. То есть, какого уровня квантиль в позиции r. + Value qr = (sum + l->count + (r->count - 1) * 0.5) / count; + Value err2 = qr * (1 - qr); + + if (err > err2) + err = err2; + + Value k = 4 * count * err * params.epsilon; + + /** Отношение веса склеенной пары столбиков ко всем значениям не больше, + * чем epsilon умножить на некий квадратичный коэффициент, который в медиане равен 1 (4 * 1/2 * 1/2), + * а по краям убывает и примерно равен расстоянию до края * 4. + */ + + if (l->count + r->count <= k) + { + // it is possible to merge left and right + /// Левый столбик "съедает" правый. + *l += *r; + } + else + { + // not enough capacity, check the next pair + sum += l->count; + ++l; + + /// Пропускаем все "съеденные" ранее значения. + if (l != r) + *l = *r; + } + ++r; + } + + /// По окончании цикла, все значения правее l были "съедены". + summary.resize(l - summary.begin() + 1); + } + + unmerged = 0; + } + } + + /** Вычисляет квантиль q [0, 1] на основе дайджеста + * Для пустого дайджеста возвращает NaN. + */ + Value quantile(const Params & params, Value q) + { + if (summary.empty()) + return NAN; + + compress(params); + + if (summary.size() == 1) + return summary[0].mean; + + Value index = q * count; + TotalCount sum = 1; + Value a_mean = summary[0].mean; + Value a_index = 0.0; + Value b_mean = summary[0].mean; + Value b_index = sum + (summary[0].count - 1) * 0.5; + + for (size_t i = 1; i < summary.size(); ++i) + { + if (index <= b_index) + break; + + sum += summary[i-1].count; + a_mean = b_mean; + a_index = b_index; + b_mean = summary[i].mean; + b_index = sum + (summary[i].count - 1) * 0.5; + } + + return interpolate(index, a_index, a_mean, b_index, b_mean); + } + + /** Объединить с другим состоянием. + */ + void merge(const Params & params, const MergingDigest & other) + { + for (const auto & c : other.summary) + add(params, c); + } + + /** Записать в поток. + */ + void write(const Params & params, DB::WriteBuffer & buf) + { + compress(params); + DB::writeVarUInt(summary.size(), buf); + buf.write(reinterpret_cast(&summary[0]), summary.size() * sizeof(summary[0])); + } + + /** Прочитать из потока и объединить с текущим состоянием. + */ + void readAndMerge(const Params & params, DB::ReadBuffer & buf) + { + size_t size = 0; + DB::readVarUInt(size, buf); + + if (size > params.max_unmerged) + throw DB::Exception("Too large t-digest summary size", DB::ErrorCodes::TOO_LARGE_ARRAY_SIZE); + + for (size_t i = 0; i < size; ++i) + { + Centroid c; + DB::readPODBinary(c, buf); + add(params, c); + } + } +}; + +} + + +namespace DB +{ + +struct AggregateFunctionQuantileTDigestData +{ + tdigest::MergingDigest digest; +}; + + +template +class AggregateFunctionQuantileTDigest final + : public IUnaryAggregateFunction> +{ +private: + double level; + tdigest::Params params; + +public: + AggregateFunctionQuantileTDigest(double level_ = 0.5) : level(level_) {} + + String getName() const override { return "quantileTDigest"; } + + DataTypePtr getReturnType() const override + { + return new DataTypeFloat64; + } + + void setArgument(const DataTypePtr & argument) + { + } + + void setParameters(const Array & params) override + { + if (params.size() != 1) + throw Exception("Aggregate function " + getName() + " requires exactly one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + level = apply_visitor(FieldVisitorConvertToNumber(), params[0]); + } + + void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num) const + { + this->data(place).digest.add(params, static_cast &>(column).getData()[row_num]); + } + + void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override + { + this->data(place).digest.merge(params, this->data(rhs).digest); + } + + void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override + { + this->data(const_cast(place)).digest.write(params, buf); + } + + void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const override + { + this->data(place).digest.readAndMerge(params, buf); + } + + void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + { + static_cast(to).getData().push_back( + this->data(const_cast(place)).digest.quantile(params, level)); + } +}; + +} diff --git a/dbms/include/DB/Common/RadixSort.h b/dbms/include/DB/Common/RadixSort.h new file mode 100644 index 00000000000..bf4cc748a8f --- /dev/null +++ b/dbms/include/DB/Common/RadixSort.h @@ -0,0 +1,252 @@ +#pragma once + +#include +#include +#include +#include + +#include +#include + + +/** Поразрядная сортировка, обладает следующей функциональностью: + * Может сортировать unsigned, signed числа, а также float-ы. + * Может сортировать массив элементов фиксированной длины, которые содержат что-то ещё кроме ключа. + * Настраиваемый размер разряда. + */ + + +/** Используется в качестве параметра шаблона. См. ниже. + */ +struct RadixSortMallocAllocator +{ + void * allocate(size_t size) + { + return malloc(size); + } + + void deallocate(void * ptr, size_t size) + { + return free(ptr); + } +}; + + +/** Преобразование, которое переводит битовое представление ключа в такое целое беззнаковое число, + * что отношение порядка над ключами будет соответствовать отношению порядка над полученными беззнаковыми числами. + * Для float-ов это преобразование делает следующее: + * если выставлен знаковый бит, то переворачивает все остальные биты. + */ +template +struct RadixSortFloatTransform +{ + /// Стоит ли записывать результат в память, или лучше делать его каждый раз заново? + static constexpr bool transform_is_simple = false; + + static KeyBits forward(KeyBits x) + { + return x ^ (-((x >> (sizeof(KeyBits) * 8 - 1) | (KeyBits(1) << (sizeof(KeyBits) * 8 - 1))))); + } + + static KeyBits backward(KeyBits x) + { + return x ^ (((x >> (sizeof(KeyBits) * 8 - 1)) - 1) | (KeyBits(1) << (sizeof(KeyBits) * 8 - 1))); + } +}; + + +template +struct RadixSortFloatTraits +{ + using Element = Float; /// Тип элемента. Это может быть структура с ключём и ещё каким-то payload-ом. Либо просто ключ. + using Key = Float; /// Ключ, по которому нужно сортировать. + using CountType = uint32_t; /// Тип для подсчёта гистограмм. В случае заведомо маленького количества элементов, может быть меньше чем size_t. + + /// Тип, в который переводится ключ, чтобы делать битовые операции. Это UInt такого же размера, как ключ. + using KeyBits = typename std::conditional::type; + + static constexpr size_t PART_SIZE_BITS = 8; /// Какими кусочками ключа в количестве бит делать один проход - перестановку массива. + + /// Преобразования ключа в KeyBitsType такое, что отношение порядка над ключём соответствует отношению порядка над KeyBitsType. + using Transform = RadixSortFloatTransform; + + /// Объект с функциями allocate и deallocate. + /// Может быть использован, например, чтобы выделить память для временного массива на стеке. + /// Для этого сам аллокатор создаётся на стеке. + using Allocator = RadixSortMallocAllocator; + + /// Функция получения ключа из элемента массива. + static Key & extractKey(Element & elem) { return elem; } +}; + + +template +struct RadixSortIdentityTransform +{ + static constexpr bool transform_is_simple = true; + + static KeyBits forward(KeyBits x) { return x; } + static KeyBits backward(KeyBits x) { return x; } +}; + + +template +struct RadixSortSignedTransform +{ + static constexpr bool transform_is_simple = true; + + static KeyBits forward(KeyBits x) { return x ^ (KeyBits(1) << (sizeof(KeyBits) * 8 - 1)); } + static KeyBits backward(KeyBits x) { return x ^ (KeyBits(1) << (sizeof(KeyBits) * 8 - 1)); } +}; + + +template +struct RadixSortUIntTraits +{ + using Element = UInt; + using Key = UInt; + using CountType = uint32_t; + using KeyBits = UInt; + + static constexpr size_t PART_SIZE_BITS = 8; + + using Transform = RadixSortIdentityTransform; + using Allocator = RadixSortMallocAllocator; + + /// Функция получения ключа из элемента массива. + static Key & extractKey(Element & elem) { return elem; } +}; + +template +struct RadixSortIntTraits +{ + using Element = Int; + using Key = Int; + using CountType = uint32_t; + using KeyBits = typename std::make_unsigned::type; + + static constexpr size_t PART_SIZE_BITS = 8; + + using Transform = RadixSortSignedTransform; + using Allocator = RadixSortMallocAllocator; + + /// Функция получения ключа из элемента массива. + static Key & extractKey(Element & elem) { return elem; } +}; + + +template +struct RadixSort +{ +private: + using Element = typename Traits::Element; + using Key = typename Traits::Key; + using CountType = typename Traits::CountType; + using KeyBits = typename Traits::KeyBits; + + static constexpr size_t HISTOGRAM_SIZE = 1 << Traits::PART_SIZE_BITS; + static constexpr size_t PART_BITMASK = HISTOGRAM_SIZE - 1; + static constexpr size_t KEY_BITS = sizeof(Key) * 8; + static constexpr size_t NUM_PASSES = (KEY_BITS + (Traits::PART_SIZE_BITS - 1)) / Traits::PART_SIZE_BITS; + + static ALWAYS_INLINE KeyBits getPart(size_t N, KeyBits x) + { + if (Traits::Transform::transform_is_simple) + x = Traits::Transform::forward(x); + + return (x >> (N * Traits::PART_SIZE_BITS)) & PART_BITMASK; + } + + static KeyBits keyToBits(Key x) { return ext::bit_cast(x); } + static Key bitsToKey(KeyBits x) { return ext::bit_cast(x); } + +public: + static void execute(Element * arr, size_t size) + { + /// Если массив имеет размер меньше 256, то лучше использовать другой алгоритм. + + /// Здесь есть циклы по NUM_PASSES. Очень важно, что они разворачиваются в compile-time. + + /// Для каждого из NUM_PASSES кусков бит ключа, считаем, сколько раз каждое значение этого куска встретилось. + CountType histograms[HISTOGRAM_SIZE * NUM_PASSES] = {0}; + + typename Traits::Allocator allocator; + + /// Будем делать несколько проходов по массиву. На каждом проходе, данные перекладываются в другой массив. Выделим этот временный массив. + Element * swap_buffer = reinterpret_cast(allocator.allocate(size * sizeof(Element))); + + /// Трансформируем массив и вычисляем гистограмму. + for (size_t i = 0; i < size; ++i) + { + if (!Traits::Transform::transform_is_simple) + Traits::extractKey(arr[i]) = bitsToKey(Traits::Transform::forward(keyToBits(Traits::extractKey(arr[i])))); + + for (size_t j = 0; j < NUM_PASSES; ++j) + ++histograms[j * HISTOGRAM_SIZE + getPart(j, keyToBits(Traits::extractKey(arr[i])))]; + } + + { + /// Заменяем гистограммы на суммы с накоплением: значение в позиции i равно сумме в предыдущих позициях минус один. + size_t sums[NUM_PASSES] = {0}; + + for (size_t i = 0; i < HISTOGRAM_SIZE; ++i) + { + for (size_t j = 0; j < NUM_PASSES; ++j) + { + size_t tmp = histograms[j * HISTOGRAM_SIZE + i] + sums[j]; + histograms[j * HISTOGRAM_SIZE + i] = sums[j] - 1; + sums[j] = tmp; + } + } + } + + /// Перекладываем элементы в порядке начиная от младшего куска бит, и далее делаем несколько проходов по количеству кусков. + for (size_t j = 0; j < NUM_PASSES; ++j) + { + Element * writer = j % 2 ? arr : swap_buffer; + Element * reader = j % 2 ? swap_buffer : arr; + + for (size_t i = 0; i < size; ++i) + { + size_t pos = getPart(j, keyToBits(Traits::extractKey(reader[i]))); + + /// Размещаем элемент на следующей свободной позиции. + auto & dest = writer[++histograms[j * HISTOGRAM_SIZE + pos]]; + dest = reader[i]; + + /// На последнем перекладывании, делаем обратную трансформацию. + if (!Traits::Transform::transform_is_simple && j == NUM_PASSES - 1) + Traits::extractKey(dest) = bitsToKey(Traits::Transform::backward(keyToBits(Traits::extractKey(reader[i])))); + } + } + + /// Если число проходов нечётное, то результирующий массив находится во временном буфере. Скопируем его на место исходного массива. + if (NUM_PASSES % 2) + memcpy(arr, swap_buffer, size * sizeof(Element)); + + allocator.deallocate(swap_buffer, size * sizeof(Element)); + } +}; + + +template +typename std::enable_if::value && std::is_integral::value, void>::type +radixSort(T * arr, size_t size) +{ + return RadixSort>::execute(arr, size); +} + +template +typename std::enable_if::value && std::is_integral::value, void>::type +radixSort(T * arr, size_t size) +{ + return RadixSort>::execute(arr, size); +} + +template +typename std::enable_if::value, void>::type +radixSort(T * arr, size_t size) +{ + return RadixSort>::execute(arr, size); +} + diff --git a/dbms/src/AggregateFunctions/AggregateFunctionFactory.cpp b/dbms/src/AggregateFunctions/AggregateFunctionFactory.cpp index aeae933b040..8abe34a780e 100644 --- a/dbms/src/AggregateFunctions/AggregateFunctionFactory.cpp +++ b/dbms/src/AggregateFunctions/AggregateFunctionFactory.cpp @@ -64,6 +64,7 @@ void registerAggregateFunctionsQuantileExact(AggregateFunctionFactory & factory) void registerAggregateFunctionsQuantileExactWeighted(AggregateFunctionFactory & factory); void registerAggregateFunctionsQuantileDeterministic(AggregateFunctionFactory & factory); void registerAggregateFunctionsQuantileTiming(AggregateFunctionFactory & factory); +void registerAggregateFunctionsQuantileTDigest(AggregateFunctionFactory & factory); void registerAggregateFunctionsSequenceMatch(AggregateFunctionFactory & factory); void registerAggregateFunctionsMinMaxAny(AggregateFunctionFactory & factory); void registerAggregateFunctionsStatistics(AggregateFunctionFactory & factory); @@ -88,6 +89,7 @@ AggregateFunctionFactory::AggregateFunctionFactory() registerAggregateFunctionsQuantileExactWeighted(*this); registerAggregateFunctionsQuantileDeterministic(*this); registerAggregateFunctionsQuantileTiming(*this); + registerAggregateFunctionsQuantileTDigest(*this); registerAggregateFunctionsSequenceMatch(*this); registerAggregateFunctionsMinMaxAny(*this); registerAggregateFunctionsStatistics(*this); diff --git a/dbms/src/AggregateFunctions/AggregateFunctionsQuantileTDigest.cpp b/dbms/src/AggregateFunctions/AggregateFunctionsQuantileTDigest.cpp new file mode 100644 index 00000000000..d337a15ab7f --- /dev/null +++ b/dbms/src/AggregateFunctions/AggregateFunctionsQuantileTDigest.cpp @@ -0,0 +1,66 @@ +#include +#include +#include + +namespace DB +{ + +namespace +{ + +AggregateFunctionPtr createAggregateFunctionQuantileTDigest(const std::string & name, const DataTypes & argument_types) +{ + if (argument_types.size() != 1) + throw Exception("Incorrect number of arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + const IDataType & argument_type = *argument_types[0]; + + if (typeid_cast(&argument_type)) return new AggregateFunctionQuantileTDigest; + else if (typeid_cast(&argument_type)) return new AggregateFunctionQuantileTDigest; + else if (typeid_cast(&argument_type)) return new AggregateFunctionQuantileTDigest; + else if (typeid_cast(&argument_type)) return new AggregateFunctionQuantileTDigest; + else if (typeid_cast(&argument_type)) return new AggregateFunctionQuantileTDigest; + else if (typeid_cast(&argument_type)) return new AggregateFunctionQuantileTDigest; + else if (typeid_cast(&argument_type)) return new AggregateFunctionQuantileTDigest; + else if (typeid_cast(&argument_type)) return new AggregateFunctionQuantileTDigest; + else if (typeid_cast(&argument_type)) return new AggregateFunctionQuantileTDigest; + else if (typeid_cast(&argument_type)) return new AggregateFunctionQuantileTDigest; +/* else if (typeid_cast(&argument_type)) return new AggregateFunctionQuantile; + else if (typeid_cast(&argument_type)) return new AggregateFunctionQuantile;*/ + else + throw Exception("Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); +} + +/* +AggregateFunctionPtr createAggregateFunctionQuantilesTDigest(const std::string & name, const DataTypes & argument_types) +{ + if (argument_types.size() != 1) + throw Exception("Incorrect number of arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + const IDataType & argument_type = *argument_types[0]; + + if (typeid_cast(&argument_type)) return new AggregateFunctionQuantilesTDigest; + else if (typeid_cast(&argument_type)) return new AggregateFunctionQuantilesTDigest; + else if (typeid_cast(&argument_type)) return new AggregateFunctionQuantilesTDigest; + else if (typeid_cast(&argument_type)) return new AggregateFunctionQuantilesTDigest; + else if (typeid_cast(&argument_type)) return new AggregateFunctionQuantilesTDigest; + else if (typeid_cast(&argument_type)) return new AggregateFunctionQuantilesTDigest; + else if (typeid_cast(&argument_type)) return new AggregateFunctionQuantilesTDigest; + else if (typeid_cast(&argument_type)) return new AggregateFunctionQuantilesTDigest; + else if (typeid_cast(&argument_type)) return new AggregateFunctionQuantilesTDigest; + else if (typeid_cast(&argument_type)) return new AggregateFunctionQuantilesTDigest; + else if (typeid_cast(&argument_type)) return new AggregateFunctionQuantilesTDigest; + else if (typeid_cast(&argument_type)) return new AggregateFunctionQuantilesTDigest; + else + throw Exception("Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); +}*/ + +} + +void registerAggregateFunctionsQuantileTDigest(AggregateFunctionFactory & factory) +{ + factory.registerFunction({"quantileTDigest", "medianTDigest"}, createAggregateFunctionQuantileTDigest); +// factory.registerFunction({"quantilesTDigest"}, createAggregateFunctionQuantilesTDigest); +} + +} diff --git a/dbms/src/Common/tests/radix_sort.cpp b/dbms/src/Common/tests/radix_sort.cpp new file mode 100644 index 00000000000..48321eae3d2 --- /dev/null +++ b/dbms/src/Common/tests/radix_sort.cpp @@ -0,0 +1,116 @@ +#include +#include +#include +#include +#include +#include + +using Key = double; + +void NO_INLINE sort1(Key * data, size_t size) +{ + std::sort(data, data + size); +} + +void NO_INLINE sort2(Key * data, size_t size) +{ + radixSort(data, size); +} + +void NO_INLINE sort3(Key * data, size_t size) +{ + std::sort(data, data + size, [](Key a, Key b) + { + return RadixSortFloatTransform::forward(ext::bit_cast(a)) + < RadixSortFloatTransform::forward(ext::bit_cast(b)); + }); +} + + +int main(int argc, char ** argv) +{ + size_t n = DB::parse(argv[1]); + size_t method = DB::parse(argv[2]); + + std::vector data(n); + +// srand(time(0)); + + { + Stopwatch watch; + + for (auto & elem : data) + elem = rand(); + + watch.stop(); + double elapsed = watch.elapsedSeconds(); + std::cerr + << "Filled in " << elapsed + << " (" << n / elapsed << " elem/sec., " + << n * sizeof(Key) / elapsed / 1048576 << " MB/sec.)" + << std::endl; + } + + if (n <= 100) + { + std::cerr << std::endl; + for (const auto & elem : data) + std::cerr << elem << ' '; + std::cerr << std::endl; + } + + + { + Stopwatch watch; + + if (method == 1) sort1(&data[0], n); + if (method == 2) sort2(&data[0], n); + if (method == 3) sort3(&data[0], n); + + watch.stop(); + double elapsed = watch.elapsedSeconds(); + std::cerr + << "Sorted in " << elapsed + << " (" << n / elapsed << " elem/sec., " + << n * sizeof(Key) / elapsed / 1048576 << " MB/sec.)" + << std::endl; + } + + { + Stopwatch watch; + + size_t i = 1; + while (i < n) + { + if (!(data[i - 1] <= data[i])) + break; + ++i; + } + + watch.stop(); + double elapsed = watch.elapsedSeconds(); + std::cerr + << "Checked in " << elapsed + << " (" << n / elapsed << " elem/sec., " + << n * sizeof(Key) / elapsed / 1048576 << " MB/sec.)" + << std::endl + << "Result: " << (i == n ? "Ok." : "Fail!") << std::endl; + } + + if (n <= 1000) + { + std::cerr << std::endl; + + std::cerr << data[0] << ' '; + for (size_t i = 1; i < n; ++i) + { + if (!(data[i - 1] <= data[i])) + std::cerr << "*** "; + std::cerr << data[i] << ' '; + } + + std::cerr << std::endl; + } + + return 0; +}