too big translation unit in Aggregator

This commit is contained in:
lgbo-ustc 2024-03-12 15:53:28 +08:00
parent d30792b196
commit 5f1991fbef
9 changed files with 1541 additions and 1262 deletions

View File

@ -109,6 +109,9 @@ public:
using Base::Base;
FixedHashMap() = default;
FixedHashMap(size_t ) {} /// NOLINT
template <typename Func, bool>
void ALWAYS_INLINE mergeToViaEmplace(Self & that, Func && func)
{

View File

@ -38,6 +38,7 @@ public:
Impl impls[NUM_BUCKETS];
TwoLevelStringHashTable() = default;
TwoLevelStringHashTable(size_t ) {} /// NOLINT
template <typename Source>
explicit TwoLevelStringHashTable(const Source & src)

View File

@ -0,0 +1,142 @@
#pragma once
#include <AggregateFunctions/IAggregateFunction.h>
#include <Common/HashTable/FixedHashMap.h>
#include <Common/HashTable/StringHashMap.h>
#include <Common/HashTable/TwoLevelHashMap.h>
#include <Common/HashTable/TwoLevelStringHashMap.h>
namespace DB
{
/** Different data structures that can be used for aggregation
* For efficiency, the aggregation data itself is put into the pool.
* Data and pool ownership (states of aggregate functions)
* is acquired later - in `convertToBlocks` function, by the ColumnAggregateFunction object.
*
* Most data structures exist in two versions: normal and two-level (TwoLevel).
* A two-level hash table works a little slower with a small number of different keys,
* but with a large number of different keys scales better, because it allows
* parallelize some operations (merging, post-processing) in a natural way.
*
* To ensure efficient work over a wide range of conditions,
* first single-level hash tables are used,
* and when the number of different keys is large enough,
* they are converted to two-level ones.
*
* PS. There are many different approaches to the effective implementation of parallel and distributed aggregation,
* best suited for different cases, and this approach is just one of them, chosen for a combination of reasons.
*/
using AggregatedDataWithoutKey = AggregateDataPtr;
using AggregatedDataWithUInt8Key = FixedImplicitZeroHashMapWithCalculatedSize<UInt8, AggregateDataPtr>;
using AggregatedDataWithUInt16Key = FixedImplicitZeroHashMap<UInt16, AggregateDataPtr>;
using AggregatedDataWithUInt32Key = HashMap<UInt32, AggregateDataPtr, HashCRC32<UInt32>>;
using AggregatedDataWithUInt64Key = HashMap<UInt64, AggregateDataPtr, HashCRC32<UInt64>>;
using AggregatedDataWithShortStringKey = StringHashMap<AggregateDataPtr>;
using AggregatedDataWithStringKey = HashMapWithSavedHash<StringRef, AggregateDataPtr>;
using AggregatedDataWithKeys128 = HashMap<UInt128, AggregateDataPtr, UInt128HashCRC32>;
using AggregatedDataWithKeys256 = HashMap<UInt256, AggregateDataPtr, UInt256HashCRC32>;
using AggregatedDataWithUInt32KeyTwoLevel = TwoLevelHashMap<UInt32, AggregateDataPtr, HashCRC32<UInt32>>;
using AggregatedDataWithUInt64KeyTwoLevel = TwoLevelHashMap<UInt64, AggregateDataPtr, HashCRC32<UInt64>>;
using AggregatedDataWithShortStringKeyTwoLevel = TwoLevelStringHashMap<AggregateDataPtr>;
using AggregatedDataWithStringKeyTwoLevel = TwoLevelHashMapWithSavedHash<StringRef, AggregateDataPtr>;
using AggregatedDataWithKeys128TwoLevel = TwoLevelHashMap<UInt128, AggregateDataPtr, UInt128HashCRC32>;
using AggregatedDataWithKeys256TwoLevel = TwoLevelHashMap<UInt256, AggregateDataPtr, UInt256HashCRC32>;
/** Variants with better hash function, using more than 32 bits for hash.
* Using for merging phase of external aggregation, where number of keys may be far greater than 4 billion,
* but we keep in memory and merge only sub-partition of them simultaneously.
* TODO We need to switch for better hash function not only for external aggregation,
* but also for huge aggregation results on machines with terabytes of RAM.
*/
using AggregatedDataWithUInt64KeyHash64 = HashMap<UInt64, AggregateDataPtr, DefaultHash<UInt64>>;
using AggregatedDataWithStringKeyHash64 = HashMapWithSavedHash<StringRef, AggregateDataPtr, StringRefHash64>;
using AggregatedDataWithKeys128Hash64 = HashMap<UInt128, AggregateDataPtr, UInt128Hash>;
using AggregatedDataWithKeys256Hash64 = HashMap<UInt256, AggregateDataPtr, UInt256Hash>;
template <typename Base>
struct AggregationDataWithNullKey : public Base
{
using Base::Base;
bool & hasNullKeyData() { return has_null_key_data; }
AggregateDataPtr & getNullKeyData() { return null_key_data; }
bool hasNullKeyData() const { return has_null_key_data; }
const AggregateDataPtr & getNullKeyData() const { return null_key_data; }
size_t size() const { return Base::size() + (has_null_key_data ? 1 : 0); }
bool empty() const { return Base::empty() && !has_null_key_data; }
void clear()
{
Base::clear();
has_null_key_data = false;
}
void clearAndShrink()
{
Base::clearAndShrink();
has_null_key_data = false;
}
private:
bool has_null_key_data = false;
AggregateDataPtr null_key_data = nullptr;
};
template <typename Base>
struct AggregationDataWithNullKeyTwoLevel : public Base
{
using Base::Base;
using Base::impls;
AggregationDataWithNullKeyTwoLevel() = default;
template <typename Other>
explicit AggregationDataWithNullKeyTwoLevel(const Other & other) : Base(other)
{
impls[0].hasNullKeyData() = other.hasNullKeyData();
impls[0].getNullKeyData() = other.getNullKeyData();
}
bool & hasNullKeyData() { return impls[0].hasNullKeyData(); }
AggregateDataPtr & getNullKeyData() { return impls[0].getNullKeyData(); }
bool hasNullKeyData() const { return impls[0].hasNullKeyData(); }
const AggregateDataPtr & getNullKeyData() const { return impls[0].getNullKeyData(); }
};
template <typename ... Types>
using HashTableWithNullKey = AggregationDataWithNullKey<HashMapTable<Types ...>>;
template <typename ... Types>
using StringHashTableWithNullKey = AggregationDataWithNullKey<StringHashMap<Types ...>>;
using AggregatedDataWithNullableUInt8Key = AggregationDataWithNullKey<AggregatedDataWithUInt8Key>;
using AggregatedDataWithNullableUInt16Key = AggregationDataWithNullKey<AggregatedDataWithUInt16Key>;
using AggregatedDataWithNullableUInt32Key = AggregationDataWithNullKey<AggregatedDataWithUInt32Key>;
using AggregatedDataWithNullableUInt64Key = AggregationDataWithNullKey<AggregatedDataWithUInt64Key>;
using AggregatedDataWithNullableStringKey = AggregationDataWithNullKey<AggregatedDataWithStringKey>;
using AggregatedDataWithNullableShortStringKey = AggregationDataWithNullKey<AggregatedDataWithShortStringKey>;
using AggregatedDataWithNullableUInt32KeyTwoLevel = AggregationDataWithNullKeyTwoLevel<
TwoLevelHashMap<UInt32, AggregateDataPtr, HashCRC32<UInt32>,
TwoLevelHashTableGrower<>, HashTableAllocator, HashTableWithNullKey>>;
using AggregatedDataWithNullableUInt64KeyTwoLevel = AggregationDataWithNullKeyTwoLevel<
TwoLevelHashMap<UInt64, AggregateDataPtr, HashCRC32<UInt64>,
TwoLevelHashTableGrower<>, HashTableAllocator, HashTableWithNullKey>>;
using AggregatedDataWithNullableShortStringKeyTwoLevel = AggregationDataWithNullKeyTwoLevel<
TwoLevelStringHashMap<AggregateDataPtr, HashTableAllocator, StringHashTableWithNullKey>>;
using AggregatedDataWithNullableStringKeyTwoLevel = AggregationDataWithNullKeyTwoLevel<
TwoLevelHashMapWithSavedHash<StringRef, AggregateDataPtr, DefaultHash<StringRef>,
TwoLevelHashTableGrower<>, HashTableAllocator, HashTableWithNullKey>>;
}

View File

@ -0,0 +1,255 @@
#include <Interpreters/AggregatedDataVariants.h>
#include <Interpreters/Aggregator.h>
namespace ProfileEvents
{
extern const Event AggregationPreallocatedElementsInHashTables;
}
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_AGGREGATED_DATA_VARIANT;
extern const int LOGICAL_ERROR;
}
using ColumnsHashing::HashMethodContext;
using ColumnsHashing::HashMethodContextPtr;
using ColumnsHashing::LastElementCacheStats;
AggregatedDataVariants::AggregatedDataVariants() : aggregates_pools(1, std::make_shared<Arena>()), aggregates_pool(aggregates_pools.back().get()) {}
AggregatedDataVariants::~AggregatedDataVariants()
{
if (aggregator && !aggregator->all_aggregates_has_trivial_destructor)
{
try
{
aggregator->destroyAllAggregateStates(*this);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
// The std::is_constructible trait isn't suitable here because some classes have template constructors with semantics different from providing size hints.
// Also string hash table variants are not supported due to the fact that both local perf tests and tests in CI showed slowdowns for them.
template <typename...>
struct HasConstructorOfNumberOfElements : std::false_type
{
};
template <typename... Ts>
struct HasConstructorOfNumberOfElements<HashMapTable<Ts...>> : std::true_type
{
};
template <typename Key, typename Cell, typename Hash, typename Grower, typename Allocator, template <typename...> typename ImplTable>
struct HasConstructorOfNumberOfElements<TwoLevelHashMapTable<Key, Cell, Hash, Grower, Allocator, ImplTable>> : std::true_type
{
};
template <typename... Ts>
struct HasConstructorOfNumberOfElements<HashTable<Ts...>> : std::true_type
{
};
template <typename... Ts>
struct HasConstructorOfNumberOfElements<TwoLevelHashTable<Ts...>> : std::true_type
{
};
template <template <typename> typename Method, typename Base>
struct HasConstructorOfNumberOfElements<Method<Base>> : HasConstructorOfNumberOfElements<Base>
{
};
template <typename Method>
auto constructWithReserveIfPossible(size_t size_hint)
{
if constexpr (HasConstructorOfNumberOfElements<typename Method::Data>::value)
{
ProfileEvents::increment(ProfileEvents::AggregationPreallocatedElementsInHashTables, size_hint);
return std::make_unique<Method>(size_hint);
}
else
return std::make_unique<Method>();
}
void AggregatedDataVariants::init(Type type_, std::optional<size_t> size_hint)
{
switch (type_)
{
case Type::EMPTY:
case Type::without_key:
break;
#define M(NAME, IS_TWO_LEVEL) \
case Type::NAME: \
if (size_hint) \
(NAME) = constructWithReserveIfPossible<decltype(NAME)::element_type>(*size_hint); \
else \
(NAME) = std::make_unique<decltype(NAME)::element_type>(); \
break;
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
}
type = type_;
}
size_t AggregatedDataVariants::size() const
{
switch (type)
{
case Type::EMPTY:
return 0;
case Type::without_key:
return 1;
#define M(NAME, IS_TWO_LEVEL) \
case Type::NAME: \
return (NAME)->data.size() + (without_key != nullptr);
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
}
UNREACHABLE();
}
size_t AggregatedDataVariants::sizeWithoutOverflowRow() const
{
switch (type)
{
case Type::EMPTY:
return 0;
case Type::without_key:
return 1;
#define M(NAME, IS_TWO_LEVEL) \
case Type::NAME: \
return (NAME)->data.size();
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
}
UNREACHABLE();
}
const char * AggregatedDataVariants::getMethodName() const
{
switch (type)
{
case Type::EMPTY:
return "EMPTY";
case Type::without_key:
return "without_key";
#define M(NAME, IS_TWO_LEVEL) \
case Type::NAME: \
return #NAME;
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
}
UNREACHABLE();
}
bool AggregatedDataVariants::isTwoLevel() const
{
switch (type)
{
case Type::EMPTY:
return false;
case Type::without_key:
return false;
#define M(NAME, IS_TWO_LEVEL) \
case Type::NAME: \
return IS_TWO_LEVEL;
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
}
UNREACHABLE();
}
bool AggregatedDataVariants::isConvertibleToTwoLevel() const
{
switch (type)
{
#define M(NAME) \
case Type::NAME: \
return true;
APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M)
#undef M
default:
return false;
}
}
void AggregatedDataVariants::convertToTwoLevel()
{
if (aggregator)
LOG_TRACE(aggregator->log, "Converting aggregation data to two-level.");
switch (type)
{
#define M(NAME) \
case Type::NAME: \
NAME ## _two_level = std::make_unique<decltype(NAME ## _two_level)::element_type>(*(NAME)); \
(NAME).reset(); \
type = Type::NAME ## _two_level; \
break;
APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M)
#undef M
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong data variant passed.");
}
}
bool AggregatedDataVariants::isLowCardinality() const
{
switch (type)
{
#define M(NAME) \
case Type::NAME: \
return true;
APPLY_FOR_LOW_CARDINALITY_VARIANTS(M)
#undef M
default:
return false;
}
}
HashMethodContextPtr AggregatedDataVariants::createCache(Type type, const HashMethodContext::Settings & settings)
{
switch (type)
{
case Type::without_key:
return nullptr;
#define M(NAME, IS_TWO_LEVEL) \
case Type::NAME: { \
using TPtr##NAME = decltype(AggregatedDataVariants::NAME); \
using T##NAME = typename TPtr##NAME ::element_type; \
return T##NAME ::State::createContext(settings); \
}
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
default:
throw Exception(ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT, "Unknown aggregated data variant.");
}
}
}

