mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 09:32:01 +00:00
Added ColumnsHashing
This commit is contained in:
parent
08f4e792b1
commit
d207498573
881
dbms/src/Common/ColumnsHashing.h
Normal file
881
dbms/src/Common/ColumnsHashing.h
Normal file
@ -0,0 +1,881 @@
|
||||
#pragma once
|
||||
#include <memory>
|
||||
#include <Core/Defines.h>
|
||||
#include <Columns/IColumn.h>
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <Columns/ColumnFixedString.h>
|
||||
#include <Common/Arena.h>
|
||||
#include <Common/HashTable/HashMap.h>
|
||||
#include <Common/LRUCache.h>
|
||||
#include <common/unaligned.h>
|
||||
#include <Interpreters/AggregationCommon.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ColumnsHashing
|
||||
{
|
||||
|
||||
/// Generic context for HashMethod. Context is shared between multiple threads, all methods must be thread-safe.
|
||||
/// Is used for caching.
|
||||
class HashMethodContext
|
||||
{
|
||||
public:
|
||||
virtual ~HashMethodContext() = default;
|
||||
|
||||
struct Settings
|
||||
{
|
||||
size_t max_threads;
|
||||
};
|
||||
};
|
||||
|
||||
using HashMethodContextPtr = std::shared_ptr<HashMethodContext>;
|
||||
|
||||
|
||||
template <typename T>
|
||||
struct MappedTraits
|
||||
{
|
||||
using Type = void *;
|
||||
static Type getMapped(T &) { return nullptr; }
|
||||
static T & getKey(T & key) { return key; }
|
||||
};
|
||||
|
||||
template <typename First, typename Second>
|
||||
struct MappedTraits<PairNoInit<First, Second>>
|
||||
{
|
||||
using Type = Second *;
|
||||
static Type getMapped(PairNoInit<First, Second> & value) { return &value.second; }
|
||||
static First & getKey(PairNoInit<First, Second> & value) { return value.first; }
|
||||
};
|
||||
|
||||
template <typename Data>
|
||||
struct HashTableTraits
|
||||
{
|
||||
using Value = typename Data::value_type;
|
||||
using Mapped = typename MappedTraits<Value>::Type;
|
||||
|
||||
static Mapped getMapped(Value & value) { return MappedTraits<Value>::getMapped(value); }
|
||||
static auto & getKey(Value & value) { return MappedTraits<Value>::getKey(value); }
|
||||
};
|
||||
|
||||
template <typename Data, bool consecutive_keys_optimization_>
|
||||
struct LastElementCache
|
||||
{
|
||||
static constexpr bool consecutive_keys_optimization = consecutive_keys_optimization_;
|
||||
using Value = typename HashTableTraits<Data>::Value;
|
||||
Value value;
|
||||
bool empty = true;
|
||||
bool found = false;
|
||||
|
||||
auto getMapped() { return HashTableTraits<Data>::getMapped(value); }
|
||||
auto & getKey() { return HashTableTraits<Data>::getKey(value); }
|
||||
};
|
||||
|
||||
template <typename Data>
|
||||
struct LastElementCache<Data, false>
|
||||
{
|
||||
static constexpr bool consecutive_keys_optimization = false;
|
||||
};
|
||||
|
||||
template <typename Data, typename Key, typename Cache>
|
||||
inline ALWAYS_INLINE typename HashTableTraits<Data>::Value & emplaceKeyImpl(
|
||||
Key key, Data & data, bool & inserted, Cache & cache [[maybe_unused]])
|
||||
{
|
||||
if constexpr (Cache::consecutive_keys_optimization)
|
||||
{
|
||||
if (!cache.empty && cache.found && cache.getKey() == key)
|
||||
{
|
||||
inserted = false;
|
||||
return cache.value;
|
||||
}
|
||||
}
|
||||
|
||||
typename Data::iterator it;
|
||||
data.emplace(key, it, inserted);
|
||||
auto & value = *it;
|
||||
|
||||
if constexpr (Cache::consecutive_keys_optimization)
|
||||
{
|
||||
cache.value = value;
|
||||
cache.empty = false;
|
||||
cache.found = true;
|
||||
}
|
||||
|
||||
return value;
|
||||
}
|
||||
|
||||
template <typename Data, typename Key, typename Cache>
|
||||
inline ALWAYS_INLINE typename HashTableTraits<Data>::Mapped findKeyImpl(
|
||||
Key key, Data & data, bool & found, Cache & cache [[maybe_unused]])
|
||||
{
|
||||
if constexpr (Cache::consecutive_keys_optimization)
|
||||
{
|
||||
if (!cache.empty && cache.getKey() == key)
|
||||
{
|
||||
found = cache.found;
|
||||
return found ? cache.getMapped() : nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
auto it = data.find(key);
|
||||
|
||||
found = it != data.end();
|
||||
auto mapped = found ? HashTableTraits<Data>::getMapped(*it)
|
||||
: nullptr;
|
||||
|
||||
if constexpr (Cache::consecutive_keys_optimization)
|
||||
{
|
||||
if (found)
|
||||
cache.value = *it;
|
||||
else
|
||||
cache.getKey() = key;
|
||||
|
||||
cache.empty = false;
|
||||
cache.found = found;
|
||||
}
|
||||
|
||||
return mapped;
|
||||
}
|
||||
|
||||
|
||||
/// For the case where there is one numeric key.
|
||||
template <typename TData, typename FieldType> /// UInt8/16/32/64 for any type with corresponding bit width.
|
||||
struct HashMethodOneNumber
|
||||
{
|
||||
const char * vec;
|
||||
LastElementCache<TData, true> last_elem_cache;
|
||||
|
||||
/// If the keys of a fixed length then key_sizes contains their lengths, empty otherwise.
|
||||
HashMethodOneNumber(const ColumnRawPtrs & key_columns, const Sizes & /*key_sizes*/, const HashMethodContextPtr &)
|
||||
{
|
||||
vec = key_columns[0]->getRawData().data;
|
||||
}
|
||||
|
||||
/// Creates context. Method is called once and result context is used in all threads.
|
||||
static HashMethodContextPtr createContext(const HashMethodContext::Settings &) { return nullptr; }
|
||||
|
||||
FieldType getKey(size_t row) const { return unalignedLoad<FieldType>(vec + row * sizeof(FieldType)); }
|
||||
|
||||
/// Emplace key into HashTable or HashMap. If Data is HashMap, returns ptr to value, otherwise nullptr.
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE typename HashTableTraits<Data>::Mapped emplaceKey(
|
||||
Data & data, /// HashTable
|
||||
size_t row, /// From which row of the block insert the key
|
||||
bool & inserted,
|
||||
Arena & /*pool*/) /// For Serialized method, key may be placed in pool.
|
||||
{
|
||||
return HashTableTraits<Data>::getMapped(emplaceKeyImpl(getKey(row), data, inserted, last_elem_cache));
|
||||
}
|
||||
|
||||
/// Find key into HashTable or HashMap. If Data is HashMap and key was found, returns ptr to value, otherwise nullptr.
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE typename HashTableTraits<Data>::Mapped findKey(Data & data, size_t row, bool & found, Arena & /*pool*/)
|
||||
{
|
||||
return findKeyImpl(getKey(row), data, found, last_elem_cache);
|
||||
}
|
||||
|
||||
/// Insert the key from the hash table into columns.
|
||||
template <typename Value>
|
||||
static void insertKeyIntoColumns(const Value & value, MutableColumns & key_columns, const Sizes & /*key_sizes*/)
|
||||
{
|
||||
static_cast<ColumnVectorHelper *>(key_columns[0].get())->insertRawData<sizeof(FieldType)>(reinterpret_cast<const char *>(&value.first));
|
||||
}
|
||||
|
||||
/// Get hash value of row.
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE size_t getHash(const Data & data, size_t row, Arena & /*pool*/)
|
||||
{
|
||||
return data.hash(getKey(row));
|
||||
}
|
||||
|
||||
/// Get StringRef from value which can be inserted into column.
|
||||
template <typename Value>
|
||||
static StringRef getValueRef(const Value & value)
|
||||
{
|
||||
return StringRef(reinterpret_cast<const char *>(&value.first), sizeof(value.first));
|
||||
}
|
||||
|
||||
/// Cache last result if key was inserted.
|
||||
template <typename Mapped>
|
||||
ALWAYS_INLINE void cacheData(size_t /*row*/, Mapped mapped)
|
||||
{
|
||||
*last_elem_cache.getMapped() = mapped;
|
||||
}
|
||||
|
||||
protected:
|
||||
template <typename Value>
|
||||
static ALWAYS_INLINE void onNewKey(Value & /*value*/, Arena & /*pool*/) {}
|
||||
};
|
||||
|
||||
|
||||
/// For the case where there is one string key.
|
||||
template <typename TData>
|
||||
struct HashMethodString
|
||||
{
|
||||
const IColumn::Offset * offsets;
|
||||
const UInt8 * chars;
|
||||
|
||||
LastElementCache<TData, true> last_elem_cache;
|
||||
|
||||
HashMethodString(const ColumnRawPtrs & key_columns, const Sizes & /*key_sizes*/, const HashMethodContextPtr &)
|
||||
{
|
||||
const IColumn & column = *key_columns[0];
|
||||
const ColumnString & column_string = static_cast<const ColumnString &>(column);
|
||||
offsets = column_string.getOffsets().data();
|
||||
chars = column_string.getChars().data();
|
||||
}
|
||||
|
||||
static HashMethodContextPtr createContext(const HashMethodContext::Settings &) { return nullptr; }
|
||||
|
||||
StringRef getKey(size_t row) const { return StringRef(chars + offsets[row - 1], offsets[row] - offsets[row - 1] - 1); }
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE typename HashTableTraits<Data>::Mapped emplaceKey(Data & data, size_t row, bool & inserted, Arena & pool)
|
||||
{
|
||||
auto & value = emplaceKeyImpl(getKey(row), data, inserted, last_elem_cache);
|
||||
if (inserted)
|
||||
{
|
||||
auto & key = HashTableTraits<Data>::getKey(value);
|
||||
if (key.size)
|
||||
key.data = pool.insert(key.data, key.size);
|
||||
}
|
||||
return HashTableTraits<Data>::getMapped(value);
|
||||
}
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE typename HashTableTraits<Data>::Mapped findKey(Data & data, size_t row, bool & found, Arena & /*pool*/)
|
||||
{
|
||||
return findKeyImpl(getKey(row), data, found, last_elem_cache);
|
||||
}
|
||||
|
||||
template <typename Value>
|
||||
static void insertKeyIntoColumns(const Value & value, MutableColumns & key_columns, const Sizes & /*key_sizes*/)
|
||||
{
|
||||
key_columns[0]->insertData(value.first.data, value.first.size);
|
||||
}
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE size_t getHash(const Data & data, size_t row, Arena & /*pool*/)
|
||||
{
|
||||
return data.hash(getKey(row));
|
||||
}
|
||||
|
||||
template <typename Value>
|
||||
static StringRef getValueRef(const Value & value)
|
||||
{
|
||||
return StringRef(value.first.data, value.first.size);
|
||||
}
|
||||
|
||||
template <typename Mapped>
|
||||
ALWAYS_INLINE void cacheData(size_t /*row*/, Mapped mapped)
|
||||
{
|
||||
*last_elem_cache.getMapped() = mapped;
|
||||
}
|
||||
|
||||
protected:
|
||||
template <typename Value>
|
||||
static ALWAYS_INLINE void onNewKey(Value & value, Arena & pool)
|
||||
{
|
||||
if (value.first.size)
|
||||
value.first.data = pool.insert(value.first.data, value.first.size);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
/// For the case where there is one fixed-length string key.
|
||||
template <typename TData>
|
||||
struct HashMethodFixedString
|
||||
{
|
||||
size_t n;
|
||||
const ColumnFixedString::Chars * chars;
|
||||
|
||||
LastElementCache<TData, true> last_elem_cache;
|
||||
|
||||
HashMethodFixedString(const ColumnRawPtrs & key_columns, const Sizes & /*key_sizes*/, const HashMethodContextPtr &)
|
||||
{
|
||||
const IColumn & column = *key_columns[0];
|
||||
const ColumnFixedString & column_string = static_cast<const ColumnFixedString &>(column);
|
||||
n = column_string.getN();
|
||||
chars = &column_string.getChars();
|
||||
}
|
||||
|
||||
static HashMethodContextPtr createContext(const HashMethodContext::Settings &) { return nullptr; }
|
||||
|
||||
StringRef getKey(size_t row) const { return StringRef(&(*chars)[row * n], n); }
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE typename HashTableTraits<Data>::Mapped emplaceKey(Data & data, size_t row, bool & inserted, Arena & pool)
|
||||
{
|
||||
auto & value = emplaceKeyImpl(getKey(row), data, inserted, last_elem_cache);
|
||||
if (inserted)
|
||||
{
|
||||
auto & key = HashTableTraits<Data>::getKey(value);
|
||||
key.data = pool.insert(key.data, key.size);
|
||||
}
|
||||
return HashTableTraits<Data>::getMapped(value);
|
||||
}
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE typename HashTableTraits<Data>::Mapped findKey(Data & data, size_t row, bool & found, Arena & /*pool*/)
|
||||
{
|
||||
return findKeyImpl(getKey(row), data, found, last_elem_cache);
|
||||
}
|
||||
|
||||
template <typename Value>
|
||||
static void insertKeyIntoColumns(const Value & value, MutableColumns & key_columns, const Sizes & /*key_sizes*/)
|
||||
{
|
||||
key_columns[0]->insertData(value.first.data, value.first.size);
|
||||
}
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE size_t getHash(const Data & data, size_t row, Arena & /*pool*/)
|
||||
{
|
||||
return data.hash(getKey(row));
|
||||
}
|
||||
|
||||
template <typename Value>
|
||||
static StringRef getValueRef(const Value & value)
|
||||
{
|
||||
return StringRef(value.first.data, value.first.size);
|
||||
}
|
||||
|
||||
template <typename Mapped>
|
||||
ALWAYS_INLINE void cacheData(size_t /*row*/, Mapped mapped)
|
||||
{
|
||||
*last_elem_cache.getMapped() = mapped;
|
||||
}
|
||||
|
||||
protected:
|
||||
template <typename Value>
|
||||
static ALWAYS_INLINE void onNewKey(Value & value, Arena & pool)
|
||||
{
|
||||
value.first.data = pool.insert(value.first.data, value.first.size);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
/// Cache stores dictionaries and saved_hash per dictionary key.
|
||||
class LowCardinalityDictionaryCache : public HashMethodContext
|
||||
{
|
||||
public:
|
||||
/// Will assume that dictionaries with same hash has the same keys.
|
||||
/// Just in case, check that they have also the same size.
|
||||
struct DictionaryKey
|
||||
{
|
||||
UInt128 hash;
|
||||
UInt64 size;
|
||||
|
||||
bool operator== (const DictionaryKey & other) const { return hash == other.hash && size == other.size; }
|
||||
};
|
||||
|
||||
struct DictionaryKeyHash
|
||||
{
|
||||
size_t operator()(const DictionaryKey & key) const
|
||||
{
|
||||
SipHash hash;
|
||||
hash.update(key.hash.low);
|
||||
hash.update(key.hash.high);
|
||||
hash.update(key.size);
|
||||
return hash.get64();
|
||||
}
|
||||
};
|
||||
|
||||
struct CachedValues
|
||||
{
|
||||
/// Store ptr to dictionary to be sure it won't be deleted.
|
||||
ColumnPtr dictionary_holder;
|
||||
/// Hashes for dictionary keys.
|
||||
const UInt64 * saved_hash = nullptr;
|
||||
};
|
||||
|
||||
using CachedValuesPtr = std::shared_ptr<CachedValues>;
|
||||
|
||||
explicit LowCardinalityDictionaryCache(const HashMethodContext::Settings & settings) : cache(settings.max_threads) {}
|
||||
|
||||
CachedValuesPtr get(const DictionaryKey & key) { return cache.get(key); }
|
||||
void set(const DictionaryKey & key, const CachedValuesPtr & mapped) { cache.set(key, mapped); }
|
||||
|
||||
private:
|
||||
using Cache = LRUCache<DictionaryKey, CachedValues, DictionaryKeyHash>;
|
||||
Cache cache;
|
||||
};
|
||||
|
||||
/// Single low cardinality column.
|
||||
template <typename SingleColumnMethod, bool use_cache>
|
||||
struct HashMethodSingleLowCardinalityColumn : public SingleColumnMethod
|
||||
{
|
||||
using Base = SingleColumnMethod;
|
||||
|
||||
static HashMethodContextPtr createContext(const HashMethodContext::Settings & settings)
|
||||
{
|
||||
return std::make_shared<LowCardinalityDictionaryCache>(settings);
|
||||
}
|
||||
|
||||
ColumnRawPtrs key_columns;
|
||||
const IColumn * positions = nullptr;
|
||||
size_t size_of_index_type = 0;
|
||||
|
||||
/// saved hash is from current column or from cache.
|
||||
const UInt64 * saved_hash = nullptr;
|
||||
/// Hold dictionary in case saved_hash is from cache to be sure it won't be deleted.
|
||||
ColumnPtr dictionary_holder;
|
||||
|
||||
/// Cache AggregateDataPtr for current column in order to decrease the number of hash table usages.
|
||||
PaddedPODArray<AggregateDataPtr> aggregate_data_cache;
|
||||
|
||||
/// If initialized column is nullable.
|
||||
bool is_nullable = false;
|
||||
|
||||
static const ColumnLowCardinality & getLowCardinalityColumn(const IColumn * low_cardinality_column)
|
||||
{
|
||||
auto column = typeid_cast<const ColumnLowCardinality *>(low_cardinality_column);
|
||||
if (!column)
|
||||
throw Exception("Invalid aggregation key type for HashMethodSingleLowCardinalityColumn method. "
|
||||
"Excepted LowCardinality, got " + column->getName(), ErrorCodes::LOGICAL_ERROR);
|
||||
return *column;
|
||||
}
|
||||
|
||||
HashMethodSingleLowCardinalityColumn(
|
||||
const ColumnRawPtrs & key_columns_low_cardinality, const Sizes & key_sizes, const HashMethodContextPtr & context)
|
||||
: Base({getLowCardinalityColumn(key_columns_low_cardinality[0]).getDictionary().getNestedNotNullableColumn().get()}, key_sizes, context)
|
||||
{
|
||||
auto column = &getLowCardinalityColumn(key_columns_low_cardinality[0]);
|
||||
|
||||
if (!context)
|
||||
throw Exception("Cache wasn't created for HashMethodSingleLowCardinalityColumn",
|
||||
ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
LowCardinalityDictionaryCache * cache;
|
||||
if constexpr (use_cache)
|
||||
{
|
||||
cache = typeid_cast<LowCardinalityDictionaryCache *>(context.get());
|
||||
if (!cache)
|
||||
{
|
||||
const auto & cached_val = *context;
|
||||
throw Exception("Invalid type for HashMethodSingleLowCardinalityColumn cache: "
|
||||
+ demangle(typeid(cached_val).name()), ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
auto * dict = column->getDictionary().getNestedNotNullableColumn().get();
|
||||
is_nullable = column->getDictionary().nestedColumnIsNullable();
|
||||
key_columns = {dict};
|
||||
bool is_shared_dict = column->isSharedDictionary();
|
||||
|
||||
typename LowCardinalityDictionaryCache::DictionaryKey dictionary_key;
|
||||
typename LowCardinalityDictionaryCache::CachedValuesPtr cached_values;
|
||||
|
||||
if (is_shared_dict)
|
||||
{
|
||||
dictionary_key = {column->getDictionary().getHash(), dict->size()};
|
||||
if constexpr (use_cache)
|
||||
cached_values = cache->get(dictionary_key);
|
||||
}
|
||||
|
||||
if (cached_values)
|
||||
{
|
||||
saved_hash = cached_values->saved_hash;
|
||||
dictionary_holder = cached_values->dictionary_holder;
|
||||
}
|
||||
else
|
||||
{
|
||||
saved_hash = column->getDictionary().tryGetSavedHash();
|
||||
dictionary_holder = column->getDictionaryPtr();
|
||||
|
||||
if constexpr (use_cache)
|
||||
{
|
||||
if (is_shared_dict)
|
||||
{
|
||||
cached_values = std::make_shared<typename LowCardinalityDictionaryCache::CachedValues>();
|
||||
cached_values->saved_hash = saved_hash;
|
||||
cached_values->dictionary_holder = dictionary_holder;
|
||||
|
||||
cache->set(dictionary_key, cached_values);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
AggregateDataPtr default_data = nullptr;
|
||||
aggregate_data_cache.assign(key_columns[0]->size(), default_data);
|
||||
|
||||
size_of_index_type = column->getSizeOfIndexType();
|
||||
positions = column->getIndexesPtr().get();
|
||||
}
|
||||
|
||||
ALWAYS_INLINE size_t getIndexAt(size_t row) const
|
||||
{
|
||||
switch (size_of_index_type)
|
||||
{
|
||||
case sizeof(UInt8): return static_cast<const ColumnUInt8 *>(positions)->getElement(row);
|
||||
case sizeof(UInt16): return static_cast<const ColumnUInt16 *>(positions)->getElement(row);
|
||||
case sizeof(UInt32): return static_cast<const ColumnUInt32 *>(positions)->getElement(row);
|
||||
case sizeof(UInt64): return static_cast<const ColumnUInt64 *>(positions)->getElement(row);
|
||||
default: throw Exception("Unexpected size of index type for low cardinality column.", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the key from the key columns for insertion into the hash table.
|
||||
ALWAYS_INLINE auto getKey(size_t row) const
|
||||
{
|
||||
return Base::getKey(getIndexAt(row));
|
||||
}
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE typename HashTableTraits<Data>::Mapped emplaceKey(Data & data, size_t row_, bool & inserted, Arena & pool)
|
||||
{
|
||||
size_t row = getIndexAt(row_);
|
||||
|
||||
if (is_nullable && row == 0)
|
||||
{
|
||||
inserted = !data.hasNullKeyData();
|
||||
data.hasNullKeyData() = true;
|
||||
return &data.getNullKeyData();
|
||||
}
|
||||
|
||||
if constexpr (use_cache)
|
||||
{
|
||||
if (aggregate_data_cache[row])
|
||||
{
|
||||
inserted = false;
|
||||
return &aggregate_data_cache[row];
|
||||
}
|
||||
}
|
||||
|
||||
Sizes key_sizes;
|
||||
auto key = getKey(row_);
|
||||
|
||||
typename Data::iterator it;
|
||||
if (saved_hash)
|
||||
data.emplace(key, it, inserted, saved_hash[row]);
|
||||
else
|
||||
data.emplace(key, it, inserted);
|
||||
|
||||
if (inserted)
|
||||
Base::onNewKey(*it, pool);
|
||||
else if constexpr (use_cache)
|
||||
aggregate_data_cache[row] = it->second;
|
||||
|
||||
return HashTableTraits<Data>::getMapped(*it);
|
||||
}
|
||||
|
||||
ALWAYS_INLINE bool isNullAt(size_t i)
|
||||
{
|
||||
if (!is_nullable)
|
||||
return false;
|
||||
|
||||
return getIndexAt(i) == 0;
|
||||
}
|
||||
|
||||
template <typename Mapped>
|
||||
ALWAYS_INLINE void cacheData(size_t i, Mapped mapped)
|
||||
{
|
||||
size_t row = getIndexAt(i);
|
||||
aggregate_data_cache[row] = mapped;
|
||||
}
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE typename HashTableTraits<Data>::Mapped findFromRow(Data & data, size_t row_, bool & found, Arena &)
|
||||
{
|
||||
size_t row = getIndexAt(row_);
|
||||
|
||||
if (is_nullable && row == 0)
|
||||
return data.hasNullKeyData() ? &data.getNullKeyData() : nullptr;
|
||||
|
||||
if constexpr (use_cache)
|
||||
{
|
||||
if (aggregate_data_cache[row])
|
||||
return &aggregate_data_cache[row];
|
||||
}
|
||||
|
||||
auto key = getKey(row_);
|
||||
|
||||
typename Data::iterator it;
|
||||
if (saved_hash)
|
||||
it = data.find(key, saved_hash[row]);
|
||||
else
|
||||
it = data.find(key);
|
||||
|
||||
found = it != data.end();
|
||||
if constexpr (use_cache)
|
||||
{
|
||||
if (found)
|
||||
aggregate_data_cache[row] = it->second;
|
||||
}
|
||||
|
||||
return typename HashTableTraits<Data>::getMapped(*it);
|
||||
}
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE size_t getHash(const Data & data, size_t row, Arena & pool)
|
||||
{
|
||||
row = getIndexAt(row);
|
||||
if (saved_hash)
|
||||
return saved_hash[row];
|
||||
|
||||
return Base::getHash(data, row, pool);
|
||||
}
|
||||
|
||||
template <typename Value>
|
||||
static void insertKeyIntoColumns(const Value & value, MutableColumns & key_columns_low_cardinality, const Sizes & /*key_sizes*/)
|
||||
{
|
||||
auto ref = Base::getValueRef(value);
|
||||
static_cast<ColumnLowCardinality *>(key_columns_low_cardinality[0].get())->insertData(ref.data, ref.size);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
namespace columns_hashing_impl
|
||||
{
|
||||
|
||||
/// This class is designed to provide the functionality that is required for
|
||||
/// supporting nullable keys in HashMethodKeysFixed. If there are
|
||||
/// no nullable keys, this class is merely implemented as an empty shell.
|
||||
template <typename Key, bool has_nullable_keys>
|
||||
class BaseStateKeysFixed;
|
||||
|
||||
/// Case where nullable keys are supported.
|
||||
template <typename Key>
|
||||
class BaseStateKeysFixed<Key, true>
|
||||
{
|
||||
protected:
|
||||
void init(const ColumnRawPtrs & key_columns)
|
||||
{
|
||||
null_maps.reserve(key_columns.size());
|
||||
actual_columns.reserve(key_columns.size());
|
||||
|
||||
for (const auto & col : key_columns)
|
||||
{
|
||||
if (col->isColumnNullable())
|
||||
{
|
||||
const auto & nullable_col = static_cast<const ColumnNullable &>(*col);
|
||||
actual_columns.push_back(&nullable_col.getNestedColumn());
|
||||
null_maps.push_back(&nullable_col.getNullMapColumn());
|
||||
}
|
||||
else
|
||||
{
|
||||
actual_columns.push_back(col);
|
||||
null_maps.push_back(nullptr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the columns which actually contain the values of the keys.
|
||||
/// For a given key column, if it is nullable, we return its nested
|
||||
/// column. Otherwise we return the key column itself.
|
||||
inline const ColumnRawPtrs & getActualColumns() const
|
||||
{
|
||||
return actual_columns;
|
||||
}
|
||||
|
||||
/// Create a bitmap that indicates whether, for a particular row,
|
||||
/// a key column bears a null value or not.
|
||||
KeysNullMap<Key> createBitmap(size_t row) const
|
||||
{
|
||||
KeysNullMap<Key> bitmap{};
|
||||
|
||||
for (size_t k = 0; k < null_maps.size(); ++k)
|
||||
{
|
||||
if (null_maps[k] != nullptr)
|
||||
{
|
||||
const auto & null_map = static_cast<const ColumnUInt8 &>(*null_maps[k]).getData();
|
||||
if (null_map[row] == 1)
|
||||
{
|
||||
size_t bucket = k / 8;
|
||||
size_t offset = k % 8;
|
||||
bitmap[bucket] |= UInt8(1) << offset;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return bitmap;
|
||||
}
|
||||
|
||||
private:
|
||||
ColumnRawPtrs actual_columns;
|
||||
ColumnRawPtrs null_maps;
|
||||
};
|
||||
|
||||
/// Case where nullable keys are not supported.
|
||||
template <typename Key>
|
||||
class BaseStateKeysFixed<Key, false>
|
||||
{
|
||||
protected:
|
||||
void init(const ColumnRawPtrs & columns) { actual_columns = columns; }
|
||||
|
||||
const ColumnRawPtrs & getActualColumns() const { return actual_columns; }
|
||||
|
||||
KeysNullMap<Key> createBitmap(size_t) const
|
||||
{
|
||||
throw Exception{"Internal error: calling createBitmap() for non-nullable keys"
|
||||
" is forbidden", ErrorCodes::LOGICAL_ERROR};
|
||||
}
|
||||
|
||||
private:
|
||||
ColumnRawPtrs actual_columns;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
// Optional mask for low cardinality columns.
|
||||
template <bool has_low_cardinality>
|
||||
struct LowCardinalityKeys
|
||||
{
|
||||
ColumnRawPtrs nested_columns;
|
||||
ColumnRawPtrs positions;
|
||||
Sizes position_sizes;
|
||||
};
|
||||
|
||||
template <>
|
||||
struct LowCardinalityKeys<false> {};
|
||||
|
||||
/// For the case where all keys are of fixed length, and they fit in N (for example, 128) bits.
|
||||
template <typename TData, bool has_nullable_keys_ = false, bool has_low_cardinality_ = false>
|
||||
struct HashMethodKeysFixed : private columns_hashing_impl::BaseStateKeysFixed<typename TData::key_type, has_nullable_keys_>
|
||||
{
|
||||
using Key = typename TData::key_type;
|
||||
|
||||
static constexpr bool has_nullable_keys = has_nullable_keys_;
|
||||
static constexpr bool has_low_cardinality = has_low_cardinality_;
|
||||
|
||||
LowCardinalityKeys<has_low_cardinality> low_cardinality_keys;
|
||||
Sizes key_sizes;
|
||||
size_t keys_size;
|
||||
|
||||
LastElementCache<TData, true> last_elem_cache;
|
||||
|
||||
using Base = columns_hashing_impl::BaseStateKeysFixed<Key, has_nullable_keys>;
|
||||
|
||||
HashMethodKeysFixed(const ColumnRawPtrs & key_columns, const Sizes & key_sizes, const HashMethodContextPtr &)
|
||||
: key_sizes(std::move(key_sizes)), keys_size(key_columns.size())
|
||||
{
|
||||
if constexpr (has_low_cardinality)
|
||||
{
|
||||
low_cardinality_keys.nested_columns.resize(key_columns.size());
|
||||
low_cardinality_keys.positions.assign(key_columns.size(), nullptr);
|
||||
low_cardinality_keys.position_sizes.resize(key_columns.size());
|
||||
for (size_t i = 0; i < key_columns.size(); ++i)
|
||||
{
|
||||
if (auto * low_cardinality_col = typeid_cast<const ColumnLowCardinality *>(key_columns[i]))
|
||||
{
|
||||
low_cardinality_keys.nested_columns[i] = low_cardinality_col->getDictionary().getNestedColumn().get();
|
||||
low_cardinality_keys.positions[i] = &low_cardinality_col->getIndexes();
|
||||
low_cardinality_keys.position_sizes[i] = low_cardinality_col->getSizeOfIndexType();
|
||||
}
|
||||
else
|
||||
low_cardinality_keys.nested_columns[i] = key_columns[i];
|
||||
}
|
||||
}
|
||||
|
||||
Base::init(key_columns);
|
||||
}
|
||||
|
||||
static HashMethodContextPtr createContext(const HashMethodContext::Settings &) { return nullptr; }
|
||||
|
||||
ALWAYS_INLINE Key getKey(size_t row) const
|
||||
{
|
||||
if (has_nullable_keys)
|
||||
{
|
||||
auto bitmap = Base::createBitmap(row);
|
||||
return packFixed<Key>(row, keys_size, Base::getActualColumns(), key_sizes, bitmap);
|
||||
}
|
||||
else
|
||||
{
|
||||
if constexpr (has_low_cardinality)
|
||||
return packFixed<Key, true>(row, keys_size, low_cardinality_keys.nested_columns, key_sizes,
|
||||
&low_cardinality_keys.positions, &low_cardinality_keys.position_sizes);
|
||||
|
||||
return packFixed<Key>(row, keys_size, Base::getActualColumns(), key_sizes);
|
||||
}
|
||||
}
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE typename HashTableTraits<Data>::Mapped emplaceKey(Data & data, size_t row, bool & inserted, Arena & /*pool*/)
|
||||
{
|
||||
return HashTableTraits<Data>::getMapped(emplaceKeyImpl(getKey(row), data, inserted, last_elem_cache));
|
||||
}
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE typename HashTableTraits<Data>::Mapped findKey(Data & data, size_t row, bool & found, Arena & /*pool*/)
|
||||
{
|
||||
return findKeyImpl(getKey(row), data, found, last_elem_cache);
|
||||
}
|
||||
|
||||
template <typename Value>
|
||||
static StringRef getValueRef(const Value & value)
|
||||
{
|
||||
return StringRef(value.first.data, value.first.size);
|
||||
}
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE size_t getHash(const Data & data, size_t row, Arena & /*pool*/)
|
||||
{
|
||||
return data.hash(getKey(row));
|
||||
}
|
||||
|
||||
template <typename Mapped>
|
||||
ALWAYS_INLINE void cacheData(size_t /*row*/, Mapped mapped)
|
||||
{
|
||||
*last_elem_cache.getMapped() = mapped;
|
||||
}
|
||||
};
|
||||
|
||||
/** Hash by concatenating serialized key values.
|
||||
* The serialized value differs in that it uniquely allows to deserialize it, having only the position with which it starts.
|
||||
* That is, for example, for strings, it contains first the serialized length of the string, and then the bytes.
|
||||
* Therefore, when aggregating by several strings, there is no ambiguity.
|
||||
*/
|
||||
template <typename TData>
|
||||
struct HashMethodSerialized
|
||||
{
|
||||
ColumnRawPtrs key_columns;
|
||||
size_t keys_size;
|
||||
LastElementCache<TData, false> last_elem_cache;
|
||||
|
||||
HashMethodSerialized(const ColumnRawPtrs & key_columns, const Sizes & /*key_sizes*/, const HashMethodContextPtr &)
|
||||
: key_columns(key_columns), keys_size(key_columns.size()) {}
|
||||
|
||||
static HashMethodContextPtr createContext(const HashMethodContext::Settings &) { return nullptr; }
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE typename HashTableTraits<Data>::Mapped emplaceKey(Data & data, size_t row, bool & inserted, Arena & pool)
|
||||
{
|
||||
auto key = getKey(row, pool);
|
||||
auto & value = emplaceKeyImpl(key, data, inserted, last_elem_cache);
|
||||
if (!inserted)
|
||||
pool.rollback(key.size);
|
||||
|
||||
return HashTableTraits<Data>::getMapped(value);
|
||||
}
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE typename HashTableTraits<Data>::Mapped findKey(Data & data, size_t row, bool & found, Arena & pool)
|
||||
{
|
||||
auto key = getKey(row, pool);
|
||||
auto mapped = findKeyImpl(key, data, found, last_elem_cache);
|
||||
pool.rollback(key.size);
|
||||
|
||||
return mapped;
|
||||
}
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE size_t getHash(const Data & data, size_t row, Arena & pool)
|
||||
{
|
||||
auto key = getKey(row, pool);
|
||||
auto hash = data.hash(key);
|
||||
pool.rollback(key.size);
|
||||
|
||||
return hash;
|
||||
}
|
||||
|
||||
template <typename Mapped>
|
||||
ALWAYS_INLINE void cacheData(size_t /*row*/, Mapped /*mapped*/) {}
|
||||
|
||||
protected:
|
||||
ALWAYS_INLINE StringRef getKey(size_t row, Arena & pool) const
|
||||
{
|
||||
return serializeKeysToPoolContiguous(row, keys_size, key_columns, pool);
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
}
|
@ -6,9 +6,11 @@
|
||||
#include <Common/setThreadName.h>
|
||||
#include <DataTypes/DataTypeAggregateFunction.h>
|
||||
#include <DataTypes/DataTypeNullable.h>
|
||||
#include <DataTypes/DataTypeLowCardinality.h>
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
#include <Columns/ColumnArray.h>
|
||||
#include <Columns/ColumnTuple.h>
|
||||
#include <Columns/ColumnLowCardinality.h>
|
||||
#include <AggregateFunctions/AggregateFunctionCount.h>
|
||||
#include <DataStreams/IProfilingBlockInputStream.h>
|
||||
#include <DataStreams/NativeBlockOutputStream.h>
|
||||
@ -22,11 +24,9 @@
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <common/demangle.h>
|
||||
|
||||
#if __has_include(<Interpreters/config_compile.h>)
|
||||
#include <Interpreters/config_compile.h>
|
||||
#include <Columns/ColumnLowCardinality.h>
|
||||
#include <DataTypes/DataTypeLowCardinality.h>
|
||||
|
||||
#endif
|
||||
|
||||
|
||||
@ -188,7 +188,7 @@ Aggregator::Aggregator(const Params & params_)
|
||||
}
|
||||
|
||||
method_chosen = chooseAggregationMethod();
|
||||
AggregationStateCache::Settings cache_settings;
|
||||
HashMethodContext::Settings cache_settings;
|
||||
cache_settings.max_threads = params.max_threads;
|
||||
aggregation_state_cache = AggregatedDataVariants::createCache(method_chosen, cache_settings);
|
||||
}
|
||||
@ -586,11 +586,7 @@ void NO_INLINE Aggregator::executeImpl(
|
||||
bool no_more_keys,
|
||||
AggregateDataPtr overflow_row) const
|
||||
{
|
||||
typename Method::State state;
|
||||
if constexpr (Method::low_cardinality_optimization)
|
||||
state.init(key_columns, aggregation_state_cache);
|
||||
else
|
||||
state.init(key_columns);
|
||||
typename Method::State state(key_columns, key_sizes, aggregation_state_cache);
|
||||
|
||||
if (!no_more_keys)
|
||||
executeImplCase<false>(method, state, aggregates_pool, rows, key_columns, aggregate_instructions, keys, overflow_row);
|
||||
@ -605,76 +601,35 @@ void NO_INLINE Aggregator::executeImplCase(
|
||||
typename Method::State & state,
|
||||
Arena * aggregates_pool,
|
||||
size_t rows,
|
||||
ColumnRawPtrs & key_columns,
|
||||
ColumnRawPtrs & /*key_columns*/,
|
||||
AggregateFunctionInstruction * aggregate_instructions,
|
||||
StringRefs & keys,
|
||||
StringRefs & /*keys*/,
|
||||
AggregateDataPtr overflow_row) const
|
||||
{
|
||||
/// NOTE When editing this code, also pay attention to SpecializedAggregator.h.
|
||||
|
||||
/// For all rows.
|
||||
typename Method::Key prev_key{};
|
||||
AggregateDataPtr value = nullptr;
|
||||
for (size_t i = 0; i < rows; ++i)
|
||||
{
|
||||
bool inserted = false; /// Inserted a new key, or was this key already?
|
||||
|
||||
/// Get the key to insert into the hash table.
|
||||
typename Method::Key key;
|
||||
if constexpr (!Method::low_cardinality_optimization)
|
||||
key = state.getKey(key_columns, params.keys_size, i, key_sizes, keys, *aggregates_pool);
|
||||
|
||||
AggregateDataPtr * aggregate_data = nullptr;
|
||||
typename Method::iterator it; /// Is not used if Method::low_cardinality_optimization
|
||||
|
||||
if (!no_more_keys) /// Insert.
|
||||
{
|
||||
/// Optimization for consecutive identical keys.
|
||||
if (!Method::no_consecutive_keys_optimization)
|
||||
{
|
||||
if (i != 0 && key == prev_key)
|
||||
{
|
||||
/// Add values to the aggregate functions.
|
||||
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
|
||||
(*inst->func)(inst->that, value + inst->state_offset, inst->arguments, i, aggregates_pool);
|
||||
|
||||
method.onExistingKey(key, keys, *aggregates_pool);
|
||||
continue;
|
||||
}
|
||||
else
|
||||
prev_key = key;
|
||||
}
|
||||
|
||||
if constexpr (Method::low_cardinality_optimization)
|
||||
aggregate_data = state.emplaceKeyFromRow(method.data, i, inserted, params.keys_size, keys, *aggregates_pool);
|
||||
else
|
||||
{
|
||||
method.data.emplace(key, it, inserted);
|
||||
aggregate_data = &Method::getAggregateData(it->second);
|
||||
}
|
||||
}
|
||||
if constexpr (!no_more_keys) /// Insert.
|
||||
aggregate_data = state.emplaceKey(method.data, i, inserted, *aggregates_pool);
|
||||
else
|
||||
{
|
||||
/// Add only if the key already exists.
|
||||
|
||||
if constexpr (Method::low_cardinality_optimization)
|
||||
aggregate_data = state.findFromRow(method.data, i);
|
||||
else
|
||||
{
|
||||
it = method.data.find(key);
|
||||
if (method.data.end() != it)
|
||||
aggregate_data = &Method::getAggregateData(it->second);
|
||||
}
|
||||
bool found = false;
|
||||
aggregate_data = state.findKey(method.data, i, found, *aggregates_pool);
|
||||
}
|
||||
|
||||
/// aggregate_date == nullptr means that the new key did not fit in the hash table because of no_more_keys.
|
||||
|
||||
/// If the key does not fit, and the data does not need to be aggregated in a separate row, then there's nothing to do.
|
||||
if (!aggregate_data && !overflow_row)
|
||||
{
|
||||
method.onExistingKey(key, keys, *aggregates_pool);
|
||||
continue;
|
||||
}
|
||||
|
||||
/// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key.
|
||||
if (inserted)
|
||||
@ -682,18 +637,12 @@ void NO_INLINE Aggregator::executeImplCase(
|
||||
/// exception-safety - if you can not allocate memory or create states, then destructors will not be called.
|
||||
*aggregate_data = nullptr;
|
||||
|
||||
if constexpr (!Method::low_cardinality_optimization)
|
||||
method.onNewKey(*it, params.keys_size, keys, *aggregates_pool);
|
||||
|
||||
AggregateDataPtr place = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
|
||||
createAggregateStates(place);
|
||||
*aggregate_data = place;
|
||||
|
||||
if constexpr (Method::low_cardinality_optimization)
|
||||
state.cacheAggregateData(i, place);
|
||||
state.cacheData(i, place);
|
||||
}
|
||||
else
|
||||
method.onExistingKey(key, keys, *aggregates_pool);
|
||||
|
||||
value = aggregate_data ? *aggregate_data : overflow_row;
|
||||
|
||||
@ -1172,7 +1121,7 @@ void NO_INLINE Aggregator::convertToBlockImplFinal(
|
||||
|
||||
for (size_t i = 0; i < params.aggregates_size; ++i)
|
||||
aggregate_functions[i]->insertResultInto(
|
||||
Method::getAggregateData(value.second) + offsets_of_aggregate_states[i],
|
||||
value.second + offsets_of_aggregate_states[i],
|
||||
*final_aggregate_columns[i]);
|
||||
}
|
||||
|
||||
@ -1203,9 +1152,9 @@ void NO_INLINE Aggregator::convertToBlockImplNotFinal(
|
||||
|
||||
/// reserved, so push_back does not throw exceptions
|
||||
for (size_t i = 0; i < params.aggregates_size; ++i)
|
||||
aggregate_columns[i]->push_back(Method::getAggregateData(value.second) + offsets_of_aggregate_states[i]);
|
||||
aggregate_columns[i]->push_back(value.second + offsets_of_aggregate_states[i]);
|
||||
|
||||
Method::getAggregateData(value.second) = nullptr;
|
||||
value.second = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
@ -1549,20 +1498,20 @@ void NO_INLINE Aggregator::mergeDataImpl(
|
||||
{
|
||||
for (size_t i = 0; i < params.aggregates_size; ++i)
|
||||
aggregate_functions[i]->merge(
|
||||
Method::getAggregateData(res_it->second) + offsets_of_aggregate_states[i],
|
||||
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i],
|
||||
res_it->second + offsets_of_aggregate_states[i],
|
||||
it->second + offsets_of_aggregate_states[i],
|
||||
arena);
|
||||
|
||||
for (size_t i = 0; i < params.aggregates_size; ++i)
|
||||
aggregate_functions[i]->destroy(
|
||||
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]);
|
||||
it->second + offsets_of_aggregate_states[i]);
|
||||
}
|
||||
else
|
||||
{
|
||||
res_it->second = it->second;
|
||||
}
|
||||
|
||||
Method::getAggregateData(it->second) = nullptr;
|
||||
it->second = nullptr;
|
||||
}
|
||||
|
||||
table_src.clearAndShrink();
|
||||
@ -1586,19 +1535,18 @@ void NO_INLINE Aggregator::mergeDataNoMoreKeysImpl(
|
||||
|
||||
AggregateDataPtr res_data = table_dst.end() == res_it
|
||||
? overflows
|
||||
: Method::getAggregateData(res_it->second);
|
||||
: res_it->second;
|
||||
|
||||
for (size_t i = 0; i < params.aggregates_size; ++i)
|
||||
aggregate_functions[i]->merge(
|
||||
res_data + offsets_of_aggregate_states[i],
|
||||
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i],
|
||||
it->second + offsets_of_aggregate_states[i],
|
||||
arena);
|
||||
|
||||
for (size_t i = 0; i < params.aggregates_size; ++i)
|
||||
aggregate_functions[i]->destroy(
|
||||
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]);
|
||||
aggregate_functions[i]->destroy(it->second + offsets_of_aggregate_states[i]);
|
||||
|
||||
Method::getAggregateData(it->second) = nullptr;
|
||||
it->second = nullptr;
|
||||
}
|
||||
|
||||
table_src.clearAndShrink();
|
||||
@ -1621,19 +1569,18 @@ void NO_INLINE Aggregator::mergeDataOnlyExistingKeysImpl(
|
||||
if (table_dst.end() == res_it)
|
||||
continue;
|
||||
|
||||
AggregateDataPtr res_data = Method::getAggregateData(res_it->second);
|
||||
AggregateDataPtr res_data = res_it->second;
|
||||
|
||||
for (size_t i = 0; i < params.aggregates_size; ++i)
|
||||
aggregate_functions[i]->merge(
|
||||
res_data + offsets_of_aggregate_states[i],
|
||||
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i],
|
||||
it->second + offsets_of_aggregate_states[i],
|
||||
arena);
|
||||
|
||||
for (size_t i = 0; i < params.aggregates_size; ++i)
|
||||
aggregate_functions[i]->destroy(
|
||||
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]);
|
||||
aggregate_functions[i]->destroy(it->second + offsets_of_aggregate_states[i]);
|
||||
|
||||
Method::getAggregateData(it->second) = nullptr;
|
||||
it->second = nullptr;
|
||||
}
|
||||
|
||||
table_src.clearAndShrink();
|
||||
@ -1984,7 +1931,7 @@ template <bool no_more_keys, typename Method, typename Table>
|
||||
void NO_INLINE Aggregator::mergeStreamsImplCase(
|
||||
Block & block,
|
||||
Arena * aggregates_pool,
|
||||
Method & method,
|
||||
Method & method [[maybe_unused]],
|
||||
Table & data,
|
||||
AggregateDataPtr overflow_row) const
|
||||
{
|
||||
@ -1998,14 +1945,9 @@ void NO_INLINE Aggregator::mergeStreamsImplCase(
|
||||
for (size_t i = 0; i < params.aggregates_size; ++i)
|
||||
aggregate_columns[i] = &typeid_cast<const ColumnAggregateFunction &>(*block.safeGetByPosition(params.keys_size + i).column).getData();
|
||||
|
||||
typename Method::State state;
|
||||
if constexpr (Method::low_cardinality_optimization)
|
||||
state.init(key_columns, aggregation_state_cache);
|
||||
else
|
||||
state.init(key_columns);
|
||||
typename Method::State state(key_columns, key_sizes, aggregation_state_cache);
|
||||
|
||||
/// For all rows.
|
||||
StringRefs keys(params.keys_size);
|
||||
size_t rows = block.rows();
|
||||
for (size_t i = 0; i < rows; ++i)
|
||||
{
|
||||
@ -2014,59 +1956,31 @@ void NO_INLINE Aggregator::mergeStreamsImplCase(
|
||||
|
||||
bool inserted = false; /// Inserted a new key, or was this key already?
|
||||
|
||||
/// Get the key to insert into the hash table.
|
||||
typename Method::Key key;
|
||||
if constexpr (!Method::low_cardinality_optimization)
|
||||
key = state.getKey(key_columns, params.keys_size, i, key_sizes, keys, *aggregates_pool);
|
||||
|
||||
if (!no_more_keys)
|
||||
{
|
||||
if constexpr (Method::low_cardinality_optimization)
|
||||
aggregate_data = state.emplaceKeyFromRow(data, i, inserted, params.keys_size, keys, *aggregates_pool);
|
||||
aggregate_data = state.emplaceKey(data, i, inserted, *aggregates_pool);
|
||||
else
|
||||
{
|
||||
data.emplace(key, it, inserted);
|
||||
aggregate_data = &Method::getAggregateData(it->second);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if constexpr (Method::low_cardinality_optimization)
|
||||
aggregate_data = state.findFromRow(data, i);
|
||||
else
|
||||
{
|
||||
it = data.find(key);
|
||||
if (data.end() != it)
|
||||
aggregate_data = &Method::getAggregateData(it->second);
|
||||
}
|
||||
bool found;
|
||||
aggregate_data = state.findKey(data, i, found, *aggregates_pool);
|
||||
}
|
||||
|
||||
/// aggregate_date == nullptr means that the new key did not fit in the hash table because of no_more_keys.
|
||||
|
||||
/// If the key does not fit, and the data does not need to be aggregated into a separate row, then there's nothing to do.
|
||||
if (!aggregate_data && !overflow_row)
|
||||
{
|
||||
method.onExistingKey(key, keys, *aggregates_pool);
|
||||
continue;
|
||||
}
|
||||
|
||||
/// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key.
|
||||
if (inserted)
|
||||
{
|
||||
*aggregate_data = nullptr;
|
||||
|
||||
if constexpr (!Method::low_cardinality_optimization)
|
||||
method.onNewKey(*it, params.keys_size, keys, *aggregates_pool);
|
||||
|
||||
AggregateDataPtr place = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
|
||||
createAggregateStates(place);
|
||||
*aggregate_data = place;
|
||||
|
||||
if constexpr (Method::low_cardinality_optimization)
|
||||
state.cacheAggregateData(i, place);
|
||||
state.cacheData(i, place);
|
||||
}
|
||||
else
|
||||
method.onExistingKey(key, keys, *aggregates_pool);
|
||||
|
||||
AggregateDataPtr value = aggregate_data ? *aggregate_data : overflow_row;
|
||||
|
||||
@ -2163,7 +2077,7 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV
|
||||
* If there is at least one block with a bucket number greater or equal than zero, then there was a two-level aggregation.
|
||||
*/
|
||||
auto max_bucket = bucket_to_blocks.rbegin()->first;
|
||||
size_t has_two_level = max_bucket >= 0;
|
||||
bool has_two_level = max_bucket >= 0;
|
||||
|
||||
if (has_two_level)
|
||||
{
|
||||
@ -2393,15 +2307,11 @@ void NO_INLINE Aggregator::convertBlockToTwoLevelImpl(
|
||||
Method & method,
|
||||
Arena * pool,
|
||||
ColumnRawPtrs & key_columns,
|
||||
StringRefs & keys,
|
||||
StringRefs & keys [[maybe_unused]],
|
||||
const Block & source,
|
||||
std::vector<Block> & destinations) const
|
||||
{
|
||||
typename Method::State state;
|
||||
if constexpr (Method::low_cardinality_optimization)
|
||||
state.init(key_columns, aggregation_state_cache);
|
||||
else
|
||||
state.init(key_columns);
|
||||
typename Method::State state(key_columns, key_sizes, aggregation_state_cache);
|
||||
|
||||
size_t rows = source.rows();
|
||||
size_t columns = source.columns();
|
||||
@ -2421,16 +2331,11 @@ void NO_INLINE Aggregator::convertBlockToTwoLevelImpl(
|
||||
}
|
||||
}
|
||||
|
||||
/// Obtain a key. Calculate bucket number from it.
|
||||
typename Method::Key key = state.getKey(key_columns, params.keys_size, i, key_sizes, keys, *pool);
|
||||
|
||||
auto hash = method.data.hash(key);
|
||||
/// Calculate bucket number from row hash.
|
||||
auto hash = state.getHash(method.data, i, *pool);
|
||||
auto bucket = method.data.getBucketFromHash(hash);
|
||||
|
||||
selector[i] = bucket;
|
||||
|
||||
/// We don't need to store this key in pool.
|
||||
method.onExistingKey(key, keys, *pool);
|
||||
}
|
||||
|
||||
size_t num_buckets = destinations.size();
|
||||
@ -2521,7 +2426,7 @@ void NO_INLINE Aggregator::destroyImpl(Table & table) const
|
||||
{
|
||||
for (auto elem : table)
|
||||
{
|
||||
AggregateDataPtr & data = Method::getAggregateData(elem.second);
|
||||
AggregateDataPtr & data = elem.second;
|
||||
|
||||
/** If an exception (usually a lack of memory, the MemoryTracker throws) arose
|
||||
* after inserting the key into a hash table, but before creating all states of aggregate functions,
|
||||
|
@ -15,6 +15,7 @@
|
||||
#include <common/ThreadPool.h>
|
||||
#include <Common/UInt128.h>
|
||||
#include <Common/LRUCache.h>
|
||||
#include <Common/ColumnsHashing.h>
|
||||
|
||||
#include <DataStreams/IBlockInputStream.h>
|
||||
#include <DataStreams/SizeLimits.h>
|
||||
@ -138,18 +139,6 @@ using AggregatedDataWithNullableStringKeyTwoLevel = AggregationDataWithNullKeyTw
|
||||
TwoLevelHashMapWithSavedHash<StringRef, AggregateDataPtr, DefaultHash<StringRef>,
|
||||
TwoLevelHashTableGrower<>, HashTableAllocator, HashTableWithNullKey>>;
|
||||
|
||||
/// Cache which can be used by aggregations method's states. Object is shared in all threads.
|
||||
struct AggregationStateCache
|
||||
{
|
||||
virtual ~AggregationStateCache() = default;
|
||||
|
||||
struct Settings
|
||||
{
|
||||
size_t max_threads;
|
||||
};
|
||||
};
|
||||
|
||||
using AggregationStateCachePtr = std::shared_ptr<AggregationStateCache>;
|
||||
|
||||
/// For the case where there is one numeric key.
|
||||
template <typename FieldType, typename TData> /// UInt8/16/32/64 for any type with corresponding bit width.
|
||||
@ -169,65 +158,16 @@ struct AggregationMethodOneNumber
|
||||
AggregationMethodOneNumber(const Other & other) : data(other.data) {}
|
||||
|
||||
/// To use one `Method` in different threads, use different `State`.
|
||||
struct State
|
||||
{
|
||||
const char * vec;
|
||||
using State = ColumnsHashing::HashMethodOneNumber<Data, FieldType>;
|
||||
|
||||
/** Called at the start of each block processing.
|
||||
* Sets the variables needed for the other methods called in inner loops.
|
||||
*/
|
||||
void init(ColumnRawPtrs & key_columns)
|
||||
{
|
||||
vec = key_columns[0]->getRawData().data;
|
||||
}
|
||||
|
||||
/// Get the key from the key columns for insertion into the hash table.
|
||||
ALWAYS_INLINE Key getKey(
|
||||
const ColumnRawPtrs & /*key_columns*/,
|
||||
size_t /*keys_size*/, /// Number of key columns.
|
||||
size_t i, /// From which row of the block, get the key.
|
||||
const Sizes & /*key_sizes*/, /// If the keys of a fixed length - their lengths. It is not used in aggregation methods for variable length keys.
|
||||
StringRefs & /*keys*/, /// Here references to key data in columns can be written. They can be used in the future.
|
||||
Arena & /*pool*/) const
|
||||
{
|
||||
return unalignedLoad<FieldType>(vec + i * sizeof(FieldType));
|
||||
}
|
||||
};
|
||||
|
||||
/// From the value in the hash table, get AggregateDataPtr.
|
||||
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
|
||||
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
|
||||
|
||||
/** Place additional data, if necessary, in case a new key was inserted into the hash table.
|
||||
*/
|
||||
static ALWAYS_INLINE void onNewKey(typename Data::value_type & /*value*/, size_t /*keys_size*/, StringRefs & /*keys*/, Arena & /*pool*/)
|
||||
{
|
||||
}
|
||||
|
||||
/** The action to be taken if the key is not new. For example, roll back the memory allocation in the pool.
|
||||
*/
|
||||
static ALWAYS_INLINE void onExistingKey(const Key & /*key*/, StringRefs & /*keys*/, Arena & /*pool*/) {}
|
||||
|
||||
/** Do not use optimization for consecutive keys.
|
||||
*/
|
||||
static const bool no_consecutive_keys_optimization = false;
|
||||
/// Use optimization for low cardinality.
|
||||
static const bool low_cardinality_optimization = false;
|
||||
|
||||
/** Insert the key from the hash table into columns.
|
||||
*/
|
||||
// Insert the key from the hash table into columns.
|
||||
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes & /*key_sizes*/)
|
||||
{
|
||||
static_cast<ColumnVectorHelper *>(key_columns[0].get())->insertRawData<sizeof(FieldType)>(reinterpret_cast<const char *>(&value.first));
|
||||
}
|
||||
|
||||
/// Get StringRef from value which can be inserted into column.
|
||||
static StringRef getValueRef(const typename Data::value_type & value)
|
||||
{
|
||||
return StringRef(reinterpret_cast<const char *>(&value.first), sizeof(value.first));
|
||||
}
|
||||
|
||||
static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & /*settings*/) { return nullptr; }
|
||||
};
|
||||
|
||||
|
||||
@ -248,58 +188,14 @@ struct AggregationMethodString
|
||||
template <typename Other>
|
||||
AggregationMethodString(const Other & other) : data(other.data) {}
|
||||
|
||||
struct State
|
||||
{
|
||||
const IColumn::Offset * offsets;
|
||||
const UInt8 * chars;
|
||||
using State = ColumnsHashing::HashMethodString<Data>;
|
||||
|
||||
void init(ColumnRawPtrs & key_columns)
|
||||
{
|
||||
const IColumn & column = *key_columns[0];
|
||||
const ColumnString & column_string = static_cast<const ColumnString &>(column);
|
||||
offsets = column_string.getOffsets().data();
|
||||
chars = column_string.getChars().data();
|
||||
}
|
||||
|
||||
ALWAYS_INLINE Key getKey(
|
||||
const ColumnRawPtrs & /*key_columns*/,
|
||||
size_t /*keys_size*/,
|
||||
ssize_t i,
|
||||
const Sizes & /*key_sizes*/,
|
||||
StringRefs & /*keys*/,
|
||||
Arena & /*pool*/) const
|
||||
{
|
||||
return StringRef(
|
||||
chars + offsets[i - 1],
|
||||
offsets[i] - offsets[i - 1] - 1);
|
||||
}
|
||||
};
|
||||
|
||||
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
|
||||
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
|
||||
|
||||
static ALWAYS_INLINE void onNewKey(typename Data::value_type & value, size_t /*keys_size*/, StringRefs & /*keys*/, Arena & pool)
|
||||
{
|
||||
if (value.first.size)
|
||||
value.first.data = pool.insert(value.first.data, value.first.size);
|
||||
}
|
||||
|
||||
static ALWAYS_INLINE void onExistingKey(const Key & /*key*/, StringRefs & /*keys*/, Arena & /*pool*/) {}
|
||||
|
||||
static const bool no_consecutive_keys_optimization = false;
|
||||
static const bool low_cardinality_optimization = false;
|
||||
|
||||
static StringRef getValueRef(const typename Data::value_type & value)
|
||||
{
|
||||
return StringRef(value.first.data, value.first.size);
|
||||
}
|
||||
|
||||
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes &)
|
||||
{
|
||||
key_columns[0]->insertData(value.first.data, value.first.size);
|
||||
}
|
||||
|
||||
static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & /*settings*/) { return nullptr; }
|
||||
};
|
||||
|
||||
|
||||
@ -320,101 +216,14 @@ struct AggregationMethodFixedString
|
||||
template <typename Other>
|
||||
AggregationMethodFixedString(const Other & other) : data(other.data) {}
|
||||
|
||||
struct State
|
||||
{
|
||||
size_t n;
|
||||
const ColumnFixedString::Chars * chars;
|
||||
using State = ColumnsHashing::HashMethodFixedString<Data>;
|
||||
|
||||
void init(ColumnRawPtrs & key_columns)
|
||||
{
|
||||
const IColumn & column = *key_columns[0];
|
||||
const ColumnFixedString & column_string = static_cast<const ColumnFixedString &>(column);
|
||||
n = column_string.getN();
|
||||
chars = &column_string.getChars();
|
||||
}
|
||||
|
||||
ALWAYS_INLINE Key getKey(
|
||||
const ColumnRawPtrs &,
|
||||
size_t,
|
||||
size_t i,
|
||||
const Sizes &,
|
||||
StringRefs &,
|
||||
Arena &) const
|
||||
{
|
||||
return StringRef(&(*chars)[i * n], n);
|
||||
}
|
||||
};
|
||||
|
||||
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
|
||||
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
|
||||
|
||||
static ALWAYS_INLINE void onNewKey(typename Data::value_type & value, size_t, StringRefs &, Arena & pool)
|
||||
{
|
||||
value.first.data = pool.insert(value.first.data, value.first.size);
|
||||
}
|
||||
|
||||
static ALWAYS_INLINE void onExistingKey(const Key &, StringRefs &, Arena &) {}
|
||||
|
||||
static const bool no_consecutive_keys_optimization = false;
|
||||
static const bool low_cardinality_optimization = false;
|
||||
|
||||
static StringRef getValueRef(const typename Data::value_type & value)
|
||||
{
|
||||
return StringRef(value.first.data, value.first.size);
|
||||
}
|
||||
|
||||
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes &)
|
||||
{
|
||||
key_columns[0]->insertData(value.first.data, value.first.size);
|
||||
}
|
||||
|
||||
static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & /*settings*/) { return nullptr; }
|
||||
};
|
||||
|
||||
/// Cache stores dictionaries and saved_hash per dictionary key.
|
||||
class LowCardinalityDictionaryCache : public AggregationStateCache
|
||||
{
|
||||
public:
|
||||
/// Will assume that dictionaries with same hash has the same keys.
|
||||
/// Just in case, check that they have also the same size.
|
||||
struct DictionaryKey
|
||||
{
|
||||
UInt128 hash;
|
||||
UInt64 size;
|
||||
|
||||
bool operator== (const DictionaryKey & other) const { return hash == other.hash && size == other.size; }
|
||||
};
|
||||
|
||||
struct DictionaryKeyHash
|
||||
{
|
||||
size_t operator()(const DictionaryKey & key) const
|
||||
{
|
||||
SipHash hash;
|
||||
hash.update(key.hash.low);
|
||||
hash.update(key.hash.high);
|
||||
hash.update(key.size);
|
||||
return hash.get64();
|
||||
}
|
||||
};
|
||||
|
||||
struct CachedValues
|
||||
{
|
||||
/// Store ptr to dictionary to be sure it won't be deleted.
|
||||
ColumnPtr dictionary_holder;
|
||||
/// Hashes for dictionary keys.
|
||||
const UInt64 * saved_hash = nullptr;
|
||||
};
|
||||
|
||||
using CachedValuesPtr = std::shared_ptr<CachedValues>;
|
||||
|
||||
explicit LowCardinalityDictionaryCache(const AggregationStateCache::Settings & settings) : cache(settings.max_threads) {}
|
||||
|
||||
CachedValuesPtr get(const DictionaryKey & key) { return cache.get(key); }
|
||||
void set(const DictionaryKey & key, const CachedValuesPtr & mapped) { cache.set(key, mapped); }
|
||||
|
||||
private:
|
||||
using Cache = LRUCache<DictionaryKey, CachedValues, DictionaryKeyHash>;
|
||||
Cache cache;
|
||||
};
|
||||
|
||||
/// Single low cardinality column.
|
||||
@ -432,342 +241,23 @@ struct AggregationMethodSingleLowCardinalityColumn : public SingleColumnMethod
|
||||
|
||||
using Base::data;
|
||||
|
||||
static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & settings)
|
||||
{
|
||||
return std::make_shared<LowCardinalityDictionaryCache>(settings);
|
||||
}
|
||||
|
||||
AggregationMethodSingleLowCardinalityColumn() = default;
|
||||
|
||||
template <typename Other>
|
||||
explicit AggregationMethodSingleLowCardinalityColumn(const Other & other) : Base(other) {}
|
||||
|
||||
struct State : public BaseState
|
||||
{
|
||||
ColumnRawPtrs key_columns;
|
||||
const IColumn * positions = nullptr;
|
||||
size_t size_of_index_type = 0;
|
||||
using State = ColumnsHashing::HashMethodSingleLowCardinalityColumn<BaseState, true>;
|
||||
|
||||
/// saved hash is from current column or from cache.
|
||||
const UInt64 * saved_hash = nullptr;
|
||||
/// Hold dictionary in case saved_hash is from cache to be sure it won't be deleted.
|
||||
ColumnPtr dictionary_holder;
|
||||
|
||||
/// Cache AggregateDataPtr for current column in order to decrease the number of hash table usages.
|
||||
PaddedPODArray<AggregateDataPtr> aggregate_data_cache;
|
||||
|
||||
/// If initialized column is nullable.
|
||||
bool is_nullable = false;
|
||||
|
||||
void init(ColumnRawPtrs &)
|
||||
{
|
||||
throw Exception("Expected cache for AggregationMethodSingleLowCardinalityColumn::init", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
void init(ColumnRawPtrs & key_columns_low_cardinality, const AggregationStateCachePtr & cache_ptr)
|
||||
{
|
||||
auto column = typeid_cast<const ColumnLowCardinality *>(key_columns_low_cardinality[0]);
|
||||
if (!column)
|
||||
throw Exception("Invalid aggregation key type for AggregationMethodSingleLowCardinalityColumn method. "
|
||||
"Excepted LowCardinality, got " + key_columns_low_cardinality[0]->getName(), ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
if (!cache_ptr)
|
||||
throw Exception("Cache wasn't created for AggregationMethodSingleLowCardinalityColumn", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
auto cache = typeid_cast<LowCardinalityDictionaryCache *>(cache_ptr.get());
|
||||
if (!cache)
|
||||
{
|
||||
const auto & cached_val = *cache_ptr;
|
||||
throw Exception("Invalid type for AggregationMethodSingleLowCardinalityColumn cache: "
|
||||
+ demangle(typeid(cached_val).name()), ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
auto * dict = column->getDictionary().getNestedNotNullableColumn().get();
|
||||
is_nullable = column->getDictionary().nestedColumnIsNullable();
|
||||
key_columns = {dict};
|
||||
bool is_shared_dict = column->isSharedDictionary();
|
||||
|
||||
typename LowCardinalityDictionaryCache::DictionaryKey dictionary_key;
|
||||
typename LowCardinalityDictionaryCache::CachedValuesPtr cached_values;
|
||||
|
||||
if (is_shared_dict)
|
||||
{
|
||||
dictionary_key = {column->getDictionary().getHash(), dict->size()};
|
||||
cached_values = cache->get(dictionary_key);
|
||||
}
|
||||
|
||||
if (cached_values)
|
||||
{
|
||||
saved_hash = cached_values->saved_hash;
|
||||
dictionary_holder = cached_values->dictionary_holder;
|
||||
}
|
||||
else
|
||||
{
|
||||
saved_hash = column->getDictionary().tryGetSavedHash();
|
||||
dictionary_holder = column->getDictionaryPtr();
|
||||
|
||||
if (is_shared_dict)
|
||||
{
|
||||
cached_values = std::make_shared<typename LowCardinalityDictionaryCache::CachedValues>();
|
||||
cached_values->saved_hash = saved_hash;
|
||||
cached_values->dictionary_holder = dictionary_holder;
|
||||
|
||||
cache->set(dictionary_key, cached_values);
|
||||
}
|
||||
}
|
||||
|
||||
AggregateDataPtr default_data = nullptr;
|
||||
aggregate_data_cache.assign(key_columns[0]->size(), default_data);
|
||||
|
||||
size_of_index_type = column->getSizeOfIndexType();
|
||||
positions = column->getIndexesPtr().get();
|
||||
|
||||
BaseState::init(key_columns);
|
||||
}
|
||||
|
||||
ALWAYS_INLINE size_t getIndexAt(size_t row) const
|
||||
{
|
||||
switch (size_of_index_type)
|
||||
{
|
||||
case sizeof(UInt8): return static_cast<const ColumnUInt8 *>(positions)->getElement(row);
|
||||
case sizeof(UInt16): return static_cast<const ColumnUInt16 *>(positions)->getElement(row);
|
||||
case sizeof(UInt32): return static_cast<const ColumnUInt32 *>(positions)->getElement(row);
|
||||
case sizeof(UInt64): return static_cast<const ColumnUInt64 *>(positions)->getElement(row);
|
||||
default: throw Exception("Unexpected size of index type for low cardinality column.", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the key from the key columns for insertion into the hash table.
|
||||
ALWAYS_INLINE Key getKey(
|
||||
const ColumnRawPtrs & /*key_columns*/,
|
||||
size_t /*keys_size*/,
|
||||
size_t i,
|
||||
const Sizes & key_sizes,
|
||||
StringRefs & keys,
|
||||
Arena & pool) const
|
||||
{
|
||||
size_t row = getIndexAt(i);
|
||||
return BaseState::getKey(key_columns, 1, row, key_sizes, keys, pool);
|
||||
}
|
||||
|
||||
template <typename D>
|
||||
ALWAYS_INLINE AggregateDataPtr * emplaceKeyFromRow(
|
||||
D & data,
|
||||
size_t i,
|
||||
bool & inserted,
|
||||
size_t keys_size,
|
||||
StringRefs & keys,
|
||||
Arena & pool)
|
||||
{
|
||||
size_t row = getIndexAt(i);
|
||||
|
||||
if (is_nullable && row == 0)
|
||||
{
|
||||
inserted = !data.hasNullKeyData();
|
||||
data.hasNullKeyData() = true;
|
||||
return &data.getNullKeyData();
|
||||
}
|
||||
|
||||
if (aggregate_data_cache[row])
|
||||
{
|
||||
inserted = false;
|
||||
return &aggregate_data_cache[row];
|
||||
}
|
||||
else
|
||||
{
|
||||
Sizes key_sizes;
|
||||
auto key = getKey({}, 0, i, key_sizes, keys, pool);
|
||||
|
||||
typename D::iterator it;
|
||||
if (saved_hash)
|
||||
data.emplace(key, it, inserted, saved_hash[row]);
|
||||
else
|
||||
data.emplace(key, it, inserted);
|
||||
|
||||
if (inserted)
|
||||
Base::onNewKey(*it, keys_size, keys, pool);
|
||||
else
|
||||
aggregate_data_cache[row] = Base::getAggregateData(it->second);
|
||||
|
||||
return &Base::getAggregateData(it->second);
|
||||
}
|
||||
}
|
||||
|
||||
ALWAYS_INLINE bool isNullAt(size_t i)
|
||||
{
|
||||
if (!is_nullable)
|
||||
return false;
|
||||
|
||||
return getIndexAt(i) == 0;
|
||||
}
|
||||
|
||||
ALWAYS_INLINE void cacheAggregateData(size_t i, AggregateDataPtr data)
|
||||
{
|
||||
size_t row = getIndexAt(i);
|
||||
aggregate_data_cache[row] = data;
|
||||
}
|
||||
|
||||
template <typename D>
|
||||
ALWAYS_INLINE AggregateDataPtr * findFromRow(D & data, size_t i)
|
||||
{
|
||||
size_t row = getIndexAt(i);
|
||||
|
||||
if (is_nullable && row == 0)
|
||||
return data.hasNullKeyData() ? &data.getNullKeyData() : nullptr;
|
||||
|
||||
if (!aggregate_data_cache[row])
|
||||
{
|
||||
Sizes key_sizes;
|
||||
StringRefs keys;
|
||||
Arena pool;
|
||||
auto key = getKey({}, 0, i, key_sizes, keys, pool);
|
||||
|
||||
typename D::iterator it;
|
||||
if (saved_hash)
|
||||
it = data.find(key, saved_hash[row]);
|
||||
else
|
||||
it = data.find(key);
|
||||
|
||||
if (it != data.end())
|
||||
aggregate_data_cache[row] = Base::getAggregateData(it->second);
|
||||
}
|
||||
return &aggregate_data_cache[row];
|
||||
}
|
||||
};
|
||||
|
||||
static AggregateDataPtr & getAggregateData(Mapped & value) { return Base::getAggregateData(value); }
|
||||
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return Base::getAggregateData(value); }
|
||||
|
||||
static void onNewKey(typename Data::value_type & value, size_t keys_size, StringRefs & keys, Arena & pool)
|
||||
{
|
||||
return Base::onNewKey(value, keys_size, keys, pool);
|
||||
}
|
||||
|
||||
static void onExistingKey(const Key & key, StringRefs & keys, Arena & pool)
|
||||
{
|
||||
return Base::onExistingKey(key, keys, pool);
|
||||
}
|
||||
|
||||
static const bool no_consecutive_keys_optimization = true;
|
||||
static const bool low_cardinality_optimization = true;
|
||||
|
||||
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns_low_cardinality, const Sizes & /*key_sizes*/)
|
||||
{
|
||||
auto ref = Base::getValueRef(value);
|
||||
auto ref = BaseState::getValueRef(value);
|
||||
static_cast<ColumnLowCardinality *>(key_columns_low_cardinality[0].get())->insertData(ref.data, ref.size);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
namespace aggregator_impl
|
||||
{
|
||||
|
||||
/// This class is designed to provide the functionality that is required for
|
||||
/// supporting nullable keys in AggregationMethodKeysFixed. If there are
|
||||
/// no nullable keys, this class is merely implemented as an empty shell.
|
||||
template <typename Key, bool has_nullable_keys>
|
||||
class BaseStateKeysFixed;
|
||||
|
||||
/// Case where nullable keys are supported.
|
||||
template <typename Key>
|
||||
class BaseStateKeysFixed<Key, true>
|
||||
{
|
||||
protected:
|
||||
void init(const ColumnRawPtrs & key_columns)
|
||||
{
|
||||
null_maps.reserve(key_columns.size());
|
||||
actual_columns.reserve(key_columns.size());
|
||||
|
||||
for (const auto & col : key_columns)
|
||||
{
|
||||
if (col->isColumnNullable())
|
||||
{
|
||||
const auto & nullable_col = static_cast<const ColumnNullable &>(*col);
|
||||
actual_columns.push_back(&nullable_col.getNestedColumn());
|
||||
null_maps.push_back(&nullable_col.getNullMapColumn());
|
||||
}
|
||||
else
|
||||
{
|
||||
actual_columns.push_back(col);
|
||||
null_maps.push_back(nullptr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the columns which actually contain the values of the keys.
|
||||
/// For a given key column, if it is nullable, we return its nested
|
||||
/// column. Otherwise we return the key column itself.
|
||||
inline const ColumnRawPtrs & getActualColumns() const
|
||||
{
|
||||
return actual_columns;
|
||||
}
|
||||
|
||||
/// Create a bitmap that indicates whether, for a particular row,
|
||||
/// a key column bears a null value or not.
|
||||
KeysNullMap<Key> createBitmap(size_t row) const
|
||||
{
|
||||
KeysNullMap<Key> bitmap{};
|
||||
|
||||
for (size_t k = 0; k < null_maps.size(); ++k)
|
||||
{
|
||||
if (null_maps[k] != nullptr)
|
||||
{
|
||||
const auto & null_map = static_cast<const ColumnUInt8 &>(*null_maps[k]).getData();
|
||||
if (null_map[row] == 1)
|
||||
{
|
||||
size_t bucket = k / 8;
|
||||
size_t offset = k % 8;
|
||||
bitmap[bucket] |= UInt8(1) << offset;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return bitmap;
|
||||
}
|
||||
|
||||
private:
|
||||
ColumnRawPtrs actual_columns;
|
||||
ColumnRawPtrs null_maps;
|
||||
};
|
||||
|
||||
/// Case where nullable keys are not supported.
|
||||
template <typename Key>
|
||||
class BaseStateKeysFixed<Key, false>
|
||||
{
|
||||
protected:
|
||||
void init(const ColumnRawPtrs &)
|
||||
{
|
||||
throw Exception{"Internal error: calling init() for non-nullable"
|
||||
" keys is forbidden", ErrorCodes::LOGICAL_ERROR};
|
||||
}
|
||||
|
||||
const ColumnRawPtrs & getActualColumns() const
|
||||
{
|
||||
throw Exception{"Internal error: calling getActualColumns() for non-nullable"
|
||||
" keys is forbidden", ErrorCodes::LOGICAL_ERROR};
|
||||
}
|
||||
|
||||
KeysNullMap<Key> createBitmap(size_t) const
|
||||
{
|
||||
throw Exception{"Internal error: calling createBitmap() for non-nullable keys"
|
||||
" is forbidden", ErrorCodes::LOGICAL_ERROR};
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
// Oprional mask for low cardinality columns.
|
||||
template <bool has_low_cardinality>
|
||||
struct LowCardinalityKeys
|
||||
{
|
||||
ColumnRawPtrs nested_columns;
|
||||
ColumnRawPtrs positions;
|
||||
Sizes position_sizes;
|
||||
};
|
||||
|
||||
template <>
|
||||
struct LowCardinalityKeys<false> {};
|
||||
|
||||
/// For the case where all keys are of fixed length, and they fit in N (for example, 128) bits.
|
||||
template <typename TData, bool has_nullable_keys_ = false, bool has_low_cardinality_ = false>
|
||||
struct AggregationMethodKeysFixed
|
||||
@ -787,71 +277,8 @@ struct AggregationMethodKeysFixed
|
||||
template <typename Other>
|
||||
AggregationMethodKeysFixed(const Other & other) : data(other.data) {}
|
||||
|
||||
class State final : private aggregator_impl::BaseStateKeysFixed<Key, has_nullable_keys>
|
||||
{
|
||||
LowCardinalityKeys<has_low_cardinality> low_cardinality_keys;
|
||||
using State = ColumnsHashing::HashMethodKeysFixed<Data, has_nullable_keys, has_low_cardinality>;
|
||||
|
||||
public:
|
||||
using Base = aggregator_impl::BaseStateKeysFixed<Key, has_nullable_keys>;
|
||||
|
||||
void init(ColumnRawPtrs & key_columns)
|
||||
{
|
||||
if constexpr (has_low_cardinality)
|
||||
{
|
||||
low_cardinality_keys.nested_columns.resize(key_columns.size());
|
||||
low_cardinality_keys.positions.assign(key_columns.size(), nullptr);
|
||||
low_cardinality_keys.position_sizes.resize(key_columns.size());
|
||||
for (size_t i = 0; i < key_columns.size(); ++i)
|
||||
{
|
||||
if (auto * low_cardinality_col = typeid_cast<const ColumnLowCardinality *>(key_columns[i]))
|
||||
{
|
||||
low_cardinality_keys.nested_columns[i] = low_cardinality_col->getDictionary().getNestedColumn().get();
|
||||
low_cardinality_keys.positions[i] = &low_cardinality_col->getIndexes();
|
||||
low_cardinality_keys.position_sizes[i] = low_cardinality_col->getSizeOfIndexType();
|
||||
}
|
||||
else
|
||||
low_cardinality_keys.nested_columns[i] = key_columns[i];
|
||||
}
|
||||
}
|
||||
|
||||
if (has_nullable_keys)
|
||||
Base::init(key_columns);
|
||||
}
|
||||
|
||||
ALWAYS_INLINE Key getKey(
|
||||
const ColumnRawPtrs & key_columns,
|
||||
size_t keys_size,
|
||||
size_t i,
|
||||
const Sizes & key_sizes,
|
||||
StringRefs &,
|
||||
Arena &) const
|
||||
{
|
||||
if (has_nullable_keys)
|
||||
{
|
||||
auto bitmap = Base::createBitmap(i);
|
||||
return packFixed<Key>(i, keys_size, Base::getActualColumns(), key_sizes, bitmap);
|
||||
}
|
||||
else
|
||||
{
|
||||
if constexpr (has_low_cardinality)
|
||||
return packFixed<Key, true>(i, keys_size, low_cardinality_keys.nested_columns, key_sizes,
|
||||
&low_cardinality_keys.positions, &low_cardinality_keys.position_sizes);
|
||||
|
||||
return packFixed<Key>(i, keys_size, key_columns, key_sizes);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
|
||||
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
|
||||
|
||||
static ALWAYS_INLINE void onNewKey(typename Data::value_type &, size_t, StringRefs &, Arena &)
|
||||
{
|
||||
}
|
||||
|
||||
static ALWAYS_INLINE void onExistingKey(const Key &, StringRefs &, Arena &) {}
|
||||
|
||||
static const bool no_consecutive_keys_optimization = false;
|
||||
static const bool low_cardinality_optimization = false;
|
||||
|
||||
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes & key_sizes)
|
||||
@ -904,8 +331,6 @@ struct AggregationMethodKeysFixed
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & /*settings*/) { return nullptr; }
|
||||
};
|
||||
|
||||
|
||||
@ -930,53 +355,24 @@ struct AggregationMethodSerialized
|
||||
template <typename Other>
|
||||
AggregationMethodSerialized(const Other & other) : data(other.data) {}
|
||||
|
||||
struct State
|
||||
{
|
||||
void init(ColumnRawPtrs &)
|
||||
{
|
||||
}
|
||||
using State = ColumnsHashing::HashMethodSerialized<Data>;
|
||||
|
||||
ALWAYS_INLINE Key getKey(
|
||||
const ColumnRawPtrs & key_columns,
|
||||
size_t keys_size,
|
||||
size_t i,
|
||||
const Sizes &,
|
||||
StringRefs &,
|
||||
Arena & pool) const
|
||||
{
|
||||
return serializeKeysToPoolContiguous(i, keys_size, key_columns, pool);
|
||||
}
|
||||
};
|
||||
|
||||
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
|
||||
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
|
||||
|
||||
static ALWAYS_INLINE void onNewKey(typename Data::value_type &, size_t, StringRefs &, Arena &)
|
||||
{
|
||||
}
|
||||
|
||||
static ALWAYS_INLINE void onExistingKey(const Key & key, StringRefs &, Arena & pool)
|
||||
{
|
||||
pool.rollback(key.size);
|
||||
}
|
||||
|
||||
/// If the key already was, it is removed from the pool (overwritten), and the next key can not be compared with it.
|
||||
static const bool no_consecutive_keys_optimization = true;
|
||||
static const bool low_cardinality_optimization = false;
|
||||
|
||||
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes &)
|
||||
{
|
||||
auto pos = value.first.data;
|
||||
for (size_t i = 0; i < key_columns.size(); ++i)
|
||||
pos = key_columns[i]->deserializeAndInsertFromArena(pos);
|
||||
for (auto & column : key_columns)
|
||||
pos = column->deserializeAndInsertFromArena(pos);
|
||||
}
|
||||
|
||||
static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & /*settings*/) { return nullptr; }
|
||||
};
|
||||
|
||||
|
||||
class Aggregator;
|
||||
|
||||
using ColumnsHashing::HashMethodContext;
|
||||
using ColumnsHashing::HashMethodContextPtr;
|
||||
|
||||
struct AggregatedDataVariants : private boost::noncopyable
|
||||
{
|
||||
/** Working with states of aggregate functions in the pool is arranged in the following (inconvenient) way:
|
||||
@ -1298,7 +694,7 @@ struct AggregatedDataVariants : private boost::noncopyable
|
||||
}
|
||||
}
|
||||
|
||||
static AggregationStateCachePtr createCache(Type type, const AggregationStateCache::Settings & settings)
|
||||
static HashMethodContextPtr createCache(Type type, const HashMethodContext::Settings & settings)
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
@ -1309,7 +705,7 @@ struct AggregatedDataVariants : private boost::noncopyable
|
||||
{ \
|
||||
using TPtr ## NAME = decltype(AggregatedDataVariants::NAME); \
|
||||
using T ## NAME = typename TPtr ## NAME ::element_type; \
|
||||
return T ## NAME ::createCache(settings); \
|
||||
return T ## NAME ::State::createContext(settings); \
|
||||
}
|
||||
|
||||
APPLY_FOR_AGGREGATED_VARIANTS(M)
|
||||
@ -1496,7 +892,7 @@ protected:
|
||||
AggregatedDataVariants::Type method_chosen;
|
||||
Sizes key_sizes;
|
||||
|
||||
AggregationStateCachePtr aggregation_state_cache;
|
||||
HashMethodContextPtr aggregation_state_cache;
|
||||
|
||||
AggregateFunctionsPlainPtrs aggregate_functions;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user