mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
dbms: improved performance of aggregate function groupArray [#METR-18778].
This commit is contained in:
parent
8a0892f2fb
commit
8732e7a3f1
@ -4,6 +4,10 @@
|
||||
#include <DB/IO/ReadHelpers.h>
|
||||
|
||||
#include <DB/DataTypes/DataTypeArray.h>
|
||||
#include <DB/DataTypes/DataTypesNumberFixed.h>
|
||||
|
||||
#include <DB/Columns/ColumnVector.h>
|
||||
#include <DB/Columns/ColumnArray.h>
|
||||
|
||||
#include <DB/AggregateFunctions/IUnaryAggregateFunction.h>
|
||||
|
||||
@ -13,14 +17,94 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
struct AggregateFunctionGroupArrayData
|
||||
|
||||
/// Частный случай - реализация для числовых типов.
|
||||
template <typename T>
|
||||
struct AggregateFunctionGroupArrayDataNumeric
|
||||
{
|
||||
Array value; /// TODO Добавить MemoryTracker /// TODO Оптимизация для распространённых типов.
|
||||
/// Сразу будет выделена память на несколько элементов так, чтобы состояние занимало 64 байта.
|
||||
static constexpr size_t bytes_in_arena = 64 - sizeof(PODArray<T>);
|
||||
|
||||
using Array = PODArray<T, bytes_in_arena / sizeof(T), AllocatorWithStackMemory<Allocator<false>, bytes_in_arena>>;
|
||||
Array value;
|
||||
};
|
||||
|
||||
|
||||
/// Складывает все значения в массив. Реализовано неэффективно.
|
||||
class AggregateFunctionGroupArray final : public IUnaryAggregateFunction<AggregateFunctionGroupArrayData, AggregateFunctionGroupArray>
|
||||
/// Общий случай (неэффективно). NOTE Можно ещё реализовать частный случай для строк.
|
||||
struct AggregateFunctionGroupArrayDataGeneric
|
||||
{
|
||||
Array value; /// TODO Добавить MemoryTracker
|
||||
};
|
||||
|
||||
|
||||
template <typename T>
|
||||
class AggregateFunctionGroupArrayNumeric final
|
||||
: public IUnaryAggregateFunction<AggregateFunctionGroupArrayDataNumeric<T>, AggregateFunctionGroupArrayNumeric<T>>
|
||||
{
|
||||
public:
|
||||
String getName() const override { return "groupArray"; }
|
||||
|
||||
DataTypePtr getReturnType() const override
|
||||
{
|
||||
return new DataTypeArray(new typename DataTypeFromFieldType<T>::Type);
|
||||
}
|
||||
|
||||
void setArgument(const DataTypePtr & argument) override
|
||||
{
|
||||
}
|
||||
|
||||
void addOne(AggregateDataPtr place, const IColumn & column, size_t row_num) const
|
||||
{
|
||||
this->data(place).value.push_back(static_cast<const ColumnVector<T> &>(column).getData()[row_num]);
|
||||
}
|
||||
|
||||
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const override
|
||||
{
|
||||
this->data(place).value.insert(this->data(rhs).value.begin(), this->data(rhs).value.end());
|
||||
}
|
||||
|
||||
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
|
||||
{
|
||||
const auto & value = this->data(place).value;
|
||||
size_t size = value.size();
|
||||
writeVarUInt(size, buf);
|
||||
buf.write(reinterpret_cast<const char *>(&value[0]), size * sizeof(value[0]));
|
||||
}
|
||||
|
||||
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const override
|
||||
{
|
||||
size_t size = 0;
|
||||
readVarUInt(size, buf);
|
||||
|
||||
if (size > AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ARRAY_SIZE)
|
||||
throw Exception("Too large array size", ErrorCodes::TOO_LARGE_ARRAY_SIZE);
|
||||
|
||||
auto & value = this->data(place).value;
|
||||
|
||||
size_t old_size = value.size();
|
||||
value.resize(old_size + size);
|
||||
buf.read(reinterpret_cast<char *>(&value[old_size]), size * sizeof(value[0]));
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
const auto & value = this->data(place).value;
|
||||
size_t size = value.size();
|
||||
|
||||
ColumnArray & arr_to = static_cast<ColumnArray &>(to);
|
||||
ColumnArray::Offsets_t & offsets_to = arr_to.getOffsets();
|
||||
|
||||
offsets_to.push_back((offsets_to.size() == 0 ? 0 : offsets_to.back()) + size);
|
||||
|
||||
typename ColumnVector<T>::Container_t & data_to = static_cast<ColumnVector<T> &>(arr_to.getData()).getData();
|
||||
data_to.insert(this->data(place).value.begin(), this->data(place).value.end());
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
/// Складывает все значения в массив, общий случай. Реализовано неэффективно.
|
||||
class AggregateFunctionGroupArrayGeneric final
|
||||
: public IUnaryAggregateFunction<AggregateFunctionGroupArrayDataGeneric, AggregateFunctionGroupArrayGeneric>
|
||||
{
|
||||
private:
|
||||
DataTypePtr type;
|
||||
|
@ -10,7 +10,16 @@ namespace
|
||||
|
||||
AggregateFunctionPtr createAggregateFunctionGroupArray(const std::string & name, const DataTypes & argument_types)
|
||||
{
|
||||
return new AggregateFunctionGroupArray;
|
||||
if (argument_types.size() != 1)
|
||||
throw Exception("Incorrect number of arguments for aggregate function " + name,
|
||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
AggregateFunctionPtr res = createWithNumericType<AggregateFunctionGroupArrayNumeric>(*argument_types[0]);
|
||||
|
||||
if (!res)
|
||||
res = new AggregateFunctionGroupArrayGeneric;
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,4 @@
|
||||
1000000 1000000
|
||||
[0,0] 2
|
||||
100000 100000
|
||||
['0','0'] 2
|
5
dbms/tests/queries/0_stateless/00274_group_array.sql
Normal file
5
dbms/tests/queries/0_stateless/00274_group_array.sql
Normal file
@ -0,0 +1,5 @@
|
||||
SELECT length(groupArray(number)), count() FROM (SELECT number FROM system.numbers_mt LIMIT 1000000);
|
||||
SELECT groupArray(dummy), count() FROM remote('127.0.0.{1,2}', system.one);
|
||||
|
||||
SELECT length(groupArray(toString(number))), count() FROM (SELECT number FROM system.numbers LIMIT 100000);
|
||||
SELECT groupArray(toString(dummy)), count() FROM remote('127.0.0.{1,2}', system.one);
|
Loading…
Reference in New Issue
Block a user