ClickHouse/dbms/include/DB/AggregateFunctions/AggregateFunctionUniqUpTo.h

262 lines
7.7 KiB
C++
Raw Normal View History

#pragma once
#include <DB/Core/FieldVisitors.h>
#include <DB/AggregateFunctions/IUnaryAggregateFunction.h>
2015-10-29 04:04:43 +00:00
#include <DB/AggregateFunctions/UniqVariadicHash.h>
#include <DB/DataTypes/DataTypesNumber.h>
#include <DB/DataTypes/DataTypeTuple.h>
#include <DB/Columns/ColumnsNumber.h>
namespace DB
{
/** Counts the number of unique values up to no more than specified in the parameter.
*
2017-03-09 00:56:38 +00:00
* Example: uniqUpTo(3)(UserID)
* - will count the number of unique visitors, return 1, 2, 3 or 4 if visitors > = 4.
*
2017-03-09 00:56:38 +00:00
* For strings, a non-cryptographic hash function is used, due to which the calculation may be a bit inaccurate.
*/
template <typename T>
struct __attribute__((__packed__)) AggregateFunctionUniqUpToData
{
/** If count == threshold + 1 - this means that it is "overflowed" (values greater than threshold).
2017-03-09 00:56:38 +00:00
* In this case (for example, after calling the merge function), the `data` array does not necessarily contain the initialized values
* - example: combine a state in which there are few values, with another state that has overflowed;
* then set count to `threshold + 1`, and values from another state are not copied.
2017-03-09 00:56:38 +00:00
*/
UInt8 count = 0;
T data[0];
size_t size() const
{
return count;
}
2017-03-09 00:56:38 +00:00
/// threshold - for how many elements there is room in a `data`.
void insert(T x, UInt8 threshold)
{
2017-03-09 04:26:17 +00:00
/// The state is already full - nothing needs to be done.
if (count > threshold)
return;
2017-03-09 00:56:38 +00:00
/// Linear search for the matching element.
for (size_t i = 0; i < count; ++i)
if (data[i] == x)
return;
2017-03-09 04:26:17 +00:00
/// Did not find the matching element. If there is room for one more element, insert it.
if (count < threshold)
data[count] = x;
2017-03-09 04:26:17 +00:00
/// After increasing count, the state may be overflowed.
++count;
}
void merge(const AggregateFunctionUniqUpToData<T> & rhs, UInt8 threshold)
{
if (count > threshold)
return;
if (rhs.count > threshold)
{
2017-03-09 04:26:17 +00:00
/// If `rhs` is overflowed, then set `count` too also overflowed for the current state.
count = rhs.count;
return;
}
for (size_t i = 0; i < rhs.count; ++i)
insert(rhs.data[i], threshold);
}
void write(WriteBuffer & wb, UInt8 threshold) const
{
writeBinary(count, wb);
2017-03-09 04:26:17 +00:00
/// Write values only if the state is not overflowed. Otherwise, they are not needed, and only the fact that the state is overflowed is important.
if (count <= threshold)
wb.write(reinterpret_cast<const char *>(&data[0]), count * sizeof(data[0]));
}
2016-03-12 04:01:03 +00:00
void read(ReadBuffer & rb, UInt8 threshold)
{
2016-03-12 04:01:03 +00:00
readBinary(count, rb);
2016-03-12 04:01:03 +00:00
if (count <= threshold)
rb.read(reinterpret_cast<char *>(&data[0]), count * sizeof(data[0]));
}
2015-11-15 06:23:44 +00:00
void addImpl(const IColumn & column, size_t row_num, UInt8 threshold)
{
insert(static_cast<const ColumnVector<T> &>(column).getData()[row_num], threshold);
}
};
2017-03-09 00:56:38 +00:00
/// For strings, their hashes are remembered.
template <>
struct AggregateFunctionUniqUpToData<String> : AggregateFunctionUniqUpToData<UInt64>
{
2015-11-15 06:23:44 +00:00
void addImpl(const IColumn & column, size_t row_num, UInt8 threshold)
{
2017-03-09 00:56:38 +00:00
/// Keep in mind that calculations are approximate.
StringRef value = column.getDataAt(row_num);
insert(CityHash64(value.data, value.size), threshold);
}
};
constexpr UInt8 uniq_upto_max_threshold = 100;
template <typename T>
class AggregateFunctionUniqUpTo final : public IUnaryAggregateFunction<AggregateFunctionUniqUpToData<T>, AggregateFunctionUniqUpTo<T> >
{
private:
2017-03-09 04:26:17 +00:00
UInt8 threshold = 5; /// Default value if the parameter is not specified.
public:
2015-11-11 02:04:23 +00:00
size_t sizeOfData() const override
{
return sizeof(AggregateFunctionUniqUpToData<T>) + sizeof(T) * threshold;
}
2015-11-11 02:04:23 +00:00
String getName() const override { return "uniqUpTo"; }
2015-11-11 02:04:23 +00:00
DataTypePtr getReturnType() const override
{
return std::make_shared<DataTypeUInt64>();
}
void setArgument(const DataTypePtr & argument)
{
}
2015-11-11 02:04:23 +00:00
void setParameters(const Array & params) override
{
if (params.size() != 1)
throw Exception("Aggregate function " + getName() + " requires exactly one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
Squashed commit of the following: commit c567d4e1fe8d54e6363e47548f1e3927cc5ee78f Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Jan 6 20:35:01 2017 +0300 Style [#METR-2944]. commit 26bf3e1228e03f46c29b13edb0e3770bd453e3f1 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Jan 6 20:33:11 2017 +0300 Miscellaneous [#METR-2944]. commit eb946f4c6fd4bb0e9e5c7fb1468d36be3dfca5a5 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Jan 6 20:30:19 2017 +0300 Miscellaneous [#METR-2944]. commit 78c867a14744b5af2db8d37caf7804fc2057ea51 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Jan 6 20:11:41 2017 +0300 Miscellaneous [#METR-2944]. commit 6604c5c83cfcedc81c8da4da026711920d5963b4 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Jan 6 19:56:15 2017 +0300 Miscellaneous [#METR-2944]. commit 23fbf05c1d4bead636458ec21b05a101b1152e33 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Jan 6 19:47:52 2017 +0300 Miscellaneous [#METR-2944]. commit 98772faf11a7d450d473f7fa84f8a9ae24f7b59b Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Jan 6 19:46:05 2017 +0300 Miscellaneous [#METR-2944]. commit 3dc636ab9f9359dbeac2e8d997ae563d4ca147e2 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Jan 6 19:39:46 2017 +0300 Miscellaneous [#METR-2944]. commit 3e16aee95482f374ee3eda1a4dbe9ba5cdce02e8 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Jan 6 19:38:03 2017 +0300 Miscellaneous [#METR-2944]. commit ae7e7e90eb1f82bd0fe0f887708d08b9e7755612 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Jan 6 19:34:15 2017 +0300 Miscellaneous [#METR-2944].
2017-01-06 17:41:19 +00:00
UInt64 threshold_param = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), params[0]);
if (threshold_param > uniq_upto_max_threshold)
throw Exception("Too large parameter for aggregate function " + getName() + ". Maximum: " + toString(uniq_upto_max_threshold),
ErrorCodes::ARGUMENT_OUT_OF_BOUND);
threshold = threshold_param;
}
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena *) const
{
2015-11-15 06:23:44 +00:00
this->data(place).addImpl(column, row_num, threshold);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).merge(this->data(rhs), threshold);
}
2015-11-11 02:04:23 +00:00
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
this->data(place).write(buf, threshold);
}
2016-09-22 23:26:08 +00:00
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
2016-03-12 04:01:03 +00:00
this->data(place).read(buf, threshold);
}
2015-11-11 02:04:23 +00:00
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
static_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).size());
}
};
2017-03-09 00:56:38 +00:00
/** For multiple arguments. To compute, hashes them.
* You can pass multiple arguments as is; You can also pass one argument - a tuple.
* But (for the possibility of effective implementation), you can not pass several arguments, among which there are tuples.
*/
template <bool argument_is_tuple>
class AggregateFunctionUniqUpToVariadic final : public IAggregateFunctionHelper<AggregateFunctionUniqUpToData<UInt64>>
{
private:
size_t num_args = 0;
2017-03-09 04:26:17 +00:00
UInt8 threshold = 5; /// Default value if the parameter is not specified.
public:
2015-11-11 02:04:23 +00:00
size_t sizeOfData() const override
2015-10-30 02:34:24 +00:00
{
return sizeof(AggregateFunctionUniqUpToData<UInt64>) + sizeof(UInt64) * threshold;
}
2015-11-11 02:04:23 +00:00
String getName() const override { return "uniqUpTo"; }
2015-11-11 02:04:23 +00:00
DataTypePtr getReturnType() const override
{
return std::make_shared<DataTypeUInt64>();
}
2015-11-11 02:04:23 +00:00
void setArguments(const DataTypes & arguments) override
{
if (argument_is_tuple)
num_args = typeid_cast<const DataTypeTuple &>(*arguments[0]).getElements().size();
else
num_args = arguments.size();
}
2015-11-11 02:04:23 +00:00
void setParameters(const Array & params) override
{
if (params.size() != 1)
throw Exception("Aggregate function " + getName() + " requires exactly one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
Squashed commit of the following: commit c567d4e1fe8d54e6363e47548f1e3927cc5ee78f Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Jan 6 20:35:01 2017 +0300 Style [#METR-2944]. commit 26bf3e1228e03f46c29b13edb0e3770bd453e3f1 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Jan 6 20:33:11 2017 +0300 Miscellaneous [#METR-2944]. commit eb946f4c6fd4bb0e9e5c7fb1468d36be3dfca5a5 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Jan 6 20:30:19 2017 +0300 Miscellaneous [#METR-2944]. commit 78c867a14744b5af2db8d37caf7804fc2057ea51 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Jan 6 20:11:41 2017 +0300 Miscellaneous [#METR-2944]. commit 6604c5c83cfcedc81c8da4da026711920d5963b4 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Jan 6 19:56:15 2017 +0300 Miscellaneous [#METR-2944]. commit 23fbf05c1d4bead636458ec21b05a101b1152e33 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Jan 6 19:47:52 2017 +0300 Miscellaneous [#METR-2944]. commit 98772faf11a7d450d473f7fa84f8a9ae24f7b59b Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Jan 6 19:46:05 2017 +0300 Miscellaneous [#METR-2944]. commit 3dc636ab9f9359dbeac2e8d997ae563d4ca147e2 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Jan 6 19:39:46 2017 +0300 Miscellaneous [#METR-2944]. commit 3e16aee95482f374ee3eda1a4dbe9ba5cdce02e8 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Jan 6 19:38:03 2017 +0300 Miscellaneous [#METR-2944]. commit ae7e7e90eb1f82bd0fe0f887708d08b9e7755612 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Jan 6 19:34:15 2017 +0300 Miscellaneous [#METR-2944].
2017-01-06 17:41:19 +00:00
UInt64 threshold_param = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), params[0]);
if (threshold_param > uniq_upto_max_threshold)
throw Exception("Too large parameter for aggregate function " + getName() + ". Maximum: " + toString(uniq_upto_max_threshold),
ErrorCodes::ARGUMENT_OUT_OF_BOUND);
threshold = threshold_param;
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
{
this->data(place).insert(UniqVariadicHash<false, argument_is_tuple>::apply(num_args, columns, row_num), threshold);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).merge(this->data(rhs), threshold);
}
2015-11-11 02:04:23 +00:00
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
this->data(place).write(buf, threshold);
}
2016-09-22 23:26:08 +00:00
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
2016-03-12 04:01:03 +00:00
this->data(place).read(buf, threshold);
}
2015-11-11 02:04:23 +00:00
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
static_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).size());
}
static void addFree(const IAggregateFunction * that, AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *arena)
{
static_cast<const AggregateFunctionUniqUpToVariadic &>(*that).add(place, columns, row_num, arena);
}
IAggregateFunction::AddFunc getAddressOfAddFunction() const override final { return &addFree; }
};
}