Merge pull request #4128 from yandex/unified-sets

Unified sets
This commit is contained in:
alexey-milovidov 2019-02-06 20:40:11 +03:00 committed by GitHub
commit 2686f64233
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 1437 additions and 1485 deletions

View File

@ -0,0 +1,557 @@
#pragma once
#include <Common/ColumnsHashingImpl.h>
#include <Common/Arena.h>
#include <Common/LRUCache.h>
#include <common/unaligned.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnLowCardinality.h>
#include <Core/Defines.h>
#include <memory>
namespace DB
{
namespace ColumnsHashing
{
/// For the case when there is one numeric key.
/// UInt8/16/32/64 for any type with corresponding bit width.
template <typename Value, typename Mapped, typename FieldType, bool use_cache = true>
struct HashMethodOneNumber
: public columns_hashing_impl::HashMethodBase<HashMethodOneNumber<Value, Mapped, FieldType, use_cache>, Value, Mapped, use_cache>
{
using Self = HashMethodOneNumber<Value, Mapped, FieldType, use_cache>;
using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache>;
const char * vec;
/// If the keys of a fixed length then key_sizes contains their lengths, empty otherwise.
HashMethodOneNumber(const ColumnRawPtrs & key_columns, const Sizes & /*key_sizes*/, const HashMethodContextPtr &)
{
vec = key_columns[0]->getRawData().data;
}
/// Creates context. Method is called once and result context is used in all threads.
using Base::createContext; /// (const HashMethodContext::Settings &) -> HashMethodContextPtr
/// Emplace key into HashTable or HashMap. If Data is HashMap, returns ptr to value, otherwise nullptr.
/// Data is a HashTable where to insert key from column's row.
/// For Serialized method, key may be placed in pool.
using Base::emplaceKey; /// (Data & data, size_t row, Arena & pool) -> EmplaceResult
/// Find key into HashTable or HashMap. If Data is HashMap and key was found, returns ptr to value, otherwise nullptr.
using Base::findKey; /// (Data & data, size_t row, Arena & pool) -> FindResult
/// Get hash value of row.
using Base::getHash; /// (const Data & data, size_t row, Arena & pool) -> size_t
/// Is used for default implementation in HashMethodBase.
FieldType getKey(size_t row, Arena &) const { return unalignedLoad<FieldType>(vec + row * sizeof(FieldType)); }
/// Get StringRef from value which can be inserted into column.
static StringRef getValueRef(const Value & value)
{
return StringRef(reinterpret_cast<const char *>(&value.first), sizeof(value.first));
}
};
/// For the case when there is one string key.
template <typename Value, typename Mapped, bool place_string_to_arena = true, bool use_cache = true>
struct HashMethodString
: public columns_hashing_impl::HashMethodBase<HashMethodString<Value, Mapped, place_string_to_arena, use_cache>, Value, Mapped, use_cache>
{
using Self = HashMethodString<Value, Mapped, place_string_to_arena, use_cache>;
using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache>;
const IColumn::Offset * offsets;
const UInt8 * chars;
HashMethodString(const ColumnRawPtrs & key_columns, const Sizes & /*key_sizes*/, const HashMethodContextPtr &)
{
const IColumn & column = *key_columns[0];
const ColumnString & column_string = static_cast<const ColumnString &>(column);
offsets = column_string.getOffsets().data();
chars = column_string.getChars().data();
}
auto getKey(ssize_t row, Arena &) const
{
return StringRef(chars + offsets[row - 1], offsets[row] - offsets[row - 1] - 1);
}
static StringRef getValueRef(const Value & value) { return StringRef(value.first.data, value.first.size); }
protected:
friend class columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache>;
static ALWAYS_INLINE void onNewKey([[maybe_unused]] StringRef & key, [[maybe_unused]] Arena & pool)
{
if constexpr (place_string_to_arena)
{
if (key.size)
key.data = pool.insert(key.data, key.size);
}
}
};
/// For the case when there is one fixed-length string key.
template <typename Value, typename Mapped, bool place_string_to_arena = true, bool use_cache = true>
struct HashMethodFixedString
: public columns_hashing_impl::HashMethodBase<HashMethodFixedString<Value, Mapped, place_string_to_arena, use_cache>, Value, Mapped, use_cache>
{
using Self = HashMethodFixedString<Value, Mapped, place_string_to_arena, use_cache>;
using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache>;
size_t n;
const ColumnFixedString::Chars * chars;
HashMethodFixedString(const ColumnRawPtrs & key_columns, const Sizes & /*key_sizes*/, const HashMethodContextPtr &)
{
const IColumn & column = *key_columns[0];
const ColumnFixedString & column_string = static_cast<const ColumnFixedString &>(column);
n = column_string.getN();
chars = &column_string.getChars();
}
StringRef getKey(size_t row, Arena &) const { return StringRef(&(*chars)[row * n], n); }
static StringRef getValueRef(const Value & value) { return StringRef(value.first.data, value.first.size); }
protected:
friend class columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache>;
static ALWAYS_INLINE void onNewKey([[maybe_unused]] StringRef & key, [[maybe_unused]] Arena & pool)
{
if constexpr (place_string_to_arena)
key.data = pool.insert(key.data, key.size);
}
};
/// Cache stores dictionaries and saved_hash per dictionary key.
class LowCardinalityDictionaryCache : public HashMethodContext
{
public:
/// Will assume that dictionaries with same hash has the same keys.
/// Just in case, check that they have also the same size.
struct DictionaryKey
{
UInt128 hash;
UInt64 size;
bool operator== (const DictionaryKey & other) const { return hash == other.hash && size == other.size; }
};
struct DictionaryKeyHash
{
size_t operator()(const DictionaryKey & key) const
{
SipHash hash;
hash.update(key.hash.low);
hash.update(key.hash.high);
hash.update(key.size);
return hash.get64();
}
};
struct CachedValues
{
/// Store ptr to dictionary to be sure it won't be deleted.
ColumnPtr dictionary_holder;
/// Hashes for dictionary keys.
const UInt64 * saved_hash = nullptr;
};
using CachedValuesPtr = std::shared_ptr<CachedValues>;
explicit LowCardinalityDictionaryCache(const HashMethodContext::Settings & settings) : cache(settings.max_threads) {}
CachedValuesPtr get(const DictionaryKey & key) { return cache.get(key); }
void set(const DictionaryKey & key, const CachedValuesPtr & mapped) { cache.set(key, mapped); }
private:
using Cache = LRUCache<DictionaryKey, CachedValues, DictionaryKeyHash>;
Cache cache;
};
/// Single low cardinality column.
template <typename SingleColumnMethod, typename Mapped, bool use_cache>
struct HashMethodSingleLowCardinalityColumn : public SingleColumnMethod
{
using Base = SingleColumnMethod;
enum class VisitValue
{
Empty = 0,
Found = 1,
NotFound = 2,
};
static constexpr bool has_mapped = !std::is_same<Mapped, void>::value;
using EmplaceResult = columns_hashing_impl::EmplaceResultImpl<Mapped>;
using FindResult = columns_hashing_impl::FindResultImpl<Mapped>;
static HashMethodContextPtr createContext(const HashMethodContext::Settings & settings)
{
return std::make_shared<LowCardinalityDictionaryCache>(settings);
}
ColumnRawPtrs key_columns;
const IColumn * positions = nullptr;
size_t size_of_index_type = 0;
/// saved hash is from current column or from cache.
const UInt64 * saved_hash = nullptr;
/// Hold dictionary in case saved_hash is from cache to be sure it won't be deleted.
ColumnPtr dictionary_holder;
/// Cache AggregateDataPtr for current column in order to decrease the number of hash table usages.
columns_hashing_impl::MappedCache<Mapped> mapped_cache;
PaddedPODArray<VisitValue> visit_cache;
/// If initialized column is nullable.
bool is_nullable = false;
static const ColumnLowCardinality & getLowCardinalityColumn(const IColumn * low_cardinality_column)
{
auto column = typeid_cast<const ColumnLowCardinality *>(low_cardinality_column);
if (!column)
throw Exception("Invalid aggregation key type for HashMethodSingleLowCardinalityColumn method. "
"Excepted LowCardinality, got " + column->getName(), ErrorCodes::LOGICAL_ERROR);
return *column;
}
HashMethodSingleLowCardinalityColumn(
const ColumnRawPtrs & key_columns_low_cardinality, const Sizes & key_sizes, const HashMethodContextPtr & context)
: Base({getLowCardinalityColumn(key_columns_low_cardinality[0]).getDictionary().getNestedNotNullableColumn().get()}, key_sizes, context)
{
auto column = &getLowCardinalityColumn(key_columns_low_cardinality[0]);
if (!context)
throw Exception("Cache wasn't created for HashMethodSingleLowCardinalityColumn",
ErrorCodes::LOGICAL_ERROR);
LowCardinalityDictionaryCache * cache;
if constexpr (use_cache)
{
cache = typeid_cast<LowCardinalityDictionaryCache *>(context.get());
if (!cache)
{
const auto & cached_val = *context;
throw Exception("Invalid type for HashMethodSingleLowCardinalityColumn cache: "
+ demangle(typeid(cached_val).name()), ErrorCodes::LOGICAL_ERROR);
}
}
auto * dict = column->getDictionary().getNestedNotNullableColumn().get();
is_nullable = column->getDictionary().nestedColumnIsNullable();
key_columns = {dict};
bool is_shared_dict = column->isSharedDictionary();
typename LowCardinalityDictionaryCache::DictionaryKey dictionary_key;
typename LowCardinalityDictionaryCache::CachedValuesPtr cached_values;
if (is_shared_dict)
{
dictionary_key = {column->getDictionary().getHash(), dict->size()};
if constexpr (use_cache)
cached_values = cache->get(dictionary_key);
}
if (cached_values)
{
saved_hash = cached_values->saved_hash;
dictionary_holder = cached_values->dictionary_holder;
}
else
{
saved_hash = column->getDictionary().tryGetSavedHash();
dictionary_holder = column->getDictionaryPtr();
if constexpr (use_cache)
{
if (is_shared_dict)
{
cached_values = std::make_shared<typename LowCardinalityDictionaryCache::CachedValues>();
cached_values->saved_hash = saved_hash;
cached_values->dictionary_holder = dictionary_holder;
cache->set(dictionary_key, cached_values);
}
}
}
if constexpr (has_mapped)
mapped_cache.resize(key_columns[0]->size());
VisitValue empty(VisitValue::Empty);
visit_cache.assign(key_columns[0]->size(), empty);
size_of_index_type = column->getSizeOfIndexType();
positions = column->getIndexesPtr().get();
}
ALWAYS_INLINE size_t getIndexAt(size_t row) const
{
switch (size_of_index_type)
{
case sizeof(UInt8): return static_cast<const ColumnUInt8 *>(positions)->getElement(row);
case sizeof(UInt16): return static_cast<const ColumnUInt16 *>(positions)->getElement(row);
case sizeof(UInt32): return static_cast<const ColumnUInt32 *>(positions)->getElement(row);
case sizeof(UInt64): return static_cast<const ColumnUInt64 *>(positions)->getElement(row);
default: throw Exception("Unexpected size of index type for low cardinality column.", ErrorCodes::LOGICAL_ERROR);
}
}
/// Get the key from the key columns for insertion into the hash table.
ALWAYS_INLINE auto getKey(size_t row, Arena & pool) const
{
return Base::getKey(getIndexAt(row), pool);
}
template <typename Data>
ALWAYS_INLINE EmplaceResult emplaceKey(Data & data, size_t row_, Arena & pool)
{
size_t row = getIndexAt(row_);
if (is_nullable && row == 0)
{
visit_cache[row] = VisitValue::Found;
bool has_null_key = data.hasNullKeyData();
data.hasNullKeyData() = true;
if constexpr (has_mapped)
return EmplaceResult(data.getNullKeyData(), mapped_cache[0], !has_null_key);
else
return EmplaceResult(!has_null_key);
}
if (visit_cache[row] == VisitValue::Found)
{
if constexpr (has_mapped)
return EmplaceResult(mapped_cache[row], mapped_cache[row], false);
else
return EmplaceResult(false);
}
auto key = getKey(row_, pool);
bool inserted = false;
typename Data::iterator it;
if (saved_hash)
data.emplace(key, it, inserted, saved_hash[row]);
else
data.emplace(key, it, inserted);
visit_cache[row] = VisitValue::Found;
if (inserted)
{
if constexpr (has_mapped)
{
new(&it->second) Mapped();
Base::onNewKey(it->first, pool);
}
else
Base::onNewKey(*it, pool);
}
if constexpr (has_mapped)
return EmplaceResult(it->second, mapped_cache[row], inserted);
else
return EmplaceResult(inserted);
}
ALWAYS_INLINE bool isNullAt(size_t i)
{
if (!is_nullable)
return false;
return getIndexAt(i) == 0;
}
template <typename Data>
ALWAYS_INLINE FindResult findFromRow(Data & data, size_t row_, Arena & pool)
{
size_t row = getIndexAt(row_);
if (is_nullable && row == 0)
{
if constexpr (has_mapped)
return FindResult(data.hasNullKeyData() ? &data.getNullKeyData() : nullptr, data.hasNullKeyData());
else
return FindResult(data.hasNullKeyData());
}
if (visit_cache[row] != VisitValue::Empty)
{
if constexpr (has_mapped)
return FindResult(&mapped_cache[row], visit_cache[row] == VisitValue::Found);
else
return FindResult(visit_cache[row] == VisitValue::Found);
}
auto key = getKey(row_, pool);
typename Data::iterator it;
if (saved_hash)
it = data.find(key, saved_hash[row]);
else
it = data.find(key);
bool found = it != data.end();
visit_cache[row] = found ? VisitValue::Found : VisitValue::NotFound;
if constexpr (has_mapped)
{
if (found)
mapped_cache[row] = it->second;
}
if constexpr (has_mapped)
return FindResult(&mapped_cache[row], found);
else
return FindResult(found);
}
template <typename Data>
ALWAYS_INLINE size_t getHash(const Data & data, size_t row, Arena & pool)
{
row = getIndexAt(row);
if (saved_hash)
return saved_hash[row];
return Base::getHash(data, row, pool);
}
};
// Optional mask for low cardinality columns.
template <bool has_low_cardinality>
struct LowCardinalityKeys
{
ColumnRawPtrs nested_columns;
ColumnRawPtrs positions;
Sizes position_sizes;
};
template <>
struct LowCardinalityKeys<false> {};
/// For the case when all keys are of fixed length, and they fit in N (for example, 128) bits.
template <typename Value, typename Key, typename Mapped, bool has_nullable_keys_ = false, bool has_low_cardinality_ = false, bool use_cache = true>
struct HashMethodKeysFixed
: private columns_hashing_impl::BaseStateKeysFixed<Key, has_nullable_keys_>
, public columns_hashing_impl::HashMethodBase<HashMethodKeysFixed<Value, Key, Mapped, has_nullable_keys_, has_low_cardinality_, use_cache>, Value, Mapped, use_cache>
{
using Self = HashMethodKeysFixed<Value, Key, Mapped, has_nullable_keys_, has_low_cardinality_, use_cache>;
using BaseHashed = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache>;
using Base = columns_hashing_impl::BaseStateKeysFixed<Key, has_nullable_keys_>;
static constexpr bool has_nullable_keys = has_nullable_keys_;
static constexpr bool has_low_cardinality = has_low_cardinality_;
LowCardinalityKeys<has_low_cardinality> low_cardinality_keys;
Sizes key_sizes;
size_t keys_size;
HashMethodKeysFixed(const ColumnRawPtrs & key_columns, const Sizes & key_sizes, const HashMethodContextPtr &)
: Base(key_columns), key_sizes(std::move(key_sizes)), keys_size(key_columns.size())
{
if constexpr (has_low_cardinality)
{
low_cardinality_keys.nested_columns.resize(key_columns.size());
low_cardinality_keys.positions.assign(key_columns.size(), nullptr);
low_cardinality_keys.position_sizes.resize(key_columns.size());
for (size_t i = 0; i < key_columns.size(); ++i)
{
if (auto * low_cardinality_col = typeid_cast<const ColumnLowCardinality *>(key_columns[i]))
{
low_cardinality_keys.nested_columns[i] = low_cardinality_col->getDictionary().getNestedColumn().get();
low_cardinality_keys.positions[i] = &low_cardinality_col->getIndexes();
low_cardinality_keys.position_sizes[i] = low_cardinality_col->getSizeOfIndexType();
}
else
low_cardinality_keys.nested_columns[i] = key_columns[i];
}
}
}
ALWAYS_INLINE Key getKey(size_t row, Arena &) const
{
if constexpr (has_nullable_keys)
{
auto bitmap = Base::createBitmap(row);
return packFixed<Key>(row, keys_size, Base::getActualColumns(), key_sizes, bitmap);
}
else
{
if constexpr (has_low_cardinality)
return packFixed<Key, true>(row, keys_size, low_cardinality_keys.nested_columns, key_sizes,
&low_cardinality_keys.positions, &low_cardinality_keys.position_sizes);
return packFixed<Key>(row, keys_size, Base::getActualColumns(), key_sizes);
}
}
};
/** Hash by concatenating serialized key values.
* The serialized value differs in that it uniquely allows to deserialize it, having only the position with which it starts.
* That is, for example, for strings, it contains first the serialized length of the string, and then the bytes.
* Therefore, when aggregating by several strings, there is no ambiguity.
*/
template <typename Value, typename Mapped>
struct HashMethodSerialized
: public columns_hashing_impl::HashMethodBase<HashMethodSerialized<Value, Mapped>, Value, Mapped, false>
{
using Self = HashMethodSerialized<Value, Mapped>;
using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, false>;
ColumnRawPtrs key_columns;
size_t keys_size;
HashMethodSerialized(const ColumnRawPtrs & key_columns, const Sizes & /*key_sizes*/, const HashMethodContextPtr &)
: key_columns(key_columns), keys_size(key_columns.size()) {}
protected:
friend class columns_hashing_impl::HashMethodBase<Self, Value, Mapped, false>;
ALWAYS_INLINE StringRef getKey(size_t row, Arena & pool) const
{
return serializeKeysToPoolContiguous(row, keys_size, key_columns, pool);
}
static ALWAYS_INLINE void onExistingKey(StringRef & key, Arena & pool) { pool.rollback(key.size); }
};
/// For the case when there is one string key.
template <typename Value, typename Mapped, bool use_cache = true>
struct HashMethodHashed
: public columns_hashing_impl::HashMethodBase<HashMethodHashed<Value, Mapped, use_cache>, Value, Mapped, use_cache>
{
using Key = UInt128;
using Self = HashMethodHashed<Value, Mapped, use_cache>;
using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache>;
ColumnRawPtrs key_columns;
HashMethodHashed(ColumnRawPtrs key_columns, const Sizes &, const HashMethodContextPtr &)
: key_columns(std::move(key_columns)) {}
ALWAYS_INLINE Key getKey(size_t row, Arena &) const { return hash128(row, key_columns.size(), key_columns); }
static ALWAYS_INLINE StringRef getValueRef(const Value & value)
{
return StringRef(reinterpret_cast<const char *>(&value.first), sizeof(value.first));
}
};
}
}