View File

@ -0,0 +1,320 @@
#pragma once
#include <boost/noncopyable.hpp>
#include <memory.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <Common/ColumnsHashing.h>
#include <Interpreters/AggregatedData.h>
#include <Interpreters/AggregationMethod.h>
namespace DB
{
class Arena;
class Aggregator;
struct AggregatedDataVariants : private boost::noncopyable
{
/** Working with states of aggregate functions in the pool is arranged in the following (inconvenient) way:
* - when aggregating, states are created in the pool using IAggregateFunction::create (inside - `placement new` of arbitrary structure);
* - they must then be destroyed using IAggregateFunction::destroy (inside - calling the destructor of arbitrary structure);
* - if aggregation is complete, then, in the Aggregator::convertToBlocks function, pointers to the states of aggregate functions
* are written to ColumnAggregateFunction; ColumnAggregateFunction "acquires ownership" of them, that is - calls `destroy` in its destructor.
* - if during the aggregation, before call to Aggregator::convertToBlocks, an exception was thrown,
* then the states of aggregate functions must still be destroyed,
* otherwise, for complex states (eg, AggregateFunctionUniq), there will be memory leaks;
* - in this case, to destroy states, the destructor calls Aggregator::destroyAggregateStates method,
* but only if the variable aggregator (see below) is not nullptr;
* - that is, until you transfer ownership of the aggregate function states in the ColumnAggregateFunction, set the variable `aggregator`,
* so that when an exception occurs, the states are correctly destroyed.
*
* PS. This can be corrected by making a pool that knows about which states of aggregate functions and in which order are put in it, and knows how to destroy them.
* But this can hardly be done simply because it is planned to put variable-length strings into the same pool.
* In this case, the pool will not be able to know with what offsets objects are stored.
*/
const Aggregator * aggregator = nullptr;
size_t keys_size{}; /// Number of keys. NOTE do we need this field?
Sizes key_sizes; /// Dimensions of keys, if keys of fixed length
/// Pools for states of aggregate functions. Ownership will be later transferred to ColumnAggregateFunction.
using ArenaPtr = std::shared_ptr<Arena>;
using Arenas = std::vector<ArenaPtr>;
Arenas aggregates_pools;
Arena * aggregates_pool{}; /// The pool that is currently used for allocation.
/** Specialization for the case when there are no keys, and for keys not fitted into max_rows_to_group_by.
*/
AggregatedDataWithoutKey without_key = nullptr;
/// Stats of a cache for consecutive keys optimization.
/// Stats can be used to disable the cache in case of a lot of misses.
ColumnsHashing::LastElementCacheStats consecutive_keys_cache_stats;
// Disable consecutive key optimization for Uint8/16, because they use a FixedHashMap
// and the lookup there is almost free, so we don't need to cache the last lookup result
std::unique_ptr<AggregationMethodOneNumber<UInt8, AggregatedDataWithUInt8Key, false>> key8;
std::unique_ptr<AggregationMethodOneNumber<UInt16, AggregatedDataWithUInt16Key, false>> key16;
std::unique_ptr<AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt64Key>> key32;
std::unique_ptr<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64Key>> key64;
std::unique_ptr<AggregationMethodStringNoCache<AggregatedDataWithShortStringKey>> key_string;
std::unique_ptr<AggregationMethodFixedStringNoCache<AggregatedDataWithShortStringKey>> key_fixed_string;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithUInt16Key, false, false, false>> keys16;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithUInt32Key>> keys32;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithUInt64Key>> keys64;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128>> keys128;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256>> keys256;
std::unique_ptr<AggregationMethodSerialized<AggregatedDataWithStringKey>> serialized;
std::unique_ptr<AggregationMethodNullableSerialized<AggregatedDataWithStringKey>> nullable_serialized;
std::unique_ptr<AggregationMethodPreallocSerialized<AggregatedDataWithStringKey>> prealloc_serialized;
std::unique_ptr<AggregationMethodNullablePreallocSerialized<AggregatedDataWithStringKey>> nullable_prealloc_serialized;
std::unique_ptr<AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt64KeyTwoLevel>> key32_two_level;
std::unique_ptr<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64KeyTwoLevel>> key64_two_level;
std::unique_ptr<AggregationMethodStringNoCache<AggregatedDataWithShortStringKeyTwoLevel>> key_string_two_level;
std::unique_ptr<AggregationMethodFixedStringNoCache<AggregatedDataWithShortStringKeyTwoLevel>> key_fixed_string_two_level;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithUInt32KeyTwoLevel>> keys32_two_level;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithUInt64KeyTwoLevel>> keys64_two_level;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128TwoLevel>> keys128_two_level;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256TwoLevel>> keys256_two_level;
std::unique_ptr<AggregationMethodSerialized<AggregatedDataWithStringKeyTwoLevel>> serialized_two_level;
std::unique_ptr<AggregationMethodNullableSerialized<AggregatedDataWithStringKeyTwoLevel>> nullable_serialized_two_level;
std::unique_ptr<AggregationMethodPreallocSerialized<AggregatedDataWithStringKeyTwoLevel>> prealloc_serialized_two_level;
std::unique_ptr<AggregationMethodNullablePreallocSerialized<AggregatedDataWithStringKeyTwoLevel>> nullable_prealloc_serialized_two_level;
std::unique_ptr<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64KeyHash64>> key64_hash64;
std::unique_ptr<AggregationMethodString<AggregatedDataWithStringKeyHash64>> key_string_hash64;
std::unique_ptr<AggregationMethodFixedString<AggregatedDataWithStringKeyHash64>> key_fixed_string_hash64;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128Hash64>> keys128_hash64;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256Hash64>> keys256_hash64;
std::unique_ptr<AggregationMethodSerialized<AggregatedDataWithStringKeyHash64>> serialized_hash64;
std::unique_ptr<AggregationMethodNullableSerialized<AggregatedDataWithStringKeyHash64>> nullable_serialized_hash64;
std::unique_ptr<AggregationMethodPreallocSerialized<AggregatedDataWithStringKeyHash64>> prealloc_serialized_hash64;
std::unique_ptr<AggregationMethodNullablePreallocSerialized<AggregatedDataWithStringKeyHash64>> nullable_prealloc_serialized_hash64;
/// Support for nullable keys.
std::unique_ptr<AggregationMethodOneNumber<UInt8, AggregatedDataWithNullableUInt8Key, false, true>> nullable_key8;
std::unique_ptr<AggregationMethodOneNumber<UInt16, AggregatedDataWithNullableUInt16Key, false, true>> nullable_key16;
std::unique_ptr<AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt32Key, true, true>> nullable_key32;
std::unique_ptr<AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64Key, true, true>> nullable_key64;
std::unique_ptr<AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt32KeyTwoLevel, true, true>> nullable_key32_two_level;
std::unique_ptr<AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64KeyTwoLevel, true, true>> nullable_key64_two_level;
std::unique_ptr<AggregationMethodStringNoCache<AggregatedDataWithNullableShortStringKey, true>> nullable_key_string;
std::unique_ptr<AggregationMethodFixedStringNoCache<AggregatedDataWithNullableShortStringKey, true>> nullable_key_fixed_string;
std::unique_ptr<AggregationMethodStringNoCache<AggregatedDataWithNullableShortStringKeyTwoLevel, true>> nullable_key_string_two_level;
std::unique_ptr<AggregationMethodFixedStringNoCache<AggregatedDataWithNullableShortStringKeyTwoLevel, true>> nullable_key_fixed_string_two_level;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128, true>> nullable_keys128;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256, true>> nullable_keys256;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128TwoLevel, true>> nullable_keys128_two_level;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256TwoLevel, true>> nullable_keys256_two_level;
/// Support for low cardinality.
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt8, AggregatedDataWithNullableUInt8Key, false>>> low_cardinality_key8;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt16, AggregatedDataWithNullableUInt16Key, false>>> low_cardinality_key16;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt64Key>>> low_cardinality_key32;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64Key>>> low_cardinality_key64;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodString<AggregatedDataWithNullableStringKey>>> low_cardinality_key_string;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodFixedString<AggregatedDataWithNullableStringKey>>> low_cardinality_key_fixed_string;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt64KeyTwoLevel>>> low_cardinality_key32_two_level;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64KeyTwoLevel>>> low_cardinality_key64_two_level;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodString<AggregatedDataWithNullableStringKeyTwoLevel>>> low_cardinality_key_string_two_level;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodFixedString<AggregatedDataWithNullableStringKeyTwoLevel>>> low_cardinality_key_fixed_string_two_level;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128, false, true>> low_cardinality_keys128;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256, false, true>> low_cardinality_keys256;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128TwoLevel, false, true>> low_cardinality_keys128_two_level;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256TwoLevel, false, true>> low_cardinality_keys256_two_level;
/// In this and similar macros, the option without_key is not considered.
#define APPLY_FOR_AGGREGATED_VARIANTS(M) \
M(key8, false) \
M(key16, false) \
M(key32, false) \
M(key64, false) \
M(key_string, false) \
M(key_fixed_string, false) \
M(keys16, false) \
M(keys32, false) \
M(keys64, false) \
M(keys128, false) \
M(keys256, false) \
M(serialized, false) \
M(nullable_serialized, false) \
M(prealloc_serialized, false) \
M(nullable_prealloc_serialized, false) \
M(key32_two_level, true) \
M(key64_two_level, true) \
M(key_string_two_level, true) \
M(key_fixed_string_two_level, true) \
M(keys32_two_level, true) \
M(keys64_two_level, true) \
M(keys128_two_level, true) \
M(keys256_two_level, true) \
M(serialized_two_level, true) \
M(nullable_serialized_two_level, true) \
M(prealloc_serialized_two_level, true) \
M(nullable_prealloc_serialized_two_level, true) \
M(key64_hash64, false) \
M(key_string_hash64, false) \
M(key_fixed_string_hash64, false) \
M(keys128_hash64, false) \
M(keys256_hash64, false) \
M(serialized_hash64, false) \
M(nullable_serialized_hash64, false) \
M(prealloc_serialized_hash64, false) \
M(nullable_prealloc_serialized_hash64, false) \
M(nullable_key8, false) \
M(nullable_key16, false) \
M(nullable_key32, false) \
M(nullable_key64, false) \
M(nullable_key32_two_level, true) \
M(nullable_key64_two_level, true) \
M(nullable_key_string, false) \
M(nullable_key_fixed_string, false) \
M(nullable_key_string_two_level, true) \
M(nullable_key_fixed_string_two_level, true) \
M(nullable_keys128, false) \
M(nullable_keys256, false) \
M(nullable_keys128_two_level, true) \
M(nullable_keys256_two_level, true) \
M(low_cardinality_key8, false) \
M(low_cardinality_key16, false) \
M(low_cardinality_key32, false) \
M(low_cardinality_key64, false) \
M(low_cardinality_keys128, false) \
M(low_cardinality_keys256, false) \
M(low_cardinality_key_string, false) \
M(low_cardinality_key_fixed_string, false) \
M(low_cardinality_key32_two_level, true) \
M(low_cardinality_key64_two_level, true) \
M(low_cardinality_keys128_two_level, true) \
M(low_cardinality_keys256_two_level, true) \
M(low_cardinality_key_string_two_level, true) \
M(low_cardinality_key_fixed_string_two_level, true) \
#define APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M) \
M(key32) \
M(key64) \
M(key_string) \
M(key_fixed_string) \
M(keys32) \
M(keys64) \
M(keys128) \
M(keys256) \
M(serialized) \
M(nullable_serialized) \
M(prealloc_serialized) \
M(nullable_prealloc_serialized) \
M(nullable_key32) \
M(nullable_key64) \
M(nullable_key_string) \
M(nullable_key_fixed_string) \
M(nullable_keys128) \
M(nullable_keys256) \
M(low_cardinality_key32) \
M(low_cardinality_key64) \
M(low_cardinality_keys128) \
M(low_cardinality_keys256) \
M(low_cardinality_key_string) \
M(low_cardinality_key_fixed_string) \
/// NOLINTNEXTLINE
#define APPLY_FOR_VARIANTS_NOT_CONVERTIBLE_TO_TWO_LEVEL(M) \
M(key8) \
M(key16) \
M(nullable_key8) \
M(nullable_key16) \
M(keys16) \
M(key64_hash64) \
M(key_string_hash64)\
M(key_fixed_string_hash64) \
M(keys128_hash64) \
M(keys256_hash64) \
M(serialized_hash64) \
M(nullable_serialized_hash64) \
M(prealloc_serialized_hash64) \
M(nullable_prealloc_serialized_hash64) \
M(low_cardinality_key8) \
M(low_cardinality_key16) \
/// NOLINTNEXTLINE
#define APPLY_FOR_VARIANTS_SINGLE_LEVEL(M) \
APPLY_FOR_VARIANTS_NOT_CONVERTIBLE_TO_TWO_LEVEL(M) \
APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M) \
/// NOLINTNEXTLINE
#define APPLY_FOR_VARIANTS_TWO_LEVEL(M) \
M(key32_two_level) \
M(key64_two_level) \
M(key_string_two_level) \
M(key_fixed_string_two_level) \
M(keys32_two_level) \
M(keys64_two_level) \
M(keys128_two_level) \
M(keys256_two_level) \
M(serialized_two_level) \
M(nullable_serialized_two_level) \
M(prealloc_serialized_two_level) \
M(nullable_prealloc_serialized_two_level) \
M(nullable_key32_two_level) \
M(nullable_key64_two_level) \
M(nullable_key_string_two_level) \
M(nullable_key_fixed_string_two_level) \
M(nullable_keys128_two_level) \
M(nullable_keys256_two_level) \
M(low_cardinality_key32_two_level) \
M(low_cardinality_key64_two_level) \
M(low_cardinality_keys128_two_level) \
M(low_cardinality_keys256_two_level) \
M(low_cardinality_key_string_two_level) \
M(low_cardinality_key_fixed_string_two_level) \
#define APPLY_FOR_LOW_CARDINALITY_VARIANTS(M) \
M(low_cardinality_key8) \
M(low_cardinality_key16) \
M(low_cardinality_key32) \
M(low_cardinality_key64) \
M(low_cardinality_keys128) \
M(low_cardinality_keys256) \
M(low_cardinality_key_string) \
M(low_cardinality_key_fixed_string) \
M(low_cardinality_key32_two_level) \
M(low_cardinality_key64_two_level) \
M(low_cardinality_keys128_two_level) \
M(low_cardinality_keys256_two_level) \
M(low_cardinality_key_string_two_level) \
M(low_cardinality_key_fixed_string_two_level)
enum class Type
{
EMPTY = 0,
without_key,
#define M(NAME, IS_TWO_LEVEL) NAME,
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
};
Type type = Type::EMPTY;
AggregatedDataVariants();
~AggregatedDataVariants();
bool empty() const { return type == Type::EMPTY; }
void invalidate() { type = Type::EMPTY; }
void init(Type type_, std::optional<size_t> size_hint = std::nullopt);
/// Number of rows (different keys).
size_t size() const;
size_t sizeWithoutOverflowRow() const;
const char * getMethodName() const;
bool isTwoLevel() const;
bool isConvertibleToTwoLevel() const;
void convertToTwoLevel();
bool isLowCardinality() const;
static ColumnsHashing::HashMethodContextPtr createCache(Type type, const ColumnsHashing::HashMethodContext::Settings & settings);
};
using AggregatedDataVariantsPtr = std::shared_ptr<AggregatedDataVariants>;
using ManyAggregatedDataVariants = std::vector<AggregatedDataVariantsPtr>;
using ManyAggregatedDataVariantsPtr = std::shared_ptr<ManyAggregatedDataVariants>;
}

