Alexander Kuzmenkov f3bde19b74 Do not use iterators in find() and emplace() methods of hash tables.
Instead, these methods return a pointer to the required data as they are
stored inside the hash table. The caller uses overloaded functions to
get the key and "mapped" values from this pointer. Such an interface
avoids the need for constructing iterator-like wrapper objects, which is
especially important for compound hash tables such as the future
2019-09-24 17:44:35 +03:00

277 lines
8.8 KiB

#pragma once
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <Columns/ColumnArray.h>
#include <Common/HashTable/HashSet.h>
#include <Common/HashTable/HashTableKeyHolder.h>
#include <Common/assert_cast.h>
#include <AggregateFunctions/IAggregateFunction.h>
namespace DB
template <typename T>
struct AggregateFunctionGroupUniqArrayData
/// When creating, the hash table must be small.
using Set = HashSet<
HashTableAllocatorWithStackMemory<sizeof(T) * (1 << 4)>
Set value;
/// Puts all values to the hash set. Returns an array of unique values. Implemented for numeric types.
template <typename T, typename Tlimit_num_elem>
class AggregateFunctionGroupUniqArray
: public IAggregateFunctionDataHelper<AggregateFunctionGroupUniqArrayData<T>, AggregateFunctionGroupUniqArray<T, Tlimit_num_elem>>
static constexpr bool limit_num_elems = Tlimit_num_elem::value;
UInt64 max_elems;
using State = AggregateFunctionGroupUniqArrayData<T>;
AggregateFunctionGroupUniqArray(const DataTypePtr & argument_type, UInt64 max_elems_ = std::numeric_limits<UInt64>::max())
: IAggregateFunctionDataHelper<AggregateFunctionGroupUniqArrayData<T>,
AggregateFunctionGroupUniqArray<T, Tlimit_num_elem>>({argument_type}, {}),
max_elems(max_elems_) {}
String getName() const override { return "groupUniqArray"; }
DataTypePtr getReturnType() const override
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeNumber<T>>());
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
if (limit_num_elems && this->data(place).value.size() >= max_elems)
this->data(place).value.insert(assert_cast<const ColumnVector<T> &>(*columns[0]).getData()[row_num]);
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
if (!limit_num_elems)
auto & cur_set = this->data(place).value;
auto & rhs_set = this->data(rhs).value;
for (auto & rhs_elem : rhs_set)
if (cur_set.size() >= max_elems)
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
auto & set = this->data(place).value;
size_t size = set.size();
writeVarUInt(size, buf);
for (const auto & elem : set)
writeIntBinary(elem, buf);
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
ColumnArray & arr_to = assert_cast<ColumnArray &>(to);
ColumnArray::Offsets & offsets_to = arr_to.getOffsets();
const typename State::Set & set = this->data(place).value;
size_t size = set.size();
offsets_to.push_back(offsets_to.back() + size);
typename ColumnVector<T>::Container & data_to = assert_cast<ColumnVector<T> &>(arr_to.getData()).getData();
size_t old_size = data_to.size();
data_to.resize(old_size + size);
size_t i = 0;
for (auto it = set.begin(); it != set.end(); ++it, ++i)
data_to[old_size + i] = it->getValue();
const char * getHeaderFilePath() const override { return __FILE__; }
/// Generic implementation, it uses serialized representation as object descriptor.
struct AggregateFunctionGroupUniqArrayGenericData
static constexpr size_t INIT_ELEMS = 2; /// adjustable
static constexpr size_t ELEM_SIZE = sizeof(HashSetCellWithSavedHash<StringRef, StringRefHash>);
using Set = HashSetWithSavedHash<StringRef, StringRefHash, HashTableGrower<INIT_ELEMS>, HashTableAllocatorWithStackMemory<INIT_ELEMS * ELEM_SIZE>>;
Set value;
template <bool is_plain_column>
static void deserializeAndInsertImpl(StringRef str, IColumn & data_to);
/** Template parameter with true value should be used for columns that store their elements in memory continuously.
* For such columns groupUniqArray() can be implemented more efficiently (especially for small numeric arrays).
template <bool is_plain_column = false, typename Tlimit_num_elem = std::false_type>
class AggregateFunctionGroupUniqArrayGeneric
: public IAggregateFunctionDataHelper<AggregateFunctionGroupUniqArrayGenericData, AggregateFunctionGroupUniqArrayGeneric<is_plain_column, Tlimit_num_elem>>
DataTypePtr & input_data_type;
static constexpr bool limit_num_elems = Tlimit_num_elem::value;
UInt64 max_elems;
using State = AggregateFunctionGroupUniqArrayGenericData;
static auto getKeyHolder(const IColumn & column, size_t row_num, Arena & arena)
if constexpr (is_plain_column)
return ArenaKeyHolder{column.getDataAt(row_num), arena};
const char * begin = nullptr;
StringRef serialized = column.serializeValueIntoArena(row_num, arena, begin);
return SerializedKeyHolder{serialized, arena};
static void deserializeAndInsert(StringRef str, IColumn & data_to)
return deserializeAndInsertImpl<is_plain_column>(str, data_to);
AggregateFunctionGroupUniqArrayGeneric(const DataTypePtr & input_data_type_, UInt64 max_elems_ = std::numeric_limits<UInt64>::max())
: IAggregateFunctionDataHelper<AggregateFunctionGroupUniqArrayGenericData, AggregateFunctionGroupUniqArrayGeneric<is_plain_column, Tlimit_num_elem>>({input_data_type_}, {})
, input_data_type(this->argument_types[0])
, max_elems(max_elems_) {}
String getName() const override { return "groupUniqArray"; }
DataTypePtr getReturnType() const override
return std::make_shared<DataTypeArray>(input_data_type);
bool allocatesMemoryInArena() const override
return true;
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
auto & set = this->data(place).value;
writeVarUInt(set.size(), buf);
for (const auto & elem : set)
writeStringBinary(elem.getValue(), buf);
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override
auto & set = this->data(place).value;
size_t size;
readVarUInt(size, buf);
//TODO: set.reserve(size);
for (size_t i = 0; i < size; ++i)
set.insert(readStringBinaryInto(*arena, buf));
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override
auto & set = this->data(place).value;
if (limit_num_elems && set.size() >= max_elems)
bool inserted;
State::Set::LookupResult it;
auto key_holder = getKeyHolder(*columns[0], row_num, *arena);
set.emplace(key_holder, it, inserted);
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
auto & cur_set = this->data(place).value;
auto & rhs_set = this->data(rhs).value;
bool inserted;
State::Set::LookupResult it;
for (auto & rhs_elem : rhs_set)
if (limit_num_elems && cur_set.size() >= max_elems)
// We have to copy the keys to our arena.
assert(arena != nullptr);
cur_set.emplace(ArenaKeyHolder{rhs_elem.getValue(), *arena}, it, inserted);
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
ColumnArray & arr_to = assert_cast<ColumnArray &>(to);
ColumnArray::Offsets & offsets_to = arr_to.getOffsets();
IColumn & data_to = arr_to.getData();
auto & set = this->data(place).value;
offsets_to.push_back(offsets_to.back() + set.size());
for (auto & elem : set)
deserializeAndInsert(elem.getValue(), data_to);
const char * getHeaderFilePath() const override { return __FILE__; }
template <>
inline void deserializeAndInsertImpl<false>(StringRef str, IColumn & data_to)
template <>
inline void deserializeAndInsertImpl<true>(StringRef str, IColumn & data_to)
data_to.insertData(, str.size);