#pragma once #include #include #include #include #include #include #include #include namespace DB { template struct AggregateFunctionUniqTraits; template <> struct AggregateFunctionUniqTraits { static UInt64 hash(UInt64 x) { return x; } }; template <> struct AggregateFunctionUniqTraits { static UInt64 hash(Int64 x) { return x; } }; template <> struct AggregateFunctionUniqTraits { static UInt64 hash(Float64 x) { UInt64 res = 0; memcpy(reinterpret_cast(&res), reinterpret_cast(&x), sizeof(x)); return res; } }; template <> struct AggregateFunctionUniqTraits { /// Имейте ввиду, что вычисление приближённое. static UInt64 hash(const String & x) { return CityHash64(x.data(), x.size()); } }; struct AggregateFunctionUniqData { UniquesHashSet set; }; /// Приближённо вычисляет количество различных значений. template class AggregateFunctionUniq : public IUnaryAggregateFunction { public: AggregateFunctionUniq() {} String getName() const { return "uniq"; } String getTypeID() const { return "uniq_" + TypeName::get(); } DataTypePtr getReturnType() const { return new DataTypeUInt64; } void setArgument(const DataTypePtr & argument) { } void addOne(AggregateDataPtr place, const Field & value) const { data(place).set.insert(AggregateFunctionUniqTraits::hash(get(value))); } void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const { data(place).set.merge(data(rhs).set); } void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const { data(place).set.write(buf); } void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const { UniquesHashSet tmp_set; tmp_set.read(buf); data(place).set.merge(tmp_set); } Field getResult(ConstAggregateDataPtr place) const { return data(place).set.size(); } }; /** То же самое, но выводит состояние вычислений в строке в текстовом виде. * Используется, если какой-то внешней программе (сейчас это ███████████) * надо получить это состояние и потом использовать по-своему. */ template class AggregateFunctionUniqState : public AggregateFunctionUniq { public: String getName() const { return "uniqState"; } String getTypeID() const { return "uniqState_" + TypeName::get(); } DataTypePtr getReturnType() const { return new DataTypeString; } Field getResult(ConstAggregateDataPtr place) const { Field res = String(); WriteBufferFromString wb(get(res)); this->data(place).set.writeText(wb); return res; } }; /** Принимает два аргумента - значение и условие. * Приближённо считает количество различных значений, когда выполнено это условие. */ template class AggregateFunctionUniqIf : public IAggregateFunctionHelper { public: AggregateFunctionUniqIf() {} String getName() const { return "uniqIf"; } String getTypeID() const { return "uniqIf_" + TypeName::get(); } DataTypePtr getReturnType() const { return new DataTypeUInt64; } void setArguments(const DataTypes & arguments) { if (!dynamic_cast(&*arguments[1])) throw Exception("Incorrect type " + arguments[1]->getName() + " of second argument for aggregate function " + getName() + ". Must be UInt8.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); } void add(AggregateDataPtr place, const Row & row) const { if (get(row[1])) data(place).set.insert(AggregateFunctionUniqTraits::hash(get(row[0]))); } void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const { data(place).set.merge(data(rhs).set); } void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const { data(place).set.write(buf); } void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const { UniquesHashSet tmp_set; tmp_set.read(buf); data(place).set.merge(tmp_set); } Field getResult(ConstAggregateDataPtr place) const { return data(place).set.size(); } }; }