View File

@ -0,0 +1,215 @@
#include <Interpreters/AggregationMethod.h>
namespace DB
{
template <typename FieldType, typename TData, bool consecutive_keys_optimization, bool nullable>
void AggregationMethodOneNumber<FieldType, TData, consecutive_keys_optimization, nullable>::insertKeyIntoColumns(
const AggregationMethodOneNumber::Key & key, std::vector<IColumn *> & key_columns, const Sizes & /*key_sizes*/)
{
ColumnFixedSizeHelper * column;
if constexpr (nullable)
{
ColumnNullable & nullable_col = assert_cast<ColumnNullable &>(*key_columns[0]);
ColumnUInt8 * null_map = assert_cast<ColumnUInt8 *>(&nullable_col.getNullMapColumn());
null_map->insertDefault();
column = static_cast<ColumnFixedSizeHelper *>(&nullable_col.getNestedColumn());
}
else
{
column = static_cast<ColumnFixedSizeHelper *>(key_columns[0]);
}
static_assert(sizeof(FieldType) <= sizeof(Key));
const auto * key_holder = reinterpret_cast<const char *>(&key);
if constexpr (sizeof(FieldType) < sizeof(Key) && std::endian::native == std::endian::big)
column->insertRawData<sizeof(FieldType)>(key_holder + (sizeof(Key) - sizeof(FieldType)));
else
column->insertRawData<sizeof(FieldType)>(key_holder);
}
template struct AggregationMethodOneNumber<UInt8, AggregatedDataWithUInt8Key, false>;
template struct AggregationMethodOneNumber<UInt16, AggregatedDataWithUInt16Key, false>;
template struct AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt64Key>;
template struct AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64Key>;
template struct AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt64KeyTwoLevel>;
template struct AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64KeyTwoLevel>;
template struct AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64KeyHash64>;
template struct AggregationMethodOneNumber<UInt8, AggregatedDataWithNullableUInt8Key, false, true>;
template struct AggregationMethodOneNumber<UInt16, AggregatedDataWithNullableUInt16Key, false, true>;
template struct AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt32Key, true, true>;
template struct AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64Key, true, true>;
template struct AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt32KeyTwoLevel, true, true>;
template struct AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64KeyTwoLevel, true, true>;
template struct AggregationMethodOneNumber<UInt8, AggregatedDataWithNullableUInt8Key, false>;
template struct AggregationMethodOneNumber<UInt16, AggregatedDataWithNullableUInt16Key, false>;
template struct AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt64Key>;
template struct AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64Key>;
template struct AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt64KeyTwoLevel>;
template struct AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64KeyTwoLevel>;
template <typename TData, bool nullable>
void AggregationMethodStringNoCache<TData, nullable>::insertKeyIntoColumns(StringRef key, std::vector<IColumn *> & key_columns, const Sizes &)
{
if constexpr (nullable)
{
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*key_columns[0]);
assert_cast<ColumnString &>(column_nullable.getNestedColumn()).insertData(key.data, key.size);
column_nullable.getNullMapData().push_back(0);
}
else
{
assert_cast<ColumnString &>(*key_columns[0]).insertData(key.data, key.size);
}
}
template struct AggregationMethodStringNoCache<AggregatedDataWithShortStringKey>;
template struct AggregationMethodStringNoCache<AggregatedDataWithShortStringKeyTwoLevel>;
template struct AggregationMethodStringNoCache<AggregatedDataWithNullableShortStringKey, true>;
template struct AggregationMethodStringNoCache<AggregatedDataWithNullableShortStringKeyTwoLevel, true>;
template <typename TData>
void AggregationMethodFixedString<TData>::insertKeyIntoColumns(StringRef key, std::vector<IColumn *> & key_columns, const Sizes &)
{
assert_cast<ColumnFixedString &>(*key_columns[0]).insertData(key.data, key.size);
}
template struct AggregationMethodFixedString<AggregatedDataWithStringKeyHash64>;
template struct AggregationMethodFixedString<AggregatedDataWithNullableStringKey>;
template struct AggregationMethodFixedString<AggregatedDataWithNullableStringKeyTwoLevel>;
template <typename TData, bool nullable>
void AggregationMethodFixedStringNoCache<TData, nullable>::insertKeyIntoColumns(StringRef key, std::vector<IColumn *> & key_columns, const Sizes &)
{
if constexpr (nullable)
assert_cast<ColumnNullable &>(*key_columns[0]).insertData(key.data, key.size);
else
assert_cast<ColumnFixedString &>(*key_columns[0]).insertData(key.data, key.size);
}
template struct AggregationMethodFixedStringNoCache<AggregatedDataWithShortStringKey>;
template struct AggregationMethodFixedStringNoCache<AggregatedDataWithShortStringKeyTwoLevel>;
template struct AggregationMethodFixedStringNoCache<AggregatedDataWithNullableShortStringKey, true>;
template struct AggregationMethodFixedStringNoCache<AggregatedDataWithNullableShortStringKeyTwoLevel, true>;
template <typename SingleColumnMethod>
void AggregationMethodSingleLowCardinalityColumn<SingleColumnMethod>::insertKeyIntoColumns(
const Key & key, std::vector<IColumn *> & key_columns_low_cardinality, const Sizes & /*key_sizes*/)
{
auto * col = assert_cast<ColumnLowCardinality *>(key_columns_low_cardinality[0]);
if constexpr (std::is_same_v<Key, StringRef>)
col->insertData(key.data, key.size);
else
col->insertData(reinterpret_cast<const char *>(&key), sizeof(key));
}
template struct AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt8, AggregatedDataWithNullableUInt8Key, false>>;
template struct AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt16, AggregatedDataWithNullableUInt16Key, false>>;
template struct AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt64Key>>;
template struct AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64Key>>;
template struct AggregationMethodSingleLowCardinalityColumn<AggregationMethodString<AggregatedDataWithNullableStringKey>>;
template struct AggregationMethodSingleLowCardinalityColumn<AggregationMethodFixedString<AggregatedDataWithNullableStringKey>>;
template struct AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt64KeyTwoLevel>>;
template struct AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64KeyTwoLevel>>;
template struct AggregationMethodSingleLowCardinalityColumn<AggregationMethodString<AggregatedDataWithNullableStringKeyTwoLevel>>;
template struct AggregationMethodSingleLowCardinalityColumn<AggregationMethodFixedString<AggregatedDataWithNullableStringKeyTwoLevel>>;
template <typename TData, bool has_nullable_keys, bool has_low_cardinality, bool consecutive_keys_optimization>
void AggregationMethodKeysFixed<TData, has_nullable_keys, has_low_cardinality,consecutive_keys_optimization>::insertKeyIntoColumns(const Key & key, std::vector<IColumn *> & key_columns, const Sizes & key_sizes)
{
size_t keys_size = key_columns.size();
static constexpr auto bitmap_size = has_nullable_keys ? std::tuple_size<KeysNullMap<Key>>::value : 0;
/// In any hash key value, column values to be read start just after the bitmap, if it exists.
size_t pos = bitmap_size;
for (size_t i = 0; i < keys_size; ++i)
{
IColumn * observed_column;
ColumnUInt8 * null_map;
bool column_nullable = false;
if constexpr (has_nullable_keys)
column_nullable = isColumnNullable(*key_columns[i]);
/// If we have a nullable column, get its nested column and its null map.
if (column_nullable)
{
ColumnNullable & nullable_col = assert_cast<ColumnNullable &>(*key_columns[i]);
observed_column = &nullable_col.getNestedColumn();
null_map = assert_cast<ColumnUInt8 *>(&nullable_col.getNullMapColumn());
}
else
{
observed_column = key_columns[i];
null_map = nullptr;
}
bool is_null = false;
if (column_nullable)
{
/// The current column is nullable. Check if the value of the
/// corresponding key is nullable. Update the null map accordingly.
size_t bucket = i / 8;
size_t offset = i % 8;
UInt8 val = (reinterpret_cast<const UInt8 *>(&key)[bucket] >> offset) & 1;
null_map->insertValue(val);
is_null = val == 1;
}
if (has_nullable_keys && is_null)
observed_column->insertDefault();
else
{
size_t size = key_sizes[i];
size_t offset_to = pos;
if constexpr (std::endian::native == std::endian::big)
offset_to = sizeof(Key) - size - pos;
observed_column->insertData(reinterpret_cast<const char *>(&key) + offset_to, size);
pos += size;
}
}
}
template struct AggregationMethodKeysFixed<AggregatedDataWithUInt16Key, false, false, false>;
template struct AggregationMethodKeysFixed<AggregatedDataWithUInt32Key>;
template struct AggregationMethodKeysFixed<AggregatedDataWithUInt64Key>;
template struct AggregationMethodKeysFixed<AggregatedDataWithKeys128>;
template struct AggregationMethodKeysFixed<AggregatedDataWithKeys256>;
template struct AggregationMethodKeysFixed<AggregatedDataWithUInt32KeyTwoLevel>;
template struct AggregationMethodKeysFixed<AggregatedDataWithUInt64KeyTwoLevel>;
template struct AggregationMethodKeysFixed<AggregatedDataWithKeys128TwoLevel>;
template struct AggregationMethodKeysFixed<AggregatedDataWithKeys256TwoLevel>;
template struct AggregationMethodKeysFixed<AggregatedDataWithKeys128Hash64>;
template struct AggregationMethodKeysFixed<AggregatedDataWithKeys256Hash64>;
template struct AggregationMethodKeysFixed<AggregatedDataWithKeys128, true>;
template struct AggregationMethodKeysFixed<AggregatedDataWithKeys256, true>;
template struct AggregationMethodKeysFixed<AggregatedDataWithKeys128TwoLevel, true>;
template struct AggregationMethodKeysFixed<AggregatedDataWithKeys256TwoLevel, true>;
template struct AggregationMethodKeysFixed<AggregatedDataWithKeys128, false, true>;
template struct AggregationMethodKeysFixed<AggregatedDataWithKeys256, false, true>;
template struct AggregationMethodKeysFixed<AggregatedDataWithKeys128TwoLevel, false, true>;
template struct AggregationMethodKeysFixed<AggregatedDataWithKeys256TwoLevel, false, true>;
template <typename TData, bool nullable, bool prealloc>
void AggregationMethodSerialized<TData, nullable, prealloc>::insertKeyIntoColumns(StringRef key, std::vector<IColumn *> & key_columns, const Sizes &)
{
const auto * pos = key.data;
for (auto & column : key_columns)
pos = column->deserializeAndInsertFromArena(pos);
}
template struct AggregationMethodSerialized<AggregatedDataWithStringKey>;
template struct AggregationMethodSerialized<AggregatedDataWithStringKeyTwoLevel>;
template struct AggregationMethodSerialized<AggregatedDataWithStringKeyHash64>;
// AggregationMethodNullableSerialized
template struct AggregationMethodSerialized<AggregatedDataWithStringKey, true, false>;
template struct AggregationMethodSerialized<AggregatedDataWithStringKeyTwoLevel, true, false>;
template struct AggregationMethodSerialized<AggregatedDataWithStringKeyHash64, true, false>;
// AggregationMethodPreallocSerialized
template struct AggregationMethodSerialized<AggregatedDataWithStringKey, false, true>;
template struct AggregationMethodSerialized<AggregatedDataWithStringKeyTwoLevel, false, true>;
template struct AggregationMethodSerialized<AggregatedDataWithStringKeyHash64, false, true>;
// AggregationMethodNullablePreallocSerialized
template struct AggregationMethodSerialized<AggregatedDataWithStringKey, true, true>;
template struct AggregationMethodSerialized<AggregatedDataWithStringKeyTwoLevel, true, true>;
template struct AggregationMethodSerialized<AggregatedDataWithStringKeyHash64, true, true>;
}

