mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 00:52:02 +00:00
Get rid of old groupArray() implementations. [#CLICKHOUSE-3084]
This commit is contained in:
parent
792faaa2db
commit
fefb4dad67
@ -8,22 +8,7 @@ namespace DB
|
|||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
|
|
||||||
AggregateFunctionPtr createAggregateFunctionGroupArray(const std::string & name, const DataTypes & argument_types, const Array & parameters)
|
static AggregateFunctionPtr createAggregateFunctionGroupArray(const std::string & name, const DataTypes & argument_types, const Array & parameters)
|
||||||
{
|
|
||||||
if (argument_types.size() != 1)
|
|
||||||
throw Exception("Incorrect number of arguments for aggregate function " + name + ", should be 2",
|
|
||||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
|
||||||
|
|
||||||
AggregateFunctionPtr res(createWithNumericType<AggregateFunctionGroupArrayNumeric>(*argument_types[0]));
|
|
||||||
|
|
||||||
if (!res)
|
|
||||||
res = std::make_shared<AggregateFunctionGroupArrayGeneric>();
|
|
||||||
|
|
||||||
return res;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
AggregateFunctionPtr createAggregateFunctionGroupArray2(const std::string & name, const DataTypes & argument_types, const Array & parameters)
|
|
||||||
{
|
{
|
||||||
if (argument_types.size() != 1)
|
if (argument_types.size() != 1)
|
||||||
throw Exception("Incorrect number of arguments for aggregate function " + name + ", should be 1",
|
throw Exception("Incorrect number of arguments for aggregate function " + name + ", should be 1",
|
||||||
@ -38,67 +23,47 @@ AggregateFunctionPtr createAggregateFunctionGroupArray2(const std::string & name
|
|||||||
}
|
}
|
||||||
else if (parameters.size() == 1)
|
else if (parameters.size() == 1)
|
||||||
{
|
{
|
||||||
if (parameters[0].getType() == Field::Types::Int64 || parameters[0].getType() == Field::Types::UInt64)
|
auto type = parameters[0].getType();
|
||||||
{
|
if (type != Field::Types::Int64 && type != Field::Types::UInt64)
|
||||||
if ((parameters[0].getType() == Field::Types::Int64 && parameters[0].get<Int64>() < 0) ||
|
throw Exception("Parameter for aggregate function " + name + " should be positive number", ErrorCodes::BAD_ARGUMENTS);
|
||||||
(parameters[0].getType() == Field::Types::UInt64 && parameters[0].get<UInt64>() == 0))
|
|
||||||
|
if ((type == Field::Types::Int64 && parameters[0].get<Int64>() < 0) ||
|
||||||
|
(type == Field::Types::UInt64 && parameters[0].get<UInt64>() == 0))
|
||||||
throw Exception("Parameter for aggregate function " + name + " should be positive number", ErrorCodes::BAD_ARGUMENTS);
|
throw Exception("Parameter for aggregate function " + name + " should be positive number", ErrorCodes::BAD_ARGUMENTS);
|
||||||
|
|
||||||
limit_size = true;
|
limit_size = true;
|
||||||
max_elems = parameters[0].get<UInt64>();
|
max_elems = parameters[0].get<UInt64>();
|
||||||
}
|
}
|
||||||
else
|
|
||||||
throw Exception("Parameter for aggregate function " + name + " should be positive number", ErrorCodes::BAD_ARGUMENTS);
|
|
||||||
}
|
|
||||||
else
|
else
|
||||||
throw Exception("Incorrect number of parameters for aggregate function " + name + ", should be 0 or 1",
|
throw Exception("Incorrect number of parameters for aggregate function " + name + ", should be 0 or 1",
|
||||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||||
|
|
||||||
if (!limit_size)
|
if (!limit_size)
|
||||||
{
|
{
|
||||||
if (auto res = createWithNumericType<AggregateFunctionGroupArrayNumeric2, std::false_type>(*argument_types[0]))
|
if (auto res = createWithNumericType<GroupArrayNumericImpl, std::false_type>(*argument_types[0]))
|
||||||
return AggregateFunctionPtr(res);
|
return AggregateFunctionPtr(res);
|
||||||
else if (typeid_cast<const DataTypeString *>(argument_types[0].get()))
|
else if (typeid_cast<const DataTypeString *>(argument_types[0].get()))
|
||||||
return std::make_shared<AggregateFunctionGroupArrayStringListImpl<NodeString, false>>();
|
return std::make_shared<GroupArrayGeneralListImpl<NodeString, false>>();
|
||||||
else
|
else
|
||||||
return std::make_shared<AggregateFunctionGroupArrayStringListImpl<NodeGeneral, false>>();
|
return std::make_shared<GroupArrayGeneralListImpl<NodeGeneral, false>>();
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if (auto res = createWithNumericType<AggregateFunctionGroupArrayNumeric2, std::true_type>(*argument_types[0], max_elems))
|
if (auto res = createWithNumericType<GroupArrayNumericImpl, std::true_type>(*argument_types[0], max_elems))
|
||||||
{
|
|
||||||
return AggregateFunctionPtr(res);
|
return AggregateFunctionPtr(res);
|
||||||
}
|
|
||||||
else if (typeid_cast<const DataTypeString *>(argument_types[0].get()))
|
else if (typeid_cast<const DataTypeString *>(argument_types[0].get()))
|
||||||
return std::make_shared<AggregateFunctionGroupArrayStringListImpl<NodeString, true>>(max_elems);
|
return std::make_shared<GroupArrayGeneralListImpl<NodeString, true>>(max_elems);
|
||||||
else
|
else
|
||||||
return std::make_shared<AggregateFunctionGroupArrayStringListImpl<NodeGeneral, true>>(max_elems);
|
return std::make_shared<GroupArrayGeneralListImpl<NodeGeneral, true>>(max_elems);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
AggregateFunctionPtr createAggregateFunctionGroupArray4(const std::string & name, const DataTypes & argument_types, const Array & parameters)
|
|
||||||
{
|
|
||||||
if (argument_types.size() != 1)
|
|
||||||
throw Exception("Incorrect number of arguments for aggregate function " + name + ", should be 2",
|
|
||||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
|
||||||
|
|
||||||
// if (auto res = createWithNumericType<AggregateFunctionGroupArrayNumeric2>(*argument_types[0]))
|
|
||||||
// return AggregateFunctionPtr(res);
|
|
||||||
|
|
||||||
if (typeid_cast<const DataTypeString *>(argument_types[0].get()))
|
|
||||||
return std::make_shared<AggregateFunctionGroupArrayStringConcatImpl>();
|
|
||||||
else
|
|
||||||
return std::make_shared<AggregateFunctionGroupArrayGeneric_ColumnPtrImpl>();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
void registerAggregateFunctionGroupArray(AggregateFunctionFactory & factory)
|
void registerAggregateFunctionGroupArray(AggregateFunctionFactory & factory)
|
||||||
{
|
{
|
||||||
factory.registerFunction("groupArray", createAggregateFunctionGroupArray);
|
factory.registerFunction("groupArray", createAggregateFunctionGroupArray);
|
||||||
factory.registerFunction("groupArray2", createAggregateFunctionGroupArray2);
|
|
||||||
factory.registerFunction("groupArray4", createAggregateFunctionGroupArray4);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -30,106 +30,32 @@ namespace ErrorCodes
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/// A particular case is an implementation for numeric types.
|
namespace
|
||||||
template <typename T>
|
|
||||||
struct AggregateFunctionGroupArrayDataNumeric
|
|
||||||
{
|
{
|
||||||
/// Memory is allocated to several elements immediately so that the state occupies 64 bytes.
|
|
||||||
static constexpr size_t bytes_in_arena = 64 - sizeof(PODArray<T>);
|
|
||||||
|
|
||||||
using Array = PODArray<T, bytes_in_arena, AllocatorWithStackMemory<Allocator<false>, bytes_in_arena>>;
|
|
||||||
Array value;
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
template <typename T>
|
|
||||||
class AggregateFunctionGroupArrayNumeric final
|
|
||||||
: public IUnaryAggregateFunction<AggregateFunctionGroupArrayDataNumeric<T>, AggregateFunctionGroupArrayNumeric<T>>
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
String getName() const override { return "groupArray"; }
|
|
||||||
|
|
||||||
DataTypePtr getReturnType() const override
|
|
||||||
{
|
|
||||||
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeNumber<T>>());
|
|
||||||
}
|
|
||||||
|
|
||||||
void setArgument(const DataTypePtr & argument)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena *) const
|
|
||||||
{
|
|
||||||
this->data(place).value.push_back(static_cast<const ColumnVector<T> &>(column).getData()[row_num]);
|
|
||||||
}
|
|
||||||
|
|
||||||
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
|
|
||||||
{
|
|
||||||
this->data(place).value.insert(this->data(rhs).value.begin(), this->data(rhs).value.end());
|
|
||||||
}
|
|
||||||
|
|
||||||
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
|
|
||||||
{
|
|
||||||
const auto & value = this->data(place).value;
|
|
||||||
size_t size = value.size();
|
|
||||||
writeVarUInt(size, buf);
|
|
||||||
buf.write(reinterpret_cast<const char *>(&value[0]), size * sizeof(value[0]));
|
|
||||||
}
|
|
||||||
|
|
||||||
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
|
|
||||||
{
|
|
||||||
size_t size = 0;
|
|
||||||
readVarUInt(size, buf);
|
|
||||||
|
|
||||||
if (unlikely(size > AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ARRAY_SIZE))
|
|
||||||
throw Exception("Too large array size", ErrorCodes::TOO_LARGE_ARRAY_SIZE);
|
|
||||||
|
|
||||||
auto & value = this->data(place).value;
|
|
||||||
|
|
||||||
value.resize(size);
|
|
||||||
buf.read(reinterpret_cast<char *>(&value[0]), size * sizeof(value[0]));
|
|
||||||
}
|
|
||||||
|
|
||||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
|
||||||
{
|
|
||||||
const auto & value = this->data(place).value;
|
|
||||||
size_t size = value.size();
|
|
||||||
|
|
||||||
ColumnArray & arr_to = static_cast<ColumnArray &>(to);
|
|
||||||
ColumnArray::Offsets_t & offsets_to = arr_to.getOffsets();
|
|
||||||
|
|
||||||
offsets_to.push_back((offsets_to.size() == 0 ? 0 : offsets_to.back()) + size);
|
|
||||||
|
|
||||||
typename ColumnVector<T>::Container_t & data_to = static_cast<ColumnVector<T> &>(arr_to.getData()).getData();
|
|
||||||
data_to.insert(this->data(place).value.begin(), this->data(place).value.end());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
/// A particular case is an implementation for numeric types.
|
/// A particular case is an implementation for numeric types.
|
||||||
template <typename T>
|
template <typename T>
|
||||||
struct AggregateFunctionGroupArrayDataNumeric2
|
struct GroupArrayNumericData
|
||||||
{
|
{
|
||||||
/// Memory is allocated to several elements immediately so that the state occupies 64 bytes.
|
// Switch to ordinary Allocator after 4096 bytes to avoid fragmentation and trash in Arena
|
||||||
static constexpr size_t bytes_in_arena = 64 - sizeof(PODArrayArenaAllocator<T>);
|
using Allocator = MixedArenaAllocator<4096>;
|
||||||
|
using Array = PODArrayArenaAllocator<T, 32, Allocator>;
|
||||||
|
|
||||||
//using Array = PODArrayArenaAllocator<T, bytes_in_arena, ArenaAllocatorWithStackMemoty<bytes_in_arena>>;
|
|
||||||
using Array = PODArrayArenaAllocator<T, bytes_in_arena, MixedArenaAllocator<4096>>;
|
|
||||||
Array value;
|
Array value;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
template <typename T, typename Tlimit_num_elems>
|
template <typename T, typename Tlimit_num_elems>
|
||||||
class AggregateFunctionGroupArrayNumeric2 final
|
class GroupArrayNumericImpl final
|
||||||
: public IUnaryAggregateFunction<AggregateFunctionGroupArrayDataNumeric2<T>, AggregateFunctionGroupArrayNumeric2<T, Tlimit_num_elems>>
|
: public IUnaryAggregateFunction<GroupArrayNumericData<T>, GroupArrayNumericImpl<T, Tlimit_num_elems>>
|
||||||
{
|
{
|
||||||
static constexpr bool limit_num_elems = Tlimit_num_elems::value;
|
static constexpr bool limit_num_elems = Tlimit_num_elems::value;
|
||||||
UInt64 max_elems;
|
UInt64 max_elems;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
AggregateFunctionGroupArrayNumeric2(UInt64 max_elems_ = std::numeric_limits<UInt64>::max()) : max_elems(max_elems_) {}
|
GroupArrayNumericImpl(UInt64 max_elems_ = std::numeric_limits<UInt64>::max()) : max_elems(max_elems_) {}
|
||||||
|
|
||||||
String getName() const override { return "groupArray2"; }
|
String getName() const override { return "groupArray"; }
|
||||||
|
|
||||||
DataTypePtr getReturnType() const override
|
DataTypePtr getReturnType() const override
|
||||||
{
|
{
|
||||||
@ -206,82 +132,18 @@ public:
|
|||||||
typename ColumnVector<T>::Container_t & data_to = static_cast<ColumnVector<T> &>(arr_to.getData()).getData();
|
typename ColumnVector<T>::Container_t & data_to = static_cast<ColumnVector<T> &>(arr_to.getData()).getData();
|
||||||
data_to.insert(this->data(place).value.begin(), this->data(place).value.end());
|
data_to.insert(this->data(place).value.begin(), this->data(place).value.end());
|
||||||
}
|
}
|
||||||
};
|
|
||||||
|
|
||||||
|
bool allocatesMemoryInArena() const override
|
||||||
/// General case (inefficient). NOTE You can also implement a special case for strings.
|
|
||||||
struct AggregateFunctionGroupArrayDataGeneric
|
|
||||||
{
|
|
||||||
Array value; /// TODO Add MemoryTracker
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
/// Puts all values to an array, general case. Implemented inefficiently.
|
|
||||||
class AggregateFunctionGroupArrayGeneric final
|
|
||||||
: public IUnaryAggregateFunction<AggregateFunctionGroupArrayDataGeneric, AggregateFunctionGroupArrayGeneric>
|
|
||||||
{
|
|
||||||
private:
|
|
||||||
DataTypePtr type;
|
|
||||||
|
|
||||||
public:
|
|
||||||
String getName() const override { return "groupArray"; }
|
|
||||||
|
|
||||||
DataTypePtr getReturnType() const override
|
|
||||||
{
|
{
|
||||||
return std::make_shared<DataTypeArray>(type);
|
return true;
|
||||||
}
|
|
||||||
|
|
||||||
void setArgument(const DataTypePtr & argument)
|
|
||||||
{
|
|
||||||
type = argument;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena *) const
|
|
||||||
{
|
|
||||||
data(place).value.push_back(Array::value_type());
|
|
||||||
column.get(row_num, data(place).value.back());
|
|
||||||
}
|
|
||||||
|
|
||||||
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
|
|
||||||
{
|
|
||||||
data(place).value.insert(data(place).value.end(), data(rhs).value.begin(), data(rhs).value.end());
|
|
||||||
}
|
|
||||||
|
|
||||||
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
|
|
||||||
{
|
|
||||||
const Array & value = data(place).value;
|
|
||||||
size_t size = value.size();
|
|
||||||
writeVarUInt(size, buf);
|
|
||||||
for (size_t i = 0; i < size; ++i)
|
|
||||||
type->serializeBinary(value[i], buf);
|
|
||||||
}
|
|
||||||
|
|
||||||
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
|
|
||||||
{
|
|
||||||
size_t size = 0;
|
|
||||||
readVarUInt(size, buf);
|
|
||||||
|
|
||||||
if (unlikely(size > AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ARRAY_SIZE))
|
|
||||||
throw Exception("Too large array size", ErrorCodes::TOO_LARGE_ARRAY_SIZE);
|
|
||||||
|
|
||||||
Array & value = data(place).value;
|
|
||||||
|
|
||||||
value.resize(size);
|
|
||||||
for (size_t i = 0; i < size; ++i)
|
|
||||||
type->deserializeBinary(value[i], buf);
|
|
||||||
}
|
|
||||||
|
|
||||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
|
||||||
{
|
|
||||||
to.insert(data(place).value);
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
namespace
|
/// General case
|
||||||
{
|
|
||||||
|
|
||||||
|
|
||||||
|
/// Nodes used to implement linked list for stoarge of groupArray states
|
||||||
struct NodeString;
|
struct NodeString;
|
||||||
struct NodeGeneral;
|
struct NodeGeneral;
|
||||||
|
|
||||||
@ -289,8 +151,9 @@ template <typename Node>
|
|||||||
struct NodeBase
|
struct NodeBase
|
||||||
{
|
{
|
||||||
Node * next;
|
Node * next;
|
||||||
UInt64 size;
|
UInt64 size; // size of payload
|
||||||
|
|
||||||
|
/// Returns pointer to actual payload
|
||||||
char * data()
|
char * data()
|
||||||
{
|
{
|
||||||
static_assert(sizeof(NodeBase) == sizeof(Node));
|
static_assert(sizeof(NodeBase) == sizeof(Node));
|
||||||
@ -346,7 +209,6 @@ struct NodeString : public NodeBase<NodeString>
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
struct NodeGeneral : public NodeBase<NodeGeneral>
|
struct NodeGeneral : public NodeBase<NodeGeneral>
|
||||||
{
|
{
|
||||||
using Node = NodeGeneral;
|
using Node = NodeGeneral;
|
||||||
@ -371,7 +233,7 @@ struct NodeGeneral : public NodeBase<NodeGeneral>
|
|||||||
|
|
||||||
|
|
||||||
template <typename Node>
|
template <typename Node>
|
||||||
struct AggregateFunctionGroupArrayListImpl_Data
|
struct GroupArrayGeneralListData
|
||||||
{
|
{
|
||||||
UInt64 elems = 0;
|
UInt64 elems = 0;
|
||||||
Node * first = nullptr;
|
Node * first = nullptr;
|
||||||
@ -379,13 +241,13 @@ struct AggregateFunctionGroupArrayListImpl_Data
|
|||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
/// Implementation of groupArray(String or ComplexObject) via linked list
|
/// Implementation of groupArray for String or any ComplexObject via linked list
|
||||||
/// It has poor performance in case of many small objects
|
/// It has poor performance in case of many small objects
|
||||||
template <typename Node, bool limit_num_elems>
|
template <typename Node, bool limit_num_elems>
|
||||||
class AggregateFunctionGroupArrayStringListImpl final
|
class GroupArrayGeneralListImpl final
|
||||||
: public IUnaryAggregateFunction<AggregateFunctionGroupArrayListImpl_Data<Node>, AggregateFunctionGroupArrayStringListImpl<Node, limit_num_elems>>
|
: public IUnaryAggregateFunction<GroupArrayGeneralListData<Node>, GroupArrayGeneralListImpl<Node, limit_num_elems>>
|
||||||
{
|
{
|
||||||
using Data = AggregateFunctionGroupArrayListImpl_Data<Node>;
|
using Data = GroupArrayGeneralListData<Node>;
|
||||||
static Data & data(AggregateDataPtr place) { return *reinterpret_cast<Data*>(place); }
|
static Data & data(AggregateDataPtr place) { return *reinterpret_cast<Data*>(place); }
|
||||||
static const Data & data(ConstAggregateDataPtr place) { return *reinterpret_cast<const Data*>(place); }
|
static const Data & data(ConstAggregateDataPtr place) { return *reinterpret_cast<const Data*>(place); }
|
||||||
|
|
||||||
@ -393,9 +255,9 @@ class AggregateFunctionGroupArrayStringListImpl final
|
|||||||
UInt64 max_elems;
|
UInt64 max_elems;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
AggregateFunctionGroupArrayStringListImpl(UInt64 max_elems_ = std::numeric_limits<UInt64>::max()) : max_elems(max_elems_) {}
|
GroupArrayGeneralListImpl(UInt64 max_elems_ = std::numeric_limits<UInt64>::max()) : max_elems(max_elems_) {}
|
||||||
|
|
||||||
String getName() const override { return "groupArray2"; }
|
String getName() const override { return "groupArray"; }
|
||||||
|
|
||||||
DataTypePtr getReturnType() const override { return std::make_shared<DataTypeArray>(data_type); }
|
DataTypePtr getReturnType() const override { return std::make_shared<DataTypeArray>(data_type); }
|
||||||
|
|
||||||
@ -553,207 +415,6 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
struct AggregateFunctionGroupArrayStringConcatImpl_Data
|
|
||||||
{
|
|
||||||
static constexpr size_t target_size = 64;
|
|
||||||
static constexpr size_t free_space = target_size - sizeof(PODArrayArenaAllocator<char>) - sizeof(PODArrayArenaAllocator<UInt64>);
|
|
||||||
|
|
||||||
PODArrayArenaAllocator<char, 64> chars;
|
|
||||||
PODArrayArenaAllocator<UInt64, free_space, ArenaAllocatorWithStackMemoty<free_space>> offsets;
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
class AggregateFunctionGroupArrayStringConcatImpl final
|
|
||||||
: public IUnaryAggregateFunction<AggregateFunctionGroupArrayStringConcatImpl_Data, AggregateFunctionGroupArrayStringConcatImpl>
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
|
|
||||||
String getName() const override { return "groupArray4"; }
|
|
||||||
|
|
||||||
DataTypePtr getReturnType() const override { return std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()); }
|
|
||||||
|
|
||||||
void setArgument(const DataTypePtr & argument) {}
|
|
||||||
|
|
||||||
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena * arena) const
|
|
||||||
{
|
|
||||||
StringRef string = static_cast<const ColumnString &>(column).getDataAtWithTerminatingZero(row_num);
|
|
||||||
|
|
||||||
data(place).chars.insert(string.data, string.data + string.size, arena);
|
|
||||||
data(place).offsets.push_back(string.size, arena);
|
|
||||||
}
|
|
||||||
|
|
||||||
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
|
|
||||||
{
|
|
||||||
auto & cur_state = data(place);
|
|
||||||
auto & rhs_state = data(rhs);
|
|
||||||
|
|
||||||
cur_state.chars.insert(rhs_state.chars.begin(), rhs_state.chars.end(), arena);
|
|
||||||
cur_state.offsets.insert(rhs_state.offsets.begin(), rhs_state.offsets.end(), arena);
|
|
||||||
}
|
|
||||||
|
|
||||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
|
||||||
{
|
|
||||||
auto & column_array = static_cast<ColumnArray &>(to);
|
|
||||||
auto & column_string = static_cast<ColumnString &>(column_array.getData());
|
|
||||||
auto & offsets = column_array.getOffsets();
|
|
||||||
auto & cur_state = data(place);
|
|
||||||
|
|
||||||
offsets.push_back((offsets.size() == 0 ? 0 : offsets.back()) + cur_state.offsets.size());
|
|
||||||
|
|
||||||
auto pos = column_string.getChars().size();
|
|
||||||
column_string.getChars().insert(cur_state.chars.begin(), cur_state.chars.end());
|
|
||||||
|
|
||||||
column_string.getOffsets().reserve(column_string.getOffsets().size() + cur_state.offsets.size());
|
|
||||||
for (UInt64 i = 0; i < cur_state.offsets.size(); ++i)
|
|
||||||
{
|
|
||||||
pos += cur_state.offsets[i];
|
|
||||||
column_string.getOffsets().push_back(pos);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
struct AggregateFunctionGroupArrayGeneric_SerializedData
|
|
||||||
{
|
|
||||||
PODArrayArenaAllocator<StringRef> values;
|
|
||||||
};
|
|
||||||
|
|
||||||
class AggregateFunctionGroupArrayGeneric_SerializedDataImpl final
|
|
||||||
: public IUnaryAggregateFunction<AggregateFunctionGroupArrayGeneric_SerializedData, AggregateFunctionGroupArrayGeneric_SerializedDataImpl>
|
|
||||||
{
|
|
||||||
private:
|
|
||||||
DataTypePtr type;
|
|
||||||
|
|
||||||
public:
|
|
||||||
String getName() const override { return "groupArray"; }
|
|
||||||
|
|
||||||
DataTypePtr getReturnType() const override
|
|
||||||
{
|
|
||||||
return std::make_shared<DataTypeArray>(type);
|
|
||||||
}
|
|
||||||
|
|
||||||
void setArgument(const DataTypePtr & argument)
|
|
||||||
{
|
|
||||||
type = argument;
|
|
||||||
}
|
|
||||||
|
|
||||||
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena * arena) const
|
|
||||||
{
|
|
||||||
const char * begin = nullptr;
|
|
||||||
data(place).values.push_back(column.serializeValueIntoArena(row_num, *arena, begin), arena);
|
|
||||||
}
|
|
||||||
|
|
||||||
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
|
|
||||||
{
|
|
||||||
for (const StringRef & elem : data(rhs).values)
|
|
||||||
data(place).values.push_back(StringRef(arena->insert(elem.data, elem.size), elem.size), arena);
|
|
||||||
}
|
|
||||||
|
|
||||||
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
|
||||||
{
|
|
||||||
ColumnArray & column_array = static_cast<ColumnArray &>(to);
|
|
||||||
auto & column_data = column_array.getData();
|
|
||||||
auto & offsets = column_array.getOffsets();
|
|
||||||
auto & cur_state = data(place);
|
|
||||||
|
|
||||||
offsets.push_back((offsets.size() == 0 ? 0 : offsets.back()) + cur_state.values.size());
|
|
||||||
|
|
||||||
column_data.reserve(cur_state.values.size());
|
|
||||||
|
|
||||||
for (const StringRef & elem : cur_state.values)
|
|
||||||
column_data.deserializeAndInsertFromArena(elem.data);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool allocatesMemoryInArena() const override
|
|
||||||
{
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
struct AggregateFunctionGroupArrayGeneric_ColumnPtrImpl_Data
|
|
||||||
{
|
|
||||||
ColumnPtr container;
|
|
||||||
};
|
|
||||||
|
|
||||||
class AggregateFunctionGroupArrayGeneric_ColumnPtrImpl final
|
|
||||||
: public IUnaryAggregateFunction<AggregateFunctionGroupArrayGeneric_ColumnPtrImpl_Data, AggregateFunctionGroupArrayGeneric_ColumnPtrImpl>
|
|
||||||
{
|
|
||||||
private:
|
|
||||||
DataTypePtr type;
|
|
||||||
|
|
||||||
public:
|
|
||||||
String getName() const override { return "groupArray4"; }
|
|
||||||
|
|
||||||
DataTypePtr getReturnType() const override
|
|
||||||
{
|
|
||||||
return std::make_shared<DataTypeArray>(type);
|
|
||||||
}
|
|
||||||
|
|
||||||
void setArgument(const DataTypePtr & argument)
|
|
||||||
{
|
|
||||||
type = argument;
|
|
||||||
}
|
|
||||||
|
|
||||||
void create(AggregateDataPtr place) const override
|
|
||||||
{
|
|
||||||
new (place) Data;
|
|
||||||
data(place).container = type->createColumn();
|
|
||||||
}
|
|
||||||
|
|
||||||
void addImpl(AggregateDataPtr place, const IColumn & column, size_t row_num, Arena *) const
|
|
||||||
{
|
|
||||||
data(place).container->insertFrom(column, row_num);
|
|
||||||
}
|
|
||||||
|
|
||||||
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
|
|
||||||
{
|
|
||||||
data(place).container->insertRangeFrom(*data(rhs).container, 0, data(rhs).container->size());
|
|
||||||
}
|
|
||||||
|
|
||||||
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
|
|
||||||
{
|
|
||||||
UInt64 s = data(place).container->size();
|
|
||||||
writeVarUInt(s, buf);
|
|
||||||
type->serializeBinaryBulk(*data(place).container, buf, 0, s);
|
|
||||||
}
|
|
||||||
|
|
||||||
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
|
|
||||||
{
|
|
||||||
UInt64 s;
|
|
||||||
readVarUInt(s, buf);
|
|
||||||
type->deserializeBinaryBulk(*data(place).container, buf, s, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
|
||||||
{
|
|
||||||
ColumnArray & array_column = static_cast<ColumnArray &>(to);
|
|
||||||
auto s = data(place).container->size();
|
|
||||||
array_column.getOffsets().push_back(s);
|
|
||||||
array_column.getData().insertRangeFrom(*data(place).container, 0, s);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
#undef AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ARRAY_SIZE
|
#undef AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ARRAY_SIZE
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,3 @@
|
|||||||
#include <Common/PODArray.h>
|
|
||||||
#include <Common/Arena.h>
|
#include <Common/Arena.h>
|
||||||
#include <Common/Allocator.h>
|
#include <Common/Allocator.h>
|
||||||
|
|
||||||
@ -6,8 +5,7 @@
|
|||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
/// Fake Allocator which proxies all allocations to Arena
|
/// Fake Allocator which proxies all allocations to Arena. Used in aggregate functions.
|
||||||
/// Used in aggregate functions
|
|
||||||
struct ArenaAllocator
|
struct ArenaAllocator
|
||||||
{
|
{
|
||||||
static void * alloc(size_t size, Arena * arena)
|
static void * alloc(size_t size, Arena * arena)
|
||||||
@ -32,10 +30,14 @@ struct ArenaAllocator
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void free(void * buf, size_t size) {}
|
static void free(void * buf, size_t size)
|
||||||
|
{
|
||||||
|
// Remains trash in arena
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
/// Switches to ordinary Allocator after REAL_ALLOCATION_TRESHOLD bytes to avoid fragmentation and trash in Arena.
|
||||||
template <size_t REAL_ALLOCATION_TRESHOLD = 4096, typename TRealAllocator = Allocator<false>, typename TArenaAllocator = ArenaAllocator>
|
template <size_t REAL_ALLOCATION_TRESHOLD = 4096, typename TRealAllocator = Allocator<false>, typename TArenaAllocator = ArenaAllocator>
|
||||||
class MixedArenaAllocator : private TRealAllocator
|
class MixedArenaAllocator : private TRealAllocator
|
||||||
{
|
{
|
||||||
@ -102,6 +104,8 @@ public:
|
|||||||
|
|
||||||
|
|
||||||
/// Similar to PODArray, but allocates memory using ArenaAllocator
|
/// Similar to PODArray, but allocates memory using ArenaAllocator
|
||||||
|
/// Uses additional Arena * arena parameter in all modification methods to not store it
|
||||||
|
/// TODO: avoid copypaste from PODArray
|
||||||
template <typename T, size_t INITIAL_SIZE = 32, typename TAllocator = ArenaAllocator>
|
template <typename T, size_t INITIAL_SIZE = 32, typename TAllocator = ArenaAllocator>
|
||||||
class PODArrayArenaAllocator : private TAllocator
|
class PODArrayArenaAllocator : private TAllocator
|
||||||
{
|
{
|
||||||
|
@ -1,17 +1,17 @@
|
|||||||
SELECT intDiv(number, 100) AS k, length(groupArray2(number)) FROM (SELECT * FROM system.numbers LIMIT 1000000) GROUP BY k WITH TOTALS ORDER BY k LIMIT 10;
|
SELECT intDiv(number, 100) AS k, length(groupArray(number)) FROM (SELECT * FROM system.numbers LIMIT 1000000) GROUP BY k WITH TOTALS ORDER BY k LIMIT 10;
|
||||||
|
|
||||||
DROP TABLE IF EXISTS test.numbers_mt;
|
DROP TABLE IF EXISTS test.numbers_mt;
|
||||||
CREATE TABLE test.numbers_mt (number UInt64) ENGINE = Log;
|
CREATE TABLE test.numbers_mt (number UInt64) ENGINE = Log;
|
||||||
INSERT INTO test.numbers_mt SELECT * FROM system.numbers LIMIT 1, 1000000;
|
INSERT INTO test.numbers_mt SELECT * FROM system.numbers LIMIT 1, 1000000;
|
||||||
|
|
||||||
SELECT count(), sum(ns), max(ns) FROM (SELECT intDiv(number, 100) AS k, groupArray2(number) AS ns FROM test.numbers_mt GROUP BY k) ARRAY JOIN ns;
|
SELECT count(), sum(ns), max(ns) FROM (SELECT intDiv(number, 100) AS k, groupArray(number) AS ns FROM test.numbers_mt GROUP BY k) ARRAY JOIN ns;
|
||||||
SELECT count(), sum(toUInt64(ns)), max(toUInt64(ns)) FROM (SELECT intDiv(number, 100) AS k, groupArray2(toString(number)) AS ns FROM test.numbers_mt GROUP BY k) ARRAY JOIN ns;
|
SELECT count(), sum(toUInt64(ns)), max(toUInt64(ns)) FROM (SELECT intDiv(number, 100) AS k, groupArray(toString(number)) AS ns FROM test.numbers_mt GROUP BY k) ARRAY JOIN ns;
|
||||||
SELECT count(), sum(toUInt64(ns[1])), max(toUInt64(ns[1])), sum(toUInt64(ns[2]))/10 FROM (SELECT intDiv(number, 100) AS k, groupArray2([toString(number), toString(number*10)]) AS ns FROM test.numbers_mt GROUP BY k) ARRAY JOIN ns;
|
SELECT count(), sum(toUInt64(ns[1])), max(toUInt64(ns[1])), sum(toUInt64(ns[2]))/10 FROM (SELECT intDiv(number, 100) AS k, groupArray([toString(number), toString(number*10)]) AS ns FROM test.numbers_mt GROUP BY k) ARRAY JOIN ns;
|
||||||
SELECT count(), sum(ns[1]), max(ns[1]), sum(ns[2])/10 FROM (SELECT intDiv(number, 100) AS k, groupArray2([number, number*10]) AS ns FROM test.numbers_mt GROUP BY k) ARRAY JOIN ns;
|
SELECT count(), sum(ns[1]), max(ns[1]), sum(ns[2])/10 FROM (SELECT intDiv(number, 100) AS k, groupArray([number, number*10]) AS ns FROM test.numbers_mt GROUP BY k) ARRAY JOIN ns;
|
||||||
|
|
||||||
SELECT count(), sum(ns), max(ns) FROM (SELECT intDiv(number, 100) AS k, groupArray2(number) AS ns FROM remote('127.0.0.{1,2}', 'test', 'numbers_mt') GROUP BY k) ARRAY JOIN ns;
|
SELECT count(), sum(ns), max(ns) FROM (SELECT intDiv(number, 100) AS k, groupArray(number) AS ns FROM remote('127.0.0.{1,2}', 'test', 'numbers_mt') GROUP BY k) ARRAY JOIN ns;
|
||||||
SELECT count(), sum(toUInt64(ns)), max(toUInt64(ns)) FROM (SELECT intDiv(number, 100) AS k, groupArray2(toString(number)) AS ns FROM remote('127.0.0.{1,2}', 'test', 'numbers_mt') GROUP BY k) ARRAY JOIN ns;
|
SELECT count(), sum(toUInt64(ns)), max(toUInt64(ns)) FROM (SELECT intDiv(number, 100) AS k, groupArray(toString(number)) AS ns FROM remote('127.0.0.{1,2}', 'test', 'numbers_mt') GROUP BY k) ARRAY JOIN ns;
|
||||||
SELECT count(), sum(toUInt64(ns[1])), max(toUInt64(ns[1])), sum(toUInt64(ns[2]))/10 FROM (SELECT intDiv(number, 100) AS k, groupArray2([toString(number), toString(number*10)]) AS ns FROM remote('127.0.0.{1,2}', 'test', 'numbers_mt') GROUP BY k) ARRAY JOIN ns;
|
SELECT count(), sum(toUInt64(ns[1])), max(toUInt64(ns[1])), sum(toUInt64(ns[2]))/10 FROM (SELECT intDiv(number, 100) AS k, groupArray([toString(number), toString(number*10)]) AS ns FROM remote('127.0.0.{1,2}', 'test', 'numbers_mt') GROUP BY k) ARRAY JOIN ns;
|
||||||
|
|
||||||
DROP TABLE test.numbers_mt;
|
DROP TABLE test.numbers_mt;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user