added uniq functions with HLL support [#CONV-8544]

This commit is contained in:
Vyacheslav Alipov 2013-08-21 13:26:42 +00:00
parent ece838895f
commit 602bea808d
3 changed files with 124 additions and 29 deletions

View File

@ -3,6 +3,7 @@
#include <city.h>
#include <stats/UniquesHashSet.h>
#include <statdaemons/HyperLogLogCounter.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/IO/ReadHelpers.h>
@ -46,20 +47,35 @@ template <> struct AggregateFunctionUniqTraits<Float64>
};
struct AggregateFunctionUniqData
struct AggregateFunctionUniqUniquesHashSetData
{
UniquesHashSet set;
typedef UniquesHashSet Set;
Set set;
static String getName() { return "uniq"; }
};
struct AggregateFunctionUniqHLL12Data
{
typedef HyperLogLogCounter<12> Set;
Set set;
static String getName() { return "uniqHLL12"; }
};
/// Структура для делегации работы по добавлению одного элемента
/// в аггрегатную функцию uniq. Используется для частичной специализации
/// для добавления строк.
template<typename T, typename Data> struct OneAdder;
/// Приближённо вычисляет количество различных значений.
template <typename T>
class AggregateFunctionUniq : public IUnaryAggregateFunction<AggregateFunctionUniqData>
template <typename T, typename Data>
class AggregateFunctionUniq : public IUnaryAggregateFunction<Data>
{
public:
AggregateFunctionUniq() {}
String getName() const { return "uniq"; }
String getName() const { return Data::getName(); }
DataTypePtr getReturnType() const
{
@ -72,50 +88,66 @@ public:
void addOne(AggregateDataPtr place, const IColumn & column, size_t row_num) const
{
data(place).set.insert(AggregateFunctionUniqTraits<T>::hash(static_cast<const ColumnVector<T> &>(column).getData()[row_num]));
OneAdder<T, Data>::addOne(*this, place, column, row_num);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
{
data(place).set.merge(data(rhs).set);
this->data(place).set.merge(this->data(rhs).set);
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
{
data(place).set.write(buf);
this->data(place).set.write(buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
{
UniquesHashSet tmp_set;
typename Data::Set tmp_set;
tmp_set.read(buf);
data(place).set.merge(tmp_set);
this->data(place).set.merge(tmp_set);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
{
static_cast<ColumnUInt64 &>(to).getData().push_back(data(place).set.size());
static_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).set.size());
}
private:
template<typename T0, typename Data0> friend struct OneAdder;
};
template<typename T, typename Data>
struct OneAdder
{
static void addOne(const AggregateFunctionUniq<T, Data> & aggregate_function, AggregateDataPtr place, const IColumn & column, size_t row_num)
{
aggregate_function.data(place).set.insert(
AggregateFunctionUniqTraits<T>::hash(static_cast<const ColumnVector<T> &>(column).getData()[row_num]));
}
};
template <>
inline void AggregateFunctionUniq<String>::addOne(AggregateDataPtr place, const IColumn & column, size_t row_num) const
template<typename Data>
struct OneAdder<String, Data>
{
/// Имейте ввиду, что вычисление приближённое.
StringRef value = column.getDataAt(row_num);
data(place).set.insert(CityHash64(value.data, value.size));
}
static void addOne(const AggregateFunctionUniq<String, Data> & aggregate_function, AggregateDataPtr place, const IColumn & column, size_t row_num)
{
/// Имейте ввиду, что вычисление приближённое.
StringRef value = column.getDataAt(row_num);
aggregate_function.data(place).set.insert(CityHash64(value.data, value.size));
}
};
/** То же самое, но выводит состояние вычислений в строке в текстовом виде.
* Используется, если какой-то внешней программе (сейчас это )
* надо получить это состояние и потом использовать по-своему.
*/
template <typename T>
class AggregateFunctionUniqState : public AggregateFunctionUniq<T>
template <typename T, typename Data>
class AggregateFunctionUniqState : public AggregateFunctionUniq<T, Data>
{
public:
String getName() const { return "uniqState"; }
String getName() const { return Data::getName() + "State"; }
DataTypePtr getReturnType() const
{
@ -139,7 +171,7 @@ public:
* Приближённо считает количество различных значений, когда выполнено это условие.
*/
template <typename T>
class AggregateFunctionUniqIf : public IAggregateFunctionHelper<AggregateFunctionUniqData>
class AggregateFunctionUniqIf : public IAggregateFunctionHelper<AggregateFunctionUniqUniquesHashSetData>
{
public:
AggregateFunctionUniqIf() {}

View File

@ -46,6 +46,23 @@ static IAggregateFunction * createWithNumericType(const IDataType & argument_typ
return NULL;
}
template<template <typename, typename> class AggregateFunctionTemplate, class Data>
static IAggregateFunction * createWithNumericType(const IDataType & argument_type)
{
if (dynamic_cast<const DataTypeUInt8 *>(&argument_type)) return new AggregateFunctionTemplate<UInt8, Data>;
else if (dynamic_cast<const DataTypeUInt16 *>(&argument_type)) return new AggregateFunctionTemplate<UInt16, Data>;
else if (dynamic_cast<const DataTypeUInt32 *>(&argument_type)) return new AggregateFunctionTemplate<UInt32, Data>;
else if (dynamic_cast<const DataTypeUInt64 *>(&argument_type)) return new AggregateFunctionTemplate<UInt64, Data>;
else if (dynamic_cast<const DataTypeInt8 *>(&argument_type)) return new AggregateFunctionTemplate<Int8, Data>;
else if (dynamic_cast<const DataTypeInt16 *>(&argument_type)) return new AggregateFunctionTemplate<Int16, Data>;
else if (dynamic_cast<const DataTypeInt32 *>(&argument_type)) return new AggregateFunctionTemplate<Int32, Data>;
else if (dynamic_cast<const DataTypeInt64 *>(&argument_type)) return new AggregateFunctionTemplate<Int64, Data>;
else if (dynamic_cast<const DataTypeFloat32 *>(&argument_type)) return new AggregateFunctionTemplate<Float32, Data>;
else if (dynamic_cast<const DataTypeFloat64 *>(&argument_type)) return new AggregateFunctionTemplate<Float64, Data>;
else
return NULL;
}
AggregateFunctionPtr AggregateFunctionFactory::get(const String & name, const DataTypes & argument_types) const
{
@ -135,16 +152,36 @@ AggregateFunctionPtr AggregateFunctionFactory::get(const String & name, const Da
const IDataType & argument_type = *argument_types[0];
AggregateFunctionPtr res = createWithNumericType<AggregateFunctionUniq>(*argument_types[0]);
AggregateFunctionPtr res = createWithNumericType<AggregateFunctionUniq, AggregateFunctionUniqUniquesHashSetData>(*argument_types[0]);
if (res)
return res;
else if (dynamic_cast<const DataTypeDate *>(&argument_type))
return new AggregateFunctionUniq<DataTypeDate::FieldType>;
return new AggregateFunctionUniq<DataTypeDate::FieldType, AggregateFunctionUniqUniquesHashSetData>;
else if (dynamic_cast<const DataTypeDateTime*>(&argument_type))
return new AggregateFunctionUniq<DataTypeDateTime::FieldType>;
return new AggregateFunctionUniq<DataTypeDateTime::FieldType, AggregateFunctionUniqUniquesHashSetData>;
else if (dynamic_cast<const DataTypeString*>(&argument_type) || dynamic_cast<const DataTypeFixedString*>(&argument_type))
return new AggregateFunctionUniq<String>;
return new AggregateFunctionUniq<String, AggregateFunctionUniqUniquesHashSetData>;
else
throw Exception("Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
else if (name == "uniqHLL12")
{
if (argument_types.size() != 1)
throw Exception("Incorrect number of arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const IDataType & argument_type = *argument_types[0];
AggregateFunctionPtr res = createWithNumericType<AggregateFunctionUniq, AggregateFunctionUniqHLL12Data>(*argument_types[0]);
if (res)
return res;
else if (dynamic_cast<const DataTypeDate *>(&argument_type))
return new AggregateFunctionUniq<DataTypeDate::FieldType, AggregateFunctionUniqHLL12Data>;
else if (dynamic_cast<const DataTypeDateTime*>(&argument_type))
return new AggregateFunctionUniq<DataTypeDateTime::FieldType, AggregateFunctionUniqHLL12Data>;
else if (dynamic_cast<const DataTypeString*>(&argument_type) || dynamic_cast<const DataTypeFixedString*>(&argument_type))
return new AggregateFunctionUniq<String, AggregateFunctionUniqHLL12Data>;
else
throw Exception("Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
@ -175,16 +212,36 @@ AggregateFunctionPtr AggregateFunctionFactory::get(const String & name, const Da
const IDataType & argument_type = *argument_types[0];
AggregateFunctionPtr res = createWithNumericType<AggregateFunctionUniqState>(*argument_types[0]);
AggregateFunctionPtr res = createWithNumericType<AggregateFunctionUniqState, AggregateFunctionUniqUniquesHashSetData>(*argument_types[0]);
if (res)
return res;
else if (dynamic_cast<const DataTypeDate *>(&argument_type))
return new AggregateFunctionUniqState<DataTypeDate::FieldType>;
return new AggregateFunctionUniqState<DataTypeDate::FieldType, AggregateFunctionUniqUniquesHashSetData>;
else if (dynamic_cast<const DataTypeDateTime*>(&argument_type))
return new AggregateFunctionUniqState<DataTypeDateTime::FieldType>;
return new AggregateFunctionUniqState<DataTypeDateTime::FieldType, AggregateFunctionUniqUniquesHashSetData>;
else if (dynamic_cast<const DataTypeString*>(&argument_type) || dynamic_cast<const DataTypeFixedString*>(&argument_type))
return new AggregateFunctionUniqState<String>;
return new AggregateFunctionUniqState<String, AggregateFunctionUniqUniquesHashSetData>;
else
throw Exception("Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
else if (name == "uniqHLL12State")
{
if (argument_types.size() != 1)
throw Exception("Incorrect number of arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const IDataType & argument_type = *argument_types[0];
AggregateFunctionPtr res = createWithNumericType<AggregateFunctionUniqState, AggregateFunctionUniqHLL12Data>(*argument_types[0]);
if (res)
return res;
else if (dynamic_cast<const DataTypeDate *>(&argument_type))
return new AggregateFunctionUniqState<DataTypeDate::FieldType, AggregateFunctionUniqHLL12Data>;
else if (dynamic_cast<const DataTypeDateTime*>(&argument_type))
return new AggregateFunctionUniqState<DataTypeDateTime::FieldType, AggregateFunctionUniqHLL12Data>;
else if (dynamic_cast<const DataTypeString*>(&argument_type) || dynamic_cast<const DataTypeFixedString*>(&argument_type))
return new AggregateFunctionUniqState<String, AggregateFunctionUniqHLL12Data>;
else
throw Exception("Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}

View File

@ -198,6 +198,12 @@ std::string QueryConverter::convertAggregateFunction(const std::string & attribu
if (name == "uniq_state")
return "uniqState(" + numeric + ")";
if (name == "uniq_hll12")
return "uniqHLL12(" + numeric + ")";
if (name == "uniq_hll12_state")
return "uniqHLL12State(" + numeric + ")";
if (name == "count_non_zero")
return "sum((" + numeric + ") == 0 ? toInt64(0) : toInt64(Sign))";
if (name == "count_non_minus_one")