diff --git a/dbms/src/Common/ColumnsHashing.h b/dbms/src/Common/ColumnsHashing.h new file mode 100644 index 00000000000..0a6d5464341 --- /dev/null +++ b/dbms/src/Common/ColumnsHashing.h @@ -0,0 +1,881 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ColumnsHashing +{ + +/// Generic context for HashMethod. Context is shared between multiple threads, all methods must be thread-safe. +/// Is used for caching. +class HashMethodContext +{ +public: + virtual ~HashMethodContext() = default; + + struct Settings + { + size_t max_threads; + }; +}; + +using HashMethodContextPtr = std::shared_ptr; + + +template +struct MappedTraits +{ + using Type = void *; + static Type getMapped(T &) { return nullptr; } + static T & getKey(T & key) { return key; } +}; + +template +struct MappedTraits> +{ + using Type = Second *; + static Type getMapped(PairNoInit & value) { return &value.second; } + static First & getKey(PairNoInit & value) { return value.first; } +}; + +template +struct HashTableTraits +{ + using Value = typename Data::value_type; + using Mapped = typename MappedTraits::Type; + + static Mapped getMapped(Value & value) { return MappedTraits::getMapped(value); } + static auto & getKey(Value & value) { return MappedTraits::getKey(value); } +}; + +template +struct LastElementCache +{ + static constexpr bool consecutive_keys_optimization = consecutive_keys_optimization_; + using Value = typename HashTableTraits::Value; + Value value; + bool empty = true; + bool found = false; + + auto getMapped() { return HashTableTraits::getMapped(value); } + auto & getKey() { return HashTableTraits::getKey(value); } +}; + +template +struct LastElementCache +{ + static constexpr bool consecutive_keys_optimization = false; +}; + +template +inline ALWAYS_INLINE typename HashTableTraits::Value & emplaceKeyImpl( + Key key, Data & data, bool & inserted, Cache & cache [[maybe_unused]]) +{ + if constexpr (Cache::consecutive_keys_optimization) + { + if (!cache.empty && cache.found && cache.getKey() == key) + { + inserted = false; + return cache.value; + } + } + + typename Data::iterator it; + data.emplace(key, it, inserted); + auto & value = *it; + + if constexpr (Cache::consecutive_keys_optimization) + { + cache.value = value; + cache.empty = false; + cache.found = true; + } + + return value; +} + +template +inline ALWAYS_INLINE typename HashTableTraits::Mapped findKeyImpl( + Key key, Data & data, bool & found, Cache & cache [[maybe_unused]]) +{ + if constexpr (Cache::consecutive_keys_optimization) + { + if (!cache.empty && cache.getKey() == key) + { + found = cache.found; + return found ? cache.getMapped() : nullptr; + } + } + + auto it = data.find(key); + + found = it != data.end(); + auto mapped = found ? HashTableTraits::getMapped(*it) + : nullptr; + + if constexpr (Cache::consecutive_keys_optimization) + { + if (found) + cache.value = *it; + else + cache.getKey() = key; + + cache.empty = false; + cache.found = found; + } + + return mapped; +} + + +/// For the case where there is one numeric key. +template /// UInt8/16/32/64 for any type with corresponding bit width. +struct HashMethodOneNumber +{ + const char * vec; + LastElementCache last_elem_cache; + + /// If the keys of a fixed length then key_sizes contains their lengths, empty otherwise. + HashMethodOneNumber(const ColumnRawPtrs & key_columns, const Sizes & /*key_sizes*/, const HashMethodContextPtr &) + { + vec = key_columns[0]->getRawData().data; + } + + /// Creates context. Method is called once and result context is used in all threads. + static HashMethodContextPtr createContext(const HashMethodContext::Settings &) { return nullptr; } + + FieldType getKey(size_t row) const { return unalignedLoad(vec + row * sizeof(FieldType)); } + + /// Emplace key into HashTable or HashMap. If Data is HashMap, returns ptr to value, otherwise nullptr. + template + ALWAYS_INLINE typename HashTableTraits::Mapped emplaceKey( + Data & data, /// HashTable + size_t row, /// From which row of the block insert the key + bool & inserted, + Arena & /*pool*/) /// For Serialized method, key may be placed in pool. + { + return HashTableTraits::getMapped(emplaceKeyImpl(getKey(row), data, inserted, last_elem_cache)); + } + + /// Find key into HashTable or HashMap. If Data is HashMap and key was found, returns ptr to value, otherwise nullptr. + template + ALWAYS_INLINE typename HashTableTraits::Mapped findKey(Data & data, size_t row, bool & found, Arena & /*pool*/) + { + return findKeyImpl(getKey(row), data, found, last_elem_cache); + } + + /// Insert the key from the hash table into columns. + template + static void insertKeyIntoColumns(const Value & value, MutableColumns & key_columns, const Sizes & /*key_sizes*/) + { + static_cast(key_columns[0].get())->insertRawData(reinterpret_cast(&value.first)); + } + + /// Get hash value of row. + template + ALWAYS_INLINE size_t getHash(const Data & data, size_t row, Arena & /*pool*/) + { + return data.hash(getKey(row)); + } + + /// Get StringRef from value which can be inserted into column. + template + static StringRef getValueRef(const Value & value) + { + return StringRef(reinterpret_cast(&value.first), sizeof(value.first)); + } + + /// Cache last result if key was inserted. + template + ALWAYS_INLINE void cacheData(size_t /*row*/, Mapped mapped) + { + *last_elem_cache.getMapped() = mapped; + } + +protected: + template + static ALWAYS_INLINE void onNewKey(Value & /*value*/, Arena & /*pool*/) {} +}; + + +/// For the case where there is one string key. +template +struct HashMethodString +{ + const IColumn::Offset * offsets; + const UInt8 * chars; + + LastElementCache last_elem_cache; + + HashMethodString(const ColumnRawPtrs & key_columns, const Sizes & /*key_sizes*/, const HashMethodContextPtr &) + { + const IColumn & column = *key_columns[0]; + const ColumnString & column_string = static_cast(column); + offsets = column_string.getOffsets().data(); + chars = column_string.getChars().data(); + } + + static HashMethodContextPtr createContext(const HashMethodContext::Settings &) { return nullptr; } + + StringRef getKey(size_t row) const { return StringRef(chars + offsets[row - 1], offsets[row] - offsets[row - 1] - 1); } + + template + ALWAYS_INLINE typename HashTableTraits::Mapped emplaceKey(Data & data, size_t row, bool & inserted, Arena & pool) + { + auto & value = emplaceKeyImpl(getKey(row), data, inserted, last_elem_cache); + if (inserted) + { + auto & key = HashTableTraits::getKey(value); + if (key.size) + key.data = pool.insert(key.data, key.size); + } + return HashTableTraits::getMapped(value); + } + + template + ALWAYS_INLINE typename HashTableTraits::Mapped findKey(Data & data, size_t row, bool & found, Arena & /*pool*/) + { + return findKeyImpl(getKey(row), data, found, last_elem_cache); + } + + template + static void insertKeyIntoColumns(const Value & value, MutableColumns & key_columns, const Sizes & /*key_sizes*/) + { + key_columns[0]->insertData(value.first.data, value.first.size); + } + + template + ALWAYS_INLINE size_t getHash(const Data & data, size_t row, Arena & /*pool*/) + { + return data.hash(getKey(row)); + } + + template + static StringRef getValueRef(const Value & value) + { + return StringRef(value.first.data, value.first.size); + } + + template + ALWAYS_INLINE void cacheData(size_t /*row*/, Mapped mapped) + { + *last_elem_cache.getMapped() = mapped; + } + +protected: + template + static ALWAYS_INLINE void onNewKey(Value & value, Arena & pool) + { + if (value.first.size) + value.first.data = pool.insert(value.first.data, value.first.size); + } +}; + + +/// For the case where there is one fixed-length string key. +template +struct HashMethodFixedString +{ + size_t n; + const ColumnFixedString::Chars * chars; + + LastElementCache last_elem_cache; + + HashMethodFixedString(const ColumnRawPtrs & key_columns, const Sizes & /*key_sizes*/, const HashMethodContextPtr &) + { + const IColumn & column = *key_columns[0]; + const ColumnFixedString & column_string = static_cast(column); + n = column_string.getN(); + chars = &column_string.getChars(); + } + + static HashMethodContextPtr createContext(const HashMethodContext::Settings &) { return nullptr; } + + StringRef getKey(size_t row) const { return StringRef(&(*chars)[row * n], n); } + + template + ALWAYS_INLINE typename HashTableTraits::Mapped emplaceKey(Data & data, size_t row, bool & inserted, Arena & pool) + { + auto & value = emplaceKeyImpl(getKey(row), data, inserted, last_elem_cache); + if (inserted) + { + auto & key = HashTableTraits::getKey(value); + key.data = pool.insert(key.data, key.size); + } + return HashTableTraits::getMapped(value); + } + + template + ALWAYS_INLINE typename HashTableTraits::Mapped findKey(Data & data, size_t row, bool & found, Arena & /*pool*/) + { + return findKeyImpl(getKey(row), data, found, last_elem_cache); + } + + template + static void insertKeyIntoColumns(const Value & value, MutableColumns & key_columns, const Sizes & /*key_sizes*/) + { + key_columns[0]->insertData(value.first.data, value.first.size); + } + + template + ALWAYS_INLINE size_t getHash(const Data & data, size_t row, Arena & /*pool*/) + { + return data.hash(getKey(row)); + } + + template + static StringRef getValueRef(const Value & value) + { + return StringRef(value.first.data, value.first.size); + } + + template + ALWAYS_INLINE void cacheData(size_t /*row*/, Mapped mapped) + { + *last_elem_cache.getMapped() = mapped; + } + +protected: + template + static ALWAYS_INLINE void onNewKey(Value & value, Arena & pool) + { + value.first.data = pool.insert(value.first.data, value.first.size); + } +}; + + +/// Cache stores dictionaries and saved_hash per dictionary key. +class LowCardinalityDictionaryCache : public HashMethodContext +{ +public: + /// Will assume that dictionaries with same hash has the same keys. + /// Just in case, check that they have also the same size. + struct DictionaryKey + { + UInt128 hash; + UInt64 size; + + bool operator== (const DictionaryKey & other) const { return hash == other.hash && size == other.size; } + }; + + struct DictionaryKeyHash + { + size_t operator()(const DictionaryKey & key) const + { + SipHash hash; + hash.update(key.hash.low); + hash.update(key.hash.high); + hash.update(key.size); + return hash.get64(); + } + }; + + struct CachedValues + { + /// Store ptr to dictionary to be sure it won't be deleted. + ColumnPtr dictionary_holder; + /// Hashes for dictionary keys. + const UInt64 * saved_hash = nullptr; + }; + + using CachedValuesPtr = std::shared_ptr; + + explicit LowCardinalityDictionaryCache(const HashMethodContext::Settings & settings) : cache(settings.max_threads) {} + + CachedValuesPtr get(const DictionaryKey & key) { return cache.get(key); } + void set(const DictionaryKey & key, const CachedValuesPtr & mapped) { cache.set(key, mapped); } + +private: + using Cache = LRUCache; + Cache cache; +}; + +/// Single low cardinality column. +template +struct HashMethodSingleLowCardinalityColumn : public SingleColumnMethod +{ + using Base = SingleColumnMethod; + + static HashMethodContextPtr createContext(const HashMethodContext::Settings & settings) + { + return std::make_shared(settings); + } + + ColumnRawPtrs key_columns; + const IColumn * positions = nullptr; + size_t size_of_index_type = 0; + + /// saved hash is from current column or from cache. + const UInt64 * saved_hash = nullptr; + /// Hold dictionary in case saved_hash is from cache to be sure it won't be deleted. + ColumnPtr dictionary_holder; + + /// Cache AggregateDataPtr for current column in order to decrease the number of hash table usages. + PaddedPODArray aggregate_data_cache; + + /// If initialized column is nullable. + bool is_nullable = false; + + static const ColumnLowCardinality & getLowCardinalityColumn(const IColumn * low_cardinality_column) + { + auto column = typeid_cast(low_cardinality_column); + if (!column) + throw Exception("Invalid aggregation key type for HashMethodSingleLowCardinalityColumn method. " + "Excepted LowCardinality, got " + column->getName(), ErrorCodes::LOGICAL_ERROR); + return *column; + } + + HashMethodSingleLowCardinalityColumn( + const ColumnRawPtrs & key_columns_low_cardinality, const Sizes & key_sizes, const HashMethodContextPtr & context) + : Base({getLowCardinalityColumn(key_columns_low_cardinality[0]).getDictionary().getNestedNotNullableColumn().get()}, key_sizes, context) + { + auto column = &getLowCardinalityColumn(key_columns_low_cardinality[0]); + + if (!context) + throw Exception("Cache wasn't created for HashMethodSingleLowCardinalityColumn", + ErrorCodes::LOGICAL_ERROR); + + LowCardinalityDictionaryCache * cache; + if constexpr (use_cache) + { + cache = typeid_cast(context.get()); + if (!cache) + { + const auto & cached_val = *context; + throw Exception("Invalid type for HashMethodSingleLowCardinalityColumn cache: " + + demangle(typeid(cached_val).name()), ErrorCodes::LOGICAL_ERROR); + } + } + + auto * dict = column->getDictionary().getNestedNotNullableColumn().get(); + is_nullable = column->getDictionary().nestedColumnIsNullable(); + key_columns = {dict}; + bool is_shared_dict = column->isSharedDictionary(); + + typename LowCardinalityDictionaryCache::DictionaryKey dictionary_key; + typename LowCardinalityDictionaryCache::CachedValuesPtr cached_values; + + if (is_shared_dict) + { + dictionary_key = {column->getDictionary().getHash(), dict->size()}; + if constexpr (use_cache) + cached_values = cache->get(dictionary_key); + } + + if (cached_values) + { + saved_hash = cached_values->saved_hash; + dictionary_holder = cached_values->dictionary_holder; + } + else + { + saved_hash = column->getDictionary().tryGetSavedHash(); + dictionary_holder = column->getDictionaryPtr(); + + if constexpr (use_cache) + { + if (is_shared_dict) + { + cached_values = std::make_shared(); + cached_values->saved_hash = saved_hash; + cached_values->dictionary_holder = dictionary_holder; + + cache->set(dictionary_key, cached_values); + } + } + } + + AggregateDataPtr default_data = nullptr; + aggregate_data_cache.assign(key_columns[0]->size(), default_data); + + size_of_index_type = column->getSizeOfIndexType(); + positions = column->getIndexesPtr().get(); + } + + ALWAYS_INLINE size_t getIndexAt(size_t row) const + { + switch (size_of_index_type) + { + case sizeof(UInt8): return static_cast(positions)->getElement(row); + case sizeof(UInt16): return static_cast(positions)->getElement(row); + case sizeof(UInt32): return static_cast(positions)->getElement(row); + case sizeof(UInt64): return static_cast(positions)->getElement(row); + default: throw Exception("Unexpected size of index type for low cardinality column.", ErrorCodes::LOGICAL_ERROR); + } + } + + /// Get the key from the key columns for insertion into the hash table. + ALWAYS_INLINE auto getKey(size_t row) const + { + return Base::getKey(getIndexAt(row)); + } + + template + ALWAYS_INLINE typename HashTableTraits::Mapped emplaceKey(Data & data, size_t row_, bool & inserted, Arena & pool) + { + size_t row = getIndexAt(row_); + + if (is_nullable && row == 0) + { + inserted = !data.hasNullKeyData(); + data.hasNullKeyData() = true; + return &data.getNullKeyData(); + } + + if constexpr (use_cache) + { + if (aggregate_data_cache[row]) + { + inserted = false; + return &aggregate_data_cache[row]; + } + } + + Sizes key_sizes; + auto key = getKey(row_); + + typename Data::iterator it; + if (saved_hash) + data.emplace(key, it, inserted, saved_hash[row]); + else + data.emplace(key, it, inserted); + + if (inserted) + Base::onNewKey(*it, pool); + else if constexpr (use_cache) + aggregate_data_cache[row] = it->second; + + return HashTableTraits::getMapped(*it); + } + + ALWAYS_INLINE bool isNullAt(size_t i) + { + if (!is_nullable) + return false; + + return getIndexAt(i) == 0; + } + + template + ALWAYS_INLINE void cacheData(size_t i, Mapped mapped) + { + size_t row = getIndexAt(i); + aggregate_data_cache[row] = mapped; + } + + template + ALWAYS_INLINE typename HashTableTraits::Mapped findFromRow(Data & data, size_t row_, bool & found, Arena &) + { + size_t row = getIndexAt(row_); + + if (is_nullable && row == 0) + return data.hasNullKeyData() ? &data.getNullKeyData() : nullptr; + + if constexpr (use_cache) + { + if (aggregate_data_cache[row]) + return &aggregate_data_cache[row]; + } + + auto key = getKey(row_); + + typename Data::iterator it; + if (saved_hash) + it = data.find(key, saved_hash[row]); + else + it = data.find(key); + + found = it != data.end(); + if constexpr (use_cache) + { + if (found) + aggregate_data_cache[row] = it->second; + } + + return typename HashTableTraits::getMapped(*it); + } + + template + ALWAYS_INLINE size_t getHash(const Data & data, size_t row, Arena & pool) + { + row = getIndexAt(row); + if (saved_hash) + return saved_hash[row]; + + return Base::getHash(data, row, pool); + } + + template + static void insertKeyIntoColumns(const Value & value, MutableColumns & key_columns_low_cardinality, const Sizes & /*key_sizes*/) + { + auto ref = Base::getValueRef(value); + static_cast(key_columns_low_cardinality[0].get())->insertData(ref.data, ref.size); + } +}; + + +namespace columns_hashing_impl +{ + +/// This class is designed to provide the functionality that is required for +/// supporting nullable keys in HashMethodKeysFixed. If there are +/// no nullable keys, this class is merely implemented as an empty shell. +template +class BaseStateKeysFixed; + +/// Case where nullable keys are supported. +template +class BaseStateKeysFixed +{ +protected: + void init(const ColumnRawPtrs & key_columns) + { + null_maps.reserve(key_columns.size()); + actual_columns.reserve(key_columns.size()); + + for (const auto & col : key_columns) + { + if (col->isColumnNullable()) + { + const auto & nullable_col = static_cast(*col); + actual_columns.push_back(&nullable_col.getNestedColumn()); + null_maps.push_back(&nullable_col.getNullMapColumn()); + } + else + { + actual_columns.push_back(col); + null_maps.push_back(nullptr); + } + } + } + + /// Return the columns which actually contain the values of the keys. + /// For a given key column, if it is nullable, we return its nested + /// column. Otherwise we return the key column itself. + inline const ColumnRawPtrs & getActualColumns() const + { + return actual_columns; + } + + /// Create a bitmap that indicates whether, for a particular row, + /// a key column bears a null value or not. + KeysNullMap createBitmap(size_t row) const + { + KeysNullMap bitmap{}; + + for (size_t k = 0; k < null_maps.size(); ++k) + { + if (null_maps[k] != nullptr) + { + const auto & null_map = static_cast(*null_maps[k]).getData(); + if (null_map[row] == 1) + { + size_t bucket = k / 8; + size_t offset = k % 8; + bitmap[bucket] |= UInt8(1) << offset; + } + } + } + + return bitmap; + } + +private: + ColumnRawPtrs actual_columns; + ColumnRawPtrs null_maps; +}; + +/// Case where nullable keys are not supported. +template +class BaseStateKeysFixed +{ +protected: + void init(const ColumnRawPtrs & columns) { actual_columns = columns; } + + const ColumnRawPtrs & getActualColumns() const { return actual_columns; } + + KeysNullMap createBitmap(size_t) const + { + throw Exception{"Internal error: calling createBitmap() for non-nullable keys" + " is forbidden", ErrorCodes::LOGICAL_ERROR}; + } + +private: + ColumnRawPtrs actual_columns; +}; + +} + +// Optional mask for low cardinality columns. +template +struct LowCardinalityKeys +{ + ColumnRawPtrs nested_columns; + ColumnRawPtrs positions; + Sizes position_sizes; +}; + +template <> +struct LowCardinalityKeys {}; + +/// For the case where all keys are of fixed length, and they fit in N (for example, 128) bits. +template +struct HashMethodKeysFixed : private columns_hashing_impl::BaseStateKeysFixed +{ + using Key = typename TData::key_type; + + static constexpr bool has_nullable_keys = has_nullable_keys_; + static constexpr bool has_low_cardinality = has_low_cardinality_; + + LowCardinalityKeys low_cardinality_keys; + Sizes key_sizes; + size_t keys_size; + + LastElementCache last_elem_cache; + + using Base = columns_hashing_impl::BaseStateKeysFixed; + + HashMethodKeysFixed(const ColumnRawPtrs & key_columns, const Sizes & key_sizes, const HashMethodContextPtr &) + : key_sizes(std::move(key_sizes)), keys_size(key_columns.size()) + { + if constexpr (has_low_cardinality) + { + low_cardinality_keys.nested_columns.resize(key_columns.size()); + low_cardinality_keys.positions.assign(key_columns.size(), nullptr); + low_cardinality_keys.position_sizes.resize(key_columns.size()); + for (size_t i = 0; i < key_columns.size(); ++i) + { + if (auto * low_cardinality_col = typeid_cast(key_columns[i])) + { + low_cardinality_keys.nested_columns[i] = low_cardinality_col->getDictionary().getNestedColumn().get(); + low_cardinality_keys.positions[i] = &low_cardinality_col->getIndexes(); + low_cardinality_keys.position_sizes[i] = low_cardinality_col->getSizeOfIndexType(); + } + else + low_cardinality_keys.nested_columns[i] = key_columns[i]; + } + } + + Base::init(key_columns); + } + + static HashMethodContextPtr createContext(const HashMethodContext::Settings &) { return nullptr; } + + ALWAYS_INLINE Key getKey(size_t row) const + { + if (has_nullable_keys) + { + auto bitmap = Base::createBitmap(row); + return packFixed(row, keys_size, Base::getActualColumns(), key_sizes, bitmap); + } + else + { + if constexpr (has_low_cardinality) + return packFixed(row, keys_size, low_cardinality_keys.nested_columns, key_sizes, + &low_cardinality_keys.positions, &low_cardinality_keys.position_sizes); + + return packFixed(row, keys_size, Base::getActualColumns(), key_sizes); + } + } + + template + ALWAYS_INLINE typename HashTableTraits::Mapped emplaceKey(Data & data, size_t row, bool & inserted, Arena & /*pool*/) + { + return HashTableTraits::getMapped(emplaceKeyImpl(getKey(row), data, inserted, last_elem_cache)); + } + + template + ALWAYS_INLINE typename HashTableTraits::Mapped findKey(Data & data, size_t row, bool & found, Arena & /*pool*/) + { + return findKeyImpl(getKey(row), data, found, last_elem_cache); + } + + template + static StringRef getValueRef(const Value & value) + { + return StringRef(value.first.data, value.first.size); + } + + template + ALWAYS_INLINE size_t getHash(const Data & data, size_t row, Arena & /*pool*/) + { + return data.hash(getKey(row)); + } + + template + ALWAYS_INLINE void cacheData(size_t /*row*/, Mapped mapped) + { + *last_elem_cache.getMapped() = mapped; + } +}; + +/** Hash by concatenating serialized key values. + * The serialized value differs in that it uniquely allows to deserialize it, having only the position with which it starts. + * That is, for example, for strings, it contains first the serialized length of the string, and then the bytes. + * Therefore, when aggregating by several strings, there is no ambiguity. + */ +template +struct HashMethodSerialized +{ + ColumnRawPtrs key_columns; + size_t keys_size; + LastElementCache last_elem_cache; + + HashMethodSerialized(const ColumnRawPtrs & key_columns, const Sizes & /*key_sizes*/, const HashMethodContextPtr &) + : key_columns(key_columns), keys_size(key_columns.size()) {} + + static HashMethodContextPtr createContext(const HashMethodContext::Settings &) { return nullptr; } + + template + ALWAYS_INLINE typename HashTableTraits::Mapped emplaceKey(Data & data, size_t row, bool & inserted, Arena & pool) + { + auto key = getKey(row, pool); + auto & value = emplaceKeyImpl(key, data, inserted, last_elem_cache); + if (!inserted) + pool.rollback(key.size); + + return HashTableTraits::getMapped(value); + } + + template + ALWAYS_INLINE typename HashTableTraits::Mapped findKey(Data & data, size_t row, bool & found, Arena & pool) + { + auto key = getKey(row, pool); + auto mapped = findKeyImpl(key, data, found, last_elem_cache); + pool.rollback(key.size); + + return mapped; + } + + template + ALWAYS_INLINE size_t getHash(const Data & data, size_t row, Arena & pool) + { + auto key = getKey(row, pool); + auto hash = data.hash(key); + pool.rollback(key.size); + + return hash; + } + + template + ALWAYS_INLINE void cacheData(size_t /*row*/, Mapped /*mapped*/) {} + +protected: + ALWAYS_INLINE StringRef getKey(size_t row, Arena & pool) const + { + return serializeKeysToPoolContiguous(row, keys_size, key_columns, pool); + } +}; + +} +} diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 145ce98dbbc..785345f9400 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -6,9 +6,11 @@ #include #include #include +#include #include #include #include +#include #include #include #include @@ -22,11 +24,9 @@ #include #include #include + #if __has_include() #include -#include -#include - #endif @@ -188,7 +188,7 @@ Aggregator::Aggregator(const Params & params_) } method_chosen = chooseAggregationMethod(); - AggregationStateCache::Settings cache_settings; + HashMethodContext::Settings cache_settings; cache_settings.max_threads = params.max_threads; aggregation_state_cache = AggregatedDataVariants::createCache(method_chosen, cache_settings); } @@ -586,11 +586,7 @@ void NO_INLINE Aggregator::executeImpl( bool no_more_keys, AggregateDataPtr overflow_row) const { - typename Method::State state; - if constexpr (Method::low_cardinality_optimization) - state.init(key_columns, aggregation_state_cache); - else - state.init(key_columns); + typename Method::State state(key_columns, key_sizes, aggregation_state_cache); if (!no_more_keys) executeImplCase(method, state, aggregates_pool, rows, key_columns, aggregate_instructions, keys, overflow_row); @@ -605,76 +601,35 @@ void NO_INLINE Aggregator::executeImplCase( typename Method::State & state, Arena * aggregates_pool, size_t rows, - ColumnRawPtrs & key_columns, + ColumnRawPtrs & /*key_columns*/, AggregateFunctionInstruction * aggregate_instructions, - StringRefs & keys, + StringRefs & /*keys*/, AggregateDataPtr overflow_row) const { /// NOTE When editing this code, also pay attention to SpecializedAggregator.h. /// For all rows. - typename Method::Key prev_key{}; AggregateDataPtr value = nullptr; for (size_t i = 0; i < rows; ++i) { bool inserted = false; /// Inserted a new key, or was this key already? - /// Get the key to insert into the hash table. - typename Method::Key key; - if constexpr (!Method::low_cardinality_optimization) - key = state.getKey(key_columns, params.keys_size, i, key_sizes, keys, *aggregates_pool); - AggregateDataPtr * aggregate_data = nullptr; - typename Method::iterator it; /// Is not used if Method::low_cardinality_optimization - if (!no_more_keys) /// Insert. - { - /// Optimization for consecutive identical keys. - if (!Method::no_consecutive_keys_optimization) - { - if (i != 0 && key == prev_key) - { - /// Add values to the aggregate functions. - for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst) - (*inst->func)(inst->that, value + inst->state_offset, inst->arguments, i, aggregates_pool); - - method.onExistingKey(key, keys, *aggregates_pool); - continue; - } - else - prev_key = key; - } - - if constexpr (Method::low_cardinality_optimization) - aggregate_data = state.emplaceKeyFromRow(method.data, i, inserted, params.keys_size, keys, *aggregates_pool); - else - { - method.data.emplace(key, it, inserted); - aggregate_data = &Method::getAggregateData(it->second); - } - } + if constexpr (!no_more_keys) /// Insert. + aggregate_data = state.emplaceKey(method.data, i, inserted, *aggregates_pool); else { /// Add only if the key already exists. - - if constexpr (Method::low_cardinality_optimization) - aggregate_data = state.findFromRow(method.data, i); - else - { - it = method.data.find(key); - if (method.data.end() != it) - aggregate_data = &Method::getAggregateData(it->second); - } + bool found = false; + aggregate_data = state.findKey(method.data, i, found, *aggregates_pool); } /// aggregate_date == nullptr means that the new key did not fit in the hash table because of no_more_keys. /// If the key does not fit, and the data does not need to be aggregated in a separate row, then there's nothing to do. if (!aggregate_data && !overflow_row) - { - method.onExistingKey(key, keys, *aggregates_pool); continue; - } /// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key. if (inserted) @@ -682,18 +637,12 @@ void NO_INLINE Aggregator::executeImplCase( /// exception-safety - if you can not allocate memory or create states, then destructors will not be called. *aggregate_data = nullptr; - if constexpr (!Method::low_cardinality_optimization) - method.onNewKey(*it, params.keys_size, keys, *aggregates_pool); - AggregateDataPtr place = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states); createAggregateStates(place); *aggregate_data = place; - if constexpr (Method::low_cardinality_optimization) - state.cacheAggregateData(i, place); + state.cacheData(i, place); } - else - method.onExistingKey(key, keys, *aggregates_pool); value = aggregate_data ? *aggregate_data : overflow_row; @@ -1172,7 +1121,7 @@ void NO_INLINE Aggregator::convertToBlockImplFinal( for (size_t i = 0; i < params.aggregates_size; ++i) aggregate_functions[i]->insertResultInto( - Method::getAggregateData(value.second) + offsets_of_aggregate_states[i], + value.second + offsets_of_aggregate_states[i], *final_aggregate_columns[i]); } @@ -1203,9 +1152,9 @@ void NO_INLINE Aggregator::convertToBlockImplNotFinal( /// reserved, so push_back does not throw exceptions for (size_t i = 0; i < params.aggregates_size; ++i) - aggregate_columns[i]->push_back(Method::getAggregateData(value.second) + offsets_of_aggregate_states[i]); + aggregate_columns[i]->push_back(value.second + offsets_of_aggregate_states[i]); - Method::getAggregateData(value.second) = nullptr; + value.second = nullptr; } } @@ -1549,20 +1498,20 @@ void NO_INLINE Aggregator::mergeDataImpl( { for (size_t i = 0; i < params.aggregates_size; ++i) aggregate_functions[i]->merge( - Method::getAggregateData(res_it->second) + offsets_of_aggregate_states[i], - Method::getAggregateData(it->second) + offsets_of_aggregate_states[i], + res_it->second + offsets_of_aggregate_states[i], + it->second + offsets_of_aggregate_states[i], arena); for (size_t i = 0; i < params.aggregates_size; ++i) aggregate_functions[i]->destroy( - Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]); + it->second + offsets_of_aggregate_states[i]); } else { res_it->second = it->second; } - Method::getAggregateData(it->second) = nullptr; + it->second = nullptr; } table_src.clearAndShrink(); @@ -1586,19 +1535,18 @@ void NO_INLINE Aggregator::mergeDataNoMoreKeysImpl( AggregateDataPtr res_data = table_dst.end() == res_it ? overflows - : Method::getAggregateData(res_it->second); + : res_it->second; for (size_t i = 0; i < params.aggregates_size; ++i) aggregate_functions[i]->merge( res_data + offsets_of_aggregate_states[i], - Method::getAggregateData(it->second) + offsets_of_aggregate_states[i], + it->second + offsets_of_aggregate_states[i], arena); for (size_t i = 0; i < params.aggregates_size; ++i) - aggregate_functions[i]->destroy( - Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]); + aggregate_functions[i]->destroy(it->second + offsets_of_aggregate_states[i]); - Method::getAggregateData(it->second) = nullptr; + it->second = nullptr; } table_src.clearAndShrink(); @@ -1621,19 +1569,18 @@ void NO_INLINE Aggregator::mergeDataOnlyExistingKeysImpl( if (table_dst.end() == res_it) continue; - AggregateDataPtr res_data = Method::getAggregateData(res_it->second); + AggregateDataPtr res_data = res_it->second; for (size_t i = 0; i < params.aggregates_size; ++i) aggregate_functions[i]->merge( res_data + offsets_of_aggregate_states[i], - Method::getAggregateData(it->second) + offsets_of_aggregate_states[i], + it->second + offsets_of_aggregate_states[i], arena); for (size_t i = 0; i < params.aggregates_size; ++i) - aggregate_functions[i]->destroy( - Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]); + aggregate_functions[i]->destroy(it->second + offsets_of_aggregate_states[i]); - Method::getAggregateData(it->second) = nullptr; + it->second = nullptr; } table_src.clearAndShrink(); @@ -1984,7 +1931,7 @@ template void NO_INLINE Aggregator::mergeStreamsImplCase( Block & block, Arena * aggregates_pool, - Method & method, + Method & method [[maybe_unused]], Table & data, AggregateDataPtr overflow_row) const { @@ -1998,14 +1945,9 @@ void NO_INLINE Aggregator::mergeStreamsImplCase( for (size_t i = 0; i < params.aggregates_size; ++i) aggregate_columns[i] = &typeid_cast(*block.safeGetByPosition(params.keys_size + i).column).getData(); - typename Method::State state; - if constexpr (Method::low_cardinality_optimization) - state.init(key_columns, aggregation_state_cache); - else - state.init(key_columns); + typename Method::State state(key_columns, key_sizes, aggregation_state_cache); /// For all rows. - StringRefs keys(params.keys_size); size_t rows = block.rows(); for (size_t i = 0; i < rows; ++i) { @@ -2014,59 +1956,31 @@ void NO_INLINE Aggregator::mergeStreamsImplCase( bool inserted = false; /// Inserted a new key, or was this key already? - /// Get the key to insert into the hash table. - typename Method::Key key; - if constexpr (!Method::low_cardinality_optimization) - key = state.getKey(key_columns, params.keys_size, i, key_sizes, keys, *aggregates_pool); - if (!no_more_keys) - { - if constexpr (Method::low_cardinality_optimization) - aggregate_data = state.emplaceKeyFromRow(data, i, inserted, params.keys_size, keys, *aggregates_pool); - else - { - data.emplace(key, it, inserted); - aggregate_data = &Method::getAggregateData(it->second); - } - } + aggregate_data = state.emplaceKey(data, i, inserted, *aggregates_pool); else { - if constexpr (Method::low_cardinality_optimization) - aggregate_data = state.findFromRow(data, i); - else - { - it = data.find(key); - if (data.end() != it) - aggregate_data = &Method::getAggregateData(it->second); - } + bool found; + aggregate_data = state.findKey(data, i, found, *aggregates_pool); } /// aggregate_date == nullptr means that the new key did not fit in the hash table because of no_more_keys. /// If the key does not fit, and the data does not need to be aggregated into a separate row, then there's nothing to do. if (!aggregate_data && !overflow_row) - { - method.onExistingKey(key, keys, *aggregates_pool); continue; - } /// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key. if (inserted) { *aggregate_data = nullptr; - if constexpr (!Method::low_cardinality_optimization) - method.onNewKey(*it, params.keys_size, keys, *aggregates_pool); - AggregateDataPtr place = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states); createAggregateStates(place); *aggregate_data = place; - if constexpr (Method::low_cardinality_optimization) - state.cacheAggregateData(i, place); + state.cacheData(i, place); } - else - method.onExistingKey(key, keys, *aggregates_pool); AggregateDataPtr value = aggregate_data ? *aggregate_data : overflow_row; @@ -2163,7 +2077,7 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV * If there is at least one block with a bucket number greater or equal than zero, then there was a two-level aggregation. */ auto max_bucket = bucket_to_blocks.rbegin()->first; - size_t has_two_level = max_bucket >= 0; + bool has_two_level = max_bucket >= 0; if (has_two_level) { @@ -2393,15 +2307,11 @@ void NO_INLINE Aggregator::convertBlockToTwoLevelImpl( Method & method, Arena * pool, ColumnRawPtrs & key_columns, - StringRefs & keys, + StringRefs & keys [[maybe_unused]], const Block & source, std::vector & destinations) const { - typename Method::State state; - if constexpr (Method::low_cardinality_optimization) - state.init(key_columns, aggregation_state_cache); - else - state.init(key_columns); + typename Method::State state(key_columns, key_sizes, aggregation_state_cache); size_t rows = source.rows(); size_t columns = source.columns(); @@ -2421,16 +2331,11 @@ void NO_INLINE Aggregator::convertBlockToTwoLevelImpl( } } - /// Obtain a key. Calculate bucket number from it. - typename Method::Key key = state.getKey(key_columns, params.keys_size, i, key_sizes, keys, *pool); - - auto hash = method.data.hash(key); + /// Calculate bucket number from row hash. + auto hash = state.getHash(method.data, i, *pool); auto bucket = method.data.getBucketFromHash(hash); selector[i] = bucket; - - /// We don't need to store this key in pool. - method.onExistingKey(key, keys, *pool); } size_t num_buckets = destinations.size(); @@ -2521,7 +2426,7 @@ void NO_INLINE Aggregator::destroyImpl(Table & table) const { for (auto elem : table) { - AggregateDataPtr & data = Method::getAggregateData(elem.second); + AggregateDataPtr & data = elem.second; /** If an exception (usually a lack of memory, the MemoryTracker throws) arose * after inserting the key into a hash table, but before creating all states of aggregate functions, diff --git a/dbms/src/Interpreters/Aggregator.h b/dbms/src/Interpreters/Aggregator.h index f51f620064f..0b40f4e6a25 100644 --- a/dbms/src/Interpreters/Aggregator.h +++ b/dbms/src/Interpreters/Aggregator.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -138,18 +139,6 @@ using AggregatedDataWithNullableStringKeyTwoLevel = AggregationDataWithNullKeyTw TwoLevelHashMapWithSavedHash, TwoLevelHashTableGrower<>, HashTableAllocator, HashTableWithNullKey>>; -/// Cache which can be used by aggregations method's states. Object is shared in all threads. -struct AggregationStateCache -{ - virtual ~AggregationStateCache() = default; - - struct Settings - { - size_t max_threads; - }; -}; - -using AggregationStateCachePtr = std::shared_ptr; /// For the case where there is one numeric key. template /// UInt8/16/32/64 for any type with corresponding bit width. @@ -169,65 +158,16 @@ struct AggregationMethodOneNumber AggregationMethodOneNumber(const Other & other) : data(other.data) {} /// To use one `Method` in different threads, use different `State`. - struct State - { - const char * vec; + using State = ColumnsHashing::HashMethodOneNumber; - /** Called at the start of each block processing. - * Sets the variables needed for the other methods called in inner loops. - */ - void init(ColumnRawPtrs & key_columns) - { - vec = key_columns[0]->getRawData().data; - } - - /// Get the key from the key columns for insertion into the hash table. - ALWAYS_INLINE Key getKey( - const ColumnRawPtrs & /*key_columns*/, - size_t /*keys_size*/, /// Number of key columns. - size_t i, /// From which row of the block, get the key. - const Sizes & /*key_sizes*/, /// If the keys of a fixed length - their lengths. It is not used in aggregation methods for variable length keys. - StringRefs & /*keys*/, /// Here references to key data in columns can be written. They can be used in the future. - Arena & /*pool*/) const - { - return unalignedLoad(vec + i * sizeof(FieldType)); - } - }; - - /// From the value in the hash table, get AggregateDataPtr. - static AggregateDataPtr & getAggregateData(Mapped & value) { return value; } - static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; } - - /** Place additional data, if necessary, in case a new key was inserted into the hash table. - */ - static ALWAYS_INLINE void onNewKey(typename Data::value_type & /*value*/, size_t /*keys_size*/, StringRefs & /*keys*/, Arena & /*pool*/) - { - } - - /** The action to be taken if the key is not new. For example, roll back the memory allocation in the pool. - */ - static ALWAYS_INLINE void onExistingKey(const Key & /*key*/, StringRefs & /*keys*/, Arena & /*pool*/) {} - - /** Do not use optimization for consecutive keys. - */ - static const bool no_consecutive_keys_optimization = false; /// Use optimization for low cardinality. static const bool low_cardinality_optimization = false; - /** Insert the key from the hash table into columns. - */ + // Insert the key from the hash table into columns. static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes & /*key_sizes*/) { static_cast(key_columns[0].get())->insertRawData(reinterpret_cast(&value.first)); } - - /// Get StringRef from value which can be inserted into column. - static StringRef getValueRef(const typename Data::value_type & value) - { - return StringRef(reinterpret_cast(&value.first), sizeof(value.first)); - } - - static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & /*settings*/) { return nullptr; } }; @@ -248,58 +188,14 @@ struct AggregationMethodString template AggregationMethodString(const Other & other) : data(other.data) {} - struct State - { - const IColumn::Offset * offsets; - const UInt8 * chars; + using State = ColumnsHashing::HashMethodString; - void init(ColumnRawPtrs & key_columns) - { - const IColumn & column = *key_columns[0]; - const ColumnString & column_string = static_cast(column); - offsets = column_string.getOffsets().data(); - chars = column_string.getChars().data(); - } - - ALWAYS_INLINE Key getKey( - const ColumnRawPtrs & /*key_columns*/, - size_t /*keys_size*/, - ssize_t i, - const Sizes & /*key_sizes*/, - StringRefs & /*keys*/, - Arena & /*pool*/) const - { - return StringRef( - chars + offsets[i - 1], - offsets[i] - offsets[i - 1] - 1); - } - }; - - static AggregateDataPtr & getAggregateData(Mapped & value) { return value; } - static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; } - - static ALWAYS_INLINE void onNewKey(typename Data::value_type & value, size_t /*keys_size*/, StringRefs & /*keys*/, Arena & pool) - { - if (value.first.size) - value.first.data = pool.insert(value.first.data, value.first.size); - } - - static ALWAYS_INLINE void onExistingKey(const Key & /*key*/, StringRefs & /*keys*/, Arena & /*pool*/) {} - - static const bool no_consecutive_keys_optimization = false; static const bool low_cardinality_optimization = false; - static StringRef getValueRef(const typename Data::value_type & value) - { - return StringRef(value.first.data, value.first.size); - } - static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes &) { key_columns[0]->insertData(value.first.data, value.first.size); } - - static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & /*settings*/) { return nullptr; } }; @@ -320,101 +216,14 @@ struct AggregationMethodFixedString template AggregationMethodFixedString(const Other & other) : data(other.data) {} - struct State - { - size_t n; - const ColumnFixedString::Chars * chars; + using State = ColumnsHashing::HashMethodFixedString; - void init(ColumnRawPtrs & key_columns) - { - const IColumn & column = *key_columns[0]; - const ColumnFixedString & column_string = static_cast(column); - n = column_string.getN(); - chars = &column_string.getChars(); - } - - ALWAYS_INLINE Key getKey( - const ColumnRawPtrs &, - size_t, - size_t i, - const Sizes &, - StringRefs &, - Arena &) const - { - return StringRef(&(*chars)[i * n], n); - } - }; - - static AggregateDataPtr & getAggregateData(Mapped & value) { return value; } - static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; } - - static ALWAYS_INLINE void onNewKey(typename Data::value_type & value, size_t, StringRefs &, Arena & pool) - { - value.first.data = pool.insert(value.first.data, value.first.size); - } - - static ALWAYS_INLINE void onExistingKey(const Key &, StringRefs &, Arena &) {} - - static const bool no_consecutive_keys_optimization = false; static const bool low_cardinality_optimization = false; - static StringRef getValueRef(const typename Data::value_type & value) - { - return StringRef(value.first.data, value.first.size); - } - static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes &) { key_columns[0]->insertData(value.first.data, value.first.size); } - - static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & /*settings*/) { return nullptr; } -}; - -/// Cache stores dictionaries and saved_hash per dictionary key. -class LowCardinalityDictionaryCache : public AggregationStateCache -{ -public: - /// Will assume that dictionaries with same hash has the same keys. - /// Just in case, check that they have also the same size. - struct DictionaryKey - { - UInt128 hash; - UInt64 size; - - bool operator== (const DictionaryKey & other) const { return hash == other.hash && size == other.size; } - }; - - struct DictionaryKeyHash - { - size_t operator()(const DictionaryKey & key) const - { - SipHash hash; - hash.update(key.hash.low); - hash.update(key.hash.high); - hash.update(key.size); - return hash.get64(); - } - }; - - struct CachedValues - { - /// Store ptr to dictionary to be sure it won't be deleted. - ColumnPtr dictionary_holder; - /// Hashes for dictionary keys. - const UInt64 * saved_hash = nullptr; - }; - - using CachedValuesPtr = std::shared_ptr; - - explicit LowCardinalityDictionaryCache(const AggregationStateCache::Settings & settings) : cache(settings.max_threads) {} - - CachedValuesPtr get(const DictionaryKey & key) { return cache.get(key); } - void set(const DictionaryKey & key, const CachedValuesPtr & mapped) { cache.set(key, mapped); } - -private: - using Cache = LRUCache; - Cache cache; }; /// Single low cardinality column. @@ -432,342 +241,23 @@ struct AggregationMethodSingleLowCardinalityColumn : public SingleColumnMethod using Base::data; - static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & settings) - { - return std::make_shared(settings); - } - AggregationMethodSingleLowCardinalityColumn() = default; template explicit AggregationMethodSingleLowCardinalityColumn(const Other & other) : Base(other) {} - struct State : public BaseState - { - ColumnRawPtrs key_columns; - const IColumn * positions = nullptr; - size_t size_of_index_type = 0; + using State = ColumnsHashing::HashMethodSingleLowCardinalityColumn; - /// saved hash is from current column or from cache. - const UInt64 * saved_hash = nullptr; - /// Hold dictionary in case saved_hash is from cache to be sure it won't be deleted. - ColumnPtr dictionary_holder; - - /// Cache AggregateDataPtr for current column in order to decrease the number of hash table usages. - PaddedPODArray aggregate_data_cache; - - /// If initialized column is nullable. - bool is_nullable = false; - - void init(ColumnRawPtrs &) - { - throw Exception("Expected cache for AggregationMethodSingleLowCardinalityColumn::init", ErrorCodes::LOGICAL_ERROR); - } - - void init(ColumnRawPtrs & key_columns_low_cardinality, const AggregationStateCachePtr & cache_ptr) - { - auto column = typeid_cast(key_columns_low_cardinality[0]); - if (!column) - throw Exception("Invalid aggregation key type for AggregationMethodSingleLowCardinalityColumn method. " - "Excepted LowCardinality, got " + key_columns_low_cardinality[0]->getName(), ErrorCodes::LOGICAL_ERROR); - - if (!cache_ptr) - throw Exception("Cache wasn't created for AggregationMethodSingleLowCardinalityColumn", ErrorCodes::LOGICAL_ERROR); - - auto cache = typeid_cast(cache_ptr.get()); - if (!cache) - { - const auto & cached_val = *cache_ptr; - throw Exception("Invalid type for AggregationMethodSingleLowCardinalityColumn cache: " - + demangle(typeid(cached_val).name()), ErrorCodes::LOGICAL_ERROR); - } - - auto * dict = column->getDictionary().getNestedNotNullableColumn().get(); - is_nullable = column->getDictionary().nestedColumnIsNullable(); - key_columns = {dict}; - bool is_shared_dict = column->isSharedDictionary(); - - typename LowCardinalityDictionaryCache::DictionaryKey dictionary_key; - typename LowCardinalityDictionaryCache::CachedValuesPtr cached_values; - - if (is_shared_dict) - { - dictionary_key = {column->getDictionary().getHash(), dict->size()}; - cached_values = cache->get(dictionary_key); - } - - if (cached_values) - { - saved_hash = cached_values->saved_hash; - dictionary_holder = cached_values->dictionary_holder; - } - else - { - saved_hash = column->getDictionary().tryGetSavedHash(); - dictionary_holder = column->getDictionaryPtr(); - - if (is_shared_dict) - { - cached_values = std::make_shared(); - cached_values->saved_hash = saved_hash; - cached_values->dictionary_holder = dictionary_holder; - - cache->set(dictionary_key, cached_values); - } - } - - AggregateDataPtr default_data = nullptr; - aggregate_data_cache.assign(key_columns[0]->size(), default_data); - - size_of_index_type = column->getSizeOfIndexType(); - positions = column->getIndexesPtr().get(); - - BaseState::init(key_columns); - } - - ALWAYS_INLINE size_t getIndexAt(size_t row) const - { - switch (size_of_index_type) - { - case sizeof(UInt8): return static_cast(positions)->getElement(row); - case sizeof(UInt16): return static_cast(positions)->getElement(row); - case sizeof(UInt32): return static_cast(positions)->getElement(row); - case sizeof(UInt64): return static_cast(positions)->getElement(row); - default: throw Exception("Unexpected size of index type for low cardinality column.", ErrorCodes::LOGICAL_ERROR); - } - } - - /// Get the key from the key columns for insertion into the hash table. - ALWAYS_INLINE Key getKey( - const ColumnRawPtrs & /*key_columns*/, - size_t /*keys_size*/, - size_t i, - const Sizes & key_sizes, - StringRefs & keys, - Arena & pool) const - { - size_t row = getIndexAt(i); - return BaseState::getKey(key_columns, 1, row, key_sizes, keys, pool); - } - - template - ALWAYS_INLINE AggregateDataPtr * emplaceKeyFromRow( - D & data, - size_t i, - bool & inserted, - size_t keys_size, - StringRefs & keys, - Arena & pool) - { - size_t row = getIndexAt(i); - - if (is_nullable && row == 0) - { - inserted = !data.hasNullKeyData(); - data.hasNullKeyData() = true; - return &data.getNullKeyData(); - } - - if (aggregate_data_cache[row]) - { - inserted = false; - return &aggregate_data_cache[row]; - } - else - { - Sizes key_sizes; - auto key = getKey({}, 0, i, key_sizes, keys, pool); - - typename D::iterator it; - if (saved_hash) - data.emplace(key, it, inserted, saved_hash[row]); - else - data.emplace(key, it, inserted); - - if (inserted) - Base::onNewKey(*it, keys_size, keys, pool); - else - aggregate_data_cache[row] = Base::getAggregateData(it->second); - - return &Base::getAggregateData(it->second); - } - } - - ALWAYS_INLINE bool isNullAt(size_t i) - { - if (!is_nullable) - return false; - - return getIndexAt(i) == 0; - } - - ALWAYS_INLINE void cacheAggregateData(size_t i, AggregateDataPtr data) - { - size_t row = getIndexAt(i); - aggregate_data_cache[row] = data; - } - - template - ALWAYS_INLINE AggregateDataPtr * findFromRow(D & data, size_t i) - { - size_t row = getIndexAt(i); - - if (is_nullable && row == 0) - return data.hasNullKeyData() ? &data.getNullKeyData() : nullptr; - - if (!aggregate_data_cache[row]) - { - Sizes key_sizes; - StringRefs keys; - Arena pool; - auto key = getKey({}, 0, i, key_sizes, keys, pool); - - typename D::iterator it; - if (saved_hash) - it = data.find(key, saved_hash[row]); - else - it = data.find(key); - - if (it != data.end()) - aggregate_data_cache[row] = Base::getAggregateData(it->second); - } - return &aggregate_data_cache[row]; - } - }; - - static AggregateDataPtr & getAggregateData(Mapped & value) { return Base::getAggregateData(value); } - static const AggregateDataPtr & getAggregateData(const Mapped & value) { return Base::getAggregateData(value); } - - static void onNewKey(typename Data::value_type & value, size_t keys_size, StringRefs & keys, Arena & pool) - { - return Base::onNewKey(value, keys_size, keys, pool); - } - - static void onExistingKey(const Key & key, StringRefs & keys, Arena & pool) - { - return Base::onExistingKey(key, keys, pool); - } - - static const bool no_consecutive_keys_optimization = true; static const bool low_cardinality_optimization = true; static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns_low_cardinality, const Sizes & /*key_sizes*/) { - auto ref = Base::getValueRef(value); + auto ref = BaseState::getValueRef(value); static_cast(key_columns_low_cardinality[0].get())->insertData(ref.data, ref.size); } }; -namespace aggregator_impl -{ - -/// This class is designed to provide the functionality that is required for -/// supporting nullable keys in AggregationMethodKeysFixed. If there are -/// no nullable keys, this class is merely implemented as an empty shell. -template -class BaseStateKeysFixed; - -/// Case where nullable keys are supported. -template -class BaseStateKeysFixed -{ -protected: - void init(const ColumnRawPtrs & key_columns) - { - null_maps.reserve(key_columns.size()); - actual_columns.reserve(key_columns.size()); - - for (const auto & col : key_columns) - { - if (col->isColumnNullable()) - { - const auto & nullable_col = static_cast(*col); - actual_columns.push_back(&nullable_col.getNestedColumn()); - null_maps.push_back(&nullable_col.getNullMapColumn()); - } - else - { - actual_columns.push_back(col); - null_maps.push_back(nullptr); - } - } - } - - /// Return the columns which actually contain the values of the keys. - /// For a given key column, if it is nullable, we return its nested - /// column. Otherwise we return the key column itself. - inline const ColumnRawPtrs & getActualColumns() const - { - return actual_columns; - } - - /// Create a bitmap that indicates whether, for a particular row, - /// a key column bears a null value or not. - KeysNullMap createBitmap(size_t row) const - { - KeysNullMap bitmap{}; - - for (size_t k = 0; k < null_maps.size(); ++k) - { - if (null_maps[k] != nullptr) - { - const auto & null_map = static_cast(*null_maps[k]).getData(); - if (null_map[row] == 1) - { - size_t bucket = k / 8; - size_t offset = k % 8; - bitmap[bucket] |= UInt8(1) << offset; - } - } - } - - return bitmap; - } - -private: - ColumnRawPtrs actual_columns; - ColumnRawPtrs null_maps; -}; - -/// Case where nullable keys are not supported. -template -class BaseStateKeysFixed -{ -protected: - void init(const ColumnRawPtrs &) - { - throw Exception{"Internal error: calling init() for non-nullable" - " keys is forbidden", ErrorCodes::LOGICAL_ERROR}; - } - - const ColumnRawPtrs & getActualColumns() const - { - throw Exception{"Internal error: calling getActualColumns() for non-nullable" - " keys is forbidden", ErrorCodes::LOGICAL_ERROR}; - } - - KeysNullMap createBitmap(size_t) const - { - throw Exception{"Internal error: calling createBitmap() for non-nullable keys" - " is forbidden", ErrorCodes::LOGICAL_ERROR}; - } -}; - -} - -// Oprional mask for low cardinality columns. -template -struct LowCardinalityKeys -{ - ColumnRawPtrs nested_columns; - ColumnRawPtrs positions; - Sizes position_sizes; -}; - -template <> -struct LowCardinalityKeys {}; - /// For the case where all keys are of fixed length, and they fit in N (for example, 128) bits. template struct AggregationMethodKeysFixed @@ -787,71 +277,8 @@ struct AggregationMethodKeysFixed template AggregationMethodKeysFixed(const Other & other) : data(other.data) {} - class State final : private aggregator_impl::BaseStateKeysFixed - { - LowCardinalityKeys low_cardinality_keys; + using State = ColumnsHashing::HashMethodKeysFixed; - public: - using Base = aggregator_impl::BaseStateKeysFixed; - - void init(ColumnRawPtrs & key_columns) - { - if constexpr (has_low_cardinality) - { - low_cardinality_keys.nested_columns.resize(key_columns.size()); - low_cardinality_keys.positions.assign(key_columns.size(), nullptr); - low_cardinality_keys.position_sizes.resize(key_columns.size()); - for (size_t i = 0; i < key_columns.size(); ++i) - { - if (auto * low_cardinality_col = typeid_cast(key_columns[i])) - { - low_cardinality_keys.nested_columns[i] = low_cardinality_col->getDictionary().getNestedColumn().get(); - low_cardinality_keys.positions[i] = &low_cardinality_col->getIndexes(); - low_cardinality_keys.position_sizes[i] = low_cardinality_col->getSizeOfIndexType(); - } - else - low_cardinality_keys.nested_columns[i] = key_columns[i]; - } - } - - if (has_nullable_keys) - Base::init(key_columns); - } - - ALWAYS_INLINE Key getKey( - const ColumnRawPtrs & key_columns, - size_t keys_size, - size_t i, - const Sizes & key_sizes, - StringRefs &, - Arena &) const - { - if (has_nullable_keys) - { - auto bitmap = Base::createBitmap(i); - return packFixed(i, keys_size, Base::getActualColumns(), key_sizes, bitmap); - } - else - { - if constexpr (has_low_cardinality) - return packFixed(i, keys_size, low_cardinality_keys.nested_columns, key_sizes, - &low_cardinality_keys.positions, &low_cardinality_keys.position_sizes); - - return packFixed(i, keys_size, key_columns, key_sizes); - } - } - }; - - static AggregateDataPtr & getAggregateData(Mapped & value) { return value; } - static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; } - - static ALWAYS_INLINE void onNewKey(typename Data::value_type &, size_t, StringRefs &, Arena &) - { - } - - static ALWAYS_INLINE void onExistingKey(const Key &, StringRefs &, Arena &) {} - - static const bool no_consecutive_keys_optimization = false; static const bool low_cardinality_optimization = false; static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes & key_sizes) @@ -904,8 +331,6 @@ struct AggregationMethodKeysFixed } } } - - static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & /*settings*/) { return nullptr; } }; @@ -930,53 +355,24 @@ struct AggregationMethodSerialized template AggregationMethodSerialized(const Other & other) : data(other.data) {} - struct State - { - void init(ColumnRawPtrs &) - { - } + using State = ColumnsHashing::HashMethodSerialized; - ALWAYS_INLINE Key getKey( - const ColumnRawPtrs & key_columns, - size_t keys_size, - size_t i, - const Sizes &, - StringRefs &, - Arena & pool) const - { - return serializeKeysToPoolContiguous(i, keys_size, key_columns, pool); - } - }; - - static AggregateDataPtr & getAggregateData(Mapped & value) { return value; } - static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; } - - static ALWAYS_INLINE void onNewKey(typename Data::value_type &, size_t, StringRefs &, Arena &) - { - } - - static ALWAYS_INLINE void onExistingKey(const Key & key, StringRefs &, Arena & pool) - { - pool.rollback(key.size); - } - - /// If the key already was, it is removed from the pool (overwritten), and the next key can not be compared with it. - static const bool no_consecutive_keys_optimization = true; static const bool low_cardinality_optimization = false; static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, const Sizes &) { auto pos = value.first.data; - for (size_t i = 0; i < key_columns.size(); ++i) - pos = key_columns[i]->deserializeAndInsertFromArena(pos); + for (auto & column : key_columns) + pos = column->deserializeAndInsertFromArena(pos); } - - static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & /*settings*/) { return nullptr; } }; class Aggregator; +using ColumnsHashing::HashMethodContext; +using ColumnsHashing::HashMethodContextPtr; + struct AggregatedDataVariants : private boost::noncopyable { /** Working with states of aggregate functions in the pool is arranged in the following (inconvenient) way: @@ -1298,7 +694,7 @@ struct AggregatedDataVariants : private boost::noncopyable } } - static AggregationStateCachePtr createCache(Type type, const AggregationStateCache::Settings & settings) + static HashMethodContextPtr createCache(Type type, const HashMethodContext::Settings & settings) { switch (type) { @@ -1309,7 +705,7 @@ struct AggregatedDataVariants : private boost::noncopyable { \ using TPtr ## NAME = decltype(AggregatedDataVariants::NAME); \ using T ## NAME = typename TPtr ## NAME ::element_type; \ - return T ## NAME ::createCache(settings); \ + return T ## NAME ::State::createContext(settings); \ } APPLY_FOR_AGGREGATED_VARIANTS(M) @@ -1496,7 +892,7 @@ protected: AggregatedDataVariants::Type method_chosen; Sizes key_sizes; - AggregationStateCachePtr aggregation_state_cache; + HashMethodContextPtr aggregation_state_cache; AggregateFunctionsPlainPtrs aggregate_functions;