dbms: added aggregate functions quantileTimingWeighted, quantilesTimingWeighted, medianTimingWeighted [#METR-15181].

This commit is contained in:
Alexey Milovidov 2015-02-27 20:38:21 +03:00
parent c20d5338f0
commit 03b1849bac
2 changed files with 226 additions and 7 deletions

View File

@ -160,12 +160,17 @@ namespace detail
void insert(UInt64 x)
{
++count;
insertWeighted(x, 1);
}
void insertWeighted(UInt64 x, size_t weight)
{
count += weight;
if (x < SMALL_THRESHOLD)
++count_small[x];
count_small[x] += weight;
else if (x < BIG_THRESHOLD)
++count_big[(x - SMALL_THRESHOLD) / BIG_PRECISION];
count_big[(x - SMALL_THRESHOLD) / BIG_PRECISION] += weight;
}
void merge(const QuantileTimingLarge & rhs)
@ -371,6 +376,23 @@ public:
}
}
void insertWeighted(UInt64 x, size_t weight)
{
/// NOTE: Первое условие - для того, чтобы избежать переполнения.
if (weight < TINY_MAX_ELEMS && tiny.count + weight <= TINY_MAX_ELEMS)
{
for (size_t i = 0; i < weight; ++i)
tiny.insert(x);
}
else
{
if (unlikely(tiny.count <= TINY_MAX_ELEMS))
toLarge();
large->insertWeighted(x, weight);
}
}
void merge(const QuantileTiming & rhs)
{
if (tiny.count + rhs.tiny.count <= TINY_MAX_ELEMS)
@ -567,6 +589,66 @@ public:
};
/** То же самое, но с двумя аргументами. Второй аргумент - "вес" (целое число) - сколько раз учитывать значение.
*/
template <typename ArgumentFieldType, typename WeightFieldType>
class AggregateFunctionQuantileTimingWeighted final : public IAggregateFunctionHelper<QuantileTiming>
{
private:
double level;
public:
AggregateFunctionQuantileTimingWeighted(double level_ = 0.5) : level(level_) {}
String getName() const { return "quantileTimingWeighted"; }
DataTypePtr getReturnType() const
{
return new DataTypeFloat32;
}
void setArguments(const DataTypes & arguments)
{
}
void setParameters(const Array & params)
{
if (params.size() != 1)
throw Exception("Aggregate function " + getName() + " requires exactly one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
level = apply_visitor(FieldVisitorConvertToNumber<Float64>(), params[0]);
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const
{
this->data(place).insertWeighted(
static_cast<const ColumnVector<ArgumentFieldType> &>(*columns[0]).getData()[row_num],
static_cast<const ColumnVector<WeightFieldType> &>(*columns[1]).getData()[row_num]);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
{
this->data(place).merge(this->data(rhs));
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
{
this->data(place).serialize(buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
{
this->data(place).deserializeMerge(buf);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
{
static_cast<ColumnFloat32 &>(to).getData().push_back(this->data(place).getFloat(level));
}
};
/** То же самое, но позволяет вычислить сразу несколько квантилей.
* Для этого, принимает в качестве параметров несколько уровней. Пример: quantilesTiming(0.5, 0.8, 0.9, 0.95)(ConnectTiming).
* Возвращает массив результатов.
@ -639,4 +721,75 @@ public:
}
};
template <typename ArgumentFieldType, typename WeightFieldType>
class AggregateFunctionQuantilesTimingWeighted final : public IAggregateFunctionHelper<QuantileTiming>
{
private:
typedef std::vector<double> Levels;
Levels levels;
public:
String getName() const { return "quantilesTimingWeighted"; }
DataTypePtr getReturnType() const
{
return new DataTypeArray(new DataTypeFloat32);
}
void setArguments(const DataTypes & arguments)
{
}
void setParameters(const Array & params)
{
if (params.empty())
throw Exception("Aggregate function " + getName() + " requires at least one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
size_t size = params.size();
levels.resize(size);
for (size_t i = 0; i < size; ++i)
levels[i] = apply_visitor(FieldVisitorConvertToNumber<Float64>(), params[i]);
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const
{
this->data(place).insertWeighted(
static_cast<const ColumnVector<ArgumentFieldType> &>(*columns[0]).getData()[row_num],
static_cast<const ColumnVector<WeightFieldType> &>(*columns[1]).getData()[row_num]);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
{
this->data(place).merge(this->data(rhs));
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
{
this->data(place).serialize(buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
{
this->data(place).deserializeMerge(buf);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
{
ColumnArray & arr_to = static_cast<ColumnArray &>(to);
ColumnArray::Offsets_t & offsets_to = arr_to.getOffsets();
size_t size = levels.size();
offsets_to.push_back((offsets_to.size() == 0 ? 0 : offsets_to.back()) + size);
typename ColumnFloat32::Container_t & data_to = static_cast<ColumnFloat32 &>(arr_to.getData()).getData();
size_t old_size = data_to.size();
data_to.resize(data_to.size() + size);
this->data(place).getManyFloat(&levels[0], size, &data_to[old_size]);
}
};
}

View File

@ -34,7 +34,7 @@ AggregateFunctionFactory::AggregateFunctionFactory()
/** Создать агрегатную функцию с числовым типом в параметре шаблона, в зависимости от типа аргумента.
*/
template<template <typename> class AggregateFunctionTemplate>
template <template <typename> class AggregateFunctionTemplate>
static IAggregateFunction * createWithNumericType(const IDataType & argument_type)
{
if (typeid_cast<const DataTypeUInt8 *>(&argument_type)) return new AggregateFunctionTemplate<UInt8>;
@ -51,7 +51,7 @@ static IAggregateFunction * createWithNumericType(const IDataType & argument_typ
return nullptr;
}
template<template <typename, typename> class AggregateFunctionTemplate, class Data>
template <template <typename, typename> class AggregateFunctionTemplate, class Data>
static IAggregateFunction * createWithNumericType(const IDataType & argument_type)
{
if (typeid_cast<const DataTypeUInt8 *>(&argument_type)) return new AggregateFunctionTemplate<UInt8, Data>;
@ -69,7 +69,7 @@ static IAggregateFunction * createWithNumericType(const IDataType & argument_typ
}
template<template <typename, typename> class AggregateFunctionTemplate, template <typename> class Data>
template <template <typename, typename> class AggregateFunctionTemplate, template <typename> class Data>
static IAggregateFunction * createWithNumericType(const IDataType & argument_type)
{
if (typeid_cast<const DataTypeUInt8 *>(&argument_type)) return new AggregateFunctionTemplate<UInt8, Data<UInt8> >;
@ -87,8 +87,45 @@ static IAggregateFunction * createWithNumericType(const IDataType & argument_typ
}
/** Для шаблона с двумя аргументами.
*/
template <typename FirstType, template <typename, typename> class AggregateFunctionTemplate>
static IAggregateFunction * createWithTwoNumericTypesSecond(const IDataType & second_type)
{
if (typeid_cast<const DataTypeUInt8 *>(&second_type)) return new AggregateFunctionTemplate<FirstType, UInt8>;
else if (typeid_cast<const DataTypeUInt16 *>(&second_type)) return new AggregateFunctionTemplate<FirstType, UInt16>;
else if (typeid_cast<const DataTypeUInt32 *>(&second_type)) return new AggregateFunctionTemplate<FirstType, UInt32>;
else if (typeid_cast<const DataTypeUInt64 *>(&second_type)) return new AggregateFunctionTemplate<FirstType, UInt64>;
else if (typeid_cast<const DataTypeInt8 *>(&second_type)) return new AggregateFunctionTemplate<FirstType, Int8>;
else if (typeid_cast<const DataTypeInt16 *>(&second_type)) return new AggregateFunctionTemplate<FirstType, Int16>;
else if (typeid_cast<const DataTypeInt32 *>(&second_type)) return new AggregateFunctionTemplate<FirstType, Int32>;
else if (typeid_cast<const DataTypeInt64 *>(&second_type)) return new AggregateFunctionTemplate<FirstType, Int64>;
else if (typeid_cast<const DataTypeFloat32 *>(&second_type)) return new AggregateFunctionTemplate<FirstType, Float32>;
else if (typeid_cast<const DataTypeFloat64 *>(&second_type)) return new AggregateFunctionTemplate<FirstType, Float64>;
else
return nullptr;
}
template <template <typename, typename> class AggregateFunctionTemplate>
static IAggregateFunction * createWithTwoNumericTypes(const IDataType & first_type, const IDataType & second_type)
{
if (typeid_cast<const DataTypeUInt8 *>(&first_type)) return createWithTwoNumericTypesSecond<UInt8, AggregateFunctionTemplate>(second_type);
else if (typeid_cast<const DataTypeUInt16 *>(&first_type)) return createWithTwoNumericTypesSecond<UInt16, AggregateFunctionTemplate>(second_type);
else if (typeid_cast<const DataTypeUInt32 *>(&first_type)) return createWithTwoNumericTypesSecond<UInt32, AggregateFunctionTemplate>(second_type);
else if (typeid_cast<const DataTypeUInt64 *>(&first_type)) return createWithTwoNumericTypesSecond<UInt64, AggregateFunctionTemplate>(second_type);
else if (typeid_cast<const DataTypeInt8 *>(&first_type)) return createWithTwoNumericTypesSecond<Int8, AggregateFunctionTemplate>(second_type);
else if (typeid_cast<const DataTypeInt16 *>(&first_type)) return createWithTwoNumericTypesSecond<Int16, AggregateFunctionTemplate>(second_type);
else if (typeid_cast<const DataTypeInt32 *>(&first_type)) return createWithTwoNumericTypesSecond<Int32, AggregateFunctionTemplate>(second_type);
else if (typeid_cast<const DataTypeInt64 *>(&first_type)) return createWithTwoNumericTypesSecond<Int64, AggregateFunctionTemplate>(second_type);
else if (typeid_cast<const DataTypeFloat32 *>(&first_type)) return createWithTwoNumericTypesSecond<Float32, AggregateFunctionTemplate>(second_type);
else if (typeid_cast<const DataTypeFloat64 *>(&first_type)) return createWithTwoNumericTypesSecond<Float64, AggregateFunctionTemplate>(second_type);
else
return nullptr;
}
/// min, max, any, anyLast
template<template <typename> class AggregateFunctionTemplate, template <typename> class Data>
template <template <typename> class AggregateFunctionTemplate, template <typename> class Data>
static IAggregateFunction * createAggregateFunctionSingleValue(const String & name, const DataTypes & argument_types)
{
if (argument_types.size() != 1)
@ -326,6 +363,32 @@ AggregateFunctionPtr AggregateFunctionFactory::get(const String & name, const Da
return res;
}
else if (name == "medianTimingWeighted" || name == "quantileTimingWeighted")
{
if (argument_types.size() != 2)
throw Exception("Incorrect number of arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
AggregateFunctionPtr res = createWithTwoNumericTypes<AggregateFunctionQuantileTimingWeighted>(*argument_types[0], *argument_types[1]);
if (!res)
throw Exception("Illegal types " + argument_types[0]->getName() + " and " + argument_types[1]->getName()
+ " of arguments for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return res;
}
else if (name == "quantilesTimingWeighted")
{
if (argument_types.size() != 2)
throw Exception("Incorrect number of arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
AggregateFunctionPtr res = createWithTwoNumericTypes<AggregateFunctionQuantilesTimingWeighted>(*argument_types[0], *argument_types[1]);
if (!res)
throw Exception("Illegal types " + argument_types[0]->getName() + " and " + argument_types[1]->getName()
+ " of arguments for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return res;
}
else if (name == "quantileDeterministic")
{
if (argument_types.size() != 2)
@ -486,6 +549,9 @@ bool AggregateFunctionFactory::isAggregateFunctionName(const String & name, int
"medianTiming",
"quantileTiming",
"quantilesTiming",
"quantileTimingWeighted",
"quantilesTimingWeighted",
"medianTimingWeighted",
"quantileDeterministic",
"quantilesDeterministic",
nullptr