mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-22 01:30:51 +00:00
dbms: added aggregate function groupUniqArray [#CONV-8330].
This commit is contained in:
parent
0259b9ddb2
commit
43d9edeca1
@ -19,7 +19,7 @@ struct AggregateFunctionGroupArrayData
|
||||
};
|
||||
|
||||
|
||||
/// Складывает все значения в массив.
|
||||
/// Складывает все значения в массив. Реализовано неэффективно.
|
||||
class AggregateFunctionGroupArray : public IUnaryAggregateFunction<AggregateFunctionGroupArrayData>
|
||||
{
|
||||
private:
|
||||
|
@ -0,0 +1,171 @@
|
||||
#pragma once
|
||||
|
||||
#include <DB/IO/WriteHelpers.h>
|
||||
#include <DB/IO/ReadHelpers.h>
|
||||
|
||||
#include <DB/DataTypes/DataTypeArray.h>
|
||||
#include <DB/DataTypes/DataTypesNumberFixed.h>
|
||||
|
||||
#include <DB/Columns/ColumnArray.h>
|
||||
|
||||
#include <DB/Interpreters/HashSet.h>
|
||||
|
||||
#include <DB/AggregateFunctions/AggregateFunctionGroupArray.h>
|
||||
|
||||
#define AGGREGATE_FUNCTION_GROUP_ARRAY_UNIQ_MAX_SIZE 0xFFFFFF
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
|
||||
template <typename T>
|
||||
struct AggregateFunctionGroupUniqArrayData
|
||||
{
|
||||
struct hash
|
||||
{
|
||||
size_t operator() (T key) const
|
||||
{
|
||||
return intHash32<0>(key);
|
||||
}
|
||||
};
|
||||
|
||||
struct GrowthTraits : public default_growth_traits
|
||||
{
|
||||
/// При создании, хэш-таблица должна быть небольшой.
|
||||
static const int INITIAL_SIZE_DEGREE = 4;
|
||||
};
|
||||
|
||||
/// NOTE: Можно сделать отдельную хэш-таблицу с оптимизацией для маленьких размеров.
|
||||
typedef HashSet<T, hash, default_zero_traits<T>, GrowthTraits> Set;
|
||||
Set value;
|
||||
};
|
||||
|
||||
template <> size_t AggregateFunctionGroupUniqArrayData<Float32>::hash::operator() (Float32 key) const
|
||||
{
|
||||
UInt64 res = 0;
|
||||
memcpy(reinterpret_cast<char *>(&res), reinterpret_cast<char *>(&key), sizeof(key));
|
||||
return intHash32<0>(res);
|
||||
}
|
||||
|
||||
template <> size_t AggregateFunctionGroupUniqArrayData<Float64>::hash::operator() (Float64 key) const
|
||||
{
|
||||
UInt64 res = 0;
|
||||
memcpy(reinterpret_cast<char *>(&res), reinterpret_cast<char *>(&key), sizeof(key));
|
||||
return intHash32<0>(res);
|
||||
}
|
||||
|
||||
|
||||
/// Складывает все значения в хэш-множество. Возвращает массив уникальных значений. Реализована для числовых типов.
|
||||
template <typename T>
|
||||
class AggregateFunctionGroupUniqArray : public IUnaryAggregateFunction<AggregateFunctionGroupUniqArrayData<T> >
|
||||
{
|
||||
private:
|
||||
typedef AggregateFunctionGroupUniqArrayData<T> State;
|
||||
|
||||
public:
|
||||
String getName() const { return "groupUniqArray"; }
|
||||
|
||||
DataTypePtr getReturnType() const
|
||||
{
|
||||
return new DataTypeArray(new typename DataTypeFromFieldType<T>::Type);
|
||||
}
|
||||
|
||||
void setArgument(const DataTypePtr & argument)
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
void addOne(AggregateDataPtr place, const IColumn & column, size_t row_num) const
|
||||
{
|
||||
this->data(place).value.insert(static_cast<const ColumnVector<T> &>(column).getData()[row_num]);
|
||||
}
|
||||
|
||||
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
|
||||
{
|
||||
typename State::Set & set = this->data(place).value;
|
||||
const typename State::Set & rhs_set = this->data(rhs).value;
|
||||
for (typename State::Set::const_iterator it = rhs_set.begin(); it != rhs_set.end(); ++it)
|
||||
set.insert(*it);
|
||||
}
|
||||
|
||||
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
|
||||
{
|
||||
const typename State::Set & set = this->data(place).value;
|
||||
size_t size = set.size();
|
||||
writeVarUInt(size, buf);
|
||||
for (typename State::Set::const_iterator it = set.begin(); it != set.end(); ++it)
|
||||
writeIntBinary(*it, buf);
|
||||
}
|
||||
|
||||
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
|
||||
{
|
||||
size_t size = 0;
|
||||
readVarUInt(size, buf);
|
||||
|
||||
if (size > AGGREGATE_FUNCTION_GROUP_ARRAY_UNIQ_MAX_SIZE)
|
||||
throw Exception("Too large hash set size for aggregate function groupUniqArray", ErrorCodes::TOO_LARGE_ARRAY_SIZE);
|
||||
|
||||
typename State::Set & set = this->data(place).value;
|
||||
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
{
|
||||
T tmp;
|
||||
readIntBinary(tmp, buf);
|
||||
set.insert(tmp);
|
||||
}
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
|
||||
{
|
||||
ColumnArray & arr_to = static_cast<ColumnArray &>(to);
|
||||
ColumnArray::Offsets_t & offsets_to = arr_to.getOffsets();
|
||||
|
||||
const typename State::Set & set = this->data(place).value;
|
||||
size_t size = set.size();
|
||||
|
||||
offsets_to.push_back((offsets_to.size() == 0 ? 0 : offsets_to.back()) + size);
|
||||
|
||||
typename ColumnVector<T>::Container_t & data_to = static_cast<ColumnVector<T> &>(arr_to.getData()).getData();
|
||||
size_t old_size = data_to.size();
|
||||
data_to.resize(old_size + size);
|
||||
|
||||
std::cerr << size << std::endl;
|
||||
|
||||
size_t i = 0;
|
||||
for (typename State::Set::const_iterator it = set.begin(); it != set.end(); ++it, ++i)
|
||||
{
|
||||
std::cerr << i << std::endl;
|
||||
data_to[old_size + i] = *it;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
/** То же самое, но в качестве аргумента - числовые массивы. Применяется ко всем элементам массивов.
|
||||
* То есть, выдаёт массив, содержащий уникальные значения из внутренностей массивов-аргументов.
|
||||
*/
|
||||
template <typename T>
|
||||
class AggregateFunctionGroupUniqArrays : public AggregateFunctionGroupUniqArray<T>
|
||||
{
|
||||
public:
|
||||
void addOne(AggregateDataPtr place, const IColumn & column, size_t row_num) const
|
||||
{
|
||||
const ColumnArray & arr = static_cast<const ColumnArray &>(column);
|
||||
const ColumnArray::Offsets_t & offsets = arr.getOffsets();
|
||||
const typename ColumnVector<T>::Container_t & data = static_cast<const ColumnVector<T> &>(arr.getData()).getData();
|
||||
|
||||
IColumn::Offset_t begin = row_num ? offsets[row_num - 1] : 0;
|
||||
IColumn::Offset_t end = offsets[row_num];
|
||||
|
||||
typename AggregateFunctionGroupUniqArrayData<T>::Set & set = this->data(place).value;
|
||||
|
||||
for (IColumn::Offset_t i = begin; i != end; ++i)
|
||||
set.insert(data[i]);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
#undef AGGREGATE_FUNCTION_GROUP_ARRAY_UNIQ_MAX_SIZE
|
||||
|
||||
}
|
@ -3,9 +3,10 @@
|
||||
#include <DB/AggregateFunctions/AggregateFunctionAvg.h>
|
||||
#include <DB/AggregateFunctions/AggregateFunctionAny.h>
|
||||
#include <DB/AggregateFunctions/AggregateFunctionAnyLast.h>
|
||||
#include <DB/AggregateFunctions/AggregateFunctionsMinMax.h>
|
||||
#include <DB/AggregateFunctions/AggregateFunctionUniq.h>
|
||||
#include <DB/AggregateFunctions/AggregateFunctionGroupArray.h>
|
||||
#include <DB/AggregateFunctions/AggregateFunctionsMinMax.h>
|
||||
#include <DB/AggregateFunctions/AggregateFunctionGroupUniqArray.h>
|
||||
#include <DB/AggregateFunctions/AggregateFunctionQuantile.h>
|
||||
#include <DB/AggregateFunctions/AggregateFunctionQuantileTiming.h>
|
||||
|
||||
@ -60,6 +61,25 @@ AggregateFunctionPtr AggregateFunctionFactory::get(const String & name, const Da
|
||||
return new AggregateFunctionMax;
|
||||
else if (name == "groupArray")
|
||||
return new AggregateFunctionGroupArray;
|
||||
else if (name == "groupUniqArray")
|
||||
{
|
||||
if (argument_types.size() != 1)
|
||||
throw Exception("Incorrect number of arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
const DataTypeArray * arr = dynamic_cast<const DataTypeArray *>(&*argument_types[0]);
|
||||
|
||||
AggregateFunctionPtr res;
|
||||
|
||||
if (!arr)
|
||||
res = createWithNumericType<AggregateFunctionGroupUniqArray>(*argument_types[0]);
|
||||
else
|
||||
res = createWithNumericType<AggregateFunctionGroupUniqArrays>(*arr->getNestedType());
|
||||
|
||||
if (!res)
|
||||
throw Exception("Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
|
||||
return res;
|
||||
}
|
||||
else if (name == "sum")
|
||||
{
|
||||
if (argument_types.size() != 1)
|
||||
@ -266,6 +286,7 @@ bool AggregateFunctionFactory::isAggregateFunctionName(const String & name) cons
|
||||
"uniqIf",
|
||||
"uniqState",
|
||||
"groupArray",
|
||||
"groupUniqArray",
|
||||
"median",
|
||||
"quantile",
|
||||
"quantiles",
|
||||
|
Loading…
Reference in New Issue
Block a user