ClickHouse/dbms/include/DB/AggregateFunctions/AggregateFunctionUniq.h
2013-06-30 11:40:30 +00:00

202 lines
5.3 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#pragma once
#include <city.h>
#include <stats/UniquesHashSet.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteBufferFromString.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeString.h>
#include <DB/Columns/ColumnString.h>
#include <DB/AggregateFunctions/IUnaryAggregateFunction.h>
namespace DB
{
template <typename T> struct AggregateFunctionUniqTraits
{
static UInt64 hash(T x) { return x; }
};
template <> struct AggregateFunctionUniqTraits<Float32>
{
static UInt64 hash(Float32 x)
{
UInt64 res = 0;
memcpy(reinterpret_cast<char *>(&res), reinterpret_cast<char *>(&x), sizeof(x));
return res;
}
};
template <> struct AggregateFunctionUniqTraits<Float64>
{
static UInt64 hash(Float64 x)
{
UInt64 res = 0;
memcpy(reinterpret_cast<char *>(&res), reinterpret_cast<char *>(&x), sizeof(x));
return res;
}
};
struct AggregateFunctionUniqData
{
UniquesHashSet set;
};
/// Приближённо вычисляет количество различных значений.
template <typename T>
class AggregateFunctionUniq : public IUnaryAggregateFunction<AggregateFunctionUniqData>
{
public:
AggregateFunctionUniq() {}
String getName() const { return "uniq"; }
DataTypePtr getReturnType() const
{
return new DataTypeUInt64;
}
void setArgument(const DataTypePtr & argument)
{
}
void addOne(AggregateDataPtr place, const IColumn & column, size_t row_num) const
{
data(place).set.insert(AggregateFunctionUniqTraits<T>::hash(static_cast<const ColumnVector<T> &>(column).getData()[row_num]));
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
{
data(place).set.merge(data(rhs).set);
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
{
data(place).set.write(buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
{
UniquesHashSet tmp_set;
tmp_set.read(buf);
data(place).set.merge(tmp_set);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
{
static_cast<ColumnUInt64 &>(to).getData().push_back(data(place).set.size());
}
};
template <>
inline void AggregateFunctionUniq<String>::addOne(AggregateDataPtr place, const IColumn & column, size_t row_num) const
{
/// Имейте ввиду, что вычисление приближённое.
StringRef value = column.getDataAt(row_num);
data(place).set.insert(CityHash64(value.data, value.size));
}
/** То же самое, но выводит состояние вычислений в строке в текстовом виде.
* Используется, если какой-то внешней программе (сейчас это ███████████)
* надо получить это состояние и потом использовать по-своему.
*/
template <typename T>
class AggregateFunctionUniqState : public AggregateFunctionUniq<T>
{
public:
String getName() const { return "uniqState"; }
DataTypePtr getReturnType() const
{
return new DataTypeString;
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
{
String res;
{
WriteBufferFromString wb(res);
this->data(place).set.writeText(wb);
}
static_cast<ColumnString &>(to).insertDataWithTerminatingZero(res.data(), res.size() + 1);
}
};
/** Принимает два аргумента - значение и условие.
* Приближённо считает количество различных значений, когда выполнено это условие.
*/
template <typename T>
class AggregateFunctionUniqIf : public IAggregateFunctionHelper<AggregateFunctionUniqData>
{
public:
AggregateFunctionUniqIf() {}
String getName() const { return "uniqIf"; }
DataTypePtr getReturnType() const
{
return new DataTypeUInt64;
}
void setArguments(const DataTypes & arguments)
{
if (!dynamic_cast<const DataTypeUInt8 *>(&*arguments[1]))
throw Exception("Incorrect type " + arguments[1]->getName() + " of second argument for aggregate function " + getName() + ". Must be UInt8.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const
{
if (static_cast<const ColumnUInt8 &>(*columns[1]).getData()[row_num])
data(place).set.insert(AggregateFunctionUniqTraits<T>::hash(static_cast<const ColumnVector<T> &>(*columns[0]).getData()[row_num]));
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
{
data(place).set.merge(data(rhs).set);
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
{
data(place).set.write(buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
{
UniquesHashSet tmp_set;
tmp_set.read(buf);
data(place).set.merge(tmp_set);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
{
static_cast<ColumnUInt64 &>(to).getData().push_back(data(place).set.size());
}
};
template <>
inline void AggregateFunctionUniqIf<String>::add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const
{
if (static_cast<const ColumnUInt8 &>(*columns[1]).getData()[row_num])
{
/// Имейте ввиду, что вычисление приближённое.
StringRef value = columns[0]->getDataAt(row_num);
data(place).set.insert(CityHash64(value.data, value.size));
}
}
}