View File

@ -0,0 +1,356 @@
#pragma once
#include <Columns/IColumn.h>
#include <Interpreters/AggregationCommon.h>
namespace DB
{
namespace ColumnsHashing
{
/// Generic context for HashMethod. Context is shared between multiple threads, all methods must be thread-safe.
/// Is used for caching.
class HashMethodContext
{
public:
virtual ~HashMethodContext() = default;
struct Settings
{
size_t max_threads;
};
};
using HashMethodContextPtr = std::shared_ptr<HashMethodContext>;
namespace columns_hashing_impl
{
template <typename Value, bool consecutive_keys_optimization_>
struct LastElementCache
{
static constexpr bool consecutive_keys_optimization = consecutive_keys_optimization_;
Value value;
bool empty = true;
bool found = false;
bool check(const Value & value_) { return !empty && value == value_; }
template <typename Key>
bool check(const Key & key) { return !empty && value.first == key; }
};
template <typename Data>
struct LastElementCache<Data, false>
{
static constexpr bool consecutive_keys_optimization = false;
};
template <typename Mapped>
class EmplaceResultImpl
{
Mapped & value;
Mapped & cached_value;
bool inserted;
public:
EmplaceResultImpl(Mapped & value, Mapped & cached_value, bool inserted)
: value(value), cached_value(cached_value), inserted(inserted) {}
bool isInserted() const { return inserted; }
auto & getMapped() const { return value; }
void setMapped(const Mapped & mapped)
{
cached_value = mapped;
value = mapped;
}
};
template <>
class EmplaceResultImpl<void>
{
bool inserted;
public:
explicit EmplaceResultImpl(bool inserted) : inserted(inserted) {}
bool isInserted() const { return inserted; }
};
template <typename Mapped>
class FindResultImpl
{
Mapped * value;
bool found;
public:
FindResultImpl(Mapped * value, bool found) : value(value), found(found) {}
bool isFound() const { return found; }
Mapped & getMapped() const { return *value; }
};
template <>
class FindResultImpl<void>
{
bool found;
public:
explicit FindResultImpl(bool found) : found(found) {}
bool isFound() const { return found; }
};
template <typename Derived, typename Value, typename Mapped, bool consecutive_keys_optimization>
class HashMethodBase
{
public:
using EmplaceResult = EmplaceResultImpl<Mapped>;
using FindResult = FindResultImpl<Mapped>;
static constexpr bool has_mapped = !std::is_same<Mapped, void>::value;
using Cache = LastElementCache<Value, consecutive_keys_optimization>;
static HashMethodContextPtr createContext(const HashMethodContext::Settings &) { return nullptr; }
template <typename Data>
ALWAYS_INLINE EmplaceResult emplaceKey(Data & data, size_t row, Arena & pool)
{
auto key = static_cast<Derived &>(*this).getKey(row, pool);
return emplaceKeyImpl(key, data, pool);
}
template <typename Data>
ALWAYS_INLINE FindResult findKey(Data & data, size_t row, Arena & pool)
{
auto key = static_cast<Derived &>(*this).getKey(row, pool);
auto res = findKeyImpl(key, data);
static_cast<Derived &>(*this).onExistingKey(key, pool);
return res;
}
template <typename Data>
ALWAYS_INLINE size_t getHash(const Data & data, size_t row, Arena & pool)
{
auto key = static_cast<Derived &>(*this).getKey(row, pool);
auto res = data.hash(key);
static_cast<Derived &>(*this).onExistingKey(key, pool);
return res;
}
protected:
Cache cache;
HashMethodBase()
{
if constexpr (consecutive_keys_optimization)
{
if constexpr (has_mapped)
{
/// Init PairNoInit elements.
cache.value.second = Mapped();
using Key = decltype(cache.value.first);
cache.value.first = Key();
}
else
cache.value = Value();
}
}
template <typename Key>
static ALWAYS_INLINE void onNewKey(Key & /*key*/, Arena & /*pool*/) {}
template <typename Key>
static ALWAYS_INLINE void onExistingKey(Key & /*key*/, Arena & /*pool*/) {}
template <typename Data, typename Key>
ALWAYS_INLINE EmplaceResult emplaceKeyImpl(Key key, Data & data, Arena & pool)
{
if constexpr (Cache::consecutive_keys_optimization)
{
if (cache.found && cache.check(key))
{
static_cast<Derived &>(*this).onExistingKey(key, pool);
if constexpr (has_mapped)
return EmplaceResult(cache.value.second, cache.value.second, false);
else
return EmplaceResult(false);
}
}
typename Data::iterator it;
bool inserted = false;
data.emplace(key, it, inserted);
[[maybe_unused]] Mapped * cached = nullptr;
if constexpr (has_mapped)
cached = &it->second;
if (inserted)
{
if constexpr (has_mapped)
{
new(&it->second) Mapped();
static_cast<Derived &>(*this).onNewKey(it->first, pool);
}
else
static_cast<Derived &>(*this).onNewKey(*it, pool);
}
else
static_cast<Derived &>(*this).onExistingKey(key, pool);
if constexpr (consecutive_keys_optimization)
{
cache.value = *it;
cache.found = true;
cache.empty = false;
if constexpr (has_mapped)
cached = &cache.value.second;
}
if constexpr (has_mapped)
return EmplaceResult(it->second, *cached, inserted);
else
return EmplaceResult(inserted);
}
template <typename Data, typename Key>
ALWAYS_INLINE FindResult findKeyImpl(Key key, Data & data)
{
if constexpr (Cache::consecutive_keys_optimization)
{
if (cache.check(key))
{
if constexpr (has_mapped)
return FindResult(&cache.value.second, cache.found);
else
return FindResult(cache.found);
}
}
auto it = data.find(key);
bool found = it != data.end();
if constexpr (consecutive_keys_optimization)
{
cache.found = found;
cache.empty = false;
if (found)
cache.value = *it;
else
{
if constexpr (has_mapped)
cache.value.first = key;
else
cache.value = key;
}
}
if constexpr (has_mapped)
return FindResult(found ? &it->second : nullptr, found);
else
return FindResult(found);
}
};
template <typename T>
struct MappedCache : public PaddedPODArray<T> {};
template <>
struct MappedCache<void> {};
/// This class is designed to provide the functionality that is required for
/// supporting nullable keys in HashMethodKeysFixed. If there are
/// no nullable keys, this class is merely implemented as an empty shell.
template <typename Key, bool has_nullable_keys>
class BaseStateKeysFixed;
/// Case where nullable keys are supported.
template <typename Key>
class BaseStateKeysFixed<Key, true>
{
protected:
BaseStateKeysFixed(const ColumnRawPtrs & key_columns)
{
null_maps.reserve(key_columns.size());
actual_columns.reserve(key_columns.size());
for (const auto & col : key_columns)
{
if (col->isColumnNullable())
{
const auto & nullable_col = static_cast<const ColumnNullable &>(*col);
actual_columns.push_back(&nullable_col.getNestedColumn());
null_maps.push_back(&nullable_col.getNullMapColumn());
}
else
{
actual_columns.push_back(col);
null_maps.push_back(nullptr);
}
}
}
/// Return the columns which actually contain the values of the keys.
/// For a given key column, if it is nullable, we return its nested
/// column. Otherwise we return the key column itself.
inline const ColumnRawPtrs & getActualColumns() const
{
return actual_columns;
}
/// Create a bitmap that indicates whether, for a particular row,
/// a key column bears a null value or not.
KeysNullMap<Key> createBitmap(size_t row) const
{
KeysNullMap<Key> bitmap{};
for (size_t k = 0; k < null_maps.size(); ++k)
{
if (null_maps[k] != nullptr)
{
const auto & null_map = static_cast<const ColumnUInt8 &>(*null_maps[k]).getData();
if (null_map[row] == 1)
{
size_t bucket = k / 8;
size_t offset = k % 8;
bitmap[bucket] |= UInt8(1) << offset;
}
}
}
return bitmap;
}
private:
ColumnRawPtrs actual_columns;
ColumnRawPtrs null_maps;
};
/// Case where nullable keys are not supported.
template <typename Key>
class BaseStateKeysFixed<Key, false>
{
protected:
BaseStateKeysFixed(const ColumnRawPtrs & columns) : actual_columns(columns) {}
const ColumnRawPtrs & getActualColumns() const { return actual_columns; }
KeysNullMap<Key> createBitmap(size_t) const
{
throw Exception{"Internal error: calling createBitmap() for non-nullable keys"
" is forbidden", ErrorCodes::LOGICAL_ERROR};
}
private:
ColumnRawPtrs actual_columns;
};
}
}
}

View File