View File

@ -0,0 +1,320 @@
#pragma once
#include <vector>
#include <Common/ColumnsHashing.h>
#include <Interpreters/AggregationCommon.h>
#include <Interpreters/AggregatedData.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnAggregateFunction.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnLowCardinality.h>
namespace DB
{
class IColumn;
/// For the case where there is one numeric key.
/// FieldType is UInt8/16/32/64 for any type with corresponding bit width.
template <typename FieldType, typename TData,
bool consecutive_keys_optimization = true, bool nullable = false>
struct AggregationMethodOneNumber
{
using Data = TData;
using Key = typename Data::key_type;
using Mapped = typename Data::mapped_type;
Data data;
AggregationMethodOneNumber() = default;
explicit AggregationMethodOneNumber(size_t size_hint) : data(size_hint) { }
template <typename Other>
explicit AggregationMethodOneNumber(const Other & other) : data(other.data)
{
}
/// To use one `Method` in different threads, use different `State`.
template <bool use_cache>
using StateImpl = ColumnsHashing::HashMethodOneNumber<
typename Data::value_type,
Mapped,
FieldType,
use_cache && consecutive_keys_optimization,
/*need_offset=*/ false,
nullable>;
using State = StateImpl<true>;
using StateNoCache = StateImpl<false>;
/// Use optimization for low cardinality.
static const bool low_cardinality_optimization = false;
static const bool one_key_nullable_optimization = nullable;
/// Shuffle key columns before `insertKeyIntoColumns` call if needed.
std::optional<Sizes> shuffleKeyColumns(std::vector<IColumn *> &, const Sizes &) { return {}; }
// Insert the key from the hash table into columns.
static void insertKeyIntoColumns(const Key & key, std::vector<IColumn *> & key_columns, const Sizes & /*key_sizes*/);
};
/// For the case where there is one string key.
template <typename TData>
struct AggregationMethodString
{
using Data = TData;
using Key = typename Data::key_type;
using Mapped = typename Data::mapped_type;
Data data;
AggregationMethodString() = default;
template <typename Other>
explicit AggregationMethodString(const Other & other) : data(other.data)
{
}
explicit AggregationMethodString(size_t size_hint) : data(size_hint) { }
template <bool use_cache>
using StateImpl = ColumnsHashing::HashMethodString<typename Data::value_type, Mapped, /*place_string_to_arena=*/ true, use_cache>;
using State = StateImpl<true>;
using StateNoCache = StateImpl<false>;
static const bool low_cardinality_optimization = false;
static const bool one_key_nullable_optimization = false;
std::optional<Sizes> shuffleKeyColumns(std::vector<IColumn *> &, const Sizes &) { return {}; }
static void insertKeyIntoColumns(StringRef key, std::vector<IColumn *> & key_columns, const Sizes &)
{
static_cast<ColumnString *>(key_columns[0])->insertData(key.data, key.size);
}
};
/// Same as above but without cache
template <typename TData, bool nullable = false>
struct AggregationMethodStringNoCache
{
using Data = TData;
using Key = typename Data::key_type;
using Mapped = typename Data::mapped_type;
Data data;
AggregationMethodStringNoCache() = default;
explicit AggregationMethodStringNoCache(size_t size_hint) : data(size_hint) { }
template <typename Other>
explicit AggregationMethodStringNoCache(const Other & other) : data(other.data)
{
}
template <bool use_cache>
using StateImpl = ColumnsHashing::HashMethodString<typename Data::value_type, Mapped, true, false, false, nullable>;
using State = StateImpl<true>;
using StateNoCache = StateImpl<false>;
static const bool low_cardinality_optimization = false;
static const bool one_key_nullable_optimization = nullable;
std::optional<Sizes> shuffleKeyColumns(std::vector<IColumn *> &, const Sizes &) { return {}; }
static void insertKeyIntoColumns(StringRef key, std::vector<IColumn *> & key_columns, const Sizes &);
};
/// For the case where there is one fixed-length string key.
template <typename TData>
struct AggregationMethodFixedString
{
using Data = TData;
using Key = typename Data::key_type;
using Mapped = typename Data::mapped_type;
Data data;
AggregationMethodFixedString() = default;
explicit AggregationMethodFixedString(size_t size_hint) : data(size_hint) { }
template <typename Other>
explicit AggregationMethodFixedString(const Other & other) : data(other.data)
{
}
template <bool use_cache>
using StateImpl = ColumnsHashing::HashMethodFixedString<typename Data::value_type, Mapped, /*place_string_to_arena=*/ true, use_cache>;
using State = StateImpl<true>;
using StateNoCache = StateImpl<false>;
static const bool low_cardinality_optimization = false;
static const bool one_key_nullable_optimization = false;
std::optional<Sizes> shuffleKeyColumns(std::vector<IColumn *> &, const Sizes &) { return {}; }
static void insertKeyIntoColumns(StringRef key, std::vector<IColumn *> & key_columns, const Sizes &);
};
/// Same as above but without cache
template <typename TData, bool nullable = false>
struct AggregationMethodFixedStringNoCache
{
using Data = TData;
using Key = typename Data::key_type;
using Mapped = typename Data::mapped_type;
Data data;
AggregationMethodFixedStringNoCache() = default;
explicit AggregationMethodFixedStringNoCache(size_t size_hint) : data(size_hint) { }
template <typename Other>
explicit AggregationMethodFixedStringNoCache(const Other & other) : data(other.data)
{
}
template <bool use_cache>
using StateImpl = ColumnsHashing::HashMethodFixedString<typename Data::value_type, Mapped, true, false, false, nullable>;
using State = StateImpl<true>;
using StateNoCache = StateImpl<false>;
static const bool low_cardinality_optimization = false;
static const bool one_key_nullable_optimization = nullable;
std::optional<Sizes> shuffleKeyColumns(std::vector<IColumn *> &, const Sizes &) { return {}; }
static void insertKeyIntoColumns(StringRef key, std::vector<IColumn *> & key_columns, const Sizes &);
};
/// Single low cardinality column.
template <typename SingleColumnMethod>
struct AggregationMethodSingleLowCardinalityColumn : public SingleColumnMethod
{
using Base = SingleColumnMethod;
using Data = typename Base::Data;
using Key = typename Base::Key;
using Mapped = typename Base::Mapped;
using Base::data;
template <bool use_cache>
using BaseStateImpl = typename Base::template StateImpl<use_cache>;
AggregationMethodSingleLowCardinalityColumn() = default;
template <typename Other>
explicit AggregationMethodSingleLowCardinalityColumn(const Other & other) : Base(other) {}
template <bool use_cache>
using StateImpl = ColumnsHashing::HashMethodSingleLowCardinalityColumn<BaseStateImpl<use_cache>, Mapped, use_cache>;
using State = StateImpl<true>;
using StateNoCache = StateImpl<false>;
static const bool low_cardinality_optimization = true;
std::optional<Sizes> shuffleKeyColumns(std::vector<IColumn *> &, const Sizes &) { return {}; }
static void insertKeyIntoColumns(const Key & key,
std::vector<IColumn *> & key_columns_low_cardinality, const Sizes & /*key_sizes*/);
};
/// For the case where all keys are of fixed length, and they fit in N (for example, 128) bits.
template <typename TData, bool has_nullable_keys_ = false, bool has_low_cardinality_ = false, bool consecutive_keys_optimization = false>
struct AggregationMethodKeysFixed
{
using Data = TData;
using Key = typename Data::key_type;
using Mapped = typename Data::mapped_type;
static constexpr bool has_nullable_keys = has_nullable_keys_;
static constexpr bool has_low_cardinality = has_low_cardinality_;
Data data;
AggregationMethodKeysFixed() = default;
explicit AggregationMethodKeysFixed(size_t size_hint) : data(size_hint) { }
template <typename Other>
explicit AggregationMethodKeysFixed(const Other & other) : data(other.data)
{
}
template <bool use_cache>
using StateImpl = ColumnsHashing::HashMethodKeysFixed<
typename Data::value_type,
Key,
Mapped,
has_nullable_keys,
has_low_cardinality,
use_cache && consecutive_keys_optimization>;
using State = StateImpl<true>;
using StateNoCache = StateImpl<false>;
static const bool low_cardinality_optimization = false;
static const bool one_key_nullable_optimization = false;
std::optional<Sizes> shuffleKeyColumns(std::vector<IColumn *> & key_columns, const Sizes & key_sizes)
{
return State::shuffleKeyColumns(key_columns, key_sizes);
}
static void insertKeyIntoColumns(const Key & key, std::vector<IColumn *> & key_columns, const Sizes & key_sizes);
};
/** Aggregates by concatenating serialized key values.
* The serialized value differs in that it uniquely allows to deserialize it, having only the position with which it starts.
* That is, for example, for strings, it contains first the serialized length of the string, and then the bytes.
* Therefore, when aggregating by several strings, there is no ambiguity.
*/
template <typename TData, bool nullable = false, bool prealloc = false>
struct AggregationMethodSerialized
{
using Data = TData;
using Key = typename Data::key_type;
using Mapped = typename Data::mapped_type;
Data data;
AggregationMethodSerialized() = default;
explicit AggregationMethodSerialized(size_t size_hint) : data(size_hint) { }
template <typename Other>
explicit AggregationMethodSerialized(const Other & other) : data(other.data)
{
}
template <bool use_cache>
using StateImpl = ColumnsHashing::HashMethodSerialized<typename Data::value_type, Mapped, nullable, prealloc>;
using State = StateImpl<true>;
using StateNoCache = StateImpl<false>;
static const bool low_cardinality_optimization = false;
static const bool one_key_nullable_optimization = false;
std::optional<Sizes> shuffleKeyColumns(std::vector<IColumn *> &, const Sizes &) { return {}; }
static void insertKeyIntoColumns(StringRef key, std::vector<IColumn *> & key_columns, const Sizes &);
};
template <typename TData>
using AggregationMethodNullableSerialized = AggregationMethodSerialized<TData, true>;
template <typename TData>
using AggregationMethodPreallocSerialized = AggregationMethodSerialized<TData, false, true>;
template <typename TData>
using AggregationMethodNullablePreallocSerialized = AggregationMethodSerialized<TData, true, true>;
}

