mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-17 21:24:28 +00:00
Refactor ColumnsHashing
This commit is contained in:
parent
2c2932e185
commit
d143a78419
@ -19,27 +19,15 @@ namespace DB
|
||||
namespace ColumnsHashing
|
||||
{
|
||||
|
||||
/// Generic context for HashMethod. Context is shared between multiple threads, all methods must be thread-safe.
|
||||
/// Is used for caching.
|
||||
class HashMethodContext
|
||||
{
|
||||
public:
|
||||
virtual ~HashMethodContext() = default;
|
||||
|
||||
struct Settings
|
||||
{
|
||||
size_t max_threads;
|
||||
};
|
||||
};
|
||||
|
||||
using HashMethodContextPtr = std::shared_ptr<HashMethodContext>;
|
||||
|
||||
|
||||
/// For the case where there is one numeric key.
|
||||
template <typename Value, typename Mapped, typename FieldType, bool use_cache = true> /// UInt8/16/32/64 for any type with corresponding bit width.
|
||||
struct HashMethodOneNumber : public columns_hashing_impl::HashMethodBase<Value, Mapped, use_cache>
|
||||
/// UInt8/16/32/64 for any type with corresponding bit width.
|
||||
template <typename Value, typename Mapped, typename FieldType, bool use_cache = true>
|
||||
struct HashMethodOneNumber
|
||||
: public columns_hashing_impl::HashMethodBase<HashMethodOneNumber<Value, Mapped, FieldType, use_cache>, Value, Mapped, use_cache>
|
||||
{
|
||||
using Base = columns_hashing_impl::HashMethodBase<Value, Mapped, use_cache>;
|
||||
using Self = HashMethodOneNumber<Value, Mapped, FieldType, use_cache>;
|
||||
using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache>;
|
||||
|
||||
const char * vec;
|
||||
|
||||
/// If the keys of a fixed length then key_sizes contains their lengths, empty otherwise.
|
||||
@ -49,51 +37,38 @@ struct HashMethodOneNumber : public columns_hashing_impl::HashMethodBase<Value,
|
||||
}
|
||||
|
||||
/// Creates context. Method is called once and result context is used in all threads.
|
||||
static HashMethodContextPtr createContext(const HashMethodContext::Settings &) { return nullptr; }
|
||||
|
||||
FieldType getKey(size_t row) const { return unalignedLoad<FieldType>(vec + row * sizeof(FieldType)); }
|
||||
using Base::createContext; /// (const HashMethodContext::Settings &) -> HashMethodContextPtr
|
||||
|
||||
/// Emplace key into HashTable or HashMap. If Data is HashMap, returns ptr to value, otherwise nullptr.
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE typename Base::EmplaceResult emplaceKey(
|
||||
Data & data, /// HashTable
|
||||
size_t row, /// From which row of the block insert the key
|
||||
Arena & /*pool*/) /// For Serialized method, key may be placed in pool.
|
||||
{
|
||||
typename Data::iterator it;
|
||||
return Base::emplaceKeyImpl(getKey(row), data, it);
|
||||
}
|
||||
/// Data is a HashTable where to insert key from column's row.
|
||||
/// For Serialized method, key may be placed in pool.
|
||||
using Base::emplaceKey; /// (Data & data, size_t row, Arena & pool) -> EmplaceResult
|
||||
|
||||
/// Find key into HashTable or HashMap. If Data is HashMap and key was found, returns ptr to value, otherwise nullptr.
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE typename Base::FindResult findKey(Data & data, size_t row, Arena & /*pool*/)
|
||||
{
|
||||
return Base::findKeyImpl(getKey(row), data);
|
||||
}
|
||||
using Base::findKey; /// (Data & data, size_t row, Arena & pool) -> FindResult
|
||||
|
||||
/// Get hash value of row.
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE size_t getHash(const Data & data, size_t row, Arena & /*pool*/)
|
||||
{
|
||||
return data.hash(getKey(row));
|
||||
}
|
||||
using Base::getHash; /// (const Data & data, size_t row, Arena & pool) -> size_t
|
||||
|
||||
/// Is used for default implementation in HashMethodBase.
|
||||
FieldType getKey(size_t row, Arena &) const { return unalignedLoad<FieldType>(vec + row * sizeof(FieldType)); }
|
||||
|
||||
/// Get StringRef from value which can be inserted into column.
|
||||
static StringRef getValueRef(const Value & value)
|
||||
{
|
||||
return StringRef(reinterpret_cast<const char *>(&value.first), sizeof(value.first));
|
||||
}
|
||||
|
||||
protected:
|
||||
static ALWAYS_INLINE void onNewKey(Value & /*value*/, Arena & /*pool*/) {}
|
||||
};
|
||||
|
||||
|
||||
/// For the case where there is one string key.
|
||||
template <typename Value, typename Mapped, bool use_cache = true>
|
||||
struct HashMethodString : public columns_hashing_impl::HashMethodBase<Value, Mapped, use_cache>
|
||||
struct HashMethodString
|
||||
: public columns_hashing_impl::HashMethodBase<HashMethodString<Value, Mapped, use_cache>, Value, Mapped, use_cache>
|
||||
{
|
||||
using Base = columns_hashing_impl::HashMethodBase<Value, Mapped, use_cache>;
|
||||
using Self = HashMethodString<Value, Mapped, use_cache>;
|
||||
using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache>;
|
||||
|
||||
const IColumn::Offset * offsets;
|
||||
const UInt8 * chars;
|
||||
|
||||
@ -105,55 +80,32 @@ struct HashMethodString : public columns_hashing_impl::HashMethodBase<Value, Map
|
||||
chars = column_string.getChars().data();
|
||||
}
|
||||
|
||||
static HashMethodContextPtr createContext(const HashMethodContext::Settings &) { return nullptr; }
|
||||
|
||||
StringRef getKey(ssize_t row) const { return StringRef(chars + offsets[row - 1], offsets[row] - offsets[row - 1] - 1); }
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE typename Base::EmplaceResult emplaceKey(Data & data, size_t row, Arena & pool)
|
||||
auto getKey(ssize_t row, Arena &) const
|
||||
{
|
||||
auto key = getKey(row);
|
||||
typename Data::iterator it;
|
||||
auto result = Base::emplaceKeyImpl(key, data, it);
|
||||
if (result.isInserted())
|
||||
{
|
||||
if (key.size)
|
||||
it->first.data = pool.insert(key.data, key.size);
|
||||
}
|
||||
return result;
|
||||
return StringRef(chars + offsets[row - 1], offsets[row] - offsets[row - 1] - 1);
|
||||
}
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE typename Base::FindResult findKey(Data & data, size_t row, Arena & /*pool*/)
|
||||
{
|
||||
return Base::findKeyImpl(getKey(row), data);
|
||||
}
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE size_t getHash(const Data & data, size_t row, Arena & /*pool*/)
|
||||
{
|
||||
return data.hash(getKey(row));
|
||||
}
|
||||
|
||||
static StringRef getValueRef(const Value & value)
|
||||
{
|
||||
return StringRef(value.first.data, value.first.size);
|
||||
}
|
||||
static StringRef getValueRef(const Value & value) { return StringRef(value.first.data, value.first.size); }
|
||||
|
||||
protected:
|
||||
static ALWAYS_INLINE void onNewKey(Value & value, Arena & pool)
|
||||
friend class columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache>;
|
||||
|
||||
static ALWAYS_INLINE void onNewKey(StringRef & key, Arena & pool)
|
||||
{
|
||||
if (value.first.size)
|
||||
value.first.data = pool.insert(value.first.data, value.first.size);
|
||||
if (key.size)
|
||||
key.data = pool.insert(key.data, key.size);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
/// For the case where there is one fixed-length string key.
|
||||
template <typename Value, typename Mapped, bool use_cache = true>
|
||||
struct HashMethodFixedString : public columns_hashing_impl::HashMethodBase<Value, Mapped, use_cache>
|
||||
struct HashMethodFixedString
|
||||
: public columns_hashing_impl::HashMethodBase<HashMethodFixedString<Value, Mapped, use_cache>, Value, Mapped, use_cache>
|
||||
{
|
||||
using Base = columns_hashing_impl::HashMethodBase<Value, Mapped, use_cache>;
|
||||
using Self = HashMethodFixedString<Value, Mapped, use_cache>;
|
||||
using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache>;
|
||||
|
||||
size_t n;
|
||||
const ColumnFixedString::Chars * chars;
|
||||
|
||||
@ -165,44 +117,13 @@ struct HashMethodFixedString : public columns_hashing_impl::HashMethodBase<Value
|
||||
chars = &column_string.getChars();
|
||||
}
|
||||
|
||||
static HashMethodContextPtr createContext(const HashMethodContext::Settings &) { return nullptr; }
|
||||
StringRef getKey(size_t row, Arena &) const { return StringRef(&(*chars)[row * n], n); }
|
||||
|
||||
StringRef getKey(size_t row) const { return StringRef(&(*chars)[row * n], n); }
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE typename Base::EmplaceResult emplaceKey(Data & data, size_t row, Arena & pool)
|
||||
{
|
||||
auto key = getKey(row);
|
||||
typename Data::iterator it;
|
||||
auto res = Base::emplaceKeyImpl(key, data, it);
|
||||
if (res.isInserted())
|
||||
it->first.data = pool.insert(key.data, key.size);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE typename Base::FindResult findKey(Data & data, size_t row, Arena & /*pool*/)
|
||||
{
|
||||
return Base::findKeyImpl(getKey(row), data);
|
||||
}
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE size_t getHash(const Data & data, size_t row, Arena & /*pool*/)
|
||||
{
|
||||
return data.hash(getKey(row));
|
||||
}
|
||||
|
||||
static StringRef getValueRef(const Value & value)
|
||||
{
|
||||
return StringRef(value.first.data, value.first.size);
|
||||
}
|
||||
static StringRef getValueRef(const Value & value) { return StringRef(value.first.data, value.first.size); }
|
||||
|
||||
protected:
|
||||
static ALWAYS_INLINE void onNewKey(Value & value, Arena & pool)
|
||||
{
|
||||
value.first.data = pool.insert(value.first.data, value.first.size);
|
||||
}
|
||||
friend class columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache>;
|
||||
static ALWAYS_INLINE void onNewKey(StringRef & key, Arena & pool) { key.data = pool.insert(key.data, key.size); }
|
||||
};
|
||||
|
||||
|
||||
@ -383,9 +304,9 @@ struct HashMethodSingleLowCardinalityColumn : public SingleColumnMethod
|
||||
}
|
||||
|
||||
/// Get the key from the key columns for insertion into the hash table.
|
||||
ALWAYS_INLINE auto getKey(size_t row) const
|
||||
ALWAYS_INLINE auto getKey(size_t row, Arena & pool) const
|
||||
{
|
||||
return Base::getKey(getIndexAt(row));
|
||||
return Base::getKey(getIndexAt(row), pool);
|
||||
}
|
||||
|
||||
template <typename Data>
|
||||
@ -413,7 +334,7 @@ struct HashMethodSingleLowCardinalityColumn : public SingleColumnMethod
|
||||
return EmplaceResult(false);
|
||||
}
|
||||
|
||||
auto key = getKey(row_);
|
||||
auto key = getKey(row_, pool);
|
||||
|
||||
bool inserted = false;
|
||||
typename Data::iterator it;
|
||||
@ -425,7 +346,12 @@ struct HashMethodSingleLowCardinalityColumn : public SingleColumnMethod
|
||||
visit_cache[row] = VisitValue::Found;
|
||||
|
||||
if (inserted)
|
||||
Base::onNewKey(*it, pool);
|
||||
{
|
||||
if constexpr (has_mapped)
|
||||
Base::onNewKey(it->first, pool);
|
||||
else
|
||||
Base::onNewKey(*it, pool);
|
||||
}
|
||||
|
||||
if constexpr (has_mapped)
|
||||
return EmplaceResult(it->second, mapped_cache[row], inserted);
|
||||
@ -442,7 +368,7 @@ struct HashMethodSingleLowCardinalityColumn : public SingleColumnMethod
|
||||
}
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE FindResult findFromRow(Data & data, size_t row_, Arena &)
|
||||
ALWAYS_INLINE FindResult findFromRow(Data & data, size_t row_, Arena & pool)
|
||||
{
|
||||
size_t row = getIndexAt(row_);
|
||||
|
||||
@ -462,7 +388,7 @@ struct HashMethodSingleLowCardinalityColumn : public SingleColumnMethod
|
||||
return FindResult(visit_cache[row] == VisitValue::Found);
|
||||
}
|
||||
|
||||
auto key = getKey(row_);
|
||||
auto key = getKey(row_, pool);
|
||||
|
||||
typename Data::iterator it;
|
||||
if (saved_hash)
|
||||
@ -513,8 +439,12 @@ struct LowCardinalityKeys<false> {};
|
||||
template <typename Value, typename Key, typename Mapped, bool has_nullable_keys_ = false, bool has_low_cardinality_ = false, bool use_cache = true>
|
||||
struct HashMethodKeysFixed
|
||||
: private columns_hashing_impl::BaseStateKeysFixed<Key, has_nullable_keys_>
|
||||
, public columns_hashing_impl::HashMethodBase<Value, Mapped, use_cache>
|
||||
, public columns_hashing_impl::HashMethodBase<HashMethodKeysFixed<Value, Key, Mapped, has_nullable_keys_, has_low_cardinality_, use_cache>, Value, Mapped, use_cache>
|
||||
{
|
||||
using Self = HashMethodKeysFixed<Value, Key, Mapped, has_nullable_keys_, has_low_cardinality_, use_cache>;
|
||||
using BaseHashed = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache>;
|
||||
using Base = columns_hashing_impl::BaseStateKeysFixed<Key, has_nullable_keys_>;
|
||||
|
||||
static constexpr bool has_nullable_keys = has_nullable_keys_;
|
||||
static constexpr bool has_low_cardinality = has_low_cardinality_;
|
||||
|
||||
@ -522,9 +452,6 @@ struct HashMethodKeysFixed
|
||||
Sizes key_sizes;
|
||||
size_t keys_size;
|
||||
|
||||
using Base = columns_hashing_impl::BaseStateKeysFixed<Key, has_nullable_keys>;
|
||||
using BaseHashed = columns_hashing_impl::HashMethodBase<Value, Mapped, use_cache>;
|
||||
|
||||
HashMethodKeysFixed(const ColumnRawPtrs & key_columns, const Sizes & key_sizes, const HashMethodContextPtr &)
|
||||
: key_sizes(std::move(key_sizes)), keys_size(key_columns.size())
|
||||
{
|
||||
@ -549,9 +476,7 @@ struct HashMethodKeysFixed
|
||||
Base::init(key_columns);
|
||||
}
|
||||
|
||||
static HashMethodContextPtr createContext(const HashMethodContext::Settings &) { return nullptr; }
|
||||
|
||||
ALWAYS_INLINE Key getKey(size_t row) const
|
||||
ALWAYS_INLINE Key getKey(size_t row, Arena &) const
|
||||
{
|
||||
if (has_nullable_keys)
|
||||
{
|
||||
@ -567,25 +492,6 @@ struct HashMethodKeysFixed
|
||||
return packFixed<Key>(row, keys_size, Base::getActualColumns(), key_sizes);
|
||||
}
|
||||
}
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE typename BaseHashed::EmplaceResult emplaceKey(Data & data, size_t row, Arena & /*pool*/)
|
||||
{
|
||||
typename Data::iterator it;
|
||||
return BaseHashed::emplaceKeyImpl(getKey(row), data, it);
|
||||
}
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE typename BaseHashed::FindResult findKey(Data & data, size_t row, Arena & /*pool*/)
|
||||
{
|
||||
return BaseHashed::findKeyImpl(getKey(row), data);
|
||||
}
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE size_t getHash(const Data & data, size_t row, Arena & /*pool*/)
|
||||
{
|
||||
return data.hash(getKey(row));
|
||||
}
|
||||
};
|
||||
|
||||
/** Hash by concatenating serialized key values.
|
||||
@ -593,91 +499,45 @@ struct HashMethodKeysFixed
|
||||
* That is, for example, for strings, it contains first the serialized length of the string, and then the bytes.
|
||||
* Therefore, when aggregating by several strings, there is no ambiguity.
|
||||
*/
|
||||
template <typename Value, typename Mapped, bool use_cache = true>
|
||||
struct HashMethodSerialized : public columns_hashing_impl::HashMethodBase<Value, Mapped, use_cache>
|
||||
template <typename Value, typename Mapped>
|
||||
struct HashMethodSerialized
|
||||
: public columns_hashing_impl::HashMethodBase<HashMethodSerialized<Value, Mapped>, Value, Mapped, false>
|
||||
{
|
||||
using Base = columns_hashing_impl::HashMethodBase<Value, Mapped, use_cache>;
|
||||
using Self = HashMethodSerialized<Value, Mapped>;
|
||||
using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, false>;
|
||||
|
||||
ColumnRawPtrs key_columns;
|
||||
size_t keys_size;
|
||||
|
||||
HashMethodSerialized(const ColumnRawPtrs & key_columns, const Sizes & /*key_sizes*/, const HashMethodContextPtr &)
|
||||
: key_columns(key_columns), keys_size(key_columns.size()) {}
|
||||
|
||||
static HashMethodContextPtr createContext(const HashMethodContext::Settings &) { return nullptr; }
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE typename Base::EmplaceResult emplaceKey(Data & data, size_t row, Arena & pool)
|
||||
{
|
||||
auto key = getKey(row, pool);
|
||||
typename Data::iterator it;
|
||||
auto res = Base::emplaceKeyImpl(key, data, it);
|
||||
if (!res.isInserted())
|
||||
pool.rollback(key.size);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE typename Base::FindResult findKey(Data & data, size_t row, Arena & pool)
|
||||
{
|
||||
auto key = getKey(row, pool);
|
||||
auto res = Base::findKeyImpl(key, data);
|
||||
pool.rollback(key.size);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE size_t getHash(const Data & data, size_t row, Arena & pool)
|
||||
{
|
||||
auto key = getKey(row, pool);
|
||||
auto hash = data.hash(key);
|
||||
pool.rollback(key.size);
|
||||
|
||||
return hash;
|
||||
}
|
||||
|
||||
protected:
|
||||
friend class columns_hashing_impl::HashMethodBase<Self, Value, Mapped, false>;
|
||||
|
||||
ALWAYS_INLINE StringRef getKey(size_t row, Arena & pool) const
|
||||
{
|
||||
return serializeKeysToPoolContiguous(row, keys_size, key_columns, pool);
|
||||
}
|
||||
|
||||
static ALWAYS_INLINE void onExistingKey(StringRef & key, Arena & pool) { pool.rollback(key.size); }
|
||||
};
|
||||
|
||||
/// For the case where there is one string key.
|
||||
template <typename Value, typename Mapped, bool use_cache = true>
|
||||
struct HashMethodHashed : public columns_hashing_impl::HashMethodBase<Value, Mapped, use_cache>
|
||||
struct HashMethodHashed
|
||||
: public columns_hashing_impl::HashMethodBase<HashMethodHashed<Value, Mapped, use_cache>, Value, Mapped, use_cache>
|
||||
{
|
||||
using Base = columns_hashing_impl::HashMethodBase<Value, Mapped, use_cache>;
|
||||
using Key = UInt128;
|
||||
using Self = HashMethodHashed<Value, Mapped, use_cache>;
|
||||
using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache>;
|
||||
|
||||
ColumnRawPtrs key_columns;
|
||||
|
||||
HashMethodHashed(ColumnRawPtrs key_columns, const Sizes &, const HashMethodContextPtr &)
|
||||
: key_columns(std::move(key_columns)) {}
|
||||
|
||||
static HashMethodContextPtr createContext(const HashMethodContext::Settings &) { return nullptr; }
|
||||
|
||||
UInt128 getKey(size_t row) const { return hash128(row, key_columns.size(), key_columns); }
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE typename Base::EmplaceResult emplaceKey(Data & data, size_t row, Arena & /*pool*/)
|
||||
{
|
||||
typename Data::iterator it;
|
||||
return Base::emplaceKeyImpl(getKey(row), data, it);
|
||||
}
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE typename Base::FindResult findKey(Data & data, size_t row, Arena & /*pool*/)
|
||||
{
|
||||
return Base::findKeyImpl(getKey(row), data);
|
||||
}
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE size_t getHash(const Data & data, size_t row, Arena & /*pool*/)
|
||||
{
|
||||
return data.hash(getKey(row));
|
||||
}
|
||||
UInt128 getKey(size_t row, Arena &) const { return hash128(row, key_columns.size(), key_columns); }
|
||||
|
||||
static StringRef getValueRef(const Value & value)
|
||||
{
|
||||
|
@ -9,6 +9,22 @@ namespace DB
|
||||
namespace ColumnsHashing
|
||||
{
|
||||
|
||||
/// Generic context for HashMethod. Context is shared between multiple threads, all methods must be thread-safe.
|
||||
/// Is used for caching.
|
||||
class HashMethodContext
|
||||
{
|
||||
public:
|
||||
virtual ~HashMethodContext() = default;
|
||||
|
||||
struct Settings
|
||||
{
|
||||
size_t max_threads;
|
||||
};
|
||||
};
|
||||
|
||||
using HashMethodContextPtr = std::shared_ptr<HashMethodContext>;
|
||||
|
||||
|
||||
namespace columns_hashing_impl
|
||||
{
|
||||
|
||||
@ -80,14 +96,41 @@ public:
|
||||
bool isFound() const { return found; }
|
||||
};
|
||||
|
||||
template <typename Value, typename Mapped, bool consecutive_keys_optimization>
|
||||
struct HashMethodBase
|
||||
template <typename Derived, typename Value, typename Mapped, bool consecutive_keys_optimization>
|
||||
class HashMethodBase
|
||||
{
|
||||
public:
|
||||
using EmplaceResult = EmplaceResultImpl<Mapped>;
|
||||
using FindResult = FindResultImpl<Mapped>;
|
||||
static constexpr bool has_mapped = !std::is_same<Mapped, void>::value;
|
||||
using Cache = LastElementCache<Value, consecutive_keys_optimization>;
|
||||
|
||||
static HashMethodContextPtr createContext(const HashMethodContext::Settings &) { return nullptr; }
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE EmplaceResult emplaceKey(Data & data, size_t row, Arena & pool)
|
||||
{
|
||||
return emplaceKeyImpl(static_cast<Derived &>(*this).getKey(row, pool), data, pool);
|
||||
}
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE FindResult findKey(Data & data, size_t row, Arena & pool)
|
||||
{
|
||||
auto key = static_cast<Derived &>(*this).getKey(row, pool);
|
||||
auto res = findKeyImpl(key, data);
|
||||
static_cast<Derived &>(*this).onExistingKey(key, pool);
|
||||
return res;
|
||||
}
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE size_t getHash(const Data & data, size_t row, Arena & pool)
|
||||
{
|
||||
auto key = static_cast<Derived &>(*this).getKey(row, pool);
|
||||
auto res = data.hash(key);
|
||||
static_cast<Derived &>(*this).onExistingKey(key, pool);
|
||||
return res;
|
||||
}
|
||||
|
||||
protected:
|
||||
Cache cache;
|
||||
|
||||
@ -102,13 +145,20 @@ protected:
|
||||
}
|
||||
}
|
||||
|
||||
template <typename Key>
|
||||
static ALWAYS_INLINE void onNewKey(Key & /*key*/, Arena & /*pool*/) {}
|
||||
template <typename Key>
|
||||
static ALWAYS_INLINE void onExistingKey(Key & /*key*/, Arena & /*pool*/) {}
|
||||
|
||||
template <typename Data, typename Key>
|
||||
ALWAYS_INLINE EmplaceResult emplaceKeyImpl(Key key, Data & data, typename Data::iterator & it)
|
||||
ALWAYS_INLINE EmplaceResult emplaceKeyImpl(Key key, Data & data, Arena & pool)
|
||||
{
|
||||
if constexpr (Cache::consecutive_keys_optimization)
|
||||
{
|
||||
if (cache.found && cache.check(key))
|
||||
{
|
||||
static_cast<Derived &>(*this).onExistingKey(key, pool);
|
||||
|
||||
if constexpr (has_mapped)
|
||||
return EmplaceResult(cache.value.second, cache.value.second, false);
|
||||
else
|
||||
@ -116,9 +166,23 @@ protected:
|
||||
}
|
||||
}
|
||||
|
||||
typename Data::iterator it;
|
||||
bool inserted = false;
|
||||
data.emplace(key, it, inserted);
|
||||
Mapped * cached = &it->second;
|
||||
|
||||
Mapped * cached = nullptr;
|
||||
if (has_mapped)
|
||||
cached = &it->second;
|
||||
|
||||
if (inserted)
|
||||
{
|
||||
if constexpr (has_mapped)
|
||||
static_cast<Derived &>(*this).onNewKey(it->first, pool);
|
||||
else
|
||||
static_cast<Derived &>(*this).onNewKey(*it, pool);
|
||||
}
|
||||
else
|
||||
static_cast<Derived &>(*this).onExistingKey(key, pool);
|
||||
|
||||
if constexpr (consecutive_keys_optimization)
|
||||
{
|
||||
|
@ -107,11 +107,7 @@ void NO_INLINE Aggregator::executeSpecialized(
|
||||
bool no_more_keys,
|
||||
AggregateDataPtr overflow_row) const
|
||||
{
|
||||
typename Method::State state;
|
||||
if constexpr (Method::low_cardinality_optimization)
|
||||
state.init(key_columns, aggregation_state_cache);
|
||||
else
|
||||
state.init(key_columns);
|
||||
typename Method::State state(key_columns, key_sizes, aggregation_state_cache);
|
||||
|
||||
if (!no_more_keys)
|
||||
executeSpecializedCase<false, Method, AggregateFunctionsList>(
|
||||
@ -130,94 +126,48 @@ void NO_INLINE Aggregator::executeSpecializedCase(
|
||||
typename Method::State & state,
|
||||
Arena * aggregates_pool,
|
||||
size_t rows,
|
||||
ColumnRawPtrs & key_columns,
|
||||
ColumnRawPtrs & /*key_columns*/,
|
||||
AggregateColumns & aggregate_columns,
|
||||
StringRefs & keys,
|
||||
StringRefs & /*keys*/,
|
||||
AggregateDataPtr overflow_row) const
|
||||
{
|
||||
/// For all rows.
|
||||
typename Method::Key prev_key{};
|
||||
AggregateDataPtr value = nullptr;
|
||||
for (size_t i = 0; i < rows; ++i)
|
||||
{
|
||||
bool inserted = false; /// Inserted a new key, or was this key already?
|
||||
|
||||
/// Get the key to insert into the hash table.
|
||||
typename Method::Key key;
|
||||
if constexpr (!Method::low_cardinality_optimization)
|
||||
key = state.getKey(key_columns, params.keys_size, i, key_sizes, keys, *aggregates_pool);
|
||||
|
||||
AggregateDataPtr * aggregate_data = nullptr;
|
||||
typename Method::iterator it; /// Is not used if Method::low_cardinality_optimization
|
||||
AggregateDataPtr aggregate_data = nullptr;
|
||||
|
||||
if (!no_more_keys) /// Insert.
|
||||
{
|
||||
/// Optimization for frequently repeating keys.
|
||||
if (!Method::no_consecutive_keys_optimization)
|
||||
auto emplace_result = state.emplaceKey(method.data, i, *aggregates_pool);
|
||||
|
||||
/// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key.
|
||||
if (emplace_result.isInserted())
|
||||
{
|
||||
if (i != 0 && key == prev_key)
|
||||
{
|
||||
/// Add values into aggregate functions.
|
||||
AggregateFunctionsList::forEach(AggregateFunctionsUpdater(
|
||||
aggregate_functions, offsets_of_aggregate_states, aggregate_columns, value, i, aggregates_pool));
|
||||
/// exception-safety - if you can not allocate memory or create states, then destructors will not be called.
|
||||
emplace_result.setMapped(nullptr);
|
||||
|
||||
method.onExistingKey(key, keys, *aggregates_pool);
|
||||
continue;
|
||||
}
|
||||
else
|
||||
prev_key = key;
|
||||
aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
|
||||
AggregateFunctionsList::forEach(AggregateFunctionsCreator(
|
||||
aggregate_functions, offsets_of_aggregate_states, aggregate_data));
|
||||
|
||||
emplace_result.setMapped(aggregate_data);
|
||||
}
|
||||
|
||||
if constexpr (Method::low_cardinality_optimization)
|
||||
aggregate_data = state.emplaceKeyFromRow(method.data, i, inserted, params.keys_size, keys, *aggregates_pool);
|
||||
else
|
||||
{
|
||||
method.data.emplace(key, it, inserted);
|
||||
aggregate_data = &Method::getAggregateData(it->second);
|
||||
}
|
||||
aggregate_data = emplace_result.getMapped();
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Add only if the key already exists.
|
||||
if constexpr (Method::low_cardinality_optimization)
|
||||
aggregate_data = state.findFromRow(method.data, i);
|
||||
else
|
||||
{
|
||||
it = method.data.find(key);
|
||||
if (method.data.end() != it)
|
||||
aggregate_data = &Method::getAggregateData(it->second);
|
||||
}
|
||||
auto find_result = state.findKey(method.data, i, *aggregates_pool);
|
||||
if (find_result.isFound())
|
||||
aggregate_data = find_result.getMapped();
|
||||
}
|
||||
|
||||
/// If the key does not fit, and the data does not need to be aggregated in a separate row, then there's nothing to do.
|
||||
if (!aggregate_data && !overflow_row)
|
||||
{
|
||||
method.onExistingKey(key, keys, *aggregates_pool);
|
||||
continue;
|
||||
}
|
||||
|
||||
/// If a new key is inserted, initialize the states of the aggregate functions, and possibly some stuff related to the key.
|
||||
if (inserted)
|
||||
{
|
||||
*aggregate_data = nullptr;
|
||||
|
||||
if constexpr (!Method::low_cardinality_optimization)
|
||||
method.onNewKey(*it, params.keys_size, keys, *aggregates_pool);
|
||||
|
||||
AggregateDataPtr place = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
|
||||
|
||||
AggregateFunctionsList::forEach(AggregateFunctionsCreator(
|
||||
aggregate_functions, offsets_of_aggregate_states, place));
|
||||
|
||||
*aggregate_data = place;
|
||||
|
||||
if constexpr (Method::low_cardinality_optimization)
|
||||
state.cacheAggregateData(i, place);
|
||||
}
|
||||
else
|
||||
method.onExistingKey(key, keys, *aggregates_pool);
|
||||
|
||||
value = aggregate_data ? *aggregate_data : overflow_row;
|
||||
auto value = aggregate_data ? aggregate_data : overflow_row;
|
||||
|
||||
/// Add values into the aggregate functions.
|
||||
AggregateFunctionsList::forEach(AggregateFunctionsUpdater(
|
||||
|
Loading…
Reference in New Issue
Block a user