From 03b1849baca18eed27a6b72f5ec83fddc7d59d50 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 27 Feb 2015 20:38:21 +0300 Subject: [PATCH] dbms: added aggregate functions quantileTimingWeighted, quantilesTimingWeighted, medianTimingWeighted [#METR-15181]. --- .../AggregateFunctionQuantileTiming.h | 159 +++++++++++++++++- .../AggregateFunctionFactory.cpp | 74 +++++++- 2 files changed, 226 insertions(+), 7 deletions(-) diff --git a/dbms/include/DB/AggregateFunctions/AggregateFunctionQuantileTiming.h b/dbms/include/DB/AggregateFunctions/AggregateFunctionQuantileTiming.h index af5db39b9b2..fcc47a9b8dc 100644 --- a/dbms/include/DB/AggregateFunctions/AggregateFunctionQuantileTiming.h +++ b/dbms/include/DB/AggregateFunctions/AggregateFunctionQuantileTiming.h @@ -160,12 +160,17 @@ namespace detail void insert(UInt64 x) { - ++count; + insertWeighted(x, 1); + } + + void insertWeighted(UInt64 x, size_t weight) + { + count += weight; if (x < SMALL_THRESHOLD) - ++count_small[x]; + count_small[x] += weight; else if (x < BIG_THRESHOLD) - ++count_big[(x - SMALL_THRESHOLD) / BIG_PRECISION]; + count_big[(x - SMALL_THRESHOLD) / BIG_PRECISION] += weight; } void merge(const QuantileTimingLarge & rhs) @@ -371,6 +376,23 @@ public: } } + void insertWeighted(UInt64 x, size_t weight) + { + /// NOTE: Первое условие - для того, чтобы избежать переполнения. + if (weight < TINY_MAX_ELEMS && tiny.count + weight <= TINY_MAX_ELEMS) + { + for (size_t i = 0; i < weight; ++i) + tiny.insert(x); + } + else + { + if (unlikely(tiny.count <= TINY_MAX_ELEMS)) + toLarge(); + + large->insertWeighted(x, weight); + } + } + void merge(const QuantileTiming & rhs) { if (tiny.count + rhs.tiny.count <= TINY_MAX_ELEMS) @@ -567,6 +589,66 @@ public: }; +/** То же самое, но с двумя аргументами. Второй аргумент - "вес" (целое число) - сколько раз учитывать значение. + */ +template +class AggregateFunctionQuantileTimingWeighted final : public IAggregateFunctionHelper +{ +private: + double level; + +public: + AggregateFunctionQuantileTimingWeighted(double level_ = 0.5) : level(level_) {} + + String getName() const { return "quantileTimingWeighted"; } + + DataTypePtr getReturnType() const + { + return new DataTypeFloat32; + } + + void setArguments(const DataTypes & arguments) + { + } + + void setParameters(const Array & params) + { + if (params.size() != 1) + throw Exception("Aggregate function " + getName() + " requires exactly one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + level = apply_visitor(FieldVisitorConvertToNumber(), params[0]); + } + + + void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const + { + this->data(place).insertWeighted( + static_cast &>(*columns[0]).getData()[row_num], + static_cast &>(*columns[1]).getData()[row_num]); + } + + void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const + { + this->data(place).merge(this->data(rhs)); + } + + void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const + { + this->data(place).serialize(buf); + } + + void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const + { + this->data(place).deserializeMerge(buf); + } + + void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const + { + static_cast(to).getData().push_back(this->data(place).getFloat(level)); + } +}; + + /** То же самое, но позволяет вычислить сразу несколько квантилей. * Для этого, принимает в качестве параметров несколько уровней. Пример: quantilesTiming(0.5, 0.8, 0.9, 0.95)(ConnectTiming). * Возвращает массив результатов. @@ -639,4 +721,75 @@ public: } }; + +template +class AggregateFunctionQuantilesTimingWeighted final : public IAggregateFunctionHelper +{ +private: + typedef std::vector Levels; + Levels levels; + +public: + String getName() const { return "quantilesTimingWeighted"; } + + DataTypePtr getReturnType() const + { + return new DataTypeArray(new DataTypeFloat32); + } + + void setArguments(const DataTypes & arguments) + { + } + + void setParameters(const Array & params) + { + if (params.empty()) + throw Exception("Aggregate function " + getName() + " requires at least one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + size_t size = params.size(); + levels.resize(size); + + for (size_t i = 0; i < size; ++i) + levels[i] = apply_visitor(FieldVisitorConvertToNumber(), params[i]); + } + + void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const + { + this->data(place).insertWeighted( + static_cast &>(*columns[0]).getData()[row_num], + static_cast &>(*columns[1]).getData()[row_num]); + } + + void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const + { + this->data(place).merge(this->data(rhs)); + } + + void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const + { + this->data(place).serialize(buf); + } + + void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const + { + this->data(place).deserializeMerge(buf); + } + + void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const + { + ColumnArray & arr_to = static_cast(to); + ColumnArray::Offsets_t & offsets_to = arr_to.getOffsets(); + + size_t size = levels.size(); + offsets_to.push_back((offsets_to.size() == 0 ? 0 : offsets_to.back()) + size); + + typename ColumnFloat32::Container_t & data_to = static_cast(arr_to.getData()).getData(); + size_t old_size = data_to.size(); + data_to.resize(data_to.size() + size); + + this->data(place).getManyFloat(&levels[0], size, &data_to[old_size]); + } +}; + + } diff --git a/dbms/src/AggregateFunctions/AggregateFunctionFactory.cpp b/dbms/src/AggregateFunctions/AggregateFunctionFactory.cpp index f39889e2187..1ce1c1f2083 100644 --- a/dbms/src/AggregateFunctions/AggregateFunctionFactory.cpp +++ b/dbms/src/AggregateFunctions/AggregateFunctionFactory.cpp @@ -34,7 +34,7 @@ AggregateFunctionFactory::AggregateFunctionFactory() /** Создать агрегатную функцию с числовым типом в параметре шаблона, в зависимости от типа аргумента. */ -template