@ -17,6 +17,7 @@
#include <common/unaligned.h> #include <common/unaligned.h>
#include <string> #include <string>
#include <type_traits> #include <type_traits>
#include <Core/Defines.h>
#define ROTL(x, b) static_cast<UInt64>(((x) << (b)) | ((x) >> (64 - (b)))) #define ROTL(x, b) static_cast<UInt64>(((x) << (b)) | ((x) >> (64 - (b))))
@ -49,7 +50,7 @@ private:
UInt8 current_bytes[8]; UInt8 current_bytes[8];
}; };
void finalize() ALWAYS_INLINE void finalize()
{ {
/// In the last free byte, we write the remainder of the division by 256. /// In the last free byte, we write the remainder of the division by 256.
current_bytes[7] = cnt; current_bytes[7] = cnt;
@ -156,7 +157,7 @@ public:
/// template for avoiding 'unsigned long long' vs 'unsigned long' problem on old poco in macos /// template for avoiding 'unsigned long long' vs 'unsigned long' problem on old poco in macos
template <typename T> template <typename T>
void get128(T & lo, T & hi) ALWAYS_INLINE void get128(T & lo, T & hi)
{ {
static_assert(sizeof(T) == 8); static_assert(sizeof(T) == 8);
finalize(); finalize();

View File

@ -85,24 +85,15 @@ void DistinctBlockInputStream::buildFilter(
size_t rows, size_t rows,
SetVariants & variants) const SetVariants & variants) const
{ {
typename Method::State state; typename Method::State state(columns, key_sizes, nullptr);
state.init(columns);
for (size_t i = 0; i < rows; ++i) for (size_t i = 0; i < rows; ++i)
{ {
/// Make a key. auto emplace_result = state.emplaceKey(method.data, i, variants.string_pool);
typename Method::Key key = state.getKey(columns, columns.size(), i, key_sizes);
typename Method::Data::iterator it;
bool inserted;
method.data.emplace(key, it, inserted);
if (inserted)
method.onNewKey(*it, columns.size(), variants.string_pool);
/// Emit the record if there is no such key in the current set yet. /// Emit the record if there is no such key in the current set yet.
/// Skip it otherwise. /// Skip it otherwise.
filter[i] = inserted; filter[i] = emplace_result.isInserted();
} }
} }

View File

@ -85,8 +85,7 @@ bool DistinctSortedBlockInputStream::buildFilter(
size_t rows, size_t rows,
ClearableSetVariants & variants) const ClearableSetVariants & variants) const
{ {
typename Method::State state; typename Method::State state(columns, key_sizes, nullptr);
state.init(columns);
/// Compare last row of previous block and first row of current block, /// Compare last row of previous block and first row of current block,
/// If rows not equal, we can clear HashSet, /// If rows not equal, we can clear HashSet,
@ -106,21 +105,14 @@ bool DistinctSortedBlockInputStream::buildFilter(
if (i > 0 && !clearing_hint_columns.empty() && !rowsEqual(clearing_hint_columns, i, clearing_hint_columns, i - 1)) if (i > 0 && !clearing_hint_columns.empty() && !rowsEqual(clearing_hint_columns, i, clearing_hint_columns, i - 1))
method.data.clear(); method.data.clear();
/// Make a key. auto emplace_result = state.emplaceKey(method.data, i, variants.string_pool);
typename Method::Key key = state.getKey(columns, columns.size(), i, key_sizes);
typename Method::Data::iterator it = method.data.find(key);
bool inserted;
method.data.emplace(key, it, inserted);
if (inserted) if (emplace_result.isInserted())
{
method.onNewKey(*it, columns.size(), variants.string_pool);
has_new_data = true; has_new_data = true;
}
/// Emit the record if there is no such key in the current set yet. /// Emit the record if there is no such key in the current set yet.
/// Skip it otherwise. /// Skip it otherwise.
filter[i] = inserted; filter[i] = emplace_result.isInserted();
} }
return has_new_data; return has_new_data;
} }

View File

@ -8,6 +8,7 @@
#include <Columns/ColumnString.h> #include <Columns/ColumnString.h>
#include <Interpreters/AggregationCommon.h> #include <Interpreters/AggregationCommon.h>
#include <Common/HashTable/ClearableHashMap.h> #include <Common/HashTable/ClearableHashMap.h>
#include <Common/ColumnsHashing.h>
namespace DB namespace DB
@ -60,11 +61,56 @@ private:
/// Initially allocate a piece of memory for 512 elements. NOTE: This is just a guess. /// Initially allocate a piece of memory for 512 elements. NOTE: This is just a guess.
static constexpr size_t INITIAL_SIZE_DEGREE = 9; static constexpr size_t INITIAL_SIZE_DEGREE = 9;
template <typename T>
struct MethodOneNumber
{
using Set = ClearableHashMap<T, UInt32, DefaultHash<T>, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(T)>>;
using Method = ColumnsHashing::HashMethodOneNumber<typename Set::value_type, UInt32, T, false>;
};
struct MethodString
{
using Set = ClearableHashMap<StringRef, UInt32, StringRefHash, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(StringRef)>>;
using Method = ColumnsHashing::HashMethodString<typename Set::value_type, UInt32, false, false>;
};
struct MethodFixedString
{
using Set = ClearableHashMap<StringRef, UInt32, StringRefHash, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(StringRef)>>;
using Method = ColumnsHashing::HashMethodFixedString<typename Set::value_type, UInt32, false, false>;
};
struct MethodFixed
{
using Set = ClearableHashMap<UInt128, UInt32, UInt128HashCRC32, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(UInt128)>>;
using Method = ColumnsHashing::HashMethodKeysFixed<typename Set::value_type, UInt128, UInt32, false, false, false>;
};
struct MethodHashed
{
using Set = ClearableHashMap<UInt128, UInt32, UInt128TrivialHash, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(UInt128)>>;
using Method = ColumnsHashing::HashMethodHashed<typename Set::value_type, UInt32, false>;
};
template <typename Method>
void executeMethod(const ColumnArray::Offsets & offsets, const ColumnRawPtrs & columns, const Sizes & key_sizes,
const NullMap * null_map, ColumnUInt32::Container & res_values);
template <typename Method, bool has_null_map>
void executeMethodImpl(const ColumnArray::Offsets & offsets, const ColumnRawPtrs & columns, const Sizes & key_sizes,
const NullMap * null_map, ColumnUInt32::Container & res_values);
template <typename T> template <typename T>
bool executeNumber(const ColumnArray::Offsets & offsets, const IColumn & data, const NullMap * null_map, ColumnUInt32::Container & res_values); bool executeNumber(const ColumnArray::Offsets & offsets, const IColumn & data, const NullMap * null_map, ColumnUInt32::Container & res_values);
bool executeString(const ColumnArray::Offsets & offsets, const IColumn & data, const NullMap * null_map, ColumnUInt32::Container & res_values); bool executeString(const ColumnArray::Offsets & offsets, const IColumn & data, const NullMap * null_map, ColumnUInt32::Container & res_values);
bool executeFixedString(const ColumnArray::Offsets & offsets, const IColumn & data, const NullMap * null_map, ColumnUInt32::Container & res_values);
bool execute128bit(const ColumnArray::Offsets & offsets, const ColumnRawPtrs & columns, ColumnUInt32::Container & res_values); bool execute128bit(const ColumnArray::Offsets & offsets, const ColumnRawPtrs & columns, ColumnUInt32::Container & res_values);
bool executeHashed(const ColumnArray::Offsets & offsets, const ColumnRawPtrs & columns, ColumnUInt32::Container & res_values); void executeHashed(const ColumnArray::Offsets & offsets, const ColumnRawPtrs & columns, ColumnUInt32::Container & res_values);
}; };
@ -131,7 +177,7 @@ void FunctionArrayEnumerateExtended<Derived>::executeImpl(Block & block, const C
if (num_arguments == 1) if (num_arguments == 1)
{ {
executeNumber<UInt8>(*offsets, *data_columns[0], null_map, res_values) if (!(executeNumber<UInt8>(*offsets, *data_columns[0], null_map, res_values)
|| executeNumber<UInt16>(*offsets, *data_columns[0], null_map, res_values) || executeNumber<UInt16>(*offsets, *data_columns[0], null_map, res_values)
|| executeNumber<UInt32>(*offsets, *data_columns[0], null_map, res_values) || executeNumber<UInt32>(*offsets, *data_columns[0], null_map, res_values)
|| executeNumber<UInt64>(*offsets, *data_columns[0], null_map, res_values) || executeNumber<UInt64>(*offsets, *data_columns[0], null_map, res_values)
@ -142,47 +188,56 @@ void FunctionArrayEnumerateExtended<Derived>::executeImpl(Block & block, const C
|| executeNumber<Float32>(*offsets, *data_columns[0], null_map, res_values) || executeNumber<Float32>(*offsets, *data_columns[0], null_map, res_values)
|| executeNumber<Float64>(*offsets, *data_columns[0], null_map, res_values) || executeNumber<Float64>(*offsets, *data_columns[0], null_map, res_values)
|| executeString(*offsets, *data_columns[0], null_map, res_values) || executeString(*offsets, *data_columns[0], null_map, res_values)
|| executeHashed(*offsets, data_columns, res_values); || executeFixedString(*offsets, *data_columns[0], null_map, res_values)))
executeHashed(*offsets, data_columns, res_values);
} }
else else
{ {
execute128bit(*offsets, data_columns, res_values) if (!execute128bit(*offsets, data_columns, res_values))
|| executeHashed(*offsets, data_columns, res_values); executeHashed(*offsets, data_columns, res_values);
} }
block.getByPosition(result).column = ColumnArray::create(std::move(res_nested), offsets_column); block.getByPosition(result).column = ColumnArray::create(std::move(res_nested), offsets_column);
} }
template <typename Derived> template <typename Derived>
template <typename T> template <typename Method, bool has_null_map>
bool FunctionArrayEnumerateExtended<Derived>::executeNumber( void FunctionArrayEnumerateExtended<Derived>::executeMethodImpl(
const ColumnArray::Offsets & offsets, const IColumn & data, const NullMap * null_map, ColumnUInt32::Container & res_values) const ColumnArray::Offsets & offsets,
const ColumnRawPtrs & columns,
const Sizes & key_sizes,
[[maybe_unused]] const NullMap * null_map,
ColumnUInt32::Container & res_values)
{ {
const ColumnVector<T> * data_concrete = checkAndGetColumn<ColumnVector<T>>(&data); typename Method::Set indices;
if (!data_concrete) typename Method::Method method(columns, key_sizes, nullptr);
return false; Arena pool; /// Won't use it;
const auto & values = data_concrete->getData();
using ValuesToIndices = ClearableHashMap<T, UInt32, DefaultHash<T>, HashTableGrower<INITIAL_SIZE_DEGREE>, ColumnArray::Offset prev_off = 0;
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(T)>>;
ValuesToIndices indices;
size_t prev_off = 0;
if constexpr (std::is_same_v<Derived, FunctionArrayEnumerateUniq>) if constexpr (std::is_same_v<Derived, FunctionArrayEnumerateUniq>)
{ {
// Unique // Unique
for (size_t i = 0; i < offsets.size(); ++i) for (size_t off : offsets)
{ {
indices.clear(); indices.clear();
UInt32 null_count = 0; UInt32 null_count = 0;
size_t off = offsets[i];
for (size_t j = prev_off; j < off; ++j) for (size_t j = prev_off; j < off; ++j)
{ {
if (null_map && (*null_map)[j]) if constexpr (has_null_map)
{
if ((*null_map)[j])
{
res_values[j] = ++null_count; res_values[j] = ++null_count;
else continue;
res_values[j] = ++indices[values[j]]; }
}
auto emplace_result = method.emplaceKey(indices, j, pool);
auto idx = emplace_result.getMapped() + 1;
emplace_result.setMapped(idx);
res_values[j] = idx;
} }
prev_off = off; prev_off = off;
} }
@ -190,31 +245,67 @@ bool FunctionArrayEnumerateExtended<Derived>::executeNumber(
else else
{ {
// Dense // Dense
for (size_t i = 0; i < offsets.size(); ++i) for (size_t off : offsets)
{ {
indices.clear(); indices.clear();
size_t rank = 0; UInt32 rank = 0;
UInt32 null_index = 0; [[maybe_unused]] UInt32 null_index = 0;
size_t off = offsets[i];
for (size_t j = prev_off; j < off; ++j) for (size_t j = prev_off; j < off; ++j)
{ {
if (null_map && (*null_map)[j]) if constexpr (has_null_map)
{
if ((*null_map)[j])
{ {
if (!null_index) if (!null_index)
null_index = ++rank; null_index = ++rank;
res_values[j] = null_index; res_values[j] = null_index;
continue;
} }
else }
{
auto & idx = indices[values[j]]; auto emplace_result = method.emplaceKey(indices, j, pool);
auto idx = emplace_result.getMapped();
if (!idx) if (!idx)
{
idx = ++rank; idx = ++rank;
res_values[j] = idx; emplace_result.setMapped(idx);
} }
res_values[j] = idx;
} }
prev_off = off; prev_off = off;
} }
} }
}
template <typename Derived>
template <typename Method>
void FunctionArrayEnumerateExtended<Derived>::executeMethod(
const ColumnArray::Offsets & offsets,
const ColumnRawPtrs & columns,
const Sizes & key_sizes,
const NullMap * null_map,
ColumnUInt32::Container & res_values)
{
if (null_map)
executeMethodImpl<Method, true>(offsets, columns, key_sizes, null_map, res_values);
else
executeMethodImpl<Method, false>(offsets, columns, key_sizes, null_map, res_values);
}
template <typename Derived>
template <typename T>
bool FunctionArrayEnumerateExtended<Derived>::executeNumber(
const ColumnArray::Offsets & offsets, const IColumn & data, const NullMap * null_map, ColumnUInt32::Container & res_values)
{
const auto * nested = checkAndGetColumn<ColumnVector<T>>(&data);
if (!nested)
return false;
executeMethod<MethodOneNumber<T>>(offsets, {nested}, {}, null_map, res_values);
return true; return true;
} }
@ -222,62 +313,22 @@ template <typename Derived>
bool FunctionArrayEnumerateExtended<Derived>::executeString( bool FunctionArrayEnumerateExtended<Derived>::executeString(
const ColumnArray::Offsets & offsets, const IColumn & data, const NullMap * null_map, ColumnUInt32::Container & res_values) const ColumnArray::Offsets & offsets, const IColumn & data, const NullMap * null_map, ColumnUInt32::Container & res_values)
{ {
const ColumnString * values = checkAndGetColumn<ColumnString>(&data); const auto * nested = checkAndGetColumn<ColumnString>(&data);
if (!values) if (nested)
return false; executeMethod<MethodString>(offsets, {nested}, {}, null_map, res_values);
size_t prev_off = 0; return nested;
using ValuesToIndices = ClearableHashMap<StringRef, UInt32, StringRefHash, HashTableGrower<INITIAL_SIZE_DEGREE>, }
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(StringRef)>>;
ValuesToIndices indices; template <typename Derived>
if constexpr (std::is_same_v<Derived, FunctionArrayEnumerateUniq>) bool FunctionArrayEnumerateExtended<Derived>::executeFixedString(
const ColumnArray::Offsets & offsets, const IColumn & data, const NullMap * null_map, ColumnUInt32::Container & res_values)
{ {
// Unique const auto * nested = checkAndGetColumn<ColumnString>(&data);
for (size_t i = 0; i < offsets.size(); ++i) if (nested)
{ executeMethod<MethodFixedString>(offsets, {nested}, {}, null_map, res_values);
indices.clear();
UInt32 null_count = 0; return nested;
size_t off = offsets[i];
for (size_t j = prev_off; j < off; ++j)
{
if (null_map && (*null_map)[j])
res_values[j] = ++null_count;
else
res_values[j] = ++indices[values->getDataAt(j)];
}
prev_off = off;
}
}
else
{
// Dense
for (size_t i = 0; i < offsets.size(); ++i)
{
indices.clear();
size_t rank = 0;
UInt32 null_index = 0;
size_t off = offsets[i];
for (size_t j = prev_off; j < off; ++j)
{
if (null_map && (*null_map)[j])
{
if (!null_index)
null_index = ++rank;
res_values[j] = null_index;
}
else
{
auto & idx = indices[values->getDataAt(j)];
if (!idx)
idx = ++rank;
res_values[j] = idx;
}
}
prev_off = off;
}
}
return true;
} }
template <typename Derived> template <typename Derived>
@ -298,95 +349,17 @@ bool FunctionArrayEnumerateExtended<Derived>::execute128bit(
keys_bytes += key_sizes[j]; keys_bytes += key_sizes[j];
} }
if (keys_bytes > 16) executeMethod<MethodFixed>(offsets, columns, key_sizes, nullptr, res_values);
return false;
using ValuesToIndices = ClearableHashMap<UInt128, UInt32, UInt128HashCRC32, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(UInt128)>>;
ValuesToIndices indices;
size_t prev_off = 0;
if constexpr (std::is_same_v<Derived, FunctionArrayEnumerateUniq>)
{
// Unique
for (size_t i = 0; i < offsets.size(); ++i)
{
indices.clear();
size_t off = offsets[i];
for (size_t j = prev_off; j < off; ++j)
res_values[j] = ++indices[packFixed<UInt128>(j, count, columns, key_sizes)];
prev_off = off;
}
}
else
{
// Dense
for (size_t i = 0; i < offsets.size(); ++i)
{
indices.clear();
size_t off = offsets[i];
size_t rank = 0;
for (size_t j = prev_off; j < off; ++j)
{
auto & idx = indices[packFixed<UInt128>(j, count, columns, key_sizes)];
if (!idx)
idx = ++rank;
res_values[j] = idx;
}
prev_off = off;
}
}
return true; return true;
} }
template <typename Derived> template <typename Derived>
bool FunctionArrayEnumerateExtended<Derived>::executeHashed( void FunctionArrayEnumerateExtended<Derived>::executeHashed(
const ColumnArray::Offsets & offsets, const ColumnArray::Offsets & offsets,
const ColumnRawPtrs & columns, const ColumnRawPtrs & columns,
ColumnUInt32::Container & res_values) ColumnUInt32::Container & res_values)
{ {
size_t count = columns.size(); executeMethod<MethodHashed>(offsets, columns, {}, nullptr, res_values);
using ValuesToIndices = ClearableHashMap<UInt128, UInt32, UInt128TrivialHash, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(UInt128)>>;
ValuesToIndices indices;
size_t prev_off = 0;
if constexpr (std::is_same_v<Derived, FunctionArrayEnumerateUniq>)
{
// Unique
for (size_t i = 0; i < offsets.size(); ++i)
{
indices.clear();
size_t off = offsets[i];
for (size_t j = prev_off; j < off; ++j)
{
res_values[j] = ++indices[hash128(j, count, columns)];
}
prev_off = off;
}
}
else
{
// Dense
for (size_t i = 0; i < offsets.size(); ++i)
{
indices.clear();
size_t off = offsets[i];
size_t rank = 0;
for (size_t j = prev_off; j < off; ++j)
{
auto & idx = indices[hash128(j, count, columns)];
if (!idx)
idx = ++rank;
res_values[j] = idx;
}
prev_off = off;
}
}
return true;
} }
} }

View File

@ -10,6 +10,7 @@
#include <Common/HashTable/ClearableHashSet.h> #include <Common/HashTable/ClearableHashSet.h>
#include <Interpreters/AggregationCommon.h> #include <Interpreters/AggregationCommon.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <Common/ColumnsHashing.h>
namespace DB namespace DB
@ -62,11 +63,56 @@ private:
/// Initially allocate a piece of memory for 512 elements. NOTE: This is just a guess. /// Initially allocate a piece of memory for 512 elements. NOTE: This is just a guess.
static constexpr size_t INITIAL_SIZE_DEGREE = 9; static constexpr size_t INITIAL_SIZE_DEGREE = 9;
template <typename T>
struct MethodOneNumber
{
using Set = ClearableHashSet<T, DefaultHash<T>, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(T)>>;
using Method = ColumnsHashing::HashMethodOneNumber<typename Set::value_type, void, T, false>;
};
struct MethodString
{
using Set = ClearableHashSet<StringRef, StringRefHash, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(StringRef)>>;
using Method = ColumnsHashing::HashMethodString<typename Set::value_type, void, false, false>;
};
struct MethodFixedString
{
using Set = ClearableHashSet<StringRef, StringRefHash, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(StringRef)>>;
using Method = ColumnsHashing::HashMethodFixedString<typename Set::value_type, void, false, false>;
};
struct MethodFixed
{
using Set = ClearableHashSet<UInt128, UInt128HashCRC32, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(UInt128)>>;
using Method = ColumnsHashing::HashMethodKeysFixed<typename Set::value_type, UInt128, void, false, false, false>;
};
struct MethodHashed
{
using Set = ClearableHashSet<UInt128, UInt128TrivialHash, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(UInt128)>>;
using Method = ColumnsHashing::HashMethodHashed<typename Set::value_type, void, false>;
};
template <typename Method>
void executeMethod(const ColumnArray::Offsets & offsets, const ColumnRawPtrs & columns, const Sizes & key_sizes,
const NullMap * null_map, ColumnUInt32::Container & res_values);
template <typename Method, bool has_null_map>
void executeMethodImpl(const ColumnArray::Offsets & offsets, const ColumnRawPtrs & columns, const Sizes & key_sizes,
const NullMap * null_map, ColumnUInt32::Container & res_values);
template <typename T> template <typename T>
bool executeNumber(const ColumnArray::Offsets & offsets, const IColumn & data, const NullMap * null_map, ColumnUInt32::Container & res_values); bool executeNumber(const ColumnArray::Offsets & offsets, const IColumn & data, const NullMap * null_map, ColumnUInt32::Container & res_values);
bool executeString(const ColumnArray::Offsets & offsets, const IColumn & data, const NullMap * null_map, ColumnUInt32::Container & res_values); bool executeString(const ColumnArray::Offsets & offsets, const IColumn & data, const NullMap * null_map, ColumnUInt32::Container & res_values);
bool executeFixedString(const ColumnArray::Offsets & offsets, const IColumn & data, const NullMap * null_map, ColumnUInt32::Container & res_values);
bool execute128bit(const ColumnArray::Offsets & offsets, const ColumnRawPtrs & columns, ColumnUInt32::Container & res_values); bool execute128bit(const ColumnArray::Offsets & offsets, const ColumnRawPtrs & columns, ColumnUInt32::Container & res_values);
bool executeHashed(const ColumnArray::Offsets & offsets, const ColumnRawPtrs & columns, ColumnUInt32::Container & res_values); void executeHashed(const ColumnArray::Offsets & offsets, const ColumnRawPtrs & columns, ColumnUInt32::Container & res_values);
}; };
@ -126,7 +172,7 @@ void FunctionArrayUniq::executeImpl(Block & block, const ColumnNumbers & argumen
if (num_arguments == 1) if (num_arguments == 1)
{ {
executeNumber<UInt8>(*offsets, *data_columns[0], null_map, res_values) if (!(executeNumber<UInt8>(*offsets, *data_columns[0], null_map, res_values)
|| executeNumber<UInt16>(*offsets, *data_columns[0], null_map, res_values) || executeNumber<UInt16>(*offsets, *data_columns[0], null_map, res_values)
|| executeNumber<UInt32>(*offsets, *data_columns[0], null_map, res_values) || executeNumber<UInt32>(*offsets, *data_columns[0], null_map, res_values)
|| executeNumber<UInt64>(*offsets, *data_columns[0], null_map, res_values) || executeNumber<UInt64>(*offsets, *data_columns[0], null_map, res_values)
@ -136,30 +182,31 @@ void FunctionArrayUniq::executeImpl(Block & block, const ColumnNumbers & argumen
|| executeNumber<Int64>(*offsets, *data_columns[0], null_map, res_values) || executeNumber<Int64>(*offsets, *data_columns[0], null_map, res_values)
|| executeNumber<Float32>(*offsets, *data_columns[0], null_map, res_values) || executeNumber<Float32>(*offsets, *data_columns[0], null_map, res_values)
|| executeNumber<Float64>(*offsets, *data_columns[0], null_map, res_values) || executeNumber<Float64>(*offsets, *data_columns[0], null_map, res_values)
|| executeString(*offsets, *data_columns[0], null_map, res_values) || executeFixedString(*offsets, *data_columns[0], null_map, res_values)
|| executeHashed(*offsets, data_columns, res_values); || executeString(*offsets, *data_columns[0], null_map, res_values)))
executeHashed(*offsets, data_columns, res_values);
} }
else else
{ {
execute128bit(*offsets, data_columns, res_values) if (!execute128bit(*offsets, data_columns, res_values))
|| executeHashed(*offsets, data_columns, res_values); executeHashed(*offsets, data_columns, res_values);
} }
block.getByPosition(result).column = std::move(res); block.getByPosition(result).column = std::move(res);
} }
template <typename T> template <typename Method, bool has_null_map>
bool FunctionArrayUniq::executeNumber(const ColumnArray::Offsets & offsets, const IColumn & data, const NullMap * null_map, ColumnUInt32::Container & res_values) void FunctionArrayUniq::executeMethodImpl(
const ColumnArray::Offsets & offsets,
const ColumnRawPtrs & columns,
const Sizes & key_sizes,
[[maybe_unused]] const NullMap * null_map,
ColumnUInt32::Container & res_values)
{ {
const ColumnVector<T> * nested = checkAndGetColumn<ColumnVector<T>>(&data); typename Method::Set set;
if (!nested) typename Method::Method method(columns, key_sizes, nullptr);
return false; Arena pool; /// Won't use it;
const auto & values = nested->getData();
using Set = ClearableHashSet<T, DefaultHash<T>, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(T)>>;
Set set;
ColumnArray::Offset prev_off = 0; ColumnArray::Offset prev_off = 0;
for (size_t i = 0; i < offsets.size(); ++i) for (size_t i = 0; i < offsets.size(); ++i)
{ {
@ -168,48 +215,66 @@ bool FunctionArrayUniq::executeNumber(const ColumnArray::Offsets & offsets, cons
ColumnArray::Offset off = offsets[i]; ColumnArray::Offset off = offsets[i];
for (ColumnArray::Offset j = prev_off; j < off; ++j) for (ColumnArray::Offset j = prev_off; j < off; ++j)
{ {
if (null_map && (*null_map)[j]) if constexpr (has_null_map)
{
if ((*null_map)[j])
{
found_null = true; found_null = true;
else continue;
set.insert(values[j]); }
}
method.emplaceKey(set, j, pool);
} }
res_values[i] = set.size() + found_null; res_values[i] = set.size() + found_null;
prev_off = off; prev_off = off;
} }
}
template <typename Method>
void FunctionArrayUniq::executeMethod(
const ColumnArray::Offsets & offsets,
const ColumnRawPtrs & columns,
const Sizes & key_sizes,
const NullMap * null_map,
ColumnUInt32::Container & res_values)
{
if (null_map)
executeMethodImpl<Method, true>(offsets, columns, key_sizes, null_map, res_values);
else
executeMethodImpl<Method, false>(offsets, columns, key_sizes, null_map, res_values);
}
template <typename T>
bool FunctionArrayUniq::executeNumber(const ColumnArray::Offsets & offsets, const IColumn & data, const NullMap * null_map, ColumnUInt32::Container & res_values)
{
const auto * nested = checkAndGetColumn<ColumnVector<T>>(&data);
if (!nested)
return false;
executeMethod<MethodOneNumber<T>>(offsets, {nested}, {}, null_map, res_values);
return true; return true;
} }
bool FunctionArrayUniq::executeString(const ColumnArray::Offsets & offsets, const IColumn & data, const NullMap * null_map, ColumnUInt32::Container & res_values) bool FunctionArrayUniq::executeString(const ColumnArray::Offsets & offsets, const IColumn & data, const NullMap * null_map, ColumnUInt32::Container & res_values)
{ {
const ColumnString * nested = checkAndGetColumn<ColumnString>(&data); const auto * nested = checkAndGetColumn<ColumnString>(&data);
if (!nested) if (nested)
return false; executeMethod<MethodString>(offsets, {nested}, {}, null_map, res_values);
using Set = ClearableHashSet<StringRef, StringRefHash, HashTableGrower<INITIAL_SIZE_DEGREE>, return nested;
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(StringRef)>>; }
Set set; bool FunctionArrayUniq::executeFixedString(const ColumnArray::Offsets & offsets, const IColumn & data, const NullMap * null_map, ColumnUInt32::Container & res_values)
ColumnArray::Offset prev_off = 0;
for (size_t i = 0; i < offsets.size(); ++i)
{ {
set.clear(); const auto * nested = checkAndGetColumn<ColumnFixedString>(&data);
bool found_null = false; if (nested)
ColumnArray::Offset off = offsets[i]; executeMethod<MethodFixedString>(offsets, {nested}, {}, null_map, res_values);
for (ColumnArray::Offset j = prev_off; j < off; ++j)
{
if (null_map && (*null_map)[j])
found_null = true;
else
set.insert(nested->getDataAt(j));
}
res_values[i] = set.size() + found_null; return nested;
prev_off = off;
} }
return true;
}
bool FunctionArrayUniq::execute128bit( bool FunctionArrayUniq::execute128bit(
const ColumnArray::Offsets & offsets, const ColumnArray::Offsets & offsets,
@ -231,49 +296,16 @@ bool FunctionArrayUniq::execute128bit(
if (keys_bytes > 16) if (keys_bytes > 16)
return false; return false;
using Set = ClearableHashSet<UInt128, UInt128HashCRC32, HashTableGrower<INITIAL_SIZE_DEGREE>, executeMethod<MethodFixed>(offsets, columns, key_sizes, nullptr, res_values);
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(UInt128)>>;
Set set;
ColumnArray::Offset prev_off = 0;
for (ColumnArray::Offset i = 0; i < offsets.size(); ++i)
{
set.clear();
ColumnArray::Offset off = offsets[i];
for (ColumnArray::Offset j = prev_off; j < off; ++j)
set.insert(packFixed<UInt128>(j, count, columns, key_sizes));
res_values[i] = set.size();
prev_off = off;
}
return true; return true;
} }
bool FunctionArrayUniq::executeHashed( void FunctionArrayUniq::executeHashed(
const ColumnArray::Offsets & offsets, const ColumnArray::Offsets & offsets,
const ColumnRawPtrs & columns, const ColumnRawPtrs & columns,
ColumnUInt32::Container & res_values) ColumnUInt32::Container & res_values)
{ {
size_t count = columns.size(); executeMethod<MethodHashed>(offsets, columns, {}, nullptr, res_values);
using Set = ClearableHashSet<UInt128, UInt128TrivialHash, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(UInt128)>>;
Set set;
ColumnArray::Offset prev_off = 0;
for (ColumnArray::Offset i = 0; i < offsets.size(); ++i)
{
set.clear();
ColumnArray::Offset off = offsets[i];
for (ColumnArray::Offset j = prev_off; j < off; ++j)
set.insert(hash128(j, count, columns));
res_values[i] = set.size();
prev_off = off;
}
return true;
} }

View File

@ -6,9 +6,11 @@
#include <Common/setThreadName.h> #include <Common/setThreadName.h>
#include <DataTypes/DataTypeAggregateFunction.h> #include <DataTypes/DataTypeAggregateFunction.h>
#include <DataTypes/DataTypeNullable.h> #include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Columns/ColumnsNumber.h> #include <Columns/ColumnsNumber.h>
#include <Columns/ColumnArray.h> #include <Columns/ColumnArray.h>
#include <Columns/ColumnTuple.h> #include <Columns/ColumnTuple.h>
#include <Columns/ColumnLowCardinality.h>
#include <AggregateFunctions/AggregateFunctionCount.h> #include <AggregateFunctions/AggregateFunctionCount.h>
#include <DataStreams/IBlockInputStream.h> #include <DataStreams/IBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h> #include <DataStreams/NativeBlockOutputStream.h>
@ -22,11 +24,9 @@
#include <Common/CurrentThread.h> #include <Common/CurrentThread.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <common/demangle.h> #include <common/demangle.h>
#if __has_include(<Interpreters/config_compile.h>) #if __has_include(<Interpreters/config_compile.h>)
#include <Interpreters/config_compile.h> #include <Interpreters/config_compile.h>
#include <Columns/ColumnLowCardinality.h>
#include <DataTypes/DataTypeLowCardinality.h>
#endif #endif
@ -188,7 +188,7 @@ Aggregator::Aggregator(const Params & params_)
} }
method_chosen = chooseAggregationMethod(); method_chosen = chooseAggregationMethod();
AggregationStateCache::Settings cache_settings; HashMethodContext::Settings cache_settings;
cache_settings.max_threads = params.max_threads; cache_settings.max_threads = params.max_threads;
aggregation_state_cache = AggregatedDataVariants::createCache(method_chosen, cache_settings); aggregation_state_cache = AggregatedDataVariants::createCache(method_chosen, cache_settings);
} }
@ -586,11 +586,7 @@ void NO_INLINE Aggregator::executeImpl(
bool no_more_keys, bool no_more_keys,
AggregateDataPtr overflow_row) const AggregateDataPtr overflow_row) const
{ {
typename Method::State state; typename Method::State state(key_columns, key_sizes, aggregation_state_cache);
if constexpr (Method::low_cardinality_optimization)
state.init(key_columns, aggregation_state_cache);
else
state.init(key_columns);
if (!no_more_keys) if (!no_more_keys)
executeImplCase<false>(method, state, aggregates_pool, rows, key_columns, aggregate_instructions, keys, overflow_row); executeImplCase<false>(method, state, aggregates_pool, rows, key_columns, aggregate_instructions, keys, overflow_row);
@ -605,97 +601,51 @@ void NO_INLINE Aggregator::executeImplCase(
typename Method::State & state, typename Method::State & state,
Arena * aggregates_pool, Arena * aggregates_pool,
size_t rows, size_t rows,
ColumnRawPtrs & key_columns, ColumnRawPtrs & /*key_columns*/,
AggregateFunctionInstruction * aggregate_instructions, AggregateFunctionInstruction * aggregate_instructions,
StringRefs & keys, StringRefs & /*keys*/,
AggregateDataPtr overflow_row) const AggregateDataPtr overflow_row) const
{ {
/// NOTE When editing this code, also pay attention to SpecializedAggregator.h. /// NOTE When editing this code, also pay attention to SpecializedAggregator.h.
/// For all rows. /// For all rows.
typename Method::Key prev_key{};
AggregateDataPtr value = nullptr;
for (size_t i = 0; i < rows; ++i) for (size_t i = 0; i < rows; ++i)
{ {
bool inserted = false; /// Inserted a new key, or was this key already? AggregateDataPtr aggregate_data = nullptr;
/// Get the key to insert into the hash table. if constexpr (!no_more_keys) /// Insert.
typename Method::Key key;
if constexpr (!Method::low_cardinality_optimization)
key = state.getKey(key_columns, params.keys_size, i, key_sizes, keys, *aggregates_pool);
AggregateDataPtr * aggregate_data = nullptr;
typename Method::iterator it; /// Is not used if Method::low_cardinality_optimization
if (!no_more_keys) /// Insert.
{ {
/// Optimization for consecutive identical keys. auto emplace_result = state.emplaceKey(method.data, i, *aggregates_pool);
if (!Method::no_consecutive_keys_optimization)
{
if (i != 0 && key == prev_key)
{
/// Add values to the aggregate functions.
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
(*inst->func)(inst->that, value + inst->state_offset, inst->arguments, i, aggregates_pool);
method.onExistingKey(key, keys, *aggregates_pool); /// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key.
continue; if (emplace_result.isInserted())
{
/// exception-safety - if you can not allocate memory or create states, then destructors will not be called.
emplace_result.setMapped(nullptr);
aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
createAggregateStates(aggregate_data);
emplace_result.setMapped(aggregate_data);
} }
else else
prev_key = key; aggregate_data = emplace_result.getMapped();
}
if constexpr (Method::low_cardinality_optimization)
aggregate_data = state.emplaceKeyFromRow(method.data, i, inserted, params.keys_size, keys, *aggregates_pool);
else
{
method.data.emplace(key, it, inserted);
aggregate_data = &Method::getAggregateData(it->second);
}
} }
else else
{ {
/// Add only if the key already exists. /// Add only if the key already exists.
auto find_result = state.findKey(method.data, i, *aggregates_pool);
if constexpr (Method::low_cardinality_optimization) if (find_result.isFound())
aggregate_data = state.findFromRow(method.data, i); aggregate_data = find_result.getMapped();
else
{
it = method.data.find(key);
if (method.data.end() != it)
aggregate_data = &Method::getAggregateData(it->second);
}
} }
/// aggregate_date == nullptr means that the new key did not fit in the hash table because of no_more_keys. /// aggregate_date == nullptr means that the new key did not fit in the hash table because of no_more_keys.
/// If the key does not fit, and the data does not need to be aggregated in a separate row, then there's nothing to do. /// If the key does not fit, and the data does not need to be aggregated in a separate row, then there's nothing to do.
if (!aggregate_data && !overflow_row) if (!aggregate_data && !overflow_row)
{
method.onExistingKey(key, keys, *aggregates_pool);
continue; continue;
}
/// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key. AggregateDataPtr value = aggregate_data ? aggregate_data : overflow_row;
if (inserted)
{
/// exception-safety - if you can not allocate memory or create states, then destructors will not be called.
*aggregate_data = nullptr;
if constexpr (!Method::low_cardinality_optimization)
method.onNewKey(*it, params.keys_size, keys, *aggregates_pool);
AggregateDataPtr place = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
createAggregateStates(place);
*aggregate_data = place;
if constexpr (Method::low_cardinality_optimization)
state.cacheAggregateData(i, place);
}
else
method.onExistingKey(key, keys, *aggregates_pool);
value = aggregate_data ? *aggregate_data : overflow_row;
/// Add values to the aggregate functions. /// Add values to the aggregate functions.
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst) for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
@ -1174,7 +1124,7 @@ void NO_INLINE Aggregator::convertToBlockImplFinal(
for (size_t i = 0; i < params.aggregates_size; ++i) for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->insertResultInto( aggregate_functions[i]->insertResultInto(
Method::getAggregateData(value.second) + offsets_of_aggregate_states[i], value.second + offsets_of_aggregate_states[i],
*final_aggregate_columns[i]); *final_aggregate_columns[i]);
} }
@ -1205,9 +1155,9 @@ void NO_INLINE Aggregator::convertToBlockImplNotFinal(
/// reserved, so push_back does not throw exceptions /// reserved, so push_back does not throw exceptions
for (size_t i = 0; i < params.aggregates_size; ++i) for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_columns[i]->push_back(Method::getAggregateData(value.second) + offsets_of_aggregate_states[i]); aggregate_columns[i]->push_back(value.second + offsets_of_aggregate_states[i]);
Method::getAggregateData(value.second) = nullptr; value.second = nullptr;
} }
} }
@ -1551,20 +1501,20 @@ void NO_INLINE Aggregator::mergeDataImpl(
{ {
for (size_t i = 0; i < params.aggregates_size; ++i) for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->merge( aggregate_functions[i]->merge(
Method::getAggregateData(res_it->second) + offsets_of_aggregate_states[i], res_it->second + offsets_of_aggregate_states[i],
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i], it->second + offsets_of_aggregate_states[i],
arena); arena);
for (size_t i = 0; i < params.aggregates_size; ++i) for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->destroy( aggregate_functions[i]->destroy(
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]); it->second + offsets_of_aggregate_states[i]);
} }
else else
{ {
res_it->second = it->second; res_it->second = it->second;
} }
Method::getAggregateData(it->second) = nullptr; it->second = nullptr;
} }
table_src.clearAndShrink(); table_src.clearAndShrink();
@ -1588,19 +1538,18 @@ void NO_INLINE Aggregator::mergeDataNoMoreKeysImpl(
AggregateDataPtr res_data = table_dst.end() == res_it AggregateDataPtr res_data = table_dst.end() == res_it
? overflows ? overflows
: Method::getAggregateData(res_it->second); : res_it->second;
for (size_t i = 0; i < params.aggregates_size; ++i) for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->merge( aggregate_functions[i]->merge(
res_data + offsets_of_aggregate_states[i], res_data + offsets_of_aggregate_states[i],
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i], it->second + offsets_of_aggregate_states[i],
arena); arena);
for (size_t i = 0; i < params.aggregates_size; ++i) for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->destroy( aggregate_functions[i]->destroy(it->second + offsets_of_aggregate_states[i]);
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]);
Method::getAggregateData(it->second) = nullptr; it->second = nullptr;
} }
table_src.clearAndShrink(); table_src.clearAndShrink();
@ -1623,19 +1572,18 @@ void NO_INLINE Aggregator::mergeDataOnlyExistingKeysImpl(
if (table_dst.end() == res_it) if (table_dst.end() == res_it)
continue; continue;
AggregateDataPtr res_data = Method::getAggregateData(res_it->second); AggregateDataPtr res_data = res_it->second;
for (size_t i = 0; i < params.aggregates_size; ++i) for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->merge( aggregate_functions[i]->merge(
res_data + offsets_of_aggregate_states[i], res_data + offsets_of_aggregate_states[i],
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i], it->second + offsets_of_aggregate_states[i],
arena); arena);
for (size_t i = 0; i < params.aggregates_size; ++i) for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->destroy( aggregate_functions[i]->destroy(it->second + offsets_of_aggregate_states[i]);
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]);
Method::getAggregateData(it->second) = nullptr; it->second = nullptr;
} }
table_src.clearAndShrink(); table_src.clearAndShrink();
@ -1986,7 +1934,7 @@ template <bool no_more_keys, typename Method, typename Table>
void NO_INLINE Aggregator::mergeStreamsImplCase( void NO_INLINE Aggregator::mergeStreamsImplCase(
Block & block, Block & block,
Arena * aggregates_pool, Arena * aggregates_pool,
Method & method, Method & method [[maybe_unused]],
Table & data, Table & data,
AggregateDataPtr overflow_row) const AggregateDataPtr overflow_row) const
{ {
@ -2000,77 +1948,43 @@ void NO_INLINE Aggregator::mergeStreamsImplCase(
for (size_t i = 0; i < params.aggregates_size; ++i) for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_columns[i] = &typeid_cast<const ColumnAggregateFunction &>(*block.safeGetByPosition(params.keys_size + i).column).getData(); aggregate_columns[i] = &typeid_cast<const ColumnAggregateFunction &>(*block.safeGetByPosition(params.keys_size + i).column).getData();
typename Method::State state; typename Method::State state(key_columns, key_sizes, aggregation_state_cache);
if constexpr (Method::low_cardinality_optimization)
state.init(key_columns, aggregation_state_cache);
else
state.init(key_columns);
/// For all rows. /// For all rows.
StringRefs keys(params.keys_size);
size_t rows = block.rows(); size_t rows = block.rows();
for (size_t i = 0; i < rows; ++i) for (size_t i = 0; i < rows; ++i)
{ {
typename Table::iterator it; AggregateDataPtr aggregate_data = nullptr;
AggregateDataPtr * aggregate_data = nullptr;
bool inserted = false; /// Inserted a new key, or was this key already?
/// Get the key to insert into the hash table.
typename Method::Key key;
if constexpr (!Method::low_cardinality_optimization)
key = state.getKey(key_columns, params.keys_size, i, key_sizes, keys, *aggregates_pool);
if (!no_more_keys) if (!no_more_keys)
{ {
if constexpr (Method::low_cardinality_optimization) auto emplace_result = state.emplaceKey(data, i, *aggregates_pool);
aggregate_data = state.emplaceKeyFromRow(data, i, inserted, params.keys_size, keys, *aggregates_pool); if (emplace_result.isInserted())
else
{ {
data.emplace(key, it, inserted); emplace_result.setMapped(nullptr);
aggregate_data = &Method::getAggregateData(it->second);
aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
createAggregateStates(aggregate_data);
emplace_result.setMapped(aggregate_data);
} }
else
aggregate_data = emplace_result.getMapped();
} }
else else
{ {
if constexpr (Method::low_cardinality_optimization) auto find_result = state.findKey(data, i, *aggregates_pool);
aggregate_data = state.findFromRow(data, i); if (find_result.isFound())
else aggregate_data = find_result.getMapped();
{
it = data.find(key);
if (data.end() != it)
aggregate_data = &Method::getAggregateData(it->second);
}
} }
/// aggregate_date == nullptr means that the new key did not fit in the hash table because of no_more_keys. /// aggregate_date == nullptr means that the new key did not fit in the hash table because of no_more_keys.
/// If the key does not fit, and the data does not need to be aggregated into a separate row, then there's nothing to do. /// If the key does not fit, and the data does not need to be aggregated into a separate row, then there's nothing to do.
if (!aggregate_data && !overflow_row) if (!aggregate_data && !overflow_row)
{
method.onExistingKey(key, keys, *aggregates_pool);
continue; continue;
}
/// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key. AggregateDataPtr value = aggregate_data ? aggregate_data : overflow_row;
if (inserted)
{
*aggregate_data = nullptr;
if constexpr (!Method::low_cardinality_optimization)
method.onNewKey(*it, params.keys_size, keys, *aggregates_pool);
AggregateDataPtr place = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
createAggregateStates(place);
*aggregate_data = place;
if constexpr (Method::low_cardinality_optimization)
state.cacheAggregateData(i, place);
}
else
method.onExistingKey(key, keys, *aggregates_pool);
AggregateDataPtr value = aggregate_data ? *aggregate_data : overflow_row;
/// Merge state of aggregate functions. /// Merge state of aggregate functions.
for (size_t j = 0; j < params.aggregates_size; ++j) for (size_t j = 0; j < params.aggregates_size; ++j)
@ -2165,7 +2079,7 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV
* If there is at least one block with a bucket number greater or equal than zero, then there was a two-level aggregation. * If there is at least one block with a bucket number greater or equal than zero, then there was a two-level aggregation.
*/ */
auto max_bucket = bucket_to_blocks.rbegin()->first; auto max_bucket = bucket_to_blocks.rbegin()->first;
size_t has_two_level = max_bucket >= 0; bool has_two_level = max_bucket >= 0;
if (has_two_level) if (has_two_level)
{ {
@ -2395,15 +2309,11 @@ void NO_INLINE Aggregator::convertBlockToTwoLevelImpl(
Method & method, Method & method,
Arena * pool, Arena * pool,
ColumnRawPtrs & key_columns, ColumnRawPtrs & key_columns,
StringRefs & keys, StringRefs & keys [[maybe_unused]],
const Block & source, const Block & source,
std::vector<Block> & destinations) const std::vector<Block> & destinations) const
{ {
typename Method::State state; typename Method::State state(key_columns, key_sizes, aggregation_state_cache);
if constexpr (Method::low_cardinality_optimization)
state.init(key_columns, aggregation_state_cache);
else
state.init(key_columns);
size_t rows = source.rows(); size_t rows = source.rows();
size_t columns = source.columns(); size_t columns = source.columns();
@ -2423,16 +2333,11 @@ void NO_INLINE Aggregator::convertBlockToTwoLevelImpl(
} }
} }
/// Obtain a key. Calculate bucket number from it. /// Calculate bucket number from row hash.
typename Method::Key key = state.getKey(key_columns, params.keys_size, i, key_sizes, keys, *pool); auto hash = state.getHash(method.data, i, *pool);
auto hash = method.data.hash(key);
auto bucket = method.data.getBucketFromHash(hash); auto bucket = method.data.getBucketFromHash(hash);
selector[i] = bucket; selector[i] = bucket;
/// We don't need to store this key in pool.
method.onExistingKey(key, keys, *pool);
} }
size_t num_buckets = destinations.size(); size_t num_buckets = destinations.size();
@ -2523,7 +2428,7 @@ void NO_INLINE Aggregator::destroyImpl(Table & table) const
{ {
for (auto elem : table) for (auto elem : table)
{ {
AggregateDataPtr & data = Method::getAggregateData(elem.second); AggregateDataPtr & data = elem.second;
/** If an exception (usually a lack of memory, the MemoryTracker throws) arose /** If an exception (usually a lack of memory, the MemoryTracker throws) arose
* after inserting the key into a hash table, but before creating all states of aggregate functions, * after inserting the key into a hash table, but before creating all states of aggregate functions,

View File

@ -15,6 +15,7 @@
#include <Common/ThreadPool.h> #include <Common/ThreadPool.h>
#include <Common/UInt128.h> #include <Common/UInt128.h>
#include <Common/LRUCache.h> #include <Common/LRUCache.h>
#include <Common/ColumnsHashing.h>
#include <DataStreams/IBlockInputStream.h> #include <DataStreams/IBlockInputStream.h>
#include <DataStreams/SizeLimits.h> #include <DataStreams/SizeLimits.h>
@ -138,18 +139,6 @@ using AggregatedDataWithNullableStringKeyTwoLevel = AggregationDataWithNullKeyTw
TwoLevelHashMapWithSavedHash<StringRef, AggregateDataPtr, DefaultHash<StringRef>, TwoLevelHashMapWithSavedHash<StringRef, AggregateDataPtr, DefaultHash<StringRef>,
TwoLevelHashTableGrower<>, HashTableAllocator, HashTableWithNullKey>>; TwoLevelHashTableGrower<>, HashTableAllocator, HashTableWithNullKey>>;
/// Cache which can be used by aggregations method's states. Object is shared in all threads.
struct AggregationStateCache
{
virtual ~AggregationStateCache() = default;
struct Settings
{
size_t max_threads;
};
};
using AggregationStateCachePtr = std::shared_ptr<AggregationStateCache>;
/// For the case where there is one numeric key. /// For the case where there is one numeric key.
template <typename FieldType, typename TData> /// UInt8/16/32/64 for any type with corresponding bit width. template <typename FieldType, typename TData> /// UInt8/16/32/64 for any type with corresponding bit width.
@ -169,65 +158,16 @@ struct AggregationMethodOneNumber
AggregationMethodOneNumber(const Other & other) : data(other.data) {} AggregationMethodOneNumber(const Other & other) : data(other.data) {}
/// To use one `Method` in different threads, use different `State`. /// To use one `Method` in different threads, use different `State`.
struct State using State = ColumnsHashing::HashMethodOneNumber<typename Data::value_type, Mapped, FieldType>;
{
const char * vec;
/** Called at the start of each block processing.
* Sets the variables needed for the other methods called in inner loops.
*/
void init(ColumnRawPtrs & key_columns)
{
vec = key_columns[0]->getRawData().data;
}
/// Get the key from the key columns for insertion into the hash table.
ALWAYS_INLINE Key getKey(
const ColumnRawPtrs & /*key_columns*/,
size_t /*keys_size*/, /// Number of key columns.
size_t i, /// From which row of the block, get the key.
const Sizes & /*key_sizes*/, /// If the keys of a fixed length - their lengths. It is not used in aggregation methods for variable length keys.
StringRefs & /*keys*/, /// Here references to key data in columns can be written. They can be used in the future.
Arena & /*pool*/) const
{
return unalignedLoad<FieldType>(vec + i * sizeof(FieldType));
}
};
/// From the value in the hash table, get AggregateDataPtr.
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
/** Place additional data, if necessary, in case a new key was inserted into the hash table.
*/
static ALWAYS_INLINE void onNewKey(typename Data::value_type & /*value*/, size_t /*keys_size*/, StringRefs & /*keys*/, Arena & /*pool*/)
{
}
/** The action to be taken if the key is not new. For example, roll back the memory allocation in the pool.
*/
static ALWAYS_INLINE void onExistingKey(const Key & /*key*/, StringRefs & /*keys*/, Arena & /*pool*/) {}
/** Do not use optimization for consecutive keys.
*/
static const bool no_consecutive_keys_optimization = false;
/// Use optimization for low cardinality. /// Use optimization for low cardinality.
static const bool low_cardinality_optimization = false; static const bool low_cardinality_optimization = false;
/** Insert the key from the hash table into columns. // Insert the key from the hash table into columns.
*/
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes & /*key_sizes*/) static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes & /*key_sizes*/)
{ {
static_cast<ColumnVectorHelper *>(key_columns[0].get())->insertRawData<sizeof(FieldType)>(reinterpret_cast<const char *>(&value.first)); static_cast<ColumnVectorHelper *>(key_columns[0].get())->insertRawData<sizeof(FieldType)>(reinterpret_cast<const char *>(&value.first));
} }
/// Get StringRef from value which can be inserted into column.
static StringRef getValueRef(const typename Data::value_type & value)
{
return StringRef(reinterpret_cast<const char *>(&value.first), sizeof(value.first));
}
static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & /*settings*/) { return nullptr; }
}; };
@ -248,58 +188,14 @@ struct AggregationMethodString
template <typename Other> template <typename Other>
AggregationMethodString(const Other & other) : data(other.data) {} AggregationMethodString(const Other & other) : data(other.data) {}
struct State using State = ColumnsHashing::HashMethodString<typename Data::value_type, Mapped>;
{
const IColumn::Offset * offsets;
const UInt8 * chars;
void init(ColumnRawPtrs & key_columns)
{
const IColumn & column = *key_columns[0];
const ColumnString & column_string = static_cast<const ColumnString &>(column);
offsets = column_string.getOffsets().data();
chars = column_string.getChars().data();
}
ALWAYS_INLINE Key getKey(
const ColumnRawPtrs & /*key_columns*/,
size_t /*keys_size*/,
ssize_t i,
const Sizes & /*key_sizes*/,
StringRefs & /*keys*/,
Arena & /*pool*/) const
{
return StringRef(
chars + offsets[i - 1],
offsets[i] - offsets[i - 1] - 1);
}
};
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
static ALWAYS_INLINE void onNewKey(typename Data::value_type & value, size_t /*keys_size*/, StringRefs & /*keys*/, Arena & pool)
{
if (value.first.size)
value.first.data = pool.insert(value.first.data, value.first.size);
}
static ALWAYS_INLINE void onExistingKey(const Key & /*key*/, StringRefs & /*keys*/, Arena & /*pool*/) {}
static const bool no_consecutive_keys_optimization = false;
static const bool low_cardinality_optimization = false; static const bool low_cardinality_optimization = false;
static StringRef getValueRef(const typename Data::value_type & value)
{
return StringRef(value.first.data, value.first.size);
}
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes &) static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes &)
{ {
key_columns[0]->insertData(value.first.data, value.first.size); key_columns[0]->insertData(value.first.data, value.first.size);
} }
static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & /*settings*/) { return nullptr; }
}; };
@ -320,101 +216,14 @@ struct AggregationMethodFixedString
template <typename Other> template <typename Other>
AggregationMethodFixedString(const Other & other) : data(other.data) {} AggregationMethodFixedString(const Other & other) : data(other.data) {}
struct State using State = ColumnsHashing::HashMethodFixedString<typename Data::value_type, Mapped>;
{
size_t n;
const ColumnFixedString::Chars * chars;
void init(ColumnRawPtrs & key_columns)
{
const IColumn & column = *key_columns[0];
const ColumnFixedString & column_string = static_cast<const ColumnFixedString &>(column);
n = column_string.getN();
chars = &column_string.getChars();
}
ALWAYS_INLINE Key getKey(
const ColumnRawPtrs &,
size_t,
size_t i,
const Sizes &,
StringRefs &,
Arena &) const
{
return StringRef(&(*chars)[i * n], n);
}
};
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
static ALWAYS_INLINE void onNewKey(typename Data::value_type & value, size_t, StringRefs &, Arena & pool)
{
value.first.data = pool.insert(value.first.data, value.first.size);
}
static ALWAYS_INLINE void onExistingKey(const Key &, StringRefs &, Arena &) {}
static const bool no_consecutive_keys_optimization = false;
static const bool low_cardinality_optimization = false; static const bool low_cardinality_optimization = false;
static StringRef getValueRef(const typename Data::value_type & value)
{
return StringRef(value.first.data, value.first.size);
}
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes &) static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes &)
{ {
key_columns[0]->insertData(value.first.data, value.first.size); key_columns[0]->insertData(value.first.data, value.first.size);
} }
static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & /*settings*/) { return nullptr; }
};
/// Cache stores dictionaries and saved_hash per dictionary key.
class LowCardinalityDictionaryCache : public AggregationStateCache
{
public:
/// Will assume that dictionaries with same hash has the same keys.
/// Just in case, check that they have also the same size.
struct DictionaryKey
{
UInt128 hash;
UInt64 size;
bool operator== (const DictionaryKey & other) const { return hash == other.hash && size == other.size; }
};
struct DictionaryKeyHash
{
size_t operator()(const DictionaryKey & key) const
{
SipHash hash;
hash.update(key.hash.low);
hash.update(key.hash.high);
hash.update(key.size);
return hash.get64();
}
};
struct CachedValues
{
/// Store ptr to dictionary to be sure it won't be deleted.
ColumnPtr dictionary_holder;
/// Hashes for dictionary keys.
const UInt64 * saved_hash = nullptr;
};
using CachedValuesPtr = std::shared_ptr<CachedValues>;
explicit LowCardinalityDictionaryCache(const AggregationStateCache::Settings & settings) : cache(settings.max_threads) {}
CachedValuesPtr get(const DictionaryKey & key) { return cache.get(key); }
void set(const DictionaryKey & key, const CachedValuesPtr & mapped) { cache.set(key, mapped); }
private:
using Cache = LRUCache<DictionaryKey, CachedValues, DictionaryKeyHash>;
Cache cache;
}; };
/// Single low cardinality column. /// Single low cardinality column.
@ -432,342 +241,23 @@ struct AggregationMethodSingleLowCardinalityColumn : public SingleColumnMethod
using Base::data; using Base::data;
static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & settings)
{
return std::make_shared<LowCardinalityDictionaryCache>(settings);
}
AggregationMethodSingleLowCardinalityColumn() = default; AggregationMethodSingleLowCardinalityColumn() = default;
template <typename Other> template <typename Other>
explicit AggregationMethodSingleLowCardinalityColumn(const Other & other) : Base(other) {} explicit AggregationMethodSingleLowCardinalityColumn(const Other & other) : Base(other) {}
struct State : public BaseState using State = ColumnsHashing::HashMethodSingleLowCardinalityColumn<BaseState, Mapped, true>;
{
ColumnRawPtrs key_columns;
const IColumn * positions = nullptr;
size_t size_of_index_type = 0;
/// saved hash is from current column or from cache.
const UInt64 * saved_hash = nullptr;
/// Hold dictionary in case saved_hash is from cache to be sure it won't be deleted.
ColumnPtr dictionary_holder;
/// Cache AggregateDataPtr for current column in order to decrease the number of hash table usages.
PaddedPODArray<AggregateDataPtr> aggregate_data_cache;
/// If initialized column is nullable.
bool is_nullable = false;
void init(ColumnRawPtrs &)
{
throw Exception("Expected cache for AggregationMethodSingleLowCardinalityColumn::init", ErrorCodes::LOGICAL_ERROR);
}
void init(ColumnRawPtrs & key_columns_low_cardinality, const AggregationStateCachePtr & cache_ptr)
{
auto column = typeid_cast<const ColumnLowCardinality *>(key_columns_low_cardinality[0]);
if (!column)
throw Exception("Invalid aggregation key type for AggregationMethodSingleLowCardinalityColumn method. "
"Excepted LowCardinality, got " + key_columns_low_cardinality[0]->getName(), ErrorCodes::LOGICAL_ERROR);
if (!cache_ptr)
throw Exception("Cache wasn't created for AggregationMethodSingleLowCardinalityColumn", ErrorCodes::LOGICAL_ERROR);
auto cache = typeid_cast<LowCardinalityDictionaryCache *>(cache_ptr.get());
if (!cache)
{
const auto & cached_val = *cache_ptr;
throw Exception("Invalid type for AggregationMethodSingleLowCardinalityColumn cache: "
+ demangle(typeid(cached_val).name()), ErrorCodes::LOGICAL_ERROR);
}
auto * dict = column->getDictionary().getNestedNotNullableColumn().get();
is_nullable = column->getDictionary().nestedColumnIsNullable();
key_columns = {dict};
bool is_shared_dict = column->isSharedDictionary();
typename LowCardinalityDictionaryCache::DictionaryKey dictionary_key;
typename LowCardinalityDictionaryCache::CachedValuesPtr cached_values;
if (is_shared_dict)
{
dictionary_key = {column->getDictionary().getHash(), dict->size()};
cached_values = cache->get(dictionary_key);
}
if (cached_values)
{
saved_hash = cached_values->saved_hash;
dictionary_holder = cached_values->dictionary_holder;
}
else
{
saved_hash = column->getDictionary().tryGetSavedHash();
dictionary_holder = column->getDictionaryPtr();
if (is_shared_dict)
{
cached_values = std::make_shared<typename LowCardinalityDictionaryCache::CachedValues>();
cached_values->saved_hash = saved_hash;
cached_values->dictionary_holder = dictionary_holder;
cache->set(dictionary_key, cached_values);
}
}
AggregateDataPtr default_data = nullptr;
aggregate_data_cache.assign(key_columns[0]->size(), default_data);
size_of_index_type = column->getSizeOfIndexType();
positions = column->getIndexesPtr().get();
BaseState::init(key_columns);
}
ALWAYS_INLINE size_t getIndexAt(size_t row) const
{
switch (size_of_index_type)
{
case sizeof(UInt8): return static_cast<const ColumnUInt8 *>(positions)->getElement(row);
case sizeof(UInt16): return static_cast<const ColumnUInt16 *>(positions)->getElement(row);
case sizeof(UInt32): return static_cast<const ColumnUInt32 *>(positions)->getElement(row);
case sizeof(UInt64): return static_cast<const ColumnUInt64 *>(positions)->getElement(row);
default: throw Exception("Unexpected size of index type for low cardinality column.", ErrorCodes::LOGICAL_ERROR);
}
}
/// Get the key from the key columns for insertion into the hash table.
ALWAYS_INLINE Key getKey(
const ColumnRawPtrs & /*key_columns*/,
size_t /*keys_size*/,
size_t i,
const Sizes & key_sizes,
StringRefs & keys,
Arena & pool) const
{
size_t row = getIndexAt(i);
return BaseState::getKey(key_columns, 1, row, key_sizes, keys, pool);
}
template <typename D>
ALWAYS_INLINE AggregateDataPtr * emplaceKeyFromRow(
D & data,
size_t i,
bool & inserted,
size_t keys_size,
StringRefs & keys,
Arena & pool)
{
size_t row = getIndexAt(i);
if (is_nullable && row == 0)
{
inserted = !data.hasNullKeyData();
data.hasNullKeyData() = true;
return &data.getNullKeyData();
}
if (aggregate_data_cache[row])
{
inserted = false;
return &aggregate_data_cache[row];
}
else
{
Sizes key_sizes;
auto key = getKey({}, 0, i, key_sizes, keys, pool);
typename D::iterator it;
if (saved_hash)
data.emplace(key, it, inserted, saved_hash[row]);
else
data.emplace(key, it, inserted);
if (inserted)
Base::onNewKey(*it, keys_size, keys, pool);
else
aggregate_data_cache[row] = Base::getAggregateData(it->second);
return &Base::getAggregateData(it->second);
}
}
ALWAYS_INLINE bool isNullAt(size_t i)
{
if (!is_nullable)
return false;
return getIndexAt(i) == 0;
}
ALWAYS_INLINE void cacheAggregateData(size_t i, AggregateDataPtr data)
{
size_t row = getIndexAt(i);
aggregate_data_cache[row] = data;
}
template <typename D>
ALWAYS_INLINE AggregateDataPtr * findFromRow(D & data, size_t i)
{
size_t row = getIndexAt(i);
if (is_nullable && row == 0)
return data.hasNullKeyData() ? &data.getNullKeyData() : nullptr;
if (!aggregate_data_cache[row])
{
Sizes key_sizes;
StringRefs keys;
Arena pool;
auto key = getKey({}, 0, i, key_sizes, keys, pool);
typename D::iterator it;
if (saved_hash)
it = data.find(key, saved_hash[row]);
else
it = data.find(key);
if (it != data.end())
aggregate_data_cache[row] = Base::getAggregateData(it->second);
}
return &aggregate_data_cache[row];
}
};
static AggregateDataPtr & getAggregateData(Mapped & value) { return Base::getAggregateData(value); }
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return Base::getAggregateData(value); }
static void onNewKey(typename Data::value_type & value, size_t keys_size, StringRefs & keys, Arena & pool)
{
return Base::onNewKey(value, keys_size, keys, pool);
}
static void onExistingKey(const Key & key, StringRefs & keys, Arena & pool)
{
return Base::onExistingKey(key, keys, pool);
}
static const bool no_consecutive_keys_optimization = true;
static const bool low_cardinality_optimization = true; static const bool low_cardinality_optimization = true;
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns_low_cardinality, const Sizes & /*key_sizes*/) static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns_low_cardinality, const Sizes & /*key_sizes*/)
{ {
auto ref = Base::getValueRef(value); auto ref = BaseState::getValueRef(value);
static_cast<ColumnLowCardinality *>(key_columns_low_cardinality[0].get())->insertData(ref.data, ref.size); static_cast<ColumnLowCardinality *>(key_columns_low_cardinality[0].get())->insertData(ref.data, ref.size);
} }
}; };
namespace aggregator_impl
{
/// This class is designed to provide the functionality that is required for
/// supporting nullable keys in AggregationMethodKeysFixed. If there are
/// no nullable keys, this class is merely implemented as an empty shell.
template <typename Key, bool has_nullable_keys>
class BaseStateKeysFixed;
/// Case where nullable keys are supported.
template <typename Key>
class BaseStateKeysFixed<Key, true>
{
protected:
void init(const ColumnRawPtrs & key_columns)
{
null_maps.reserve(key_columns.size());
actual_columns.reserve(key_columns.size());
for (const auto & col : key_columns)
{
if (col->isColumnNullable())
{
const auto & nullable_col = static_cast<const ColumnNullable &>(*col);
actual_columns.push_back(&nullable_col.getNestedColumn());
null_maps.push_back(&nullable_col.getNullMapColumn());
}
else
{
actual_columns.push_back(col);
null_maps.push_back(nullptr);
}
}
}
/// Return the columns which actually contain the values of the keys.
/// For a given key column, if it is nullable, we return its nested
/// column. Otherwise we return the key column itself.
inline const ColumnRawPtrs & getActualColumns() const
{
return actual_columns;
}
/// Create a bitmap that indicates whether, for a particular row,
/// a key column bears a null value or not.
KeysNullMap<Key> createBitmap(size_t row) const
{
KeysNullMap<Key> bitmap{};
for (size_t k = 0; k < null_maps.size(); ++k)
{
if (null_maps[k] != nullptr)
{
const auto & null_map = static_cast<const ColumnUInt8 &>(*null_maps[k]).getData();
if (null_map[row] == 1)
{
size_t bucket = k / 8;
size_t offset = k % 8;
bitmap[bucket] |= UInt8(1) << offset;
}
}
}
return bitmap;
}
private:
ColumnRawPtrs actual_columns;
ColumnRawPtrs null_maps;
};
/// Case where nullable keys are not supported.
template <typename Key>
class BaseStateKeysFixed<Key, false>
{
protected:
void init(const ColumnRawPtrs &)
{
throw Exception{"Internal error: calling init() for non-nullable"
" keys is forbidden", ErrorCodes::LOGICAL_ERROR};
}
const ColumnRawPtrs & getActualColumns() const
{
throw Exception{"Internal error: calling getActualColumns() for non-nullable"
" keys is forbidden", ErrorCodes::LOGICAL_ERROR};
}
KeysNullMap<Key> createBitmap(size_t) const
{
throw Exception{"Internal error: calling createBitmap() for non-nullable keys"
" is forbidden", ErrorCodes::LOGICAL_ERROR};
}
};
}
// Oprional mask for low cardinality columns.
template <bool has_low_cardinality>
struct LowCardinalityKeys
{
ColumnRawPtrs nested_columns;
ColumnRawPtrs positions;
Sizes position_sizes;
};
template <>
struct LowCardinalityKeys<false> {};
/// For the case where all keys are of fixed length, and they fit in N (for example, 128) bits. /// For the case where all keys are of fixed length, and they fit in N (for example, 128) bits.
template <typename TData, bool has_nullable_keys_ = false, bool has_low_cardinality_ = false> template <typename TData, bool has_nullable_keys_ = false, bool has_low_cardinality_ = false>
struct AggregationMethodKeysFixed struct AggregationMethodKeysFixed
@ -787,71 +277,8 @@ struct AggregationMethodKeysFixed
template <typename Other> template <typename Other>
AggregationMethodKeysFixed(const Other & other) : data(other.data) {} AggregationMethodKeysFixed(const Other & other) : data(other.data) {}
class State final : private aggregator_impl::BaseStateKeysFixed<Key, has_nullable_keys> using State = ColumnsHashing::HashMethodKeysFixed<typename Data::value_type, Key, Mapped, has_nullable_keys, has_low_cardinality>;
{
LowCardinalityKeys<has_low_cardinality> low_cardinality_keys;
public:
using Base = aggregator_impl::BaseStateKeysFixed<Key, has_nullable_keys>;
void init(ColumnRawPtrs & key_columns)
{
if constexpr (has_low_cardinality)
{
low_cardinality_keys.nested_columns.resize(key_columns.size());
low_cardinality_keys.positions.assign(key_columns.size(), nullptr);
low_cardinality_keys.position_sizes.resize(key_columns.size());
for (size_t i = 0; i < key_columns.size(); ++i)
{
if (auto * low_cardinality_col = typeid_cast<const ColumnLowCardinality *>(key_columns[i]))
{
low_cardinality_keys.nested_columns[i] = low_cardinality_col->getDictionary().getNestedColumn().get();
low_cardinality_keys.positions[i] = &low_cardinality_col->getIndexes();
low_cardinality_keys.position_sizes[i] = low_cardinality_col->getSizeOfIndexType();
}
else
low_cardinality_keys.nested_columns[i] = key_columns[i];
}
}
if (has_nullable_keys)
Base::init(key_columns);
}
ALWAYS_INLINE Key getKey(
const ColumnRawPtrs & key_columns,
size_t keys_size,
size_t i,
const Sizes & key_sizes,
StringRefs &,
Arena &) const
{
if (has_nullable_keys)
{
auto bitmap = Base::createBitmap(i);
return packFixed<Key>(i, keys_size, Base::getActualColumns(), key_sizes, bitmap);
}
else
{
if constexpr (has_low_cardinality)
return packFixed<Key, true>(i, keys_size, low_cardinality_keys.nested_columns, key_sizes,
&low_cardinality_keys.positions, &low_cardinality_keys.position_sizes);
return packFixed<Key>(i, keys_size, key_columns, key_sizes);
}
}
};
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
static ALWAYS_INLINE void onNewKey(typename Data::value_type &, size_t, StringRefs &, Arena &)
{
}
static ALWAYS_INLINE void onExistingKey(const Key &, StringRefs &, Arena &) {}
static const bool no_consecutive_keys_optimization = false;
static const bool low_cardinality_optimization = false; static const bool low_cardinality_optimization = false;
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes & key_sizes) static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes & key_sizes)
@ -904,8 +331,6 @@ struct AggregationMethodKeysFixed
} }
} }
} }
static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & /*settings*/) { return nullptr; }
}; };
@ -930,53 +355,24 @@ struct AggregationMethodSerialized
template <typename Other> template <typename Other>
AggregationMethodSerialized(const Other & other) : data(other.data) {} AggregationMethodSerialized(const Other & other) : data(other.data) {}
struct State using State = ColumnsHashing::HashMethodSerialized<typename Data::value_type, Mapped>;
{
void init(ColumnRawPtrs &)
{
}
ALWAYS_INLINE Key getKey(
const ColumnRawPtrs & key_columns,
size_t keys_size,
size_t i,
const Sizes &,
StringRefs &,
Arena & pool) const
{
return serializeKeysToPoolContiguous(i, keys_size, key_columns, pool);
}
};
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
static ALWAYS_INLINE void onNewKey(typename Data::value_type &, size_t, StringRefs &, Arena &)
{
}
static ALWAYS_INLINE void onExistingKey(const Key & key, StringRefs &, Arena & pool)
{
pool.rollback(key.size);
}
/// If the key already was, it is removed from the pool (overwritten), and the next key can not be compared with it.
static const bool no_consecutive_keys_optimization = true;
static const bool low_cardinality_optimization = false; static const bool low_cardinality_optimization = false;
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes &) static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes &)
{ {
auto pos = value.first.data; auto pos = value.first.data;
for (size_t i = 0; i < key_columns.size(); ++i) for (auto & column : key_columns)
pos = key_columns[i]->deserializeAndInsertFromArena(pos); pos = column->deserializeAndInsertFromArena(pos);
} }
static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & /*settings*/) { return nullptr; }
}; };
class Aggregator; class Aggregator;
using ColumnsHashing::HashMethodContext;
using ColumnsHashing::HashMethodContextPtr;
struct AggregatedDataVariants : private boost::noncopyable struct AggregatedDataVariants : private boost::noncopyable
{ {
/** Working with states of aggregate functions in the pool is arranged in the following (inconvenient) way: /** Working with states of aggregate functions in the pool is arranged in the following (inconvenient) way:
@ -1298,7 +694,7 @@ struct AggregatedDataVariants : private boost::noncopyable
} }
} }
static AggregationStateCachePtr createCache(Type type, const AggregationStateCache::Settings & settings) static HashMethodContextPtr createCache(Type type, const HashMethodContext::Settings & settings)
{ {
switch (type) switch (type)
{ {
@ -1309,7 +705,7 @@ struct AggregatedDataVariants : private boost::noncopyable
{ \ { \
using TPtr ## NAME = decltype(AggregatedDataVariants::NAME); \ using TPtr ## NAME = decltype(AggregatedDataVariants::NAME); \
using T ## NAME = typename TPtr ## NAME ::element_type; \ using T ## NAME = typename TPtr ## NAME ::element_type; \
return T ## NAME ::createCache(settings); \ return T ## NAME ::State::createContext(settings); \
} }
APPLY_FOR_AGGREGATED_VARIANTS(M) APPLY_FOR_AGGREGATED_VARIANTS(M)
@ -1496,7 +892,7 @@ protected:
AggregatedDataVariants::Type method_chosen; AggregatedDataVariants::Type method_chosen;
Sizes key_sizes; Sizes key_sizes;
AggregationStateCachePtr aggregation_state_cache; HashMethodContextPtr aggregation_state_cache;
AggregateFunctionsPlainPtrs aggregate_functions; AggregateFunctionsPlainPtrs aggregate_functions;

View File

@ -170,18 +170,54 @@ static size_t getTotalByteCountImpl(const Maps & maps, Join::Type type)
} }
template <Join::Type type> template <Join::Type type, typename Value, typename Mapped>
struct KeyGetterForType; struct KeyGetterForTypeImpl;
template <> struct KeyGetterForType<Join::Type::key8> { using Type = JoinKeyGetterOneNumber<UInt8>; }; template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::key8, Value, Mapped>
template <> struct KeyGetterForType<Join::Type::key16> { using Type = JoinKeyGetterOneNumber<UInt16>; }; {
template <> struct KeyGetterForType<Join::Type::key32> { using Type = JoinKeyGetterOneNumber<UInt32>; }; using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt8, false>;
template <> struct KeyGetterForType<Join::Type::key64> { using Type = JoinKeyGetterOneNumber<UInt64>; }; };
template <> struct KeyGetterForType<Join::Type::key_string> { using Type = JoinKeyGetterString; }; template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::key16, Value, Mapped>
template <> struct KeyGetterForType<Join::Type::key_fixed_string> { using Type = JoinKeyGetterFixedString; }; {
template <> struct KeyGetterForType<Join::Type::keys128> { using Type = JoinKeyGetterFixed<UInt128>; }; using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt16, false>;
template <> struct KeyGetterForType<Join::Type::keys256> { using Type = JoinKeyGetterFixed<UInt256>; }; };
template <> struct KeyGetterForType<Join::Type::hashed> { using Type = JoinKeyGetterHashed; }; template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::key32, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt32, false>;
};
template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::key64, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt64, false>;
};
template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::key_string, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodString<Value, Mapped, true, false>;
};
template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::key_fixed_string, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodFixedString<Value, Mapped, true, false>;
};
template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::keys128, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodKeysFixed<Value, UInt128, Mapped, false, false, false>;
};
template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::keys256, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodKeysFixed<Value, UInt256, Mapped, false, false, false>;
};
template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<Join::Type::hashed, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodHashed<Value, Mapped, false>;
};
template <Join::Type type, typename Data>
struct KeyGetterForType
{
using Value = typename Data::value_type;
using Mapped_t = typename Data::mapped_type;
using Mapped = std::conditional_t<std::is_const_v<Data>, const Mapped_t, Mapped_t>;
using Type = typename KeyGetterForTypeImpl<type, Value, Mapped>::Type;
};
/// Do I need to use the hash table maps_*_full, in which we remember whether the row was joined. /// Do I need to use the hash table maps_*_full, in which we remember whether the row was joined.
@ -316,42 +352,30 @@ namespace
template <ASTTableJoin::Strictness STRICTNESS, typename Map, typename KeyGetter> template <ASTTableJoin::Strictness STRICTNESS, typename Map, typename KeyGetter>
struct Inserter struct Inserter
{ {
static void insert(Map & map, const typename Map::key_type & key, Block * stored_block, size_t i, Arena & pool); static void insert(Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool);
}; };
template <typename Map, typename KeyGetter> template <typename Map, typename KeyGetter>
struct Inserter<ASTTableJoin::Strictness::Any, Map, KeyGetter> struct Inserter<ASTTableJoin::Strictness::Any, Map, KeyGetter>
{ {
static void insert(Map & map, const typename Map::key_type & key, Block * stored_block, size_t i, Arena & pool) static ALWAYS_INLINE void insert(Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool)
{ {
typename Map::iterator it; auto emplace_result = key_getter.emplaceKey(map, i, pool);
bool inserted;
map.emplace(key, it, inserted);
if (inserted) if (emplace_result.isInserted() || emplace_result.getMapped().overwrite)
{ new (&emplace_result.getMapped()) typename Map::mapped_type(stored_block, i);
KeyGetter::onNewKey(it->first, pool);
new (&it->second) typename Map::mapped_type(stored_block, i);
}
else if (it->second.overwrite)
new (&it->second) typename Map::mapped_type(stored_block, i);
} }
}; };
template <typename Map, typename KeyGetter> template <typename Map, typename KeyGetter>
struct Inserter<ASTTableJoin::Strictness::All, Map, KeyGetter> struct Inserter<ASTTableJoin::Strictness::All, Map, KeyGetter>
{ {
static void insert(Map & map, const typename Map::key_type & key, Block * stored_block, size_t i, Arena & pool) static ALWAYS_INLINE void insert(Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool)
{ {
typename Map::iterator it; auto emplace_result = key_getter.emplaceKey(map, i, pool);
bool inserted;
map.emplace(key, it, inserted);
if (inserted) if (emplace_result.isInserted())
{ new (&emplace_result.getMapped()) typename Map::mapped_type(stored_block, i);
KeyGetter::onNewKey(it->first, pool);
new (&it->second) typename Map::mapped_type(stored_block, i);
}
else else
{ {
/** The first element of the list is stored in the value of the hash table, the rest in the pool. /** The first element of the list is stored in the value of the hash table, the rest in the pool.
@ -359,9 +383,10 @@ namespace
* That is, the former second element, if it was, will be the third, and so on. * That is, the former second element, if it was, will be the third, and so on.
*/ */
auto elem = pool.alloc<typename Map::mapped_type>(); auto elem = pool.alloc<typename Map::mapped_type>();
auto & mapped = emplace_result.getMapped();
elem->next = it->second.next; elem->next = mapped.next;
it->second.next = elem; mapped.next = elem;
elem->block = stored_block; elem->block = stored_block;
elem->row_num = i; elem->row_num = i;
} }
@ -372,17 +397,16 @@ namespace
template <ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map, bool has_null_map> template <ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map, bool has_null_map>
void NO_INLINE insertFromBlockImplTypeCase( void NO_INLINE insertFromBlockImplTypeCase(
Map & map, size_t rows, const ColumnRawPtrs & key_columns, Map & map, size_t rows, const ColumnRawPtrs & key_columns,
size_t keys_size, const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool) const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool)
{ {
KeyGetter key_getter(key_columns); KeyGetter key_getter(key_columns, key_sizes, nullptr);
for (size_t i = 0; i < rows; ++i) for (size_t i = 0; i < rows; ++i)
{ {
if (has_null_map && (*null_map)[i]) if (has_null_map && (*null_map)[i])
continue; continue;
auto key = key_getter.getKey(key_columns, keys_size, i, key_sizes); Inserter<STRICTNESS, Map, KeyGetter>::insert(map, key_getter, stored_block, i, pool);
Inserter<STRICTNESS, Map, KeyGetter>::insert(map, key, stored_block, i, pool);
} }
} }
@ -390,19 +414,19 @@ namespace
template <ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map> template <ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map>
void insertFromBlockImplType( void insertFromBlockImplType(
Map & map, size_t rows, const ColumnRawPtrs & key_columns, Map & map, size_t rows, const ColumnRawPtrs & key_columns,
size_t keys_size, const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool) const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool)
{ {
if (null_map) if (null_map)
insertFromBlockImplTypeCase<STRICTNESS, KeyGetter, Map, true>(map, rows, key_columns, keys_size, key_sizes, stored_block, null_map, pool); insertFromBlockImplTypeCase<STRICTNESS, KeyGetter, Map, true>(map, rows, key_columns, key_sizes, stored_block, null_map, pool);
else else
insertFromBlockImplTypeCase<STRICTNESS, KeyGetter, Map, false>(map, rows, key_columns, keys_size, key_sizes, stored_block, null_map, pool); insertFromBlockImplTypeCase<STRICTNESS, KeyGetter, Map, false>(map, rows, key_columns, key_sizes, stored_block, null_map, pool);
} }
template <ASTTableJoin::Strictness STRICTNESS, typename Maps> template <ASTTableJoin::Strictness STRICTNESS, typename Maps>
void insertFromBlockImpl( void insertFromBlockImpl(
Join::Type type, Maps & maps, size_t rows, const ColumnRawPtrs & key_columns, Join::Type type, Maps & maps, size_t rows, const ColumnRawPtrs & key_columns,
size_t keys_size, const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool) const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool)
{ {
switch (type) switch (type)
{ {
@ -411,8 +435,8 @@ namespace
#define M(TYPE) \ #define M(TYPE) \
case Join::Type::TYPE: \ case Join::Type::TYPE: \
insertFromBlockImplType<STRICTNESS, typename KeyGetterForType<Join::Type::TYPE>::Type>(\ insertFromBlockImplType<STRICTNESS, typename KeyGetterForType<Join::Type::TYPE, std::remove_reference_t<decltype(*maps.TYPE)>>::Type>(\
*maps.TYPE, rows, key_columns, keys_size, key_sizes, stored_block, null_map, pool); \ *maps.TYPE, rows, key_columns, key_sizes, stored_block, null_map, pool); \
break; break;
APPLY_FOR_JOIN_VARIANTS(M) APPLY_FOR_JOIN_VARIANTS(M)
#undef M #undef M
@ -499,7 +523,7 @@ bool Join::insertFromBlock(const Block & block)
{ {
dispatch([&](auto, auto strictness_, auto & map) dispatch([&](auto, auto strictness_, auto & map)
{ {
insertFromBlockImpl<strictness_>(type, map, rows, key_columns, keys_size, key_sizes, stored_block, null_map, pool); insertFromBlockImpl<strictness_>(type, map, rows, key_columns, key_sizes, stored_block, null_map, pool);
}); });
} }
@ -515,14 +539,14 @@ namespace
template <typename Map> template <typename Map>
struct Adder<true, ASTTableJoin::Strictness::Any, Map> struct Adder<true, ASTTableJoin::Strictness::Any, Map>
{ {
static void addFound(const typename Map::const_iterator & it, size_t num_columns_to_add, MutableColumns & added_columns, static void addFound(const typename Map::mapped_type & mapped, size_t num_columns_to_add, MutableColumns & added_columns,
size_t i, IColumn::Filter & filter, IColumn::Offset & /*current_offset*/, IColumn::Offsets * /*offsets*/, size_t i, IColumn::Filter & filter, IColumn::Offset & /*current_offset*/, IColumn::Offsets * /*offsets*/,
const std::vector<size_t> & right_indexes) const std::vector<size_t> & right_indexes)
{ {
filter[i] = 1; filter[i] = 1;
for (size_t j = 0; j < num_columns_to_add; ++j) for (size_t j = 0; j < num_columns_to_add; ++j)
added_columns[j]->insertFrom(*it->second.block->getByPosition(right_indexes[j]).column.get(), it->second.row_num); added_columns[j]->insertFrom(*mapped.block->getByPosition(right_indexes[j]).column, mapped.row_num);
} }
static void addNotFound(size_t num_columns_to_add, MutableColumns & added_columns, static void addNotFound(size_t num_columns_to_add, MutableColumns & added_columns,
@ -538,14 +562,14 @@ namespace
template <typename Map> template <typename Map>
struct Adder<false, ASTTableJoin::Strictness::Any, Map> struct Adder<false, ASTTableJoin::Strictness::Any, Map>
{ {
static void addFound(const typename Map::const_iterator & it, size_t num_columns_to_add, MutableColumns & added_columns, static void addFound(const typename Map::mapped_type & mapped, size_t num_columns_to_add, MutableColumns & added_columns,
size_t i, IColumn::Filter & filter, IColumn::Offset & /*current_offset*/, IColumn::Offsets * /*offsets*/, size_t i, IColumn::Filter & filter, IColumn::Offset & /*current_offset*/, IColumn::Offsets * /*offsets*/,
const std::vector<size_t> & right_indexes) const std::vector<size_t> & right_indexes)
{ {
filter[i] = 1; filter[i] = 1;
for (size_t j = 0; j < num_columns_to_add; ++j) for (size_t j = 0; j < num_columns_to_add; ++j)
added_columns[j]->insertFrom(*it->second.block->getByPosition(right_indexes[j]).column.get(), it->second.row_num); added_columns[j]->insertFrom(*mapped.block->getByPosition(right_indexes[j]).column, mapped.row_num);
} }
static void addNotFound(size_t /*num_columns_to_add*/, MutableColumns & /*added_columns*/, static void addNotFound(size_t /*num_columns_to_add*/, MutableColumns & /*added_columns*/,
@ -558,14 +582,14 @@ namespace
template <bool fill_left, typename Map> template <bool fill_left, typename Map>
struct Adder<fill_left, ASTTableJoin::Strictness::All, Map> struct Adder<fill_left, ASTTableJoin::Strictness::All, Map>
{ {
static void addFound(const typename Map::const_iterator & it, size_t num_columns_to_add, MutableColumns & added_columns, static void addFound(const typename Map::mapped_type & mapped, size_t num_columns_to_add, MutableColumns & added_columns,
size_t i, IColumn::Filter & filter, IColumn::Offset & current_offset, IColumn::Offsets * offsets, size_t i, IColumn::Filter & filter, IColumn::Offset & current_offset, IColumn::Offsets * offsets,
const std::vector<size_t> & right_indexes) const std::vector<size_t> & right_indexes)
{ {
filter[i] = 1; filter[i] = 1;
size_t rows_joined = 0; size_t rows_joined = 0;
for (auto current = &static_cast<const typename Map::mapped_type::Base_t &>(it->second); current != nullptr; current = current->next) for (auto current = &static_cast<const typename Map::mapped_type::Base_t &>(mapped); current != nullptr; current = current->next)
{ {
for (size_t j = 0; j < num_columns_to_add; ++j) for (size_t j = 0; j < num_columns_to_add; ++j)
added_columns[j]->insertFrom(*current->block->getByPosition(right_indexes[j]).column.get(), current->row_num); added_columns[j]->insertFrom(*current->block->getByPosition(right_indexes[j]).column.get(), current->row_num);
@ -605,10 +629,10 @@ namespace
const std::vector<size_t> & right_indexes) const std::vector<size_t> & right_indexes)
{ {
IColumn::Offset current_offset = 0; IColumn::Offset current_offset = 0;
size_t keys_size = key_columns.size();
size_t num_columns_to_add = right_indexes.size(); size_t num_columns_to_add = right_indexes.size();
KeyGetter key_getter(key_columns); Arena pool;
KeyGetter key_getter(key_columns, key_sizes, nullptr);
for (size_t i = 0; i < rows; ++i) for (size_t i = 0; i < rows; ++i)
{ {
@ -619,14 +643,14 @@ namespace
} }
else else
{ {
auto key = key_getter.getKey(key_columns, keys_size, i, key_sizes); auto find_result = key_getter.findKey(map, i, pool);
typename Map::const_iterator it = map.find(key);
if (it != map.end()) if (find_result.isFound())
{ {
it->second.setUsed(); auto & mapped = find_result.getMapped();
mapped.setUsed();
Adder<Join::KindTrait<KIND>::fill_left, STRICTNESS, Map>::addFound( Adder<Join::KindTrait<KIND>::fill_left, STRICTNESS, Map>::addFound(
it, num_columns_to_add, added_columns, i, filter, current_offset, offsets_to_replicate.get(), right_indexes); mapped, num_columns_to_add, added_columns, i, filter, current_offset, offsets_to_replicate.get(), right_indexes);
} }
else else
Adder<Join::KindTrait<KIND>::fill_left, STRICTNESS, Map>::addNotFound( Adder<Join::KindTrait<KIND>::fill_left, STRICTNESS, Map>::addNotFound(
@ -753,7 +777,7 @@ void Join::joinBlockImpl(
#define M(TYPE) \ #define M(TYPE) \
case Join::Type::TYPE: \ case Join::Type::TYPE: \
std::tie(filter, offsets_to_replicate) = \ std::tie(filter, offsets_to_replicate) = \
joinBlockImplType<KIND, STRICTNESS, typename KeyGetterForType<Join::Type::TYPE>::Type>(\ joinBlockImplType<KIND, STRICTNESS, typename KeyGetterForType<Join::Type::TYPE, const std::remove_reference_t<decltype(*maps_.TYPE)>>::Type>(\
*maps_.TYPE, block.rows(), key_columns, key_sizes, added_columns, null_map, right_indexes); \ *maps_.TYPE, block.rows(), key_columns, key_sizes, added_columns, null_map, right_indexes); \
break; break;
APPLY_FOR_JOIN_VARIANTS(M) APPLY_FOR_JOIN_VARIANTS(M)

View File

@ -8,6 +8,7 @@
#include <Interpreters/SettingsCommon.h> #include <Interpreters/SettingsCommon.h>
#include <Common/Arena.h> #include <Common/Arena.h>
#include <Common/ColumnsHashing.h>
#include <Common/HashTable/HashMap.h> #include <Common/HashTable/HashMap.h>
#include <Columns/ColumnString.h> #include <Columns/ColumnString.h>
@ -21,148 +22,6 @@
namespace DB namespace DB
{ {
/// Helpers to obtain keys (to use in a hash table or similar data structure) for various equi-JOINs.
/// UInt8/16/32/64 or another types with same number of bits.
template <typename FieldType>
struct JoinKeyGetterOneNumber
{
using Key = FieldType;
const char * vec;
/** Created before processing of each block.
* Initialize some members, used in another methods, called in inner loops.
*/
JoinKeyGetterOneNumber(const ColumnRawPtrs & key_columns)
{
vec = key_columns[0]->getRawData().data;
}
Key getKey(
const ColumnRawPtrs & /*key_columns*/,
size_t /*keys_size*/, /// number of key columns.
size_t i, /// row number to get key from.
const Sizes & /*key_sizes*/) const /// If keys are of fixed size - their sizes. Not used for methods with variable-length keys.
{
return unalignedLoad<FieldType>(vec + i * sizeof(FieldType));
}
/// Place additional data into memory pool, if needed, when new key was inserted into hash table.
static void onNewKey(Key & /*key*/, Arena & /*pool*/) {}
};
/// For single String key.
struct JoinKeyGetterString
{
using Key = StringRef;
const IColumn::Offset * offsets;
const UInt8 * chars;
JoinKeyGetterString(const ColumnRawPtrs & key_columns)
{
const IColumn & column = *key_columns[0];
const ColumnString & column_string = static_cast<const ColumnString &>(column);
offsets = column_string.getOffsets().data();
chars = column_string.getChars().data();
}
Key getKey(
const ColumnRawPtrs &,
size_t,
ssize_t i,
const Sizes &) const
{
return StringRef(
chars + offsets[i - 1],
offsets[i] - offsets[i - 1] - 1);
}
static void onNewKey(Key & key, Arena & pool)
{
if (key.size)
key.data = pool.insert(key.data, key.size);
}
};
/// For single FixedString key.
struct JoinKeyGetterFixedString
{
using Key = StringRef;
size_t n;
const ColumnFixedString::Chars * chars;
JoinKeyGetterFixedString(const ColumnRawPtrs & key_columns)
{
const IColumn & column = *key_columns[0];
const ColumnFixedString & column_string = static_cast<const ColumnFixedString &>(column);
n = column_string.getN();
chars = &column_string.getChars();
}
Key getKey(
const ColumnRawPtrs &,
size_t,
size_t i,
const Sizes &) const
{
return StringRef(&(*chars)[i * n], n);
}
static void onNewKey(Key & key, Arena & pool)
{
key.data = pool.insert(key.data, key.size);
}
};
/// For keys of fixed size, that could be packed in sizeof TKey width.
template <typename TKey>
struct JoinKeyGetterFixed
{
using Key = TKey;
JoinKeyGetterFixed(const ColumnRawPtrs &)
{
}
Key getKey(
const ColumnRawPtrs & key_columns,
size_t keys_size,
size_t i,
const Sizes & key_sizes) const
{
return packFixed<Key>(i, keys_size, key_columns, key_sizes);
}
static void onNewKey(Key &, Arena &) {}
};
/// Generic method, use crypto hash function.
struct JoinKeyGetterHashed
{
using Key = UInt128;
JoinKeyGetterHashed(const ColumnRawPtrs &)
{
}
Key getKey(
const ColumnRawPtrs & key_columns,
size_t keys_size,
size_t i,
const Sizes &) const
{
return hash128(i, keys_size, key_columns);
}
static void onNewKey(Key &, Arena &) {}
};
/** Data structure for implementation of JOIN. /** Data structure for implementation of JOIN.
* It is just a hash table: keys -> rows of joined ("right") table. * It is just a hash table: keys -> rows of joined ("right") table.
* Additionally, CROSS JOIN is supported: instead of hash table, it use just set of blocks without keys. * Additionally, CROSS JOIN is supported: instead of hash table, it use just set of blocks without keys.

View File

@ -75,30 +75,22 @@ void NO_INLINE Set::insertFromBlockImplCase(
const ColumnRawPtrs & key_columns, const ColumnRawPtrs & key_columns,
size_t rows, size_t rows,
SetVariants & variants, SetVariants & variants,
ConstNullMapPtr null_map, [[maybe_unused]] ConstNullMapPtr null_map,
ColumnUInt8::Container * out_filter) [[maybe_unused]] ColumnUInt8::Container * out_filter)
{ {
typename Method::State state; typename Method::State state(key_columns, key_sizes, nullptr);
state.init(key_columns);
/// For all rows /// For all rows
for (size_t i = 0; i < rows; ++i) for (size_t i = 0; i < rows; ++i)
{ {
if (has_null_map && (*null_map)[i]) if constexpr (has_null_map)
if ((*null_map)[i])
continue; continue;
/// Obtain a key to insert to the set [[maybe_unused]] auto emplace_result = state.emplaceKey(method.data, i, variants.string_pool);
typename Method::Key key = state.getKey(key_columns, keys_size, i, key_sizes);
typename Method::Data::iterator it; if constexpr (build_filter)
bool inserted; (*out_filter)[i] = emplace_result.isInserted();
method.data.emplace(key, it, inserted);
if (inserted)
method.onNewKey(*it, keys_size, variants.string_pool);
if (build_filter)
(*out_filter)[i] = inserted;
} }
} }
@ -392,10 +384,10 @@ void NO_INLINE Set::executeImplCase(
size_t rows, size_t rows,
ConstNullMapPtr null_map) const ConstNullMapPtr null_map) const
{ {
typename Method::State state; Arena pool;
state.init(key_columns); typename Method::State state(key_columns, key_sizes, nullptr);
/// NOTE Optimization is not used for consecutive identical values. /// NOTE Optimization is not used for consecutive identical strings.
/// For all rows /// For all rows
for (size_t i = 0; i < rows; ++i) for (size_t i = 0; i < rows; ++i)
@ -404,9 +396,8 @@ void NO_INLINE Set::executeImplCase(
vec_res[i] = negative; vec_res[i] = negative;
else else
{ {
/// Build the key auto find_result = state.findKey(method.data, i, pool);
typename Method::Key key = state.getKey(key_columns, keys_size, i, key_sizes); vec_res[i] = negative ^ find_result.isFound();
vec_res[i] = negative ^ method.data.has(key);
} }
} }
} }

View File

@ -3,6 +3,7 @@
#include <Columns/ColumnNullable.h> #include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h> #include <Columns/ColumnString.h>
#include <Interpreters/AggregationCommon.h> #include <Interpreters/AggregationCommon.h>
#include <Common/ColumnsHashing.h>
#include <Common/Arena.h> #include <Common/Arena.h>
#include <Common/HashTable/HashSet.h> #include <Common/HashTable/HashSet.h>
@ -27,33 +28,7 @@ struct SetMethodOneNumber
Data data; Data data;
/// To use one `Method` in different threads, use different `State`. using State = ColumnsHashing::HashMethodOneNumber<typename Data::value_type, void, FieldType>;
struct State
{
const char * vec;
/** Called at the start of each block processing.
* Sets the variables required for the other methods called in inner loops.
*/
void init(const ColumnRawPtrs & key_columns)
{
vec = key_columns[0]->getRawData().data;
}
/// Get key from key columns for insertion into hash table.
Key getKey(
const ColumnRawPtrs & /*key_columns*/,
size_t /*keys_size*/, /// Number of key columns.
size_t i, /// From what row of the block I get the key.
const Sizes & /*key_sizes*/) const /// If keys of a fixed length - their lengths. Not used in methods for variable length keys.
{
return unalignedLoad<FieldType>(vec + i * sizeof(FieldType));
}
};
/** Place additional data, if necessary, in case a new key was inserted into the hash table.
*/
static void onNewKey(typename Data::value_type & /*value*/, size_t /*keys_size*/, Arena & /*pool*/) {}
}; };
/// For the case where there is one string key. /// For the case where there is one string key.
@ -65,36 +40,7 @@ struct SetMethodString
Data data; Data data;
struct State using State = ColumnsHashing::HashMethodString<typename Data::value_type, void, true, false>;
{
const IColumn::Offset * offsets;
const UInt8 * chars;
void init(const ColumnRawPtrs & key_columns)
{
const IColumn & column = *key_columns[0];
const ColumnString & column_string = static_cast<const ColumnString &>(column);
offsets = column_string.getOffsets().data();
chars = column_string.getChars().data();
}
Key getKey(
const ColumnRawPtrs &,
size_t,
ssize_t i,
const Sizes &) const
{
return StringRef(
chars + offsets[i - 1],
offsets[i] - offsets[i - 1] - 1);
}
};
static void onNewKey(typename Data::value_type & value, size_t, Arena & pool)
{
if (value.size)
value.data = pool.insert(value.data, value.size);
}
}; };
/// For the case when there is one fixed-length string key. /// For the case when there is one fixed-length string key.
@ -106,33 +52,7 @@ struct SetMethodFixedString
Data data; Data data;
struct State using State = ColumnsHashing::HashMethodFixedString<typename Data::value_type, void, true, false>;
{
size_t n;
const ColumnFixedString::Chars * chars;
void init(const ColumnRawPtrs & key_columns)
{
const IColumn & column = *key_columns[0];
const ColumnFixedString & column_string = static_cast<const ColumnFixedString &>(column);
n = column_string.getN();
chars = &column_string.getChars();
}
Key getKey(
const ColumnRawPtrs &,
size_t,
size_t i,
const Sizes &) const
{
return StringRef(&(*chars)[i * n], n);
}
};
static void onNewKey(typename Data::value_type & value, size_t, Arena & pool)
{
value.data = pool.insert(value.data, value.size);
}
}; };
namespace set_impl namespace set_impl
@ -242,34 +162,7 @@ struct SetMethodKeysFixed
Data data; Data data;
class State : private set_impl::BaseStateKeysFixed<Key, has_nullable_keys> using State = ColumnsHashing::HashMethodKeysFixed<typename Data::value_type, Key, void, has_nullable_keys, false>;
{
public:
using Base = set_impl::BaseStateKeysFixed<Key, has_nullable_keys>;
void init(const ColumnRawPtrs & key_columns)
{
if (has_nullable_keys)
Base::init(key_columns);
}
Key getKey(
const ColumnRawPtrs & key_columns,
size_t keys_size,
size_t i,
const Sizes & key_sizes) const
{
if (has_nullable_keys)
{
auto bitmap = Base::createBitmap(i);
return packFixed<Key>(i, keys_size, Base::getActualColumns(), key_sizes, bitmap);
}
else
return packFixed<Key>(i, keys_size, key_columns, key_sizes);
}
};
static void onNewKey(typename Data::value_type &, size_t, Arena &) {}
}; };
/// For other cases. 128 bit hash from the key. /// For other cases. 128 bit hash from the key.
@ -281,23 +174,7 @@ struct SetMethodHashed
Data data; Data data;
struct State using State = ColumnsHashing::HashMethodHashed<typename Data::value_type, void>;
{
void init(const ColumnRawPtrs &)
{
}
Key getKey(
const ColumnRawPtrs & key_columns,
size_t keys_size,
size_t i,
const Sizes &) const
{
return hash128(i, keys_size, key_columns);
}
};
static void onNewKey(typename Data::value_type &, size_t, Arena &) {}
}; };

View File

@ -107,11 +107,7 @@ void NO_INLINE Aggregator::executeSpecialized(
bool no_more_keys, bool no_more_keys,
AggregateDataPtr overflow_row) const AggregateDataPtr overflow_row) const
{ {
typename Method::State state; typename Method::State state(key_columns, key_sizes, aggregation_state_cache);
if constexpr (Method::low_cardinality_optimization)
state.init(key_columns, aggregation_state_cache);
else
state.init(key_columns);
if (!no_more_keys) if (!no_more_keys)
executeSpecializedCase<false, Method, AggregateFunctionsList>( executeSpecializedCase<false, Method, AggregateFunctionsList>(
@ -130,94 +126,48 @@ void NO_INLINE Aggregator::executeSpecializedCase(
typename Method::State & state, typename Method::State & state,
Arena * aggregates_pool, Arena * aggregates_pool,
size_t rows, size_t rows,
ColumnRawPtrs & key_columns, ColumnRawPtrs & /*key_columns*/,
AggregateColumns & aggregate_columns, AggregateColumns & aggregate_columns,
StringRefs & keys, StringRefs & /*keys*/,
AggregateDataPtr overflow_row) const AggregateDataPtr overflow_row) const
{ {
/// For all rows. /// For all rows.
typename Method::Key prev_key{};
AggregateDataPtr value = nullptr;
for (size_t i = 0; i < rows; ++i) for (size_t i = 0; i < rows; ++i)
{ {
bool inserted = false; /// Inserted a new key, or was this key already? AggregateDataPtr aggregate_data = nullptr;
/// Get the key to insert into the hash table.
typename Method::Key key;
if constexpr (!Method::low_cardinality_optimization)
key = state.getKey(key_columns, params.keys_size, i, key_sizes, keys, *aggregates_pool);
AggregateDataPtr * aggregate_data = nullptr;
typename Method::iterator it; /// Is not used if Method::low_cardinality_optimization
if (!no_more_keys) /// Insert. if (!no_more_keys) /// Insert.
{ {
/// Optimization for frequently repeating keys. auto emplace_result = state.emplaceKey(method.data, i, *aggregates_pool);
if (!Method::no_consecutive_keys_optimization)
{
if (i != 0 && key == prev_key)
{
/// Add values into aggregate functions.
AggregateFunctionsList::forEach(AggregateFunctionsUpdater(
aggregate_functions, offsets_of_aggregate_states, aggregate_columns, value, i, aggregates_pool));
method.onExistingKey(key, keys, *aggregates_pool); /// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key.
continue; if (emplace_result.isInserted())
{
/// exception-safety - if you can not allocate memory or create states, then destructors will not be called.
emplace_result.setMapped(nullptr);
aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
AggregateFunctionsList::forEach(AggregateFunctionsCreator(
aggregate_functions, offsets_of_aggregate_states, aggregate_data));
emplace_result.setMapped(aggregate_data);
} }
else else
prev_key = key; aggregate_data = emplace_result.getMapped();
}
if constexpr (Method::low_cardinality_optimization)
aggregate_data = state.emplaceKeyFromRow(method.data, i, inserted, params.keys_size, keys, *aggregates_pool);
else
{
method.data.emplace(key, it, inserted);
aggregate_data = &Method::getAggregateData(it->second);
}
} }
else else
{ {
/// Add only if the key already exists. /// Add only if the key already exists.
if constexpr (Method::low_cardinality_optimization) auto find_result = state.findKey(method.data, i, *aggregates_pool);
aggregate_data = state.findFromRow(method.data, i); if (find_result.isFound())
else aggregate_data = find_result.getMapped();
{
it = method.data.find(key);
if (method.data.end() != it)
aggregate_data = &Method::getAggregateData(it->second);
}
} }
/// If the key does not fit, and the data does not need to be aggregated in a separate row, then there's nothing to do. /// If the key does not fit, and the data does not need to be aggregated in a separate row, then there's nothing to do.
if (!aggregate_data && !overflow_row) if (!aggregate_data && !overflow_row)
{
method.onExistingKey(key, keys, *aggregates_pool);
continue; continue;
}
/// If a new key is inserted, initialize the states of the aggregate functions, and possibly some stuff related to the key. auto value = aggregate_data ? aggregate_data : overflow_row;
if (inserted)
{
*aggregate_data = nullptr;
if constexpr (!Method::low_cardinality_optimization)
method.onNewKey(*it, params.keys_size, keys, *aggregates_pool);
AggregateDataPtr place = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
AggregateFunctionsList::forEach(AggregateFunctionsCreator(
aggregate_functions, offsets_of_aggregate_states, place));
*aggregate_data = place;
if constexpr (Method::low_cardinality_optimization)
state.cacheAggregateData(i, place);
}
else
method.onExistingKey(key, keys, *aggregates_pool);
value = aggregate_data ? *aggregate_data : overflow_row;
/// Add values into the aggregate functions. /// Add values into the aggregate functions.
AggregateFunctionsList::forEach(AggregateFunctionsUpdater( AggregateFunctionsList::forEach(AggregateFunctionsUpdater(

View File

@ -0,0 +1,48 @@
<test>
<name>Benchmark</name>
<tags>
<tag>columns_hashing</tag>
</tags>
<preconditions>
<table_exists>hits_100m_single</table_exists>
<table_exists>hits_1000m_single</table_exists>
</preconditions>
<type>loop</type>
<stop_conditions>
<all_of>
<iterations>5</iterations>
<min_time_not_changing_for_ms>60000</min_time_not_changing_for_ms>
</all_of>
<any_of>
<iterations>10</iterations>
<total_time_ms>150000</total_time_ms>
</any_of>
</stop_conditions>
<!--
<query><![CDATA[select count() from hits_100m_single any left join hits_100m_single using (UserID, RegionID)]]></query>
<query><![CDATA[select count() from hits_100m_single any left join hits_100m_single using (UserID)]]></query>
<query><![CDATA[select count() from hits_100m_single any left join hits_100m_single using URL where URL != '']]></query>
<query><![CDATA[select count() from hits_1000m_single any left join hits_1000m_single using MobilePhoneModel where MobilePhoneModel != '']]></query>
<query><![CDATA[select count() from hits_100m_single any left join hits_100m_single using (MobilePhoneModel, UserID) where MobilePhoneModel != '']]></query>
<query><![CDATA[select count() from (select count() from hits_1000m_single group by (UserID))]]></query>
<query><![CDATA[select count() from (select count() from hits_100m_single group by (UserID, RegionID))]]></query>
<query><![CDATA[select count() from (select count() from hits_100m_single where URL != '' group by URL)]]></query>
<query><![CDATA[select count() from (select count() from hits_1000m_single where MobilePhoneModel != '' group by MobilePhoneModel)]]></query>
<query><![CDATA[select count() from (select count() from hits_1000m_single where MobilePhoneModel != '' group by (MobilePhoneModel, UserID))]]></query>
-->
<query><![CDATA[select sum(UserID + 1 in (select UserID from hits_100m_single)) from hits_100m_single]]></query>
<query><![CDATA[select sum((UserID + 1, RegionID) in (select UserID, RegionID from hits_100m_single)) from hits_100m_single]]></query>
<query><![CDATA[select sum(URL in (select URL from hits_100m where URL != '')) from hits_100m_single]]></query>
<query><![CDATA[select sum(MobilePhoneModel in (select MobilePhoneModel from hits_1000m where MobilePhoneModel != '')) from hits_1000m_single]]></query>
<query><![CDATA[select sum((MobilePhoneModel, UserID + 1) in (select MobilePhoneModel, UserID from hits_1000m where MobilePhoneModel != '')) from hits_1000m_single]]></query>
<main_metric>
<min_time/>
</main_metric>
</test>