View File

@ -48,7 +48,6 @@ namespace ProfileEvents
extern const Event ExternalAggregationUncompressedBytes;
extern const Event ExternalProcessingCompressedBytesTotal;
extern const Event ExternalProcessingUncompressedBytesTotal;
extern const Event AggregationPreallocatedElementsInHashTables;
extern const Event AggregationHashTablesInitializedAsTwoLevel;
extern const Event OverflowThrow;
extern const Event OverflowBreak;
@ -269,50 +268,6 @@ void updateStatistics(const DB::ManyAggregatedDataVariants & data_variants, cons
getHashTablesStatistics().update(sum_of_sizes, *median_size, params);
}
// The std::is_constructible trait isn't suitable here because some classes have template constructors with semantics different from providing size hints.
// Also string hash table variants are not supported due to the fact that both local perf tests and tests in CI showed slowdowns for them.
template <typename...>
struct HasConstructorOfNumberOfElements : std::false_type
{
};
template <typename... Ts>
struct HasConstructorOfNumberOfElements<HashMapTable<Ts...>> : std::true_type
{
};
template <typename Key, typename Cell, typename Hash, typename Grower, typename Allocator, template <typename...> typename ImplTable>
struct HasConstructorOfNumberOfElements<TwoLevelHashMapTable<Key, Cell, Hash, Grower, Allocator, ImplTable>> : std::true_type
{
};
template <typename... Ts>
struct HasConstructorOfNumberOfElements<HashTable<Ts...>> : std::true_type
{
};
template <typename... Ts>
struct HasConstructorOfNumberOfElements<TwoLevelHashTable<Ts...>> : std::true_type
{
};
template <template <typename> typename Method, typename Base>
struct HasConstructorOfNumberOfElements<Method<Base>> : HasConstructorOfNumberOfElements<Base>
{
};
template <typename Method>
auto constructWithReserveIfPossible(size_t size_hint)
{
if constexpr (HasConstructorOfNumberOfElements<typename Method::Data>::value)
{
ProfileEvents::increment(ProfileEvents::AggregationPreallocatedElementsInHashTables, size_hint);
return std::make_unique<Method>(size_hint);
}
else
return std::make_unique<Method>();
}
DB::ColumnNumbers calculateKeysPositions(const DB::Block & header, const DB::Aggregator::Params & params)
{
DB::ColumnNumbers keys_positions(params.keys_size);
@ -345,71 +300,11 @@ size_t getMinBytesForPrefetch()
namespace DB
{
AggregatedDataVariants::~AggregatedDataVariants()
{
if (aggregator && !aggregator->all_aggregates_has_trivial_destructor)
{
try
{
aggregator->destroyAllAggregateStates(*this);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
std::optional<HashTablesCacheStatistics> getHashTablesCacheStatistics()
{
return getHashTablesStatistics().getCacheStats();
}
void AggregatedDataVariants::convertToTwoLevel()
{
if (aggregator)
LOG_TRACE(aggregator->log, "Converting aggregation data to two-level.");
switch (type)
{
#define M(NAME) \
case Type::NAME: \
NAME ## _two_level = std::make_unique<decltype(NAME ## _two_level)::element_type>(*(NAME)); \
(NAME).reset(); \
type = Type::NAME ## _two_level; \
break;
APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M)
#undef M
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong data variant passed.");
}
}
void AggregatedDataVariants::init(Type type_, std::optional<size_t> size_hint)
{
switch (type_)
{
case Type::EMPTY:
case Type::without_key:
break;
#define M(NAME, IS_TWO_LEVEL) \
case Type::NAME: \
if (size_hint) \
(NAME) = constructWithReserveIfPossible<decltype(NAME)::element_type>(*size_hint); \
else \
(NAME) = std::make_unique<decltype(NAME)::element_type>(); \
break;
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
}
type = type_;
}
Aggregator::Params::StatsCollectingParams::StatsCollectingParams() = default;
Aggregator::Params::StatsCollectingParams::StatsCollectingParams(
@ -1121,30 +1016,30 @@ void NO_INLINE Aggregator::executeImpl(
if (compiled_aggregate_functions_holder && !hasSparseArguments(aggregate_instructions))
{
if (prefetch)
executeImplBatch<false, true, true>(
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, all_keys_are_const, overflow_row);
executeImplBatch<true>(
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, false, all_keys_are_const, true, overflow_row);
else
executeImplBatch<false, true, false>(
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, all_keys_are_const, overflow_row);
executeImplBatch<false>(
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, false, all_keys_are_const, true, overflow_row);
}
else
#endif
{
if (prefetch)
executeImplBatch<false, false, true>(
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, all_keys_are_const, overflow_row);
executeImplBatch<true>(
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, false, all_keys_are_const, false, overflow_row);
else
executeImplBatch<false, false, false>(
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, all_keys_are_const, overflow_row);
executeImplBatch<false>(
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, false, all_keys_are_const, false, overflow_row);
}
}
else
{
executeImplBatch<true, false, false>(method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, all_keys_are_const, overflow_row);
executeImplBatch<false>(method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, true, all_keys_are_const, false, overflow_row);
}
}
template <bool no_more_keys, bool use_compiled_functions, bool prefetch, typename Method, typename State>
template <bool prefetch, typename Method, typename State>
void NO_INLINE Aggregator::executeImplBatch(
Method & method,
State & state,
@ -1152,7 +1047,9 @@ void NO_INLINE Aggregator::executeImplBatch(
size_t row_begin,
size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
bool no_more_keys,
bool all_keys_are_const,
bool use_compiled_functions,
AggregateDataPtr overflow_row) const
{
using KeyHolder = decltype(state.getKeyHolder(0, std::declval<Arena &>()));
@ -1164,7 +1061,7 @@ void NO_INLINE Aggregator::executeImplBatch(
/// Optimization for special case when there are no aggregate functions.
if (params.aggregates_size == 0)
{
if constexpr (no_more_keys)
if (no_more_keys)
return;
/// This pointer is unused, but the logic will compare it for nullptr to check if the cell is set.
@ -1197,39 +1094,42 @@ void NO_INLINE Aggregator::executeImplBatch(
}
/// Optimization for special case when aggregating by 8bit key.
if constexpr (!no_more_keys && std::is_same_v<Method, typename decltype(AggregatedDataVariants::key8)::element_type>)
if (!no_more_keys)
{
/// We use another method if there are aggregate functions with -Array combinator.
bool has_arrays = false;
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
{
if (inst->offsets)
{
has_arrays = true;
break;
}
}
if (!has_arrays && !hasSparseArguments(aggregate_instructions) && !all_keys_are_const)
if constexpr (std::is_same_v<Method, typename decltype(AggregatedDataVariants::key8)::element_type>)
{
/// We use another method if there are aggregate functions with -Array combinator.
bool has_arrays = false;
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
{
inst->batch_that->addBatchLookupTable8(
row_begin,
row_end,
reinterpret_cast<AggregateDataPtr *>(method.data.data()),
inst->state_offset,
[&](AggregateDataPtr & aggregate_data)
{
AggregateDataPtr place = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
createAggregateStates(place);
aggregate_data = place;
},
state.getKeyData(),
inst->batch_arguments,
aggregates_pool);
if (inst->offsets)
{
has_arrays = true;
break;
}
}
if (!has_arrays && !hasSparseArguments(aggregate_instructions) && !all_keys_are_const)
{
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
{
inst->batch_that->addBatchLookupTable8(
row_begin,
row_end,
reinterpret_cast<AggregateDataPtr *>(method.data.data()),
inst->state_offset,
[&](AggregateDataPtr & aggregate_data)
{
AggregateDataPtr place = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
createAggregateStates(place);
aggregate_data = place;
},
state.getKeyData(),
inst->batch_arguments,
aggregates_pool);
}
return;
}
return;
}
}
@ -1255,12 +1155,12 @@ void NO_INLINE Aggregator::executeImplBatch(
state.resetCache();
/// For all rows.
for (size_t i = key_start; i < key_end; ++i)
if (!no_more_keys)
{
AggregateDataPtr aggregate_data = nullptr;
if constexpr (!no_more_keys)
for (size_t i = key_start; i < key_end; ++i)
{
AggregateDataPtr aggregate_data = nullptr;
if constexpr (prefetch && HasPrefetchMemberFunc<decltype(method.data), KeyHolder>)
{
if (i == key_start + prefetching.iterationsToMeasure())
@ -1284,7 +1184,7 @@ void NO_INLINE Aggregator::executeImplBatch(
aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
#if USE_EMBEDDED_COMPILER
if constexpr (use_compiled_functions)
if (use_compiled_functions)
{
const auto & compiled_aggregate_functions = compiled_aggregate_functions_holder->compiled_aggregate_functions;
compiled_aggregate_functions.create_aggregate_states_function(aggregate_data);
@ -1297,7 +1197,8 @@ void NO_INLINE Aggregator::executeImplBatch(
#if defined(MEMORY_SANITIZER)
/// We compile only functions that do not allocate some data in Arena. Only store necessary state in AggregateData place.
for (size_t aggregate_function_index = 0; aggregate_function_index < aggregate_functions.size(); ++aggregate_function_index)
for (size_t aggregate_function_index = 0; aggregate_function_index < aggregate_functions.size();
++aggregate_function_index)
{
if (!is_aggregate_function_compiled[aggregate_function_index])
continue;
@ -1320,26 +1221,51 @@ void NO_INLINE Aggregator::executeImplBatch(
aggregate_data = emplace_result.getMapped();
assert(aggregate_data != nullptr);
places[i] = aggregate_data;
}
else
}
else
{
for (size_t i = key_start; i < key_end; ++i)
{
AggregateDataPtr aggregate_data = nullptr;
/// Add only if the key already exists.
auto find_result = state.findKey(method.data, i, *aggregates_pool);
if (find_result.isFound())
{
aggregate_data = find_result.getMapped();
}
else
{
aggregate_data = overflow_row;
}
places[i] = aggregate_data;
}
places[i] = aggregate_data;
}
executeAggregateInstructions(
aggregates_pool,
row_begin,
row_end,
aggregate_instructions,
places,
key_start,
state.hasOnlyOneValueSinceLastReset(),
no_more_keys,
all_keys_are_const,
use_compiled_functions);
}
void Aggregator::executeAggregateInstructions(
Arena * aggregates_pool,
size_t row_begin,
size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
const std::unique_ptr<AggregateDataPtr[]> &places,
size_t key_start,
bool has_only_one_value_since_last_reset,
bool no_more_keys,
bool all_keys_are_const,
bool use_compiled_functions) const
{
#if USE_EMBEDDED_COMPILER
if constexpr (use_compiled_functions)
if (use_compiled_functions)
{
std::vector<ColumnData> columns_data;
@ -1355,7 +1281,7 @@ void NO_INLINE Aggregator::executeImplBatch(
columns_data.emplace_back(getColumnData(inst->batch_arguments[argument_index]));
}
if (all_keys_are_const || (!no_more_keys && state.hasOnlyOneValueSinceLastReset()))
if (all_keys_are_const || (!no_more_keys && has_only_one_value_since_last_reset))
{
auto add_into_aggregate_states_function_single_place = compiled_aggregate_functions_holder->compiled_aggregate_functions.add_into_aggregate_states_function_single_place;
add_into_aggregate_states_function_single_place(row_begin, row_end, columns_data.data(), places[key_start]);
@ -1372,33 +1298,34 @@ void NO_INLINE Aggregator::executeImplBatch(
for (size_t i = 0; i < aggregate_functions.size(); ++i)
{
#if USE_EMBEDDED_COMPILER
if constexpr (use_compiled_functions)
if (use_compiled_functions)
if (is_aggregate_function_compiled[i])
continue;
#endif
AggregateFunctionInstruction * inst = aggregate_instructions + i;
if (all_keys_are_const || (!no_more_keys && state.hasOnlyOneValueSinceLastReset()))
if (all_keys_are_const || (!no_more_keys && has_only_one_value_since_last_reset))
addBatchSinglePlace(row_begin, row_end, inst, places[key_start] + inst->state_offset, aggregates_pool);
else
addBatch(row_begin, row_end, inst, places.get(), aggregates_pool);
}
}
template <bool use_compiled_functions>
void NO_INLINE Aggregator::executeWithoutKeyImpl(
AggregatedDataWithoutKey & res,
size_t row_begin, size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena) const
Arena * arena,
bool use_compiled_functions) const
{
if (row_begin == row_end)
return;
#if USE_EMBEDDED_COMPILER
if constexpr (use_compiled_functions)
if (use_compiled_functions)
{
std::vector<ColumnData> columns_data;
@ -1441,7 +1368,7 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl(
AggregateFunctionInstruction * inst = aggregate_instructions + i;
#if USE_EMBEDDED_COMPILER
if constexpr (use_compiled_functions)
if (use_compiled_functions)
if (is_aggregate_function_compiled[i])
continue;
#endif
@ -1707,12 +1634,12 @@ bool Aggregator::executeOnBlock(Columns columns,
// #if USE_EMBEDDED_COMPILER
// if (compiled_aggregate_functions_holder)
// {
// executeWithoutKeyImpl<true>(result.without_key, row_begin, row_end, aggregate_functions_instructions.data(), result.aggregates_pool);
// executeWithoutKeyImpl(result.without_key, row_begin, row_end, aggregate_functions_instructions.data(), result.aggregates_pool,true);
// }
// else
// #endif
{
executeWithoutKeyImpl<false>(result.without_key, row_begin, row_end, aggregate_functions_instructions.data(), result.aggregates_pool);
executeWithoutKeyImpl(result.without_key, row_begin, row_end, aggregate_functions_instructions.data(), result.aggregates_pool, false);
}
}
else
@ -1832,8 +1759,14 @@ Block Aggregator::convertOneBucketToBlock(
{
// Used in ConvertingAggregatedToChunksSource -> ConvertingAggregatedToChunksTransform (expects single chunk for each bucket_id).
constexpr bool return_single_block = true;
Block block = convertToBlockImpl<return_single_block>(
method, method.data.impls[bucket], arena, data_variants.aggregates_pools, final, method.data.impls[bucket].size());
Block block = std::get<Block>(convertToBlockImpl(
method,
method.data.impls[bucket],
arena,
data_variants.aggregates_pools,
final,
method.data.impls[bucket].size(),
return_single_block));
block.info.bucket_num = static_cast<int>(bucket);
return block;
@ -1953,35 +1886,33 @@ bool Aggregator::checkLimits(size_t result_size, bool & no_more_keys) const
}
template <bool return_single_block, typename Method, typename Table>
Aggregator::ConvertToBlockRes<return_single_block>
Aggregator::convertToBlockImpl(Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, bool final, size_t rows) const
template <typename Method, typename Table>
Aggregator::ConvertToBlockResVariant
Aggregator::convertToBlockImpl(Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, bool final,size_t rows, bool return_single_block) const
{
if (data.empty())
{
auto && out_cols = prepareOutputBlockColumns(params, aggregate_functions, getHeader(final), aggregates_pools, final, rows);
return {finalizeBlock(params, getHeader(final), std::move(out_cols), final, rows)};
}
ConvertToBlockRes<return_single_block> res;
ConvertToBlockResVariant res;
if (final)
{
#if USE_EMBEDDED_COMPILER
if (compiled_aggregate_functions_holder)
{
static constexpr bool use_compiled_functions = !Method::low_cardinality_optimization;
res = convertToBlockImplFinal<Method, use_compiled_functions, return_single_block>(method, data, arena, aggregates_pools, rows);
res = convertToBlockImplFinal<Method>(method, data, arena, aggregates_pools, use_compiled_functions, true);
}
else
#endif
{
res = convertToBlockImplFinal<Method, false, return_single_block>(method, data, arena, aggregates_pools, rows);
res = convertToBlockImplFinal<Method>(method, data, arena, aggregates_pools, false, return_single_block);
}
}
else
{
res = convertToBlockImplNotFinal<return_single_block>(method, data, aggregates_pools, rows);
res = convertToBlockImplNotFinal(method, data, aggregates_pools, rows, return_single_block);
}
/// In order to release memory early.
@ -2146,19 +2077,27 @@ Block Aggregator::insertResultsIntoColumns(PaddedPODArray<AggregateDataPtr> & pl
return finalizeBlock(params, getHeader(/* final */ true), std::move(out_cols), /* final */ true, places.size());
}
template <typename Method, bool use_compiled_functions, bool return_single_block, typename Table>
Aggregator::ConvertToBlockRes<return_single_block> NO_INLINE
Aggregator::convertToBlockImplFinal(Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, size_t) const
// template <typename Method, bool use_compiled_functions, bool return_single_block, typename Table>
// Aggregator::ConvertToBlockRes<return_single_block> NO_INLINE
// Aggregator::convertToBlockImplFinal(Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, size_t) const
template <typename Method, typename Table>
Aggregator::ConvertToBlockResVariant Aggregator::convertToBlockImplFinal(
Method & method,
Table & data,
Arena * arena,
Arenas & aggregates_pools,
bool use_compiled_functions,
bool return_single_block) const
{
/// +1 for nullKeyData, if `data` doesn't have it - not a problem, just some memory for one excessive row will be preallocated
const size_t max_block_size = (return_single_block ? data.size() : std::min(params.max_block_size, data.size())) + 1;
const bool final = true;
ConvertToBlockRes<return_single_block> res;
std::optional<OutputBlockColumns> out_cols;
std::optional<Sizes> shuffled_key_sizes;
PaddedPODArray<AggregateDataPtr> places;
bool has_null_key_data = false;
BlocksList blocks;
auto init_out_cols = [&]()
{
@ -2187,51 +2126,78 @@ Aggregator::convertToBlockImplFinal(Method & method, Table & data, Arena * arena
// should be invoked at least once, because null data might be the only content of the `data`
init_out_cols();
data.forEachValue(
[&](const auto & key, auto & mapped)
{
if (!out_cols.has_value())
init_out_cols();
const auto & key_sizes_ref = shuffled_key_sizes ? *shuffled_key_sizes : key_sizes;
method.insertKeyIntoColumns(key, out_cols->raw_key_columns, key_sizes_ref);
places.emplace_back(mapped);
/// Mark the cell as destroyed so it will not be destroyed in destructor.
mapped = nullptr;
if constexpr (!return_single_block)
if (return_single_block)
{
data.forEachValue(
[&](const auto & key, auto & mapped)
{
if (!out_cols.has_value())
init_out_cols();
const auto & key_sizes_ref = shuffled_key_sizes ? *shuffled_key_sizes : key_sizes;
method.insertKeyIntoColumns(key, out_cols->raw_key_columns, key_sizes_ref);
places.emplace_back(mapped);
/// Mark the cell as destroyed so it will not be destroyed in destructor.
mapped = nullptr;
});
}
else
{
data.forEachValue(
[&](const auto & key, auto & mapped)
{
if (!out_cols.has_value())
init_out_cols();
const auto & key_sizes_ref = shuffled_key_sizes ? *shuffled_key_sizes : key_sizes;
method.insertKeyIntoColumns(key, out_cols->raw_key_columns, key_sizes_ref);
places.emplace_back(mapped);
/// Mark the cell as destroyed so it will not be destroyed in destructor.
mapped = nullptr;
if (places.size() >= max_block_size)
{
res.emplace_back(insertResultsIntoColumns<use_compiled_functions>(places, std::move(out_cols.value()), arena, has_null_key_data));
if (use_compiled_functions)
blocks.emplace_back(insertResultsIntoColumns<true>(places, std::move(out_cols.value()), arena, has_null_key_data));
else
blocks.emplace_back(insertResultsIntoColumns<false>(places, std::move(out_cols.value()), arena, has_null_key_data));
places.clear();
out_cols.reset();
has_null_key_data = false;
}
}
});
});
}
if constexpr (return_single_block)
if (return_single_block)
{
return insertResultsIntoColumns<use_compiled_functions>(places, std::move(out_cols.value()), arena, has_null_key_data);
if (use_compiled_functions)
return insertResultsIntoColumns<true>(places, std::move(out_cols.value()), arena, has_null_key_data);
else
return insertResultsIntoColumns<false>(places, std::move(out_cols.value()), arena, has_null_key_data);
}
else
{
if (out_cols.has_value())
res.emplace_back(insertResultsIntoColumns<use_compiled_functions>(places, std::move(out_cols.value()), arena, has_null_key_data));
return res;
{
if (use_compiled_functions)
blocks.emplace_back(insertResultsIntoColumns<true>(places, std::move(out_cols.value()), arena, has_null_key_data));
else
blocks.emplace_back(insertResultsIntoColumns<false>(places, std::move(out_cols.value()), arena, has_null_key_data));
}
return blocks;
}
}
template <bool return_single_block, typename Method, typename Table>
Aggregator::ConvertToBlockRes<return_single_block> NO_INLINE
Aggregator::convertToBlockImplNotFinal(Method & method, Table & data, Arenas & aggregates_pools, size_t) const
template <typename Method, typename Table>
Aggregator::ConvertToBlockResVariant NO_INLINE
Aggregator::convertToBlockImplNotFinal(Method & method, Table & data, Arenas & aggregates_pools, size_t, bool return_single_block) const
{
/// +1 for nullKeyData, if `data` doesn't have it - not a problem, just some memory for one excessive row will be preallocated
const size_t max_block_size = (return_single_block ? data.size() : std::min(params.max_block_size, data.size())) + 1;
const bool final = false;
ConvertToBlockRes<return_single_block> res;
BlocksList res_blocks;
std::optional<OutputBlockColumns> out_cols;
std::optional<Sizes> shuffled_key_sizes;
@ -2261,46 +2227,65 @@ Aggregator::convertToBlockImplNotFinal(Method & method, Table & data, Arenas & a
// should be invoked at least once, because null data might be the only content of the `data`
init_out_cols();
data.forEachValue(
[&](const auto & key, auto & mapped)
{
if (!out_cols.has_value())
init_out_cols();
const auto & key_sizes_ref = shuffled_key_sizes ? *shuffled_key_sizes : key_sizes;
method.insertKeyIntoColumns(key, out_cols->raw_key_columns, key_sizes_ref);
/// reserved, so push_back does not throw exceptions
for (size_t i = 0; i < params.aggregates_size; ++i)
out_cols->aggregate_columns_data[i]->push_back(mapped + offsets_of_aggregate_states[i]);
mapped = nullptr;
++rows_in_current_block;
if constexpr (!return_single_block)
if (return_single_block)
{
data.forEachValue(
[&](const auto & key, auto & mapped)
{
if (!out_cols.has_value())
init_out_cols();
const auto & key_sizes_ref = shuffled_key_sizes ? *shuffled_key_sizes : key_sizes;
method.insertKeyIntoColumns(key, out_cols->raw_key_columns, key_sizes_ref);
/// reserved, so push_back does not throw exceptions
for (size_t i = 0; i < params.aggregates_size; ++i)
out_cols->aggregate_columns_data[i]->push_back(mapped + offsets_of_aggregate_states[i]);
mapped = nullptr;
++rows_in_current_block;
});
}
else
{
data.forEachValue(
[&](const auto & key, auto & mapped)
{
if (!out_cols.has_value())
init_out_cols();
const auto & key_sizes_ref = shuffled_key_sizes ? *shuffled_key_sizes : key_sizes;
method.insertKeyIntoColumns(key, out_cols->raw_key_columns, key_sizes_ref);
/// reserved, so push_back does not throw exceptions
for (size_t i = 0; i < params.aggregates_size; ++i)
out_cols->aggregate_columns_data[i]->push_back(mapped + offsets_of_aggregate_states[i]);
mapped = nullptr;
++rows_in_current_block;
if (rows_in_current_block >= max_block_size)
{
res.emplace_back(finalizeBlock(params, getHeader(final), std::move(out_cols.value()), final, rows_in_current_block));
res_blocks.emplace_back(finalizeBlock(params, getHeader(final), std::move(out_cols.value()), final, rows_in_current_block));
out_cols.reset();
rows_in_current_block = 0;
}
}
});
});
}
if constexpr (return_single_block)
if (return_single_block)
{
return finalizeBlock(params, getHeader(final), std::move(out_cols).value(), final, rows_in_current_block);
}
else
{
if (rows_in_current_block)
res.emplace_back(finalizeBlock(params, getHeader(final), std::move(out_cols).value(), final, rows_in_current_block));
return res;
res_blocks.emplace_back(finalizeBlock(params, getHeader(final), std::move(out_cols).value(), final, rows_in_current_block));
return res_blocks;
}
return res;
return res_blocks;
}
void Aggregator::addSingleKeyToAggregateColumns(
@ -2406,18 +2391,23 @@ template <bool return_single_block>
Aggregator::ConvertToBlockRes<return_single_block>
Aggregator::prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const
{
ConvertToBlockResVariant res_variant;
const size_t rows = data_variants.sizeWithoutOverflowRow();
#define M(NAME) \
else if (data_variants.type == AggregatedDataVariants::Type::NAME) \
{ \
return convertToBlockImpl<return_single_block>( \
*data_variants.NAME, data_variants.NAME->data, data_variants.aggregates_pool, data_variants.aggregates_pools, final, rows); \
res_variant = convertToBlockImpl( \
*data_variants.NAME, data_variants.NAME->data, data_variants.aggregates_pool, data_variants.aggregates_pools, final, rows, return_single_block); \
}
if (false) {} // NOLINT
APPLY_FOR_VARIANTS_SINGLE_LEVEL(M)
#undef M
else throw Exception(ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT, "Unknown aggregated data variant.");
if constexpr (return_single_block)
return std::get<Block>(res_variant);
else
return std::get<BlocksList>(res_variant);
}
@ -2543,7 +2533,7 @@ BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, b
if (data_variants.type != AggregatedDataVariants::Type::without_key)
{
if (!data_variants.isTwoLevel())
blocks.splice(blocks.end(), prepareBlockAndFillSingleLevel</* return_single_block */ false>(data_variants, final));
blocks.splice(blocks.end(), prepareBlockAndFillSingleLevel<false>(data_variants, final));
else
blocks.splice(blocks.end(), prepareBlocksAndFillTwoLevel(data_variants, final, thread_pool.get()));
}
@ -2609,8 +2599,8 @@ void NO_INLINE Aggregator::mergeDataNullKey(
}
}
template <typename Method, bool use_compiled_functions, bool prefetch, typename Table>
void NO_INLINE Aggregator::mergeDataImpl(Table & table_dst, Table & table_src, Arena * arena) const
template <typename Method, typename Table>
void NO_INLINE Aggregator::mergeDataImpl(Table & table_dst, Table & table_src, Arena * arena, bool use_compiled_functions, bool prefetch) const
{
if constexpr (Method::low_cardinality_optimization || Method::one_key_nullable_optimization)
mergeDataNullKey<Method, Table>(table_dst, table_src, arena);
@ -2633,11 +2623,14 @@ void NO_INLINE Aggregator::mergeDataImpl(Table & table_dst, Table & table_src, A
src = nullptr;
};
table_src.template mergeToViaEmplace<decltype(merge), prefetch>(table_dst, std::move(merge));
if (prefetch)
table_src.template mergeToViaEmplace<decltype(merge), true>(table_dst, std::move(merge));
else
table_src.template mergeToViaEmplace<decltype(merge), false>(table_dst, std::move(merge));
table_src.clearAndShrink();
#if USE_EMBEDDED_COMPILER
if constexpr (use_compiled_functions)
if (use_compiled_functions)
{
const auto & compiled_functions = compiled_aggregate_functions_holder->compiled_aggregate_functions;
compiled_functions.merge_aggregate_states_function(dst_places.data(), src_places.data(), dst_places.size());
@ -2790,22 +2783,14 @@ void NO_INLINE Aggregator::mergeSingleLevelDataImpl(
#if USE_EMBEDDED_COMPILER
if (compiled_aggregate_functions_holder)
{
if (prefetch)
mergeDataImpl<Method, true, true>(
getDataVariant<Method>(*res).data, getDataVariant<Method>(current).data, res->aggregates_pool);
else
mergeDataImpl<Method, true, false>(
getDataVariant<Method>(*res).data, getDataVariant<Method>(current).data, res->aggregates_pool);
mergeDataImpl<Method>(
getDataVariant<Method>(*res).data, getDataVariant<Method>(current).data, res->aggregates_pool, true, prefetch);
}
else
#endif
{
if (prefetch)
mergeDataImpl<Method, false, true>(
getDataVariant<Method>(*res).data, getDataVariant<Method>(current).data, res->aggregates_pool);
else
mergeDataImpl<Method, false, false>(
getDataVariant<Method>(*res).data, getDataVariant<Method>(current).data, res->aggregates_pool);
mergeDataImpl<Method>(
getDataVariant<Method>(*res).data, getDataVariant<Method>(current).data, res->aggregates_pool, false, prefetch);
}
}
else if (res->without_key)
@ -2854,22 +2839,18 @@ void NO_INLINE Aggregator::mergeBucketImpl(
#if USE_EMBEDDED_COMPILER
if (compiled_aggregate_functions_holder)
{
if (prefetch)
mergeDataImpl<Method, true, true>(
getDataVariant<Method>(*res).data.impls[bucket], getDataVariant<Method>(current).data.impls[bucket], arena);
else
mergeDataImpl<Method, true, false>(
getDataVariant<Method>(*res).data.impls[bucket], getDataVariant<Method>(current).data.impls[bucket], arena);
mergeDataImpl<Method>(
getDataVariant<Method>(*res).data.impls[bucket], getDataVariant<Method>(current).data.impls[bucket], arena, true, prefetch);
}
else
#endif
{
if (prefetch)
mergeDataImpl<Method, false, true>(
getDataVariant<Method>(*res).data.impls[bucket], getDataVariant<Method>(current).data.impls[bucket], arena);
else
mergeDataImpl<Method, false, false>(
getDataVariant<Method>(*res).data.impls[bucket], getDataVariant<Method>(current).data.impls[bucket], arena);
mergeDataImpl<Method>(
getDataVariant<Method>(*res).data.impls[bucket],
getDataVariant<Method>(current).data.impls[bucket],
arena,
false,
prefetch);
}
}
}
@ -3589,7 +3570,4 @@ void Aggregator::destroyAllAggregateStates(AggregatedDataVariants & result) cons
throw Exception(ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT, "Unknown aggregated data variant.");
}
template Aggregator::ConvertToBlockRes<false>
Aggregator::prepareBlockAndFillSingleLevel<false>(AggregatedDataVariants & data_variants, bool final) const;
}

File diff suppressed because it is too large Load Diff