mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Add several groupArray implementations. [#CLICKHOUSE-3084]
This commit is contained in:
parent
03bbe9938a
commit
c2cc8d6147
@ -22,11 +22,44 @@ AggregateFunctionPtr createAggregateFunctionGroupArray(const std::string & name,
|
||||
return res;
|
||||
}
|
||||
|
||||
AggregateFunctionPtr createAggregateFunctionGroupArray2(const std::string & name, const DataTypes & argument_types)
|
||||
{
|
||||
if (argument_types.size() != 1)
|
||||
throw Exception("Incorrect number of arguments for aggregate function " + name + ", should be 1",
|
||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
if (auto res = createWithNumericType<AggregateFunctionGroupArrayNumeric2>(*argument_types[0]))
|
||||
return AggregateFunctionPtr(res);
|
||||
|
||||
if (typeid_cast<const DataTypeString *>(argument_types[0].get()))
|
||||
return std::make_shared<AggregateFunctionGroupArrayStringListImpl<NodeString>>();
|
||||
else
|
||||
return std::make_shared<AggregateFunctionGroupArrayStringListImpl<NodeGeneral>>();
|
||||
}
|
||||
|
||||
|
||||
AggregateFunctionPtr createAggregateFunctionGroupArray4(const std::string & name, const DataTypes & argument_types)
|
||||
{
|
||||
if (argument_types.size() != 1)
|
||||
throw Exception("Incorrect number of arguments for aggregate function " + name + ", should be 2",
|
||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
if (auto res = createWithNumericType<AggregateFunctionGroupArrayNumeric2>(*argument_types[0]))
|
||||
return AggregateFunctionPtr(res);
|
||||
|
||||
if (typeid_cast<const DataTypeString *>(argument_types[0].get()))
|
||||
return std::make_shared<AggregateFunctionGroupArrayStringConcatImpl>();
|
||||
else
|
||||
return std::make_shared<AggregateFunctionGroupArrayGeneric_ColumnPtrImpl>();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void registerAggregateFunctionGroupArray(AggregateFunctionFactory & factory)
|
||||
{
|
||||
factory.registerFunction("groupArray", createAggregateFunctionGroupArray);
|
||||
factory.registerFunction("groupArray2", createAggregateFunctionGroupArray2);
|
||||
factory.registerFunction("groupArray4", createAggregateFunctionGroupArray4);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -5,12 +5,19 @@
|
||||
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
|
||||
#include <Columns/ColumnVector.h>
|
||||
#include <Columns/ColumnArray.h>
|
||||
#include <Columns/ColumnString.h>
|
||||
|
||||
#include <Common/PODArrayArena.h>
|
||||
|
||||
#include <AggregateFunctions/IUnaryAggregateFunction.h>
|
||||
|
||||
#include <common/likely.h>
|
||||
#include <type_traits>
|
||||
|
||||
#define AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ARRAY_SIZE 0xFFFFFF
|
||||
|
||||
|
||||
@ -74,7 +81,7 @@ public:
|
||||
size_t size = 0;
|
||||
readVarUInt(size, buf);
|
||||
|
||||
if (size > AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ARRAY_SIZE)
|
||||
if (unlikely(size > AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ARRAY_SIZE))
|
||||
throw Exception("Too large array size", ErrorCodes::TOO_LARGE_ARRAY_SIZE);
|
||||
|
||||
auto & value = this->data(place).value;
|
||||
@ -99,6 +106,81 @@ public:
|
||||
};
|
||||
|
||||
|
||||
/// A particular case is an implementation for numeric types.
|
||||
template <typename T>
|
||||
struct AggregateFunctionGroupArrayDataNumeric2
|
||||
{
|
||||
/// Memory is allocated to several elements immediately so that the state occupies 64 bytes.
|
||||
static constexpr size_t bytes_in_arena = 64 - sizeof(PODArrayArenaAllocator<T>);
|
||||
|
||||
using Array = PODArrayArenaAllocator<T, bytes_in_arena, ArenaAllocatorWithStackMemoty<bytes_in_arena>>;
|
||||
Array value;
|
||||
};
|
||||
|
||||
|
||||
template <typename T>
|
||||
class AggregateFunctionGroupArrayNumeric2 final
|
||||
: public IUnaryAggregateFunction<AggregateFunctionGroupArrayDataNumeric2<T>, AggregateFunctionGroupArrayNumeric2<T>>
|
||||
{
|
||||
public:
|
||||
String getName() const override { return "groupArray"; }
|
||||
|
||||
DataTypePtr getReturnType() const override
|
||||
{
|
||||
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeNumber<T>>());
|
||||
}
|
||||
|
||||
void setArgument(const DataTypePtr & argument)
|
||||
{
|
||||
}
|
||||
|
||||
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena * arena) const
|
||||
{
|
||||
this->data(place).value.push_back(static_cast<const ColumnVector<T> &>(column).getData()[row_num], arena);
|
||||
}
|
||||
|
||||
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
|
||||
{
|
||||
this->data(place).value.insert(this->data(rhs).value.begin(), this->data(rhs).value.end(), arena);
|
||||
}
|
||||
|
||||
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
|
||||
{
|
||||
const auto & value = this->data(place).value;
|
||||
size_t size = value.size();
|
||||
writeVarUInt(size, buf);
|
||||
buf.write(reinterpret_cast<const char *>(&value[0]), size * sizeof(value[0]));
|
||||
}
|
||||
|
||||
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override
|
||||
{
|
||||
size_t size = 0;
|
||||
readVarUInt(size, buf);
|
||||
|
||||
if (unlikely(size > AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ARRAY_SIZE))
|
||||
throw Exception("Too large array size", ErrorCodes::TOO_LARGE_ARRAY_SIZE);
|
||||
|
||||
auto & value = this->data(place).value;
|
||||
|
||||
value.resize(size, arena);
|
||||
buf.read(reinterpret_cast<char *>(&value[0]), size * sizeof(value[0]));
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
const auto & value = this->data(place).value;
|
||||
size_t size = value.size();
|
||||
|
||||
ColumnArray & arr_to = static_cast<ColumnArray &>(to);
|
||||
ColumnArray::Offsets_t & offsets_to = arr_to.getOffsets();
|
||||
|
||||
offsets_to.push_back((offsets_to.size() == 0 ? 0 : offsets_to.back()) + size);
|
||||
|
||||
typename ColumnVector<T>::Container_t & data_to = static_cast<ColumnVector<T> &>(arr_to.getData()).getData();
|
||||
data_to.insert(this->data(place).value.begin(), this->data(place).value.end());
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
/// General case (inefficient). NOTE You can also implement a special case for strings.
|
||||
struct AggregateFunctionGroupArrayDataGeneric
|
||||
@ -153,7 +235,7 @@ public:
|
||||
size_t size = 0;
|
||||
readVarUInt(size, buf);
|
||||
|
||||
if (size > AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ARRAY_SIZE)
|
||||
if (unlikely(size > AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ARRAY_SIZE))
|
||||
throw Exception("Too large array size", ErrorCodes::TOO_LARGE_ARRAY_SIZE);
|
||||
|
||||
Array & value = data(place).value;
|
||||
@ -170,6 +252,484 @@ public:
|
||||
};
|
||||
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
struct NodeString;
|
||||
struct NodeGeneral;
|
||||
|
||||
template <typename Node>
|
||||
struct NodeBase
|
||||
{
|
||||
Node * next;
|
||||
UInt64 size;
|
||||
|
||||
char * data()
|
||||
{
|
||||
static_assert(sizeof(NodeBase) == sizeof(Node));
|
||||
return reinterpret_cast<char *>(this) + sizeof(Node);
|
||||
}
|
||||
|
||||
/// Clones existing node (does not modify next field)
|
||||
Node * clone(Arena * arena)
|
||||
{
|
||||
return reinterpret_cast<Node *>(const_cast<char *>(arena->insert(reinterpret_cast<char *>(this), sizeof(Node) + size)));
|
||||
}
|
||||
|
||||
/// Write node to buffer
|
||||
void write(WriteBuffer & buf)
|
||||
{
|
||||
writeVarUInt(size, buf);
|
||||
buf.write(data(), size);
|
||||
}
|
||||
|
||||
/// Reads and allocates node from ReadBuffer's data (doesn't set next)
|
||||
static Node * read(ReadBuffer & buf, Arena * arena)
|
||||
{
|
||||
UInt64 size;
|
||||
readVarUInt(size, buf);
|
||||
|
||||
Node * node = reinterpret_cast<Node *>(arena->alloc(sizeof(Node) + size));
|
||||
node->size = size;
|
||||
buf.read(node->data(), size);
|
||||
return node;
|
||||
}
|
||||
};
|
||||
|
||||
struct NodeString : public NodeBase<NodeString>
|
||||
{
|
||||
using Node = NodeString;
|
||||
|
||||
/// Create node from string
|
||||
static Node * allocate(const IColumn & column, size_t row_num, Arena * arena)
|
||||
{
|
||||
StringRef string = static_cast<const ColumnString &>(column).getDataAt(row_num);
|
||||
|
||||
Node * node = reinterpret_cast<Node *>(arena->alloc(sizeof(Node) + string.size));
|
||||
node->next = nullptr;
|
||||
node->size = string.size;
|
||||
memcpy(node->data(), string.data, string.size);
|
||||
|
||||
return node;
|
||||
}
|
||||
|
||||
void insertInto(IColumn & column)
|
||||
{
|
||||
static_cast<ColumnString &>(column).insertData(data(), size);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
struct NodeGeneral : public NodeBase<NodeGeneral>
|
||||
{
|
||||
using Node = NodeGeneral;
|
||||
|
||||
static Node * allocate(const IColumn & column, size_t row_num, Arena * arena)
|
||||
{
|
||||
const char * begin = arena->alloc(sizeof(Node));
|
||||
StringRef value = column.serializeValueIntoArena(row_num, *arena, begin);
|
||||
|
||||
Node * node = reinterpret_cast<Node *>(const_cast<char *>(begin));
|
||||
node->next = nullptr;
|
||||
node->size = value.size;
|
||||
|
||||
return node;
|
||||
}
|
||||
|
||||
void insertInto(IColumn & column)
|
||||
{
|
||||
column.deserializeAndInsertFromArena(data());
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
template <typename Node>
|
||||
struct AggregateFunctionGroupArrayListImpl_Data
|
||||
{
|
||||
UInt64 elems = 0;
|
||||
Node * first = nullptr;
|
||||
Node * last = nullptr;
|
||||
};
|
||||
|
||||
|
||||
/// Implementation of groupArray(String or ComplexObject) via linked list
|
||||
/// It has poor performance in case of many small objects
|
||||
template <typename Node, bool limit_size=false>
|
||||
class AggregateFunctionGroupArrayStringListImpl final
|
||||
: public IUnaryAggregateFunction<AggregateFunctionGroupArrayListImpl_Data<Node>, AggregateFunctionGroupArrayStringListImpl<Node>>
|
||||
{
|
||||
using Data = AggregateFunctionGroupArrayListImpl_Data<Node>;
|
||||
static Data & data(AggregateDataPtr place) { return *reinterpret_cast<Data*>(place); }
|
||||
static const Data & data(ConstAggregateDataPtr place) { return *reinterpret_cast<const Data*>(place); }
|
||||
|
||||
DataTypePtr data_type;
|
||||
size_t max_elems = 0;
|
||||
|
||||
public:
|
||||
|
||||
String getName() const override { return "groupArray2"; }
|
||||
|
||||
DataTypePtr getReturnType() const override { return std::make_shared<DataTypeArray>(data_type); }
|
||||
|
||||
void setParameters(const Array & params) override
|
||||
{
|
||||
if (!limit_size && !params.empty())
|
||||
throw Exception("Incorrect initialization of " + getName() + ". It is a bug.", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
if (!limit_size)
|
||||
return;
|
||||
|
||||
if (params.size() == 1)
|
||||
{
|
||||
throw Exception("Incorrect number of parameters for aggregate function " + getName() + ", should be 0 or 1",
|
||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
if (params[0].getType() == Field::Types::Int64)
|
||||
{
|
||||
if (params[0].get<Int64>() < 0)
|
||||
throw Exception("Parameter for aggregate function " + getName() + " should be positive number", ErrorCodes::BAD_ARGUMENTS);
|
||||
max_elems = params[0].get<Int64>();
|
||||
}
|
||||
else if (params[0].getType() == Field::Types::UInt64)
|
||||
{
|
||||
if (params[0].get<UInt64>() == 0)
|
||||
throw Exception("Parameter for aggregate function " + getName() + " should be positive number", ErrorCodes::BAD_ARGUMENTS);
|
||||
max_elems = params[0].get<UInt64>();
|
||||
}
|
||||
else
|
||||
throw Exception("Parameter for aggregate function " + getName() + " should be positive number", ErrorCodes::BAD_ARGUMENTS);
|
||||
}
|
||||
}
|
||||
|
||||
void setArgument(const DataTypePtr & argument)
|
||||
{
|
||||
data_type = argument;
|
||||
}
|
||||
|
||||
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena * arena) const
|
||||
{
|
||||
Node * node = Node::allocate(column, row_num, arena);
|
||||
|
||||
if (unlikely(!data(place).first))
|
||||
{
|
||||
data(place).first = node;
|
||||
data(place).last = node;
|
||||
}
|
||||
else
|
||||
{
|
||||
data(place).last->next = node;
|
||||
data(place).last = node;
|
||||
}
|
||||
|
||||
++data(place).elems;
|
||||
}
|
||||
|
||||
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
|
||||
{
|
||||
/// It is sadly, but rhs's Arena could be destroyed
|
||||
|
||||
if (!data(rhs).first) /// rhs state is empty
|
||||
return;
|
||||
|
||||
data(place).elems += data(rhs).elems;
|
||||
|
||||
Node * p_rhs = data(rhs).first;
|
||||
Node * p_lhs;
|
||||
|
||||
if (unlikely(!data(place).last)) /// lhs state is empty
|
||||
{
|
||||
p_lhs = p_rhs->clone(arena);
|
||||
data(place).first = data(place).last = p_lhs;
|
||||
p_rhs = p_rhs->next;
|
||||
}
|
||||
else
|
||||
{
|
||||
p_lhs = data(place).last;
|
||||
}
|
||||
|
||||
while (p_rhs)
|
||||
{
|
||||
Node * p_new = p_rhs->clone(arena);
|
||||
p_lhs->next = p_new;
|
||||
p_rhs = p_rhs->next;
|
||||
p_lhs = p_new;
|
||||
}
|
||||
|
||||
p_lhs->next = nullptr;
|
||||
data(place).last = p_lhs;
|
||||
}
|
||||
|
||||
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
|
||||
{
|
||||
writeVarUInt(data(place).elems, buf);
|
||||
|
||||
Node * p = data(place).first;
|
||||
while (p)
|
||||
{
|
||||
p->write(buf);
|
||||
p = p->next;
|
||||
}
|
||||
}
|
||||
|
||||
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override
|
||||
{
|
||||
UInt64 elems;
|
||||
readVarUInt(elems, buf);
|
||||
data(place).elems = elems;
|
||||
|
||||
if (unlikely(elems == 0))
|
||||
return;
|
||||
|
||||
if (unlikely(elems > AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ARRAY_SIZE))
|
||||
throw Exception("Too large array size", ErrorCodes::TOO_LARGE_ARRAY_SIZE);
|
||||
|
||||
Node * prev = Node::read(buf, arena);
|
||||
data(place).first = prev;
|
||||
|
||||
for (UInt64 i = 1; i < elems; ++i)
|
||||
{
|
||||
Node * cur = Node::read(buf, arena);
|
||||
prev->next = cur;
|
||||
prev = cur;
|
||||
}
|
||||
|
||||
prev->next = nullptr;
|
||||
data(place).last = prev;
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
auto & column_array = static_cast<ColumnArray &>(to);
|
||||
|
||||
auto & offsets = column_array.getOffsets();
|
||||
offsets.push_back((offsets.size() == 0 ? 0 : offsets.back()) + data(place).elems);
|
||||
|
||||
auto & column_data = column_array.getData();
|
||||
|
||||
if (std::is_same<Node, NodeString>::value)
|
||||
{
|
||||
auto & string_offsets = static_cast<ColumnString &>(column_data).getOffsets();
|
||||
string_offsets.reserve(string_offsets.size() + data(place).elems);
|
||||
}
|
||||
|
||||
Node * p = data(place).first;
|
||||
while (p)
|
||||
{
|
||||
p->insertInto(column_data);
|
||||
p = p->next;
|
||||
}
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override
|
||||
{
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
|
||||
struct AggregateFunctionGroupArrayStringConcatImpl_Data
|
||||
{
|
||||
static constexpr size_t target_size = 64;
|
||||
static constexpr size_t free_space = target_size - sizeof(PODArrayArenaAllocator<char>) - sizeof(PODArrayArenaAllocator<UInt64>);
|
||||
|
||||
PODArrayArenaAllocator<char, 64> chars;
|
||||
PODArrayArenaAllocator<UInt64, free_space, ArenaAllocatorWithStackMemoty<free_space>> offsets;
|
||||
};
|
||||
|
||||
|
||||
class AggregateFunctionGroupArrayStringConcatImpl final
|
||||
: public IUnaryAggregateFunction<AggregateFunctionGroupArrayStringConcatImpl_Data, AggregateFunctionGroupArrayStringConcatImpl>
|
||||
{
|
||||
public:
|
||||
|
||||
String getName() const override { return "groupArray4"; }
|
||||
|
||||
DataTypePtr getReturnType() const override { return std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()); }
|
||||
|
||||
void setArgument(const DataTypePtr & argument) {}
|
||||
|
||||
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena * arena) const
|
||||
{
|
||||
StringRef string = static_cast<const ColumnString &>(column).getDataAtWithTerminatingZero(row_num);
|
||||
|
||||
data(place).chars.insert(string.data, string.data + string.size, arena);
|
||||
data(place).offsets.push_back(string.size, arena);
|
||||
}
|
||||
|
||||
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
|
||||
{
|
||||
}
|
||||
|
||||
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override
|
||||
{
|
||||
}
|
||||
|
||||
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
|
||||
{
|
||||
auto & cur_state = data(place);
|
||||
auto & rhs_state = data(rhs);
|
||||
|
||||
cur_state.chars.insert(rhs_state.chars.begin(), rhs_state.chars.end(), arena);
|
||||
cur_state.offsets.insert(rhs_state.offsets.begin(), rhs_state.offsets.end(), arena);
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
auto & column_array = static_cast<ColumnArray &>(to);
|
||||
auto & column_string = static_cast<ColumnString &>(column_array.getData());
|
||||
auto & offsets = column_array.getOffsets();
|
||||
auto & cur_state = data(place);
|
||||
|
||||
offsets.push_back((offsets.size() == 0 ? 0 : offsets.back()) + cur_state.offsets.size());
|
||||
|
||||
auto pos = column_string.getChars().size();
|
||||
column_string.getChars().insert(cur_state.chars.begin(), cur_state.chars.end());
|
||||
|
||||
column_string.getOffsets().reserve(column_string.getOffsets().size() + cur_state.offsets.size());
|
||||
for (UInt64 i = 0; i < cur_state.offsets.size(); ++i)
|
||||
{
|
||||
pos += cur_state.offsets[i];
|
||||
column_string.getOffsets().push_back(pos);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
struct AggregateFunctionGroupArrayGeneric_SerializedData
|
||||
{
|
||||
PODArrayArenaAllocator<StringRef> values;
|
||||
};
|
||||
|
||||
class AggregateFunctionGroupArrayGeneric_SerializedDataImpl final
|
||||
: public IUnaryAggregateFunction<AggregateFunctionGroupArrayGeneric_SerializedData, AggregateFunctionGroupArrayGeneric_SerializedDataImpl>
|
||||
{
|
||||
private:
|
||||
DataTypePtr type;
|
||||
|
||||
public:
|
||||
String getName() const override { return "groupArray"; }
|
||||
|
||||
DataTypePtr getReturnType() const override
|
||||
{
|
||||
return std::make_shared<DataTypeArray>(type);
|
||||
}
|
||||
|
||||
void setArgument(const DataTypePtr & argument)
|
||||
{
|
||||
type = argument;
|
||||
}
|
||||
|
||||
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena * arena) const
|
||||
{
|
||||
const char * begin = nullptr;
|
||||
data(place).values.push_back(column.serializeValueIntoArena(row_num, *arena, begin), arena);
|
||||
}
|
||||
|
||||
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
|
||||
{
|
||||
for (const StringRef & elem : data(rhs).values)
|
||||
data(place).values.push_back(StringRef(arena->insert(elem.data, elem.size), elem.size), arena);
|
||||
}
|
||||
|
||||
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
|
||||
{
|
||||
}
|
||||
|
||||
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
|
||||
{
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
ColumnArray & column_array = typeid_cast<ColumnArray &>(to);
|
||||
auto & column_data = column_array.getData();
|
||||
auto & offsets = column_array.getOffsets();
|
||||
auto & cur_state = data(place);
|
||||
|
||||
offsets.push_back((offsets.size() == 0 ? 0 : offsets.back()) + cur_state.values.size());
|
||||
|
||||
column_data.reserve(cur_state.values.size());
|
||||
|
||||
for (const StringRef & elem : cur_state.values)
|
||||
column_data.deserializeAndInsertFromArena(elem.data);
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override
|
||||
{
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
|
||||
|
||||
struct AggregateFunctionGroupArrayGeneric_ColumnPtrImpl_Data
|
||||
{
|
||||
ColumnPtr container;
|
||||
};
|
||||
|
||||
class AggregateFunctionGroupArrayGeneric_ColumnPtrImpl final
|
||||
: public IUnaryAggregateFunction<AggregateFunctionGroupArrayGeneric_ColumnPtrImpl_Data, AggregateFunctionGroupArrayGeneric_ColumnPtrImpl>
|
||||
{
|
||||
private:
|
||||
DataTypePtr type;
|
||||
|
||||
public:
|
||||
String getName() const override { return "groupArray"; }
|
||||
|
||||
DataTypePtr getReturnType() const override
|
||||
{
|
||||
return std::make_shared<DataTypeArray>(type);
|
||||
}
|
||||
|
||||
void setArgument(const DataTypePtr & argument)
|
||||
{
|
||||
type = argument;
|
||||
}
|
||||
|
||||
void create(AggregateDataPtr place) const override
|
||||
{
|
||||
new (place) Data;
|
||||
data(place).container = type->createColumn();
|
||||
}
|
||||
|
||||
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena *) const
|
||||
{
|
||||
data(place).container->insertFrom(column, row_num);
|
||||
}
|
||||
|
||||
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
|
||||
{
|
||||
data(place).container->insertRangeFrom(*data(rhs).container, 0, data(rhs).container->size());
|
||||
}
|
||||
|
||||
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
|
||||
{
|
||||
UInt64 s = data(place).container->size();
|
||||
writeVarUInt(s, buf);
|
||||
type->serializeBinaryBulk(*data(place).container, buf, 0, s);
|
||||
}
|
||||
|
||||
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
|
||||
{
|
||||
UInt64 s;
|
||||
readVarUInt(s, buf);
|
||||
type->deserializeBinaryBulk(*data(place).container, buf, s, 0);
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
ColumnArray & array_column = typeid_cast<ColumnArray &>(to);
|
||||
auto s = data(place).container->size();
|
||||
array_column.getOffsets().push_back(s);
|
||||
array_column.getData().insertRangeFrom(*data(place).container, 0, s);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
|
||||
#undef AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ARRAY_SIZE
|
||||
|
||||
}
|
||||
|
@ -98,6 +98,8 @@ private:
|
||||
size_in_bytes += head->size();
|
||||
}
|
||||
|
||||
friend class ArenaAllocator;
|
||||
|
||||
public:
|
||||
Arena(size_t initial_size_ = 4096, size_t growth_factor_ = 2, size_t linear_growth_threshold_ = 128 * 1024 * 1024)
|
||||
: growth_factor(growth_factor_), linear_growth_threshold(linear_growth_threshold_),
|
||||
|
@ -39,7 +39,7 @@ namespace DB
|
||||
template <typename T, size_t INITIAL_SIZE = 4096, typename TAllocator = Allocator<false>, size_t pad_right_ = 0>
|
||||
class PODArray : private boost::noncopyable, private TAllocator /// empty base optimization
|
||||
{
|
||||
private:
|
||||
protected:
|
||||
/// Round padding up to an whole number of elements to simplify arithmetic.
|
||||
static constexpr size_t pad_right = (pad_right_ + sizeof(T) - 1) / sizeof(T) * sizeof(T);
|
||||
|
||||
|
@ -10,3 +10,10 @@
|
||||
9 100
|
||||
|
||||
0 1000000
|
||||
1000000 500000500000 1000000
|
||||
1000000 500000500000 1000000
|
||||
1000000 500000500000 1000000 500000500000
|
||||
1000000 500000500000 1000000 500000500000
|
||||
2000000 1000001000000 1000000
|
||||
2000000 1000001000000 1000000
|
||||
2000000 1000001000000 1000000 1000001000000
|
||||
|
@ -1 +1,16 @@
|
||||
SELECT intDiv(number, 100) AS k, length(groupArray(number)) FROM (SELECT * FROM system.numbers LIMIT 1000000) GROUP BY k WITH TOTALS ORDER BY k LIMIT 10;
|
||||
SELECT intDiv(number, 100) AS k, length(groupArray2(number)) FROM (SELECT * FROM system.numbers LIMIT 1000000) GROUP BY k WITH TOTALS ORDER BY k LIMIT 10;
|
||||
|
||||
DROP TABLE IF EXISTS test.numbers_mt;
|
||||
CREATE TABLE test.numbers_mt (number UInt64) ENGINE = Log;
|
||||
INSERT INTO test.numbers_mt SELECT * FROM system.numbers LIMIT 1, 1000000;
|
||||
|
||||
SELECT count(), sum(ns), max(ns) FROM (SELECT intDiv(number, 100) AS k, groupArray2(number) AS ns FROM test.numbers_mt GROUP BY k) ARRAY JOIN ns;
|
||||
SELECT count(), sum(toUInt64(ns)), max(toUInt64(ns)) FROM (SELECT intDiv(number, 100) AS k, groupArray2(toString(number)) AS ns FROM test.numbers_mt GROUP BY k) ARRAY JOIN ns;
|
||||
SELECT count(), sum(toUInt64(ns[1])), max(toUInt64(ns[1])), sum(toUInt64(ns[2]))/10 FROM (SELECT intDiv(number, 100) AS k, groupArray2([toString(number), toString(number*10)]) AS ns FROM test.numbers_mt GROUP BY k) ARRAY JOIN ns;
|
||||
SELECT count(), sum(ns[1]), max(ns[1]), sum(ns[2])/10 FROM (SELECT intDiv(number, 100) AS k, groupArray2([number, number*10]) AS ns FROM test.numbers_mt GROUP BY k) ARRAY JOIN ns;
|
||||
|
||||
SELECT count(), sum(ns), max(ns) FROM (SELECT intDiv(number, 100) AS k, groupArray2(number) AS ns FROM remote('127.0.0.{1,2}', 'test', 'numbers_mt') GROUP BY k) ARRAY JOIN ns;
|
||||
SELECT count(), sum(toUInt64(ns)), max(toUInt64(ns)) FROM (SELECT intDiv(number, 100) AS k, groupArray2(toString(number)) AS ns FROM remote('127.0.0.{1,2}', 'test', 'numbers_mt') GROUP BY k) ARRAY JOIN ns;
|
||||
SELECT count(), sum(toUInt64(ns[1])), max(toUInt64(ns[1])), sum(toUInt64(ns[2]))/10 FROM (SELECT intDiv(number, 100) AS k, groupArray2([toString(number), toString(number*10)]) AS ns FROM remote('127.0.0.{1,2}', 'test', 'numbers_mt') GROUP BY k) ARRAY JOIN ns;
|
||||
|
||||
DROP TABLE test.numbers_mt;
|
||||
|
Loading…
Reference in New Issue
Block a user