This commit is contained in:
Amos Bird 2019-12-27 11:40:12 +08:00
parent b4578c4be0
commit a61a62e0b2
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4

View File

@ -1,30 +1,29 @@
#pragma once
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <Common/ArenaAllocator.h>
#include <Common/assert_cast.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <common/likely.h>
#include <type_traits>
#include <common/likely.h>
#define AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ARRAY_SIZE 0xFFFFFF
namespace DB
{
namespace ErrorCodes
{
extern const int TOO_LARGE_ARRAY_SIZE;
@ -57,22 +56,8 @@ static constexpr const char * getNameByTrait()
__builtin_unreachable();
}
/// A particular case is an implementation for numeric types.
template <typename T, bool has_sampler>
struct GroupArrayNumericData;
template <typename T>
struct GroupArrayNumericData<T, false>
{
// Switch to ordinary Allocator after 4096 bytes to avoid fragmentation and trash in Arena
using Allocator = MixedAlignedArenaAllocator<alignof(T), 4096>;
using Array = PODArray<T, 32, Allocator>;
Array value;
};
template <typename T>
struct GroupArrayNumericData<T, true>
struct GroupArraySamplerData
{
// Switch to ordinary Allocator after 4096 bytes to avoid fragmentation and trash in Arena
using Allocator = MixedAlignedArenaAllocator<alignof(T), 4096>;
@ -101,6 +86,25 @@ struct GroupArrayNumericData<T, true>
}
};
/// A particular case is an implementation for numeric types.
template <typename T, bool has_sampler>
struct GroupArrayNumericData;
template <typename T>
struct GroupArrayNumericData<T, false>
{
// Switch to ordinary Allocator after 4096 bytes to avoid fragmentation and trash in Arena
using Allocator = MixedAlignedArenaAllocator<alignof(T), 4096>;
using Array = PODArray<T, 32, Allocator>;
Array value;
};
template <typename T>
struct GroupArrayNumericData<T, true> : public GroupArraySamplerData<T>
{
};
template <typename T, typename Trait>
class GroupArrayNumericImpl final
: public IAggregateFunctionDataHelper<GroupArrayNumericData<T, Trait::sampler != Sampler::NONE>, GroupArrayNumericImpl<T, Trait>>
@ -112,7 +116,8 @@ class GroupArrayNumericImpl final
UInt64 seed;
public:
explicit GroupArrayNumericImpl(const DataTypePtr & data_type_, UInt64 max_elems_ = std::numeric_limits<UInt64>::max(), UInt64 seed_ = 123456)
explicit GroupArrayNumericImpl(
const DataTypePtr & data_type_, UInt64 max_elems_ = std::numeric_limits<UInt64>::max(), UInt64 seed_ = 123456)
: IAggregateFunctionDataHelper<GroupArrayNumericData<T, Trait::sampler != Sampler::NONE>, GroupArrayNumericImpl<T, Trait>>(
{data_type_}, {})
, data_type(this->argument_types[0])
@ -123,10 +128,7 @@ public:
String getName() const override { return getNameByTrait<Trait>(); }
DataTypePtr getReturnType() const override
{
return std::make_shared<DataTypeArray>(data_type);
}
DataTypePtr getReturnType() const override { return std::make_shared<DataTypeArray>(data_type); }
void insert(Data & a, const T & v, Arena * arena) const
{
@ -141,6 +143,13 @@ public:
}
}
void create(AggregateDataPtr place) const override
{
[[maybe_unused]] auto a = new (place) Data;
if constexpr (Trait::sampler == Sampler::RNG)
a->rng.seed(seed);
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{
if constexpr (Trait::sampler == Sampler::NONE)
@ -155,8 +164,6 @@ public:
{
auto & a = this->data(place);
++a.total_values;
if (a.value.empty())
a.rng.seed(seed);
if (a.value.size() < max_elems)
a.value.push_back(assert_cast<const ColumnVector<T> &>(*columns[0]).getData()[row_num], arena);
else
@ -294,10 +301,7 @@ public:
}
}
bool allocatesMemoryInArena() const override
{
return true;
}
bool allocatesMemoryInArena() const override { return true; }
};
@ -312,15 +316,9 @@ struct GroupArrayNodeBase
UInt64 size; // size of payload
/// Returns pointer to actual payload
char * data()
{
return reinterpret_cast<char *>(this) + sizeof(Node);
}
char * data() { return reinterpret_cast<char *>(this) + sizeof(Node); }
const char * data() const
{
return reinterpret_cast<const char *>(this) + sizeof(Node);
}
const char * data() const { return reinterpret_cast<const char *>(this) + sizeof(Node); }
/// Clones existing node (does not modify next field)
Node * clone(Arena * arena) const
@ -386,33 +384,9 @@ struct GroupArrayNodeGeneral : public GroupArrayNodeBase<GroupArrayNodeGeneral>
return node;
}
void insertInto(IColumn & column)
{
column.deserializeAndInsertFromArena(data());
}
void insertInto(IColumn & column) { column.deserializeAndInsertFromArena(data()); }
};
class MyAllocator : protected Allocator<false>
{
using Base = Allocator<false>;
public:
void * alloc(size_t size, Arena *)
{
return Base::alloc(size, 8);
}
void free(void * buf, size_t size)
{
Base::free(buf, size);
}
void * realloc(void * buf, size_t old_size, size_t new_size, Arena *)
{
return Base::realloc(buf, old_size, new_size, 8);
}
};
template <typename Node, bool has_sampler>
struct GroupArrayGeneralData;
@ -420,54 +394,26 @@ template <typename Node>
struct GroupArrayGeneralData<Node, false>
{
// Switch to ordinary Allocator after 4096 bytes to avoid fragmentation and trash in Arena
using Allocator = MixedAlignedArenaAllocator<alignof(Node*), 4096>;
// using Allocator = MyAllocator;
using Array = PODArray<Node*, 32, Allocator>;
using Allocator = MixedAlignedArenaAllocator<alignof(Node *), 4096>;
using Array = PODArray<Node *, 32, Allocator>;
Array value;
};
template <typename Node>
struct GroupArrayGeneralData<Node, true>
struct GroupArrayGeneralData<Node, true> : public GroupArraySamplerData<Node *>
{
// Switch to ordinary Allocator after 4096 bytes to avoid fragmentation and trash in Arena
using Allocator = MixedAlignedArenaAllocator<alignof(Node*), 4096>;
// using Allocator = MyAllocator;
using Array = PODArray<Node*, 32, Allocator>;
Array value;
size_t total_values = 0;
pcg32_fast rng;
UInt64 genRandom(size_t lim)
{
/// With a large number of values, we will generate random numbers several times slower.
if (lim <= static_cast<UInt64>(rng.max()))
return static_cast<UInt32>(rng()) % static_cast<UInt32>(lim);
else
return (static_cast<UInt64>(rng()) * (static_cast<UInt64>(rng.max()) + 1ULL) + static_cast<UInt64>(rng())) % lim;
}
void randomShuffle()
{
for (size_t i = 1; i < value.size(); ++i)
{
size_t j = genRandom(i + 1);
std::swap(value[i], value[j]);
}
}
};
/// Implementation of groupArray for String or any ComplexObject via Array
template <typename Node, typename Trait>
class GroupArrayGeneralImpl final : public IAggregateFunctionDataHelper<
GroupArrayGeneralData<Node, Trait::sampler != Sampler::NONE>,
GroupArrayGeneralImpl<Node, Trait>>
class GroupArrayGeneralImpl final
: public IAggregateFunctionDataHelper<GroupArrayGeneralData<Node, Trait::sampler != Sampler::NONE>, GroupArrayGeneralImpl<Node, Trait>>
{
static constexpr bool limit_num_elems = Trait::has_limit;
using Data = GroupArrayGeneralData<Node, Trait::sampler != Sampler::NONE>;
static Data & data(AggregateDataPtr place) { return *reinterpret_cast<Data*>(place); }
static const Data & data(ConstAggregateDataPtr place) { return *reinterpret_cast<const Data*>(place); }
static Data & data(AggregateDataPtr place) { return *reinterpret_cast<Data *>(place); }
static const Data & data(ConstAggregateDataPtr place) { return *reinterpret_cast<const Data *>(place); }
DataTypePtr & data_type;
UInt64 max_elems;
@ -475,9 +421,8 @@ class GroupArrayGeneralImpl final : public IAggregateFunctionDataHelper<
public:
GroupArrayGeneralImpl(const DataTypePtr & data_type_, UInt64 max_elems_ = std::numeric_limits<UInt64>::max(), UInt64 seed_ = 123456)
: IAggregateFunctionDataHelper<
GroupArrayGeneralData<Node, Trait::sampler != Sampler::NONE>,
GroupArrayGeneralImpl<Node, Trait>>({data_type_}, {})
: IAggregateFunctionDataHelper<GroupArrayGeneralData<Node, Trait::sampler != Sampler::NONE>, GroupArrayGeneralImpl<Node, Trait>>(
{data_type_}, {})
, data_type(this->argument_types[0])
, max_elems(max_elems_)
, seed(seed_)
@ -501,6 +446,13 @@ public:
}
}
void create(AggregateDataPtr place) const override
{
[[maybe_unused]] auto a = new (place) Data;
if constexpr (Trait::sampler == Sampler::RNG)
a->rng.seed(seed);
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{
if constexpr (Trait::sampler == Sampler::NONE)
@ -516,8 +468,6 @@ public:
{
auto & a = data(place);
++a.total_values;
if (a.value.empty())
a.rng.seed(seed);
if (a.value.size() < max_elems)
a.value.push_back(Node::allocate(*columns[0], row_num, arena), arena);
else
@ -672,12 +622,7 @@ public:
node->insertInto(column_data);
}
bool allocatesMemoryInArena() const override
{
return true;
}
const char * getHeaderFilePath() const override { return __FILE__; }
bool allocatesMemoryInArena() const override { return true; }
};
template <typename Node>
@ -703,10 +648,7 @@ struct GroupArrayListNodeString : public GroupArrayListNodeBase<GroupArrayListNo
return node;
}
void insertInto(IColumn & column)
{
assert_cast<ColumnString &>(column).insertData(data(), size);
}
void insertInto(IColumn & column) { assert_cast<ColumnString &>(column).insertData(data(), size); }
};
struct GroupArrayListNodeGeneral : public GroupArrayListNodeBase<GroupArrayListNodeGeneral>
@ -725,10 +667,7 @@ struct GroupArrayListNodeGeneral : public GroupArrayListNodeBase<GroupArrayListN
return node;
}
void insertInto(IColumn & column)
{
column.deserializeAndInsertFromArena(data());
}
void insertInto(IColumn & column) { column.deserializeAndInsertFromArena(data()); }
};
@ -744,23 +683,20 @@ struct GroupArrayGeneralListData
/// Implementation of groupArray for String or any ComplexObject via linked list
/// It has poor performance in case of many small objects
template <typename Node, typename Trait>
class GroupArrayGeneralListImpl final : public IAggregateFunctionDataHelper<
GroupArrayGeneralListData<Node>,
GroupArrayGeneralListImpl<Node, Trait>>
class GroupArrayGeneralListImpl final
: public IAggregateFunctionDataHelper<GroupArrayGeneralListData<Node>, GroupArrayGeneralListImpl<Node, Trait>>
{
static constexpr bool limit_num_elems = Trait::has_limit;
using Data = GroupArrayGeneralListData<Node>;
static Data & data(AggregateDataPtr place) { return *reinterpret_cast<Data*>(place); }
static const Data & data(ConstAggregateDataPtr place) { return *reinterpret_cast<const Data*>(place); }
static Data & data(AggregateDataPtr place) { return *reinterpret_cast<Data *>(place); }
static const Data & data(ConstAggregateDataPtr place) { return *reinterpret_cast<const Data *>(place); }
DataTypePtr & data_type;
UInt64 max_elems;
public:
GroupArrayGeneralListImpl(const DataTypePtr & data_type_, UInt64 max_elems_ = std::numeric_limits<UInt64>::max())
: IAggregateFunctionDataHelper<
GroupArrayGeneralListData<Node>,
GroupArrayGeneralListImpl<Node, Trait>>({data_type_}, {})
: IAggregateFunctionDataHelper<GroupArrayGeneralListData<Node>, GroupArrayGeneralListImpl<Node, Trait>>({data_type_}, {})
, data_type(this->argument_types[0])
, max_elems(max_elems_)
{
@ -904,10 +840,7 @@ public:
}
}
bool allocatesMemoryInArena() const override
{
return true;
}
bool allocatesMemoryInArena() const override { return true; }
};
#undef AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ARRAY_SIZE