mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
Merge pull request #61211 from bigo-sg/split_aggregator
Too big translation unit in `Aggregator`
This commit is contained in:
commit
018316c78a
@ -109,6 +109,9 @@ public:
|
||||
|
||||
using Base::Base;
|
||||
|
||||
FixedHashMap() = default;
|
||||
FixedHashMap(size_t ) {} /// NOLINT
|
||||
|
||||
template <typename Func, bool>
|
||||
void ALWAYS_INLINE mergeToViaEmplace(Self & that, Func && func)
|
||||
{
|
||||
|
@ -38,6 +38,7 @@ public:
|
||||
Impl impls[NUM_BUCKETS];
|
||||
|
||||
TwoLevelStringHashTable() = default;
|
||||
TwoLevelStringHashTable(size_t ) {} /// NOLINT
|
||||
|
||||
template <typename Source>
|
||||
explicit TwoLevelStringHashTable(const Source & src)
|
||||
|
142
src/Interpreters/AggregatedData.h
Normal file
142
src/Interpreters/AggregatedData.h
Normal file
@ -0,0 +1,142 @@
|
||||
#pragma once
|
||||
#include <AggregateFunctions/IAggregateFunction.h>
|
||||
|
||||
#include <Common/HashTable/FixedHashMap.h>
|
||||
#include <Common/HashTable/StringHashMap.h>
|
||||
#include <Common/HashTable/TwoLevelHashMap.h>
|
||||
#include <Common/HashTable/TwoLevelStringHashMap.h>
|
||||
namespace DB
|
||||
{
|
||||
/** Different data structures that can be used for aggregation
|
||||
* For efficiency, the aggregation data itself is put into the pool.
|
||||
* Data and pool ownership (states of aggregate functions)
|
||||
* is acquired later - in `convertToBlocks` function, by the ColumnAggregateFunction object.
|
||||
*
|
||||
* Most data structures exist in two versions: normal and two-level (TwoLevel).
|
||||
* A two-level hash table works a little slower with a small number of different keys,
|
||||
* but with a large number of different keys scales better, because it allows
|
||||
* parallelize some operations (merging, post-processing) in a natural way.
|
||||
*
|
||||
* To ensure efficient work over a wide range of conditions,
|
||||
* first single-level hash tables are used,
|
||||
* and when the number of different keys is large enough,
|
||||
* they are converted to two-level ones.
|
||||
*
|
||||
* PS. There are many different approaches to the effective implementation of parallel and distributed aggregation,
|
||||
* best suited for different cases, and this approach is just one of them, chosen for a combination of reasons.
|
||||
*/
|
||||
|
||||
using AggregatedDataWithoutKey = AggregateDataPtr;
|
||||
|
||||
using AggregatedDataWithUInt8Key = FixedImplicitZeroHashMapWithCalculatedSize<UInt8, AggregateDataPtr>;
|
||||
using AggregatedDataWithUInt16Key = FixedImplicitZeroHashMap<UInt16, AggregateDataPtr>;
|
||||
|
||||
using AggregatedDataWithUInt32Key = HashMap<UInt32, AggregateDataPtr, HashCRC32<UInt32>>;
|
||||
using AggregatedDataWithUInt64Key = HashMap<UInt64, AggregateDataPtr, HashCRC32<UInt64>>;
|
||||
|
||||
using AggregatedDataWithShortStringKey = StringHashMap<AggregateDataPtr>;
|
||||
|
||||
using AggregatedDataWithStringKey = HashMapWithSavedHash<StringRef, AggregateDataPtr>;
|
||||
|
||||
using AggregatedDataWithKeys128 = HashMap<UInt128, AggregateDataPtr, UInt128HashCRC32>;
|
||||
using AggregatedDataWithKeys256 = HashMap<UInt256, AggregateDataPtr, UInt256HashCRC32>;
|
||||
|
||||
using AggregatedDataWithUInt32KeyTwoLevel = TwoLevelHashMap<UInt32, AggregateDataPtr, HashCRC32<UInt32>>;
|
||||
using AggregatedDataWithUInt64KeyTwoLevel = TwoLevelHashMap<UInt64, AggregateDataPtr, HashCRC32<UInt64>>;
|
||||
|
||||
using AggregatedDataWithShortStringKeyTwoLevel = TwoLevelStringHashMap<AggregateDataPtr>;
|
||||
|
||||
using AggregatedDataWithStringKeyTwoLevel = TwoLevelHashMapWithSavedHash<StringRef, AggregateDataPtr>;
|
||||
|
||||
using AggregatedDataWithKeys128TwoLevel = TwoLevelHashMap<UInt128, AggregateDataPtr, UInt128HashCRC32>;
|
||||
using AggregatedDataWithKeys256TwoLevel = TwoLevelHashMap<UInt256, AggregateDataPtr, UInt256HashCRC32>;
|
||||
|
||||
/** Variants with better hash function, using more than 32 bits for hash.
|
||||
* Using for merging phase of external aggregation, where number of keys may be far greater than 4 billion,
|
||||
* but we keep in memory and merge only sub-partition of them simultaneously.
|
||||
* TODO We need to switch for better hash function not only for external aggregation,
|
||||
* but also for huge aggregation results on machines with terabytes of RAM.
|
||||
*/
|
||||
|
||||
using AggregatedDataWithUInt64KeyHash64 = HashMap<UInt64, AggregateDataPtr, DefaultHash<UInt64>>;
|
||||
using AggregatedDataWithStringKeyHash64 = HashMapWithSavedHash<StringRef, AggregateDataPtr, StringRefHash64>;
|
||||
using AggregatedDataWithKeys128Hash64 = HashMap<UInt128, AggregateDataPtr, UInt128Hash>;
|
||||
using AggregatedDataWithKeys256Hash64 = HashMap<UInt256, AggregateDataPtr, UInt256Hash>;
|
||||
|
||||
template <typename Base>
|
||||
struct AggregationDataWithNullKey : public Base
|
||||
{
|
||||
using Base::Base;
|
||||
|
||||
bool & hasNullKeyData() { return has_null_key_data; }
|
||||
AggregateDataPtr & getNullKeyData() { return null_key_data; }
|
||||
bool hasNullKeyData() const { return has_null_key_data; }
|
||||
const AggregateDataPtr & getNullKeyData() const { return null_key_data; }
|
||||
size_t size() const { return Base::size() + (has_null_key_data ? 1 : 0); }
|
||||
bool empty() const { return Base::empty() && !has_null_key_data; }
|
||||
void clear()
|
||||
{
|
||||
Base::clear();
|
||||
has_null_key_data = false;
|
||||
}
|
||||
void clearAndShrink()
|
||||
{
|
||||
Base::clearAndShrink();
|
||||
has_null_key_data = false;
|
||||
}
|
||||
|
||||
private:
|
||||
bool has_null_key_data = false;
|
||||
AggregateDataPtr null_key_data = nullptr;
|
||||
};
|
||||
|
||||
template <typename Base>
|
||||
struct AggregationDataWithNullKeyTwoLevel : public Base
|
||||
{
|
||||
using Base::Base;
|
||||
using Base::impls;
|
||||
|
||||
AggregationDataWithNullKeyTwoLevel() = default;
|
||||
|
||||
template <typename Other>
|
||||
explicit AggregationDataWithNullKeyTwoLevel(const Other & other) : Base(other)
|
||||
{
|
||||
impls[0].hasNullKeyData() = other.hasNullKeyData();
|
||||
impls[0].getNullKeyData() = other.getNullKeyData();
|
||||
}
|
||||
|
||||
bool & hasNullKeyData() { return impls[0].hasNullKeyData(); }
|
||||
AggregateDataPtr & getNullKeyData() { return impls[0].getNullKeyData(); }
|
||||
bool hasNullKeyData() const { return impls[0].hasNullKeyData(); }
|
||||
const AggregateDataPtr & getNullKeyData() const { return impls[0].getNullKeyData(); }
|
||||
};
|
||||
|
||||
template <typename ... Types>
|
||||
using HashTableWithNullKey = AggregationDataWithNullKey<HashMapTable<Types ...>>;
|
||||
template <typename ... Types>
|
||||
using StringHashTableWithNullKey = AggregationDataWithNullKey<StringHashMap<Types ...>>;
|
||||
|
||||
using AggregatedDataWithNullableUInt8Key = AggregationDataWithNullKey<AggregatedDataWithUInt8Key>;
|
||||
using AggregatedDataWithNullableUInt16Key = AggregationDataWithNullKey<AggregatedDataWithUInt16Key>;
|
||||
using AggregatedDataWithNullableUInt32Key = AggregationDataWithNullKey<AggregatedDataWithUInt32Key>;
|
||||
|
||||
|
||||
using AggregatedDataWithNullableUInt64Key = AggregationDataWithNullKey<AggregatedDataWithUInt64Key>;
|
||||
using AggregatedDataWithNullableStringKey = AggregationDataWithNullKey<AggregatedDataWithStringKey>;
|
||||
using AggregatedDataWithNullableShortStringKey = AggregationDataWithNullKey<AggregatedDataWithShortStringKey>;
|
||||
|
||||
|
||||
using AggregatedDataWithNullableUInt32KeyTwoLevel = AggregationDataWithNullKeyTwoLevel<
|
||||
TwoLevelHashMap<UInt32, AggregateDataPtr, HashCRC32<UInt32>,
|
||||
TwoLevelHashTableGrower<>, HashTableAllocator, HashTableWithNullKey>>;
|
||||
using AggregatedDataWithNullableUInt64KeyTwoLevel = AggregationDataWithNullKeyTwoLevel<
|
||||
TwoLevelHashMap<UInt64, AggregateDataPtr, HashCRC32<UInt64>,
|
||||
TwoLevelHashTableGrower<>, HashTableAllocator, HashTableWithNullKey>>;
|
||||
|
||||
using AggregatedDataWithNullableShortStringKeyTwoLevel = AggregationDataWithNullKeyTwoLevel<
|
||||
TwoLevelStringHashMap<AggregateDataPtr, HashTableAllocator, StringHashTableWithNullKey>>;
|
||||
|
||||
using AggregatedDataWithNullableStringKeyTwoLevel = AggregationDataWithNullKeyTwoLevel<
|
||||
TwoLevelHashMapWithSavedHash<StringRef, AggregateDataPtr, DefaultHash<StringRef>,
|
||||
TwoLevelHashTableGrower<>, HashTableAllocator, HashTableWithNullKey>>;
|
||||
}
|
256
src/Interpreters/AggregatedDataVariants.cpp
Normal file
256
src/Interpreters/AggregatedDataVariants.cpp
Normal file
@ -0,0 +1,256 @@
|
||||
#include <Interpreters/AggregatedDataVariants.h>
|
||||
#include <Interpreters/Aggregator.h>
|
||||
#include <Poco/Logger.h>
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event AggregationPreallocatedElementsInHashTables;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int UNKNOWN_AGGREGATED_DATA_VARIANT;
|
||||
extern const int LOGICAL_ERROR;
|
||||
|
||||
}
|
||||
using ColumnsHashing::HashMethodContext;
|
||||
using ColumnsHashing::HashMethodContextPtr;
|
||||
|
||||
AggregatedDataVariants::AggregatedDataVariants() : aggregates_pools(1, std::make_shared<Arena>()), aggregates_pool(aggregates_pools.back().get()) {}
|
||||
|
||||
AggregatedDataVariants::~AggregatedDataVariants()
|
||||
{
|
||||
if (aggregator && !aggregator->all_aggregates_has_trivial_destructor)
|
||||
{
|
||||
try
|
||||
{
|
||||
aggregator->destroyAllAggregateStates(*this);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// The std::is_constructible trait isn't suitable here because some classes have template constructors with semantics different from providing size hints.
|
||||
// Also string hash table variants are not supported due to the fact that both local perf tests and tests in CI showed slowdowns for them.
|
||||
template <typename...>
|
||||
struct HasConstructorOfNumberOfElements : std::false_type
|
||||
{
|
||||
};
|
||||
|
||||
template <typename... Ts>
|
||||
struct HasConstructorOfNumberOfElements<HashMapTable<Ts...>> : std::true_type
|
||||
{
|
||||
};
|
||||
|
||||
template <typename Key, typename Cell, typename Hash, typename Grower, typename Allocator, template <typename...> typename ImplTable>
|
||||
struct HasConstructorOfNumberOfElements<TwoLevelHashMapTable<Key, Cell, Hash, Grower, Allocator, ImplTable>> : std::true_type
|
||||
{
|
||||
};
|
||||
|
||||
template <typename... Ts>
|
||||
struct HasConstructorOfNumberOfElements<HashTable<Ts...>> : std::true_type
|
||||
{
|
||||
};
|
||||
|
||||
template <typename... Ts>
|
||||
struct HasConstructorOfNumberOfElements<TwoLevelHashTable<Ts...>> : std::true_type
|
||||
{
|
||||
};
|
||||
|
||||
template <template <typename> typename Method, typename Base>
|
||||
struct HasConstructorOfNumberOfElements<Method<Base>> : HasConstructorOfNumberOfElements<Base>
|
||||
{
|
||||
};
|
||||
|
||||
template <typename Method>
|
||||
auto constructWithReserveIfPossible(size_t size_hint)
|
||||
{
|
||||
if constexpr (HasConstructorOfNumberOfElements<typename Method::Data>::value)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::AggregationPreallocatedElementsInHashTables, size_hint);
|
||||
return std::make_unique<Method>(size_hint);
|
||||
}
|
||||
else
|
||||
return std::make_unique<Method>();
|
||||
}
|
||||
|
||||
void AggregatedDataVariants::init(Type type_, std::optional<size_t> size_hint)
|
||||
{
|
||||
switch (type_)
|
||||
{
|
||||
case Type::EMPTY:
|
||||
case Type::without_key:
|
||||
break;
|
||||
|
||||
#define M(NAME, IS_TWO_LEVEL) \
|
||||
case Type::NAME: \
|
||||
if (size_hint) \
|
||||
(NAME) = constructWithReserveIfPossible<decltype(NAME)::element_type>(*size_hint); \
|
||||
else \
|
||||
(NAME) = std::make_unique<decltype(NAME)::element_type>(); \
|
||||
break;
|
||||
APPLY_FOR_AGGREGATED_VARIANTS(M)
|
||||
#undef M
|
||||
}
|
||||
|
||||
type = type_;
|
||||
}
|
||||
|
||||
size_t AggregatedDataVariants::size() const
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case Type::EMPTY:
|
||||
return 0;
|
||||
case Type::without_key:
|
||||
return 1;
|
||||
|
||||
#define M(NAME, IS_TWO_LEVEL) \
|
||||
case Type::NAME: \
|
||||
return (NAME)->data.size() + (without_key != nullptr);
|
||||
APPLY_FOR_AGGREGATED_VARIANTS(M)
|
||||
#undef M
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
size_t AggregatedDataVariants::sizeWithoutOverflowRow() const
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case Type::EMPTY:
|
||||
return 0;
|
||||
case Type::without_key:
|
||||
return 1;
|
||||
|
||||
#define M(NAME, IS_TWO_LEVEL) \
|
||||
case Type::NAME: \
|
||||
return (NAME)->data.size();
|
||||
APPLY_FOR_AGGREGATED_VARIANTS(M)
|
||||
#undef M
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
const char * AggregatedDataVariants::getMethodName() const
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case Type::EMPTY:
|
||||
return "EMPTY";
|
||||
case Type::without_key:
|
||||
return "without_key";
|
||||
|
||||
#define M(NAME, IS_TWO_LEVEL) \
|
||||
case Type::NAME: \
|
||||
return #NAME;
|
||||
APPLY_FOR_AGGREGATED_VARIANTS(M)
|
||||
#undef M
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
bool AggregatedDataVariants::isTwoLevel() const
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case Type::EMPTY:
|
||||
return false;
|
||||
case Type::without_key:
|
||||
return false;
|
||||
|
||||
#define M(NAME, IS_TWO_LEVEL) \
|
||||
case Type::NAME: \
|
||||
return IS_TWO_LEVEL;
|
||||
APPLY_FOR_AGGREGATED_VARIANTS(M)
|
||||
#undef M
|
||||
}
|
||||
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
bool AggregatedDataVariants::isConvertibleToTwoLevel() const
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
#define M(NAME) \
|
||||
case Type::NAME: \
|
||||
return true;
|
||||
|
||||
APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M)
|
||||
|
||||
#undef M
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
void AggregatedDataVariants::convertToTwoLevel()
|
||||
{
|
||||
if (aggregator)
|
||||
LOG_TRACE(aggregator->log, "Converting aggregation data to two-level.");
|
||||
|
||||
switch (type)
|
||||
{
|
||||
#define M(NAME) \
|
||||
case Type::NAME: \
|
||||
NAME ## _two_level = std::make_unique<decltype(NAME ## _two_level)::element_type>(*(NAME)); \
|
||||
(NAME).reset(); \
|
||||
type = Type::NAME ## _two_level; \
|
||||
break;
|
||||
|
||||
APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M)
|
||||
|
||||
#undef M
|
||||
|
||||
default:
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong data variant passed.");
|
||||
}
|
||||
}
|
||||
|
||||
bool AggregatedDataVariants::isLowCardinality() const
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
#define M(NAME) \
|
||||
case Type::NAME: \
|
||||
return true;
|
||||
|
||||
APPLY_FOR_LOW_CARDINALITY_VARIANTS(M)
|
||||
#undef M
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
HashMethodContextPtr AggregatedDataVariants::createCache(Type type, const HashMethodContext::Settings & settings)
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case Type::without_key:
|
||||
return nullptr;
|
||||
|
||||
#define M(NAME, IS_TWO_LEVEL) \
|
||||
case Type::NAME: { \
|
||||
using TPtr##NAME = decltype(AggregatedDataVariants::NAME); \
|
||||
using T##NAME = typename TPtr##NAME ::element_type; \
|
||||
return T##NAME ::State::createContext(settings); \
|
||||
}
|
||||
|
||||
APPLY_FOR_AGGREGATED_VARIANTS(M)
|
||||
#undef M
|
||||
|
||||
default:
|
||||
throw Exception(ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT, "Unknown aggregated data variant.");
|
||||
}
|
||||
}
|
||||
}
|
320
src/Interpreters/AggregatedDataVariants.h
Normal file
320
src/Interpreters/AggregatedDataVariants.h
Normal file
@ -0,0 +1,320 @@
|
||||
#pragma once
|
||||
#include <boost/noncopyable.hpp>
|
||||
#include <memory.h>
|
||||
#include <AggregateFunctions/IAggregateFunction.h>
|
||||
#include <Common/ColumnsHashing.h>
|
||||
#include <Interpreters/AggregatedData.h>
|
||||
#include <Interpreters/AggregationMethod.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class Arena;
|
||||
class Aggregator;
|
||||
|
||||
struct AggregatedDataVariants : private boost::noncopyable
|
||||
{
|
||||
/** Working with states of aggregate functions in the pool is arranged in the following (inconvenient) way:
|
||||
* - when aggregating, states are created in the pool using IAggregateFunction::create (inside - `placement new` of arbitrary structure);
|
||||
* - they must then be destroyed using IAggregateFunction::destroy (inside - calling the destructor of arbitrary structure);
|
||||
* - if aggregation is complete, then, in the Aggregator::convertToBlocks function, pointers to the states of aggregate functions
|
||||
* are written to ColumnAggregateFunction; ColumnAggregateFunction "acquires ownership" of them, that is - calls `destroy` in its destructor.
|
||||
* - if during the aggregation, before call to Aggregator::convertToBlocks, an exception was thrown,
|
||||
* then the states of aggregate functions must still be destroyed,
|
||||
* otherwise, for complex states (eg, AggregateFunctionUniq), there will be memory leaks;
|
||||
* - in this case, to destroy states, the destructor calls Aggregator::destroyAggregateStates method,
|
||||
* but only if the variable aggregator (see below) is not nullptr;
|
||||
* - that is, until you transfer ownership of the aggregate function states in the ColumnAggregateFunction, set the variable `aggregator`,
|
||||
* so that when an exception occurs, the states are correctly destroyed.
|
||||
*
|
||||
* PS. This can be corrected by making a pool that knows about which states of aggregate functions and in which order are put in it, and knows how to destroy them.
|
||||
* But this can hardly be done simply because it is planned to put variable-length strings into the same pool.
|
||||
* In this case, the pool will not be able to know with what offsets objects are stored.
|
||||
*/
|
||||
const Aggregator * aggregator = nullptr;
|
||||
|
||||
size_t keys_size{}; /// Number of keys. NOTE do we need this field?
|
||||
Sizes key_sizes; /// Dimensions of keys, if keys of fixed length
|
||||
|
||||
/// Pools for states of aggregate functions. Ownership will be later transferred to ColumnAggregateFunction.
|
||||
using ArenaPtr = std::shared_ptr<Arena>;
|
||||
using Arenas = std::vector<ArenaPtr>;
|
||||
Arenas aggregates_pools;
|
||||
Arena * aggregates_pool{}; /// The pool that is currently used for allocation.
|
||||
|
||||
/** Specialization for the case when there are no keys, and for keys not fitted into max_rows_to_group_by.
|
||||
*/
|
||||
AggregatedDataWithoutKey without_key = nullptr;
|
||||
|
||||
/// Stats of a cache for consecutive keys optimization.
|
||||
/// Stats can be used to disable the cache in case of a lot of misses.
|
||||
ColumnsHashing::LastElementCacheStats consecutive_keys_cache_stats;
|
||||
|
||||
// Disable consecutive key optimization for Uint8/16, because they use a FixedHashMap
|
||||
// and the lookup there is almost free, so we don't need to cache the last lookup result
|
||||
std::unique_ptr<AggregationMethodOneNumber<UInt8, AggregatedDataWithUInt8Key, false>> key8;
|
||||
std::unique_ptr<AggregationMethodOneNumber<UInt16, AggregatedDataWithUInt16Key, false>> key16;
|
||||
|
||||
std::unique_ptr<AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt64Key>> key32;
|
||||
std::unique_ptr<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64Key>> key64;
|
||||
std::unique_ptr<AggregationMethodStringNoCache<AggregatedDataWithShortStringKey>> key_string;
|
||||
std::unique_ptr<AggregationMethodFixedStringNoCache<AggregatedDataWithShortStringKey>> key_fixed_string;
|
||||
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithUInt16Key, false, false, false>> keys16;
|
||||
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithUInt32Key>> keys32;
|
||||
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithUInt64Key>> keys64;
|
||||
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128>> keys128;
|
||||
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256>> keys256;
|
||||
std::unique_ptr<AggregationMethodSerialized<AggregatedDataWithStringKey>> serialized;
|
||||
std::unique_ptr<AggregationMethodNullableSerialized<AggregatedDataWithStringKey>> nullable_serialized;
|
||||
std::unique_ptr<AggregationMethodPreallocSerialized<AggregatedDataWithStringKey>> prealloc_serialized;
|
||||
std::unique_ptr<AggregationMethodNullablePreallocSerialized<AggregatedDataWithStringKey>> nullable_prealloc_serialized;
|
||||
|
||||
std::unique_ptr<AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt64KeyTwoLevel>> key32_two_level;
|
||||
std::unique_ptr<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64KeyTwoLevel>> key64_two_level;
|
||||
std::unique_ptr<AggregationMethodStringNoCache<AggregatedDataWithShortStringKeyTwoLevel>> key_string_two_level;
|
||||
std::unique_ptr<AggregationMethodFixedStringNoCache<AggregatedDataWithShortStringKeyTwoLevel>> key_fixed_string_two_level;
|
||||
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithUInt32KeyTwoLevel>> keys32_two_level;
|
||||
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithUInt64KeyTwoLevel>> keys64_two_level;
|
||||
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128TwoLevel>> keys128_two_level;
|
||||
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256TwoLevel>> keys256_two_level;
|
||||
std::unique_ptr<AggregationMethodSerialized<AggregatedDataWithStringKeyTwoLevel>> serialized_two_level;
|
||||
std::unique_ptr<AggregationMethodNullableSerialized<AggregatedDataWithStringKeyTwoLevel>> nullable_serialized_two_level;
|
||||
std::unique_ptr<AggregationMethodPreallocSerialized<AggregatedDataWithStringKeyTwoLevel>> prealloc_serialized_two_level;
|
||||
std::unique_ptr<AggregationMethodNullablePreallocSerialized<AggregatedDataWithStringKeyTwoLevel>> nullable_prealloc_serialized_two_level;
|
||||
|
||||
std::unique_ptr<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64KeyHash64>> key64_hash64;
|
||||
std::unique_ptr<AggregationMethodString<AggregatedDataWithStringKeyHash64>> key_string_hash64;
|
||||
std::unique_ptr<AggregationMethodFixedString<AggregatedDataWithStringKeyHash64>> key_fixed_string_hash64;
|
||||
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128Hash64>> keys128_hash64;
|
||||
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256Hash64>> keys256_hash64;
|
||||
std::unique_ptr<AggregationMethodSerialized<AggregatedDataWithStringKeyHash64>> serialized_hash64;
|
||||
std::unique_ptr<AggregationMethodNullableSerialized<AggregatedDataWithStringKeyHash64>> nullable_serialized_hash64;
|
||||
std::unique_ptr<AggregationMethodPreallocSerialized<AggregatedDataWithStringKeyHash64>> prealloc_serialized_hash64;
|
||||
std::unique_ptr<AggregationMethodNullablePreallocSerialized<AggregatedDataWithStringKeyHash64>> nullable_prealloc_serialized_hash64;
|
||||
|
||||
/// Support for nullable keys.
|
||||
std::unique_ptr<AggregationMethodOneNumber<UInt8, AggregatedDataWithNullableUInt8Key, false, true>> nullable_key8;
|
||||
std::unique_ptr<AggregationMethodOneNumber<UInt16, AggregatedDataWithNullableUInt16Key, false, true>> nullable_key16;
|
||||
std::unique_ptr<AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt32Key, true, true>> nullable_key32;
|
||||
std::unique_ptr<AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64Key, true, true>> nullable_key64;
|
||||
std::unique_ptr<AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt32KeyTwoLevel, true, true>> nullable_key32_two_level;
|
||||
std::unique_ptr<AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64KeyTwoLevel, true, true>> nullable_key64_two_level;
|
||||
|
||||
std::unique_ptr<AggregationMethodStringNoCache<AggregatedDataWithNullableShortStringKey, true>> nullable_key_string;
|
||||
std::unique_ptr<AggregationMethodFixedStringNoCache<AggregatedDataWithNullableShortStringKey, true>> nullable_key_fixed_string;
|
||||
std::unique_ptr<AggregationMethodStringNoCache<AggregatedDataWithNullableShortStringKeyTwoLevel, true>> nullable_key_string_two_level;
|
||||
std::unique_ptr<AggregationMethodFixedStringNoCache<AggregatedDataWithNullableShortStringKeyTwoLevel, true>> nullable_key_fixed_string_two_level;
|
||||
|
||||
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128, true>> nullable_keys128;
|
||||
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256, true>> nullable_keys256;
|
||||
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128TwoLevel, true>> nullable_keys128_two_level;
|
||||
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256TwoLevel, true>> nullable_keys256_two_level;
|
||||
|
||||
/// Support for low cardinality.
|
||||
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt8, AggregatedDataWithNullableUInt8Key, false>>> low_cardinality_key8;
|
||||
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt16, AggregatedDataWithNullableUInt16Key, false>>> low_cardinality_key16;
|
||||
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt64Key>>> low_cardinality_key32;
|
||||
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64Key>>> low_cardinality_key64;
|
||||
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodString<AggregatedDataWithNullableStringKey>>> low_cardinality_key_string;
|
||||
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodFixedString<AggregatedDataWithNullableStringKey>>> low_cardinality_key_fixed_string;
|
||||
|
||||
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt64KeyTwoLevel>>> low_cardinality_key32_two_level;
|
||||
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64KeyTwoLevel>>> low_cardinality_key64_two_level;
|
||||
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodString<AggregatedDataWithNullableStringKeyTwoLevel>>> low_cardinality_key_string_two_level;
|
||||
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodFixedString<AggregatedDataWithNullableStringKeyTwoLevel>>> low_cardinality_key_fixed_string_two_level;
|
||||
|
||||
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128, false, true>> low_cardinality_keys128;
|
||||
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256, false, true>> low_cardinality_keys256;
|
||||
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128TwoLevel, false, true>> low_cardinality_keys128_two_level;
|
||||
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256TwoLevel, false, true>> low_cardinality_keys256_two_level;
|
||||
|
||||
/// In this and similar macros, the option without_key is not considered.
|
||||
#define APPLY_FOR_AGGREGATED_VARIANTS(M) \
|
||||
M(key8, false) \
|
||||
M(key16, false) \
|
||||
M(key32, false) \
|
||||
M(key64, false) \
|
||||
M(key_string, false) \
|
||||
M(key_fixed_string, false) \
|
||||
M(keys16, false) \
|
||||
M(keys32, false) \
|
||||
M(keys64, false) \
|
||||
M(keys128, false) \
|
||||
M(keys256, false) \
|
||||
M(serialized, false) \
|
||||
M(nullable_serialized, false) \
|
||||
M(prealloc_serialized, false) \
|
||||
M(nullable_prealloc_serialized, false) \
|
||||
M(key32_two_level, true) \
|
||||
M(key64_two_level, true) \
|
||||
M(key_string_two_level, true) \
|
||||
M(key_fixed_string_two_level, true) \
|
||||
M(keys32_two_level, true) \
|
||||
M(keys64_two_level, true) \
|
||||
M(keys128_two_level, true) \
|
||||
M(keys256_two_level, true) \
|
||||
M(serialized_two_level, true) \
|
||||
M(nullable_serialized_two_level, true) \
|
||||
M(prealloc_serialized_two_level, true) \
|
||||
M(nullable_prealloc_serialized_two_level, true) \
|
||||
M(key64_hash64, false) \
|
||||
M(key_string_hash64, false) \
|
||||
M(key_fixed_string_hash64, false) \
|
||||
M(keys128_hash64, false) \
|
||||
M(keys256_hash64, false) \
|
||||
M(serialized_hash64, false) \
|
||||
M(nullable_serialized_hash64, false) \
|
||||
M(prealloc_serialized_hash64, false) \
|
||||
M(nullable_prealloc_serialized_hash64, false) \
|
||||
M(nullable_key8, false) \
|
||||
M(nullable_key16, false) \
|
||||
M(nullable_key32, false) \
|
||||
M(nullable_key64, false) \
|
||||
M(nullable_key32_two_level, true) \
|
||||
M(nullable_key64_two_level, true) \
|
||||
M(nullable_key_string, false) \
|
||||
M(nullable_key_fixed_string, false) \
|
||||
M(nullable_key_string_two_level, true) \
|
||||
M(nullable_key_fixed_string_two_level, true) \
|
||||
M(nullable_keys128, false) \
|
||||
M(nullable_keys256, false) \
|
||||
M(nullable_keys128_two_level, true) \
|
||||
M(nullable_keys256_two_level, true) \
|
||||
M(low_cardinality_key8, false) \
|
||||
M(low_cardinality_key16, false) \
|
||||
M(low_cardinality_key32, false) \
|
||||
M(low_cardinality_key64, false) \
|
||||
M(low_cardinality_keys128, false) \
|
||||
M(low_cardinality_keys256, false) \
|
||||
M(low_cardinality_key_string, false) \
|
||||
M(low_cardinality_key_fixed_string, false) \
|
||||
M(low_cardinality_key32_two_level, true) \
|
||||
M(low_cardinality_key64_two_level, true) \
|
||||
M(low_cardinality_keys128_two_level, true) \
|
||||
M(low_cardinality_keys256_two_level, true) \
|
||||
M(low_cardinality_key_string_two_level, true) \
|
||||
M(low_cardinality_key_fixed_string_two_level, true) \
|
||||
|
||||
#define APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M) \
|
||||
M(key32) \
|
||||
M(key64) \
|
||||
M(key_string) \
|
||||
M(key_fixed_string) \
|
||||
M(keys32) \
|
||||
M(keys64) \
|
||||
M(keys128) \
|
||||
M(keys256) \
|
||||
M(serialized) \
|
||||
M(nullable_serialized) \
|
||||
M(prealloc_serialized) \
|
||||
M(nullable_prealloc_serialized) \
|
||||
M(nullable_key32) \
|
||||
M(nullable_key64) \
|
||||
M(nullable_key_string) \
|
||||
M(nullable_key_fixed_string) \
|
||||
M(nullable_keys128) \
|
||||
M(nullable_keys256) \
|
||||
M(low_cardinality_key32) \
|
||||
M(low_cardinality_key64) \
|
||||
M(low_cardinality_keys128) \
|
||||
M(low_cardinality_keys256) \
|
||||
M(low_cardinality_key_string) \
|
||||
M(low_cardinality_key_fixed_string) \
|
||||
|
||||
/// NOLINTNEXTLINE
|
||||
#define APPLY_FOR_VARIANTS_NOT_CONVERTIBLE_TO_TWO_LEVEL(M) \
|
||||
M(key8) \
|
||||
M(key16) \
|
||||
M(nullable_key8) \
|
||||
M(nullable_key16) \
|
||||
M(keys16) \
|
||||
M(key64_hash64) \
|
||||
M(key_string_hash64)\
|
||||
M(key_fixed_string_hash64) \
|
||||
M(keys128_hash64) \
|
||||
M(keys256_hash64) \
|
||||
M(serialized_hash64) \
|
||||
M(nullable_serialized_hash64) \
|
||||
M(prealloc_serialized_hash64) \
|
||||
M(nullable_prealloc_serialized_hash64) \
|
||||
M(low_cardinality_key8) \
|
||||
M(low_cardinality_key16) \
|
||||
|
||||
/// NOLINTNEXTLINE
|
||||
#define APPLY_FOR_VARIANTS_SINGLE_LEVEL(M) \
|
||||
APPLY_FOR_VARIANTS_NOT_CONVERTIBLE_TO_TWO_LEVEL(M) \
|
||||
APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M) \
|
||||
|
||||
/// NOLINTNEXTLINE
|
||||
#define APPLY_FOR_VARIANTS_TWO_LEVEL(M) \
|
||||
M(key32_two_level) \
|
||||
M(key64_two_level) \
|
||||
M(key_string_two_level) \
|
||||
M(key_fixed_string_two_level) \
|
||||
M(keys32_two_level) \
|
||||
M(keys64_two_level) \
|
||||
M(keys128_two_level) \
|
||||
M(keys256_two_level) \
|
||||
M(serialized_two_level) \
|
||||
M(nullable_serialized_two_level) \
|
||||
M(prealloc_serialized_two_level) \
|
||||
M(nullable_prealloc_serialized_two_level) \
|
||||
M(nullable_key32_two_level) \
|
||||
M(nullable_key64_two_level) \
|
||||
M(nullable_key_string_two_level) \
|
||||
M(nullable_key_fixed_string_two_level) \
|
||||
M(nullable_keys128_two_level) \
|
||||
M(nullable_keys256_two_level) \
|
||||
M(low_cardinality_key32_two_level) \
|
||||
M(low_cardinality_key64_two_level) \
|
||||
M(low_cardinality_keys128_two_level) \
|
||||
M(low_cardinality_keys256_two_level) \
|
||||
M(low_cardinality_key_string_two_level) \
|
||||
M(low_cardinality_key_fixed_string_two_level) \
|
||||
|
||||
#define APPLY_FOR_LOW_CARDINALITY_VARIANTS(M) \
|
||||
M(low_cardinality_key8) \
|
||||
M(low_cardinality_key16) \
|
||||
M(low_cardinality_key32) \
|
||||
M(low_cardinality_key64) \
|
||||
M(low_cardinality_keys128) \
|
||||
M(low_cardinality_keys256) \
|
||||
M(low_cardinality_key_string) \
|
||||
M(low_cardinality_key_fixed_string) \
|
||||
M(low_cardinality_key32_two_level) \
|
||||
M(low_cardinality_key64_two_level) \
|
||||
M(low_cardinality_keys128_two_level) \
|
||||
M(low_cardinality_keys256_two_level) \
|
||||
M(low_cardinality_key_string_two_level) \
|
||||
M(low_cardinality_key_fixed_string_two_level)
|
||||
|
||||
enum class Type
|
||||
{
|
||||
EMPTY = 0,
|
||||
without_key,
|
||||
|
||||
#define M(NAME, IS_TWO_LEVEL) NAME,
|
||||
APPLY_FOR_AGGREGATED_VARIANTS(M)
|
||||
#undef M
|
||||
};
|
||||
Type type = Type::EMPTY;
|
||||
AggregatedDataVariants();
|
||||
~AggregatedDataVariants();
|
||||
bool empty() const { return type == Type::EMPTY; }
|
||||
void invalidate() { type = Type::EMPTY; }
|
||||
void init(Type type_, std::optional<size_t> size_hint = std::nullopt);
|
||||
/// Number of rows (different keys).
|
||||
size_t size() const;
|
||||
size_t sizeWithoutOverflowRow() const;
|
||||
const char * getMethodName() const;
|
||||
bool isTwoLevel() const;
|
||||
bool isConvertibleToTwoLevel() const;
|
||||
void convertToTwoLevel();
|
||||
bool isLowCardinality() const;
|
||||
static ColumnsHashing::HashMethodContextPtr createCache(Type type, const ColumnsHashing::HashMethodContext::Settings & settings);
|
||||
|
||||
};
|
||||
|
||||
using AggregatedDataVariantsPtr = std::shared_ptr<AggregatedDataVariants>;
|
||||
using ManyAggregatedDataVariants = std::vector<AggregatedDataVariantsPtr>;
|
||||
using ManyAggregatedDataVariantsPtr = std::shared_ptr<ManyAggregatedDataVariants>;
|
||||
}
|
215
src/Interpreters/AggregationMethod.cpp
Normal file
215
src/Interpreters/AggregationMethod.cpp
Normal file
@ -0,0 +1,215 @@
|
||||
#include <Interpreters/AggregationMethod.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
template <typename FieldType, typename TData, bool consecutive_keys_optimization, bool nullable>
|
||||
void AggregationMethodOneNumber<FieldType, TData, consecutive_keys_optimization, nullable>::insertKeyIntoColumns(
|
||||
const AggregationMethodOneNumber::Key & key, std::vector<IColumn *> & key_columns, const Sizes & /*key_sizes*/)
|
||||
{
|
||||
ColumnFixedSizeHelper * column;
|
||||
if constexpr (nullable)
|
||||
{
|
||||
ColumnNullable & nullable_col = assert_cast<ColumnNullable &>(*key_columns[0]);
|
||||
ColumnUInt8 * null_map = assert_cast<ColumnUInt8 *>(&nullable_col.getNullMapColumn());
|
||||
null_map->insertDefault();
|
||||
column = static_cast<ColumnFixedSizeHelper *>(&nullable_col.getNestedColumn());
|
||||
}
|
||||
else
|
||||
{
|
||||
column = static_cast<ColumnFixedSizeHelper *>(key_columns[0]);
|
||||
}
|
||||
static_assert(sizeof(FieldType) <= sizeof(Key));
|
||||
const auto * key_holder = reinterpret_cast<const char *>(&key);
|
||||
if constexpr (sizeof(FieldType) < sizeof(Key) && std::endian::native == std::endian::big)
|
||||
column->insertRawData<sizeof(FieldType)>(key_holder + (sizeof(Key) - sizeof(FieldType)));
|
||||
else
|
||||
column->insertRawData<sizeof(FieldType)>(key_holder);
|
||||
}
|
||||
|
||||
template struct AggregationMethodOneNumber<UInt8, AggregatedDataWithUInt8Key, false>;
|
||||
template struct AggregationMethodOneNumber<UInt16, AggregatedDataWithUInt16Key, false>;
|
||||
template struct AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt64Key>;
|
||||
template struct AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64Key>;
|
||||
template struct AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt64KeyTwoLevel>;
|
||||
template struct AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64KeyTwoLevel>;
|
||||
template struct AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64KeyHash64>;
|
||||
template struct AggregationMethodOneNumber<UInt8, AggregatedDataWithNullableUInt8Key, false, true>;
|
||||
template struct AggregationMethodOneNumber<UInt16, AggregatedDataWithNullableUInt16Key, false, true>;
|
||||
template struct AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt32Key, true, true>;
|
||||
template struct AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64Key, true, true>;
|
||||
template struct AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt32KeyTwoLevel, true, true>;
|
||||
template struct AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64KeyTwoLevel, true, true>;
|
||||
template struct AggregationMethodOneNumber<UInt8, AggregatedDataWithNullableUInt8Key, false>;
|
||||
template struct AggregationMethodOneNumber<UInt16, AggregatedDataWithNullableUInt16Key, false>;
|
||||
template struct AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt64Key>;
|
||||
template struct AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64Key>;
|
||||
template struct AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt64KeyTwoLevel>;
|
||||
template struct AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64KeyTwoLevel>;
|
||||
|
||||
template <typename TData, bool nullable>
|
||||
void AggregationMethodStringNoCache<TData, nullable>::insertKeyIntoColumns(StringRef key, std::vector<IColumn *> & key_columns, const Sizes &)
|
||||
{
|
||||
if constexpr (nullable)
|
||||
{
|
||||
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*key_columns[0]);
|
||||
assert_cast<ColumnString &>(column_nullable.getNestedColumn()).insertData(key.data, key.size);
|
||||
column_nullable.getNullMapData().push_back(0);
|
||||
}
|
||||
else
|
||||
{
|
||||
assert_cast<ColumnString &>(*key_columns[0]).insertData(key.data, key.size);
|
||||
}
|
||||
}
|
||||
template struct AggregationMethodStringNoCache<AggregatedDataWithShortStringKey>;
|
||||
template struct AggregationMethodStringNoCache<AggregatedDataWithShortStringKeyTwoLevel>;
|
||||
template struct AggregationMethodStringNoCache<AggregatedDataWithNullableShortStringKey, true>;
|
||||
template struct AggregationMethodStringNoCache<AggregatedDataWithNullableShortStringKeyTwoLevel, true>;
|
||||
|
||||
template <typename TData>
|
||||
void AggregationMethodFixedString<TData>::insertKeyIntoColumns(StringRef key, std::vector<IColumn *> & key_columns, const Sizes &)
|
||||
{
|
||||
assert_cast<ColumnFixedString &>(*key_columns[0]).insertData(key.data, key.size);
|
||||
}
|
||||
template struct AggregationMethodFixedString<AggregatedDataWithStringKeyHash64>;
|
||||
template struct AggregationMethodFixedString<AggregatedDataWithNullableStringKey>;
|
||||
template struct AggregationMethodFixedString<AggregatedDataWithNullableStringKeyTwoLevel>;
|
||||
|
||||
|
||||
template <typename TData, bool nullable>
|
||||
void AggregationMethodFixedStringNoCache<TData, nullable>::insertKeyIntoColumns(StringRef key, std::vector<IColumn *> & key_columns, const Sizes &)
|
||||
{
|
||||
if constexpr (nullable)
|
||||
assert_cast<ColumnNullable &>(*key_columns[0]).insertData(key.data, key.size);
|
||||
else
|
||||
assert_cast<ColumnFixedString &>(*key_columns[0]).insertData(key.data, key.size);
|
||||
}
|
||||
template struct AggregationMethodFixedStringNoCache<AggregatedDataWithShortStringKey>;
|
||||
template struct AggregationMethodFixedStringNoCache<AggregatedDataWithShortStringKeyTwoLevel>;
|
||||
template struct AggregationMethodFixedStringNoCache<AggregatedDataWithNullableShortStringKey, true>;
|
||||
template struct AggregationMethodFixedStringNoCache<AggregatedDataWithNullableShortStringKeyTwoLevel, true>;
|
||||
|
||||
|
||||
template <typename SingleColumnMethod>
|
||||
void AggregationMethodSingleLowCardinalityColumn<SingleColumnMethod>::insertKeyIntoColumns(
|
||||
const Key & key, std::vector<IColumn *> & key_columns_low_cardinality, const Sizes & /*key_sizes*/)
|
||||
{
|
||||
auto * col = assert_cast<ColumnLowCardinality *>(key_columns_low_cardinality[0]);
|
||||
|
||||
if constexpr (std::is_same_v<Key, StringRef>)
|
||||
col->insertData(key.data, key.size);
|
||||
else
|
||||
col->insertData(reinterpret_cast<const char *>(&key), sizeof(key));
|
||||
}
|
||||
template struct AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt8, AggregatedDataWithNullableUInt8Key, false>>;
|
||||
template struct AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt16, AggregatedDataWithNullableUInt16Key, false>>;
|
||||
template struct AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt64Key>>;
|
||||
template struct AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64Key>>;
|
||||
template struct AggregationMethodSingleLowCardinalityColumn<AggregationMethodString<AggregatedDataWithNullableStringKey>>;
|
||||
template struct AggregationMethodSingleLowCardinalityColumn<AggregationMethodFixedString<AggregatedDataWithNullableStringKey>>;
|
||||
template struct AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt64KeyTwoLevel>>;
|
||||
template struct AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64KeyTwoLevel>>;
|
||||
template struct AggregationMethodSingleLowCardinalityColumn<AggregationMethodString<AggregatedDataWithNullableStringKeyTwoLevel>>;
|
||||
template struct AggregationMethodSingleLowCardinalityColumn<AggregationMethodFixedString<AggregatedDataWithNullableStringKeyTwoLevel>>;
|
||||
|
||||
|
||||
template <typename TData, bool has_nullable_keys, bool has_low_cardinality, bool consecutive_keys_optimization>
|
||||
void AggregationMethodKeysFixed<TData, has_nullable_keys, has_low_cardinality,consecutive_keys_optimization>::insertKeyIntoColumns(const Key & key, std::vector<IColumn *> & key_columns, const Sizes & key_sizes)
|
||||
{
|
||||
size_t keys_size = key_columns.size();
|
||||
|
||||
static constexpr auto bitmap_size = has_nullable_keys ? std::tuple_size<KeysNullMap<Key>>::value : 0;
|
||||
/// In any hash key value, column values to be read start just after the bitmap, if it exists.
|
||||
size_t pos = bitmap_size;
|
||||
|
||||
for (size_t i = 0; i < keys_size; ++i)
|
||||
{
|
||||
IColumn * observed_column;
|
||||
ColumnUInt8 * null_map;
|
||||
|
||||
bool column_nullable = false;
|
||||
if constexpr (has_nullable_keys)
|
||||
column_nullable = isColumnNullable(*key_columns[i]);
|
||||
|
||||
/// If we have a nullable column, get its nested column and its null map.
|
||||
if (column_nullable)
|
||||
{
|
||||
ColumnNullable & nullable_col = assert_cast<ColumnNullable &>(*key_columns[i]);
|
||||
observed_column = &nullable_col.getNestedColumn();
|
||||
null_map = assert_cast<ColumnUInt8 *>(&nullable_col.getNullMapColumn());
|
||||
}
|
||||
else
|
||||
{
|
||||
observed_column = key_columns[i];
|
||||
null_map = nullptr;
|
||||
}
|
||||
|
||||
bool is_null = false;
|
||||
if (column_nullable)
|
||||
{
|
||||
/// The current column is nullable. Check if the value of the
|
||||
/// corresponding key is nullable. Update the null map accordingly.
|
||||
size_t bucket = i / 8;
|
||||
size_t offset = i % 8;
|
||||
UInt8 val = (reinterpret_cast<const UInt8 *>(&key)[bucket] >> offset) & 1;
|
||||
null_map->insertValue(val);
|
||||
is_null = val == 1;
|
||||
}
|
||||
|
||||
if (has_nullable_keys && is_null)
|
||||
observed_column->insertDefault();
|
||||
else
|
||||
{
|
||||
size_t size = key_sizes[i];
|
||||
size_t offset_to = pos;
|
||||
if constexpr (std::endian::native == std::endian::big)
|
||||
offset_to = sizeof(Key) - size - pos;
|
||||
observed_column->insertData(reinterpret_cast<const char *>(&key) + offset_to, size);
|
||||
pos += size;
|
||||
}
|
||||
}
|
||||
}
|
||||
template struct AggregationMethodKeysFixed<AggregatedDataWithUInt16Key, false, false, false>;
|
||||
template struct AggregationMethodKeysFixed<AggregatedDataWithUInt32Key>;
|
||||
template struct AggregationMethodKeysFixed<AggregatedDataWithUInt64Key>;
|
||||
template struct AggregationMethodKeysFixed<AggregatedDataWithKeys128>;
|
||||
template struct AggregationMethodKeysFixed<AggregatedDataWithKeys256>;
|
||||
template struct AggregationMethodKeysFixed<AggregatedDataWithUInt32KeyTwoLevel>;
|
||||
template struct AggregationMethodKeysFixed<AggregatedDataWithUInt64KeyTwoLevel>;
|
||||
template struct AggregationMethodKeysFixed<AggregatedDataWithKeys128TwoLevel>;
|
||||
template struct AggregationMethodKeysFixed<AggregatedDataWithKeys256TwoLevel>;
|
||||
template struct AggregationMethodKeysFixed<AggregatedDataWithKeys128Hash64>;
|
||||
template struct AggregationMethodKeysFixed<AggregatedDataWithKeys256Hash64>;
|
||||
template struct AggregationMethodKeysFixed<AggregatedDataWithKeys128, true>;
|
||||
template struct AggregationMethodKeysFixed<AggregatedDataWithKeys256, true>;
|
||||
template struct AggregationMethodKeysFixed<AggregatedDataWithKeys128TwoLevel, true>;
|
||||
template struct AggregationMethodKeysFixed<AggregatedDataWithKeys256TwoLevel, true>;
|
||||
template struct AggregationMethodKeysFixed<AggregatedDataWithKeys128, false, true>;
|
||||
template struct AggregationMethodKeysFixed<AggregatedDataWithKeys256, false, true>;
|
||||
template struct AggregationMethodKeysFixed<AggregatedDataWithKeys128TwoLevel, false, true>;
|
||||
template struct AggregationMethodKeysFixed<AggregatedDataWithKeys256TwoLevel, false, true>;
|
||||
|
||||
|
||||
template <typename TData, bool nullable, bool prealloc>
|
||||
void AggregationMethodSerialized<TData, nullable, prealloc>::insertKeyIntoColumns(StringRef key, std::vector<IColumn *> & key_columns, const Sizes &)
|
||||
{
|
||||
const auto * pos = key.data;
|
||||
for (auto & column : key_columns)
|
||||
pos = column->deserializeAndInsertFromArena(pos);
|
||||
}
|
||||
template struct AggregationMethodSerialized<AggregatedDataWithStringKey>;
|
||||
template struct AggregationMethodSerialized<AggregatedDataWithStringKeyTwoLevel>;
|
||||
template struct AggregationMethodSerialized<AggregatedDataWithStringKeyHash64>;
|
||||
// AggregationMethodNullableSerialized
|
||||
template struct AggregationMethodSerialized<AggregatedDataWithStringKey, true, false>;
|
||||
template struct AggregationMethodSerialized<AggregatedDataWithStringKeyTwoLevel, true, false>;
|
||||
template struct AggregationMethodSerialized<AggregatedDataWithStringKeyHash64, true, false>;
|
||||
// AggregationMethodPreallocSerialized
|
||||
template struct AggregationMethodSerialized<AggregatedDataWithStringKey, false, true>;
|
||||
template struct AggregationMethodSerialized<AggregatedDataWithStringKeyTwoLevel, false, true>;
|
||||
template struct AggregationMethodSerialized<AggregatedDataWithStringKeyHash64, false, true>;
|
||||
// AggregationMethodNullablePreallocSerialized
|
||||
template struct AggregationMethodSerialized<AggregatedDataWithStringKey, true, true>;
|
||||
template struct AggregationMethodSerialized<AggregatedDataWithStringKeyTwoLevel, true, true>;
|
||||
template struct AggregationMethodSerialized<AggregatedDataWithStringKeyHash64, true, true>;
|
||||
|
||||
}
|
320
src/Interpreters/AggregationMethod.h
Normal file
320
src/Interpreters/AggregationMethod.h
Normal file
@ -0,0 +1,320 @@
|
||||
#pragma once
|
||||
#include <vector>
|
||||
#include <Common/ColumnsHashing.h>
|
||||
#include <Interpreters/AggregationCommon.h>
|
||||
#include <Interpreters/AggregatedData.h>
|
||||
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <Columns/ColumnFixedString.h>
|
||||
#include <Columns/ColumnAggregateFunction.h>
|
||||
#include <Columns/ColumnVector.h>
|
||||
#include <Columns/ColumnNullable.h>
|
||||
#include <Columns/ColumnLowCardinality.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class IColumn;
|
||||
/// For the case where there is one numeric key.
|
||||
/// FieldType is UInt8/16/32/64 for any type with corresponding bit width.
|
||||
template <typename FieldType, typename TData,
|
||||
bool consecutive_keys_optimization = true, bool nullable = false>
|
||||
struct AggregationMethodOneNumber
|
||||
{
|
||||
using Data = TData;
|
||||
using Key = typename Data::key_type;
|
||||
using Mapped = typename Data::mapped_type;
|
||||
|
||||
Data data;
|
||||
|
||||
AggregationMethodOneNumber() = default;
|
||||
|
||||
explicit AggregationMethodOneNumber(size_t size_hint) : data(size_hint) { }
|
||||
|
||||
template <typename Other>
|
||||
explicit AggregationMethodOneNumber(const Other & other) : data(other.data)
|
||||
{
|
||||
}
|
||||
|
||||
/// To use one `Method` in different threads, use different `State`.
|
||||
template <bool use_cache>
|
||||
using StateImpl = ColumnsHashing::HashMethodOneNumber<
|
||||
typename Data::value_type,
|
||||
Mapped,
|
||||
FieldType,
|
||||
use_cache && consecutive_keys_optimization,
|
||||
/*need_offset=*/ false,
|
||||
nullable>;
|
||||
|
||||
using State = StateImpl<true>;
|
||||
using StateNoCache = StateImpl<false>;
|
||||
|
||||
/// Use optimization for low cardinality.
|
||||
static const bool low_cardinality_optimization = false;
|
||||
static const bool one_key_nullable_optimization = nullable;
|
||||
|
||||
/// Shuffle key columns before `insertKeyIntoColumns` call if needed.
|
||||
std::optional<Sizes> shuffleKeyColumns(std::vector<IColumn *> &, const Sizes &) { return {}; }
|
||||
|
||||
// Insert the key from the hash table into columns.
|
||||
static void insertKeyIntoColumns(const Key & key, std::vector<IColumn *> & key_columns, const Sizes & /*key_sizes*/);
|
||||
};
|
||||
|
||||
/// For the case where there is one string key.
|
||||
template <typename TData>
|
||||
struct AggregationMethodString
|
||||
{
|
||||
using Data = TData;
|
||||
using Key = typename Data::key_type;
|
||||
using Mapped = typename Data::mapped_type;
|
||||
|
||||
Data data;
|
||||
|
||||
AggregationMethodString() = default;
|
||||
|
||||
template <typename Other>
|
||||
explicit AggregationMethodString(const Other & other) : data(other.data)
|
||||
{
|
||||
}
|
||||
|
||||
explicit AggregationMethodString(size_t size_hint) : data(size_hint) { }
|
||||
|
||||
template <bool use_cache>
|
||||
using StateImpl = ColumnsHashing::HashMethodString<typename Data::value_type, Mapped, /*place_string_to_arena=*/ true, use_cache>;
|
||||
|
||||
using State = StateImpl<true>;
|
||||
using StateNoCache = StateImpl<false>;
|
||||
|
||||
static const bool low_cardinality_optimization = false;
|
||||
static const bool one_key_nullable_optimization = false;
|
||||
|
||||
std::optional<Sizes> shuffleKeyColumns(std::vector<IColumn *> &, const Sizes &) { return {}; }
|
||||
|
||||
static void insertKeyIntoColumns(StringRef key, std::vector<IColumn *> & key_columns, const Sizes &)
|
||||
{
|
||||
static_cast<ColumnString *>(key_columns[0])->insertData(key.data, key.size);
|
||||
}
|
||||
};
|
||||
|
||||
/// Same as above but without cache
|
||||
template <typename TData, bool nullable = false>
|
||||
struct AggregationMethodStringNoCache
|
||||
{
|
||||
using Data = TData;
|
||||
using Key = typename Data::key_type;
|
||||
using Mapped = typename Data::mapped_type;
|
||||
|
||||
Data data;
|
||||
|
||||
AggregationMethodStringNoCache() = default;
|
||||
|
||||
explicit AggregationMethodStringNoCache(size_t size_hint) : data(size_hint) { }
|
||||
|
||||
template <typename Other>
|
||||
explicit AggregationMethodStringNoCache(const Other & other) : data(other.data)
|
||||
{
|
||||
}
|
||||
|
||||
template <bool use_cache>
|
||||
using StateImpl = ColumnsHashing::HashMethodString<typename Data::value_type, Mapped, true, false, false, nullable>;
|
||||
|
||||
using State = StateImpl<true>;
|
||||
using StateNoCache = StateImpl<false>;
|
||||
|
||||
static const bool low_cardinality_optimization = false;
|
||||
static const bool one_key_nullable_optimization = nullable;
|
||||
|
||||
std::optional<Sizes> shuffleKeyColumns(std::vector<IColumn *> &, const Sizes &) { return {}; }
|
||||
|
||||
static void insertKeyIntoColumns(StringRef key, std::vector<IColumn *> & key_columns, const Sizes &);
|
||||
};
|
||||
|
||||
/// For the case where there is one fixed-length string key.
|
||||
template <typename TData>
|
||||
struct AggregationMethodFixedString
|
||||
{
|
||||
using Data = TData;
|
||||
using Key = typename Data::key_type;
|
||||
using Mapped = typename Data::mapped_type;
|
||||
|
||||
Data data;
|
||||
|
||||
AggregationMethodFixedString() = default;
|
||||
|
||||
explicit AggregationMethodFixedString(size_t size_hint) : data(size_hint) { }
|
||||
|
||||
template <typename Other>
|
||||
explicit AggregationMethodFixedString(const Other & other) : data(other.data)
|
||||
{
|
||||
}
|
||||
|
||||
template <bool use_cache>
|
||||
using StateImpl = ColumnsHashing::HashMethodFixedString<typename Data::value_type, Mapped, /*place_string_to_arena=*/ true, use_cache>;
|
||||
|
||||
using State = StateImpl<true>;
|
||||
using StateNoCache = StateImpl<false>;
|
||||
|
||||
static const bool low_cardinality_optimization = false;
|
||||
static const bool one_key_nullable_optimization = false;
|
||||
|
||||
std::optional<Sizes> shuffleKeyColumns(std::vector<IColumn *> &, const Sizes &) { return {}; }
|
||||
|
||||
static void insertKeyIntoColumns(StringRef key, std::vector<IColumn *> & key_columns, const Sizes &);
|
||||
};
|
||||
|
||||
/// Same as above but without cache
|
||||
template <typename TData, bool nullable = false>
|
||||
struct AggregationMethodFixedStringNoCache
|
||||
{
|
||||
using Data = TData;
|
||||
using Key = typename Data::key_type;
|
||||
using Mapped = typename Data::mapped_type;
|
||||
|
||||
Data data;
|
||||
|
||||
AggregationMethodFixedStringNoCache() = default;
|
||||
|
||||
explicit AggregationMethodFixedStringNoCache(size_t size_hint) : data(size_hint) { }
|
||||
|
||||
template <typename Other>
|
||||
explicit AggregationMethodFixedStringNoCache(const Other & other) : data(other.data)
|
||||
{
|
||||
}
|
||||
|
||||
template <bool use_cache>
|
||||
using StateImpl = ColumnsHashing::HashMethodFixedString<typename Data::value_type, Mapped, true, false, false, nullable>;
|
||||
|
||||
using State = StateImpl<true>;
|
||||
using StateNoCache = StateImpl<false>;
|
||||
|
||||
static const bool low_cardinality_optimization = false;
|
||||
static const bool one_key_nullable_optimization = nullable;
|
||||
|
||||
std::optional<Sizes> shuffleKeyColumns(std::vector<IColumn *> &, const Sizes &) { return {}; }
|
||||
|
||||
static void insertKeyIntoColumns(StringRef key, std::vector<IColumn *> & key_columns, const Sizes &);
|
||||
};
|
||||
|
||||
/// Single low cardinality column.
|
||||
template <typename SingleColumnMethod>
|
||||
struct AggregationMethodSingleLowCardinalityColumn : public SingleColumnMethod
|
||||
{
|
||||
using Base = SingleColumnMethod;
|
||||
using Data = typename Base::Data;
|
||||
using Key = typename Base::Key;
|
||||
using Mapped = typename Base::Mapped;
|
||||
using Base::data;
|
||||
|
||||
template <bool use_cache>
|
||||
using BaseStateImpl = typename Base::template StateImpl<use_cache>;
|
||||
|
||||
AggregationMethodSingleLowCardinalityColumn() = default;
|
||||
|
||||
template <typename Other>
|
||||
explicit AggregationMethodSingleLowCardinalityColumn(const Other & other) : Base(other) {}
|
||||
|
||||
template <bool use_cache>
|
||||
using StateImpl = ColumnsHashing::HashMethodSingleLowCardinalityColumn<BaseStateImpl<use_cache>, Mapped, use_cache>;
|
||||
|
||||
using State = StateImpl<true>;
|
||||
using StateNoCache = StateImpl<false>;
|
||||
|
||||
static const bool low_cardinality_optimization = true;
|
||||
|
||||
std::optional<Sizes> shuffleKeyColumns(std::vector<IColumn *> &, const Sizes &) { return {}; }
|
||||
|
||||
static void insertKeyIntoColumns(const Key & key,
|
||||
std::vector<IColumn *> & key_columns_low_cardinality, const Sizes & /*key_sizes*/);
|
||||
};
|
||||
|
||||
/// For the case where all keys are of fixed length, and they fit in N (for example, 128) bits.
|
||||
template <typename TData, bool has_nullable_keys_ = false, bool has_low_cardinality_ = false, bool consecutive_keys_optimization = false>
|
||||
struct AggregationMethodKeysFixed
|
||||
{
|
||||
using Data = TData;
|
||||
using Key = typename Data::key_type;
|
||||
using Mapped = typename Data::mapped_type;
|
||||
static constexpr bool has_nullable_keys = has_nullable_keys_;
|
||||
static constexpr bool has_low_cardinality = has_low_cardinality_;
|
||||
|
||||
Data data;
|
||||
|
||||
AggregationMethodKeysFixed() = default;
|
||||
|
||||
explicit AggregationMethodKeysFixed(size_t size_hint) : data(size_hint) { }
|
||||
|
||||
template <typename Other>
|
||||
explicit AggregationMethodKeysFixed(const Other & other) : data(other.data)
|
||||
{
|
||||
}
|
||||
|
||||
template <bool use_cache>
|
||||
using StateImpl = ColumnsHashing::HashMethodKeysFixed<
|
||||
typename Data::value_type,
|
||||
Key,
|
||||
Mapped,
|
||||
has_nullable_keys,
|
||||
has_low_cardinality,
|
||||
use_cache && consecutive_keys_optimization>;
|
||||
|
||||
using State = StateImpl<true>;
|
||||
using StateNoCache = StateImpl<false>;
|
||||
|
||||
static const bool low_cardinality_optimization = false;
|
||||
static const bool one_key_nullable_optimization = false;
|
||||
|
||||
std::optional<Sizes> shuffleKeyColumns(std::vector<IColumn *> & key_columns, const Sizes & key_sizes)
|
||||
{
|
||||
return State::shuffleKeyColumns(key_columns, key_sizes);
|
||||
}
|
||||
|
||||
static void insertKeyIntoColumns(const Key & key, std::vector<IColumn *> & key_columns, const Sizes & key_sizes);
|
||||
};
|
||||
|
||||
/** Aggregates by concatenating serialized key values.
|
||||
* The serialized value differs in that it uniquely allows to deserialize it, having only the position with which it starts.
|
||||
* That is, for example, for strings, it contains first the serialized length of the string, and then the bytes.
|
||||
* Therefore, when aggregating by several strings, there is no ambiguity.
|
||||
*/
|
||||
template <typename TData, bool nullable = false, bool prealloc = false>
|
||||
struct AggregationMethodSerialized
|
||||
{
|
||||
using Data = TData;
|
||||
using Key = typename Data::key_type;
|
||||
using Mapped = typename Data::mapped_type;
|
||||
|
||||
Data data;
|
||||
|
||||
AggregationMethodSerialized() = default;
|
||||
|
||||
explicit AggregationMethodSerialized(size_t size_hint) : data(size_hint) { }
|
||||
|
||||
template <typename Other>
|
||||
explicit AggregationMethodSerialized(const Other & other) : data(other.data)
|
||||
{
|
||||
}
|
||||
|
||||
template <bool use_cache>
|
||||
using StateImpl = ColumnsHashing::HashMethodSerialized<typename Data::value_type, Mapped, nullable, prealloc>;
|
||||
|
||||
using State = StateImpl<true>;
|
||||
using StateNoCache = StateImpl<false>;
|
||||
|
||||
static const bool low_cardinality_optimization = false;
|
||||
static const bool one_key_nullable_optimization = false;
|
||||
|
||||
std::optional<Sizes> shuffleKeyColumns(std::vector<IColumn *> &, const Sizes &) { return {}; }
|
||||
|
||||
static void insertKeyIntoColumns(StringRef key, std::vector<IColumn *> & key_columns, const Sizes &);
|
||||
};
|
||||
|
||||
template <typename TData>
|
||||
using AggregationMethodNullableSerialized = AggregationMethodSerialized<TData, true>;
|
||||
|
||||
template <typename TData>
|
||||
using AggregationMethodPreallocSerialized = AggregationMethodSerialized<TData, false, true>;
|
||||
|
||||
template <typename TData>
|
||||
using AggregationMethodNullablePreallocSerialized = AggregationMethodSerialized<TData, true, true>;
|
||||
|
||||
|
||||
}
|
@ -48,7 +48,6 @@ namespace ProfileEvents
|
||||
extern const Event ExternalAggregationUncompressedBytes;
|
||||
extern const Event ExternalProcessingCompressedBytesTotal;
|
||||
extern const Event ExternalProcessingUncompressedBytesTotal;
|
||||
extern const Event AggregationPreallocatedElementsInHashTables;
|
||||
extern const Event AggregationHashTablesInitializedAsTwoLevel;
|
||||
extern const Event OverflowThrow;
|
||||
extern const Event OverflowBreak;
|
||||
@ -270,50 +269,6 @@ void updateStatistics(const DB::ManyAggregatedDataVariants & data_variants, cons
|
||||
getHashTablesStatistics().update(sum_of_sizes, *median_size, params);
|
||||
}
|
||||
|
||||
// The std::is_constructible trait isn't suitable here because some classes have template constructors with semantics different from providing size hints.
|
||||
// Also string hash table variants are not supported due to the fact that both local perf tests and tests in CI showed slowdowns for them.
|
||||
template <typename...>
|
||||
struct HasConstructorOfNumberOfElements : std::false_type
|
||||
{
|
||||
};
|
||||
|
||||
template <typename... Ts>
|
||||
struct HasConstructorOfNumberOfElements<HashMapTable<Ts...>> : std::true_type
|
||||
{
|
||||
};
|
||||
|
||||
template <typename Key, typename Cell, typename Hash, typename Grower, typename Allocator, template <typename...> typename ImplTable>
|
||||
struct HasConstructorOfNumberOfElements<TwoLevelHashMapTable<Key, Cell, Hash, Grower, Allocator, ImplTable>> : std::true_type
|
||||
{
|
||||
};
|
||||
|
||||
template <typename... Ts>
|
||||
struct HasConstructorOfNumberOfElements<HashTable<Ts...>> : std::true_type
|
||||
{
|
||||
};
|
||||
|
||||
template <typename... Ts>
|
||||
struct HasConstructorOfNumberOfElements<TwoLevelHashTable<Ts...>> : std::true_type
|
||||
{
|
||||
};
|
||||
|
||||
template <template <typename> typename Method, typename Base>
|
||||
struct HasConstructorOfNumberOfElements<Method<Base>> : HasConstructorOfNumberOfElements<Base>
|
||||
{
|
||||
};
|
||||
|
||||
template <typename Method>
|
||||
auto constructWithReserveIfPossible(size_t size_hint)
|
||||
{
|
||||
if constexpr (HasConstructorOfNumberOfElements<typename Method::Data>::value)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::AggregationPreallocatedElementsInHashTables, size_hint);
|
||||
return std::make_unique<Method>(size_hint);
|
||||
}
|
||||
else
|
||||
return std::make_unique<Method>();
|
||||
}
|
||||
|
||||
DB::ColumnNumbers calculateKeysPositions(const DB::Block & header, const DB::Aggregator::Params & params)
|
||||
{
|
||||
DB::ColumnNumbers keys_positions(params.keys_size);
|
||||
@ -346,71 +301,11 @@ size_t getMinBytesForPrefetch()
|
||||
namespace DB
|
||||
{
|
||||
|
||||
AggregatedDataVariants::~AggregatedDataVariants()
|
||||
{
|
||||
if (aggregator && !aggregator->all_aggregates_has_trivial_destructor)
|
||||
{
|
||||
try
|
||||
{
|
||||
aggregator->destroyAllAggregateStates(*this);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<HashTablesCacheStatistics> getHashTablesCacheStatistics()
|
||||
{
|
||||
return getHashTablesStatistics().getCacheStats();
|
||||
}
|
||||
|
||||
void AggregatedDataVariants::convertToTwoLevel()
|
||||
{
|
||||
if (aggregator)
|
||||
LOG_TRACE(aggregator->log, "Converting aggregation data to two-level.");
|
||||
|
||||
switch (type)
|
||||
{
|
||||
#define M(NAME) \
|
||||
case Type::NAME: \
|
||||
NAME ## _two_level = std::make_unique<decltype(NAME ## _two_level)::element_type>(*(NAME)); \
|
||||
(NAME).reset(); \
|
||||
type = Type::NAME ## _two_level; \
|
||||
break;
|
||||
|
||||
APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M)
|
||||
|
||||
#undef M
|
||||
|
||||
default:
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong data variant passed.");
|
||||
}
|
||||
}
|
||||
|
||||
void AggregatedDataVariants::init(Type type_, std::optional<size_t> size_hint)
|
||||
{
|
||||
switch (type_)
|
||||
{
|
||||
case Type::EMPTY:
|
||||
case Type::without_key:
|
||||
break;
|
||||
|
||||
#define M(NAME, IS_TWO_LEVEL) \
|
||||
case Type::NAME: \
|
||||
if (size_hint) \
|
||||
(NAME) = constructWithReserveIfPossible<decltype(NAME)::element_type>(*size_hint); \
|
||||
else \
|
||||
(NAME) = std::make_unique<decltype(NAME)::element_type>(); \
|
||||
break;
|
||||
APPLY_FOR_AGGREGATED_VARIANTS(M)
|
||||
#undef M
|
||||
}
|
||||
|
||||
type = type_;
|
||||
}
|
||||
|
||||
Aggregator::Params::StatsCollectingParams::StatsCollectingParams() = default;
|
||||
|
||||
Aggregator::Params::StatsCollectingParams::StatsCollectingParams(
|
||||
@ -1112,7 +1007,6 @@ void NO_INLINE Aggregator::executeImpl(
|
||||
bool all_keys_are_const,
|
||||
AggregateDataPtr overflow_row) const
|
||||
{
|
||||
bool use_compiled_functions = false;
|
||||
if (!no_more_keys)
|
||||
{
|
||||
/// Prefetching doesn't make sense for small hash tables, because they fit in caches entirely.
|
||||
@ -1120,47 +1014,33 @@ void NO_INLINE Aggregator::executeImpl(
|
||||
&& (method.data.getBufferSizeInBytes() > min_bytes_for_prefetch);
|
||||
|
||||
#if USE_EMBEDDED_COMPILER
|
||||
use_compiled_functions = compiled_aggregate_functions_holder && !hasSparseArguments(aggregate_instructions);
|
||||
#endif
|
||||
if (prefetch)
|
||||
executeImplBatch<false, true>(
|
||||
method,
|
||||
state,
|
||||
aggregates_pool,
|
||||
row_begin,
|
||||
row_end,
|
||||
aggregate_instructions,
|
||||
all_keys_are_const,
|
||||
use_compiled_functions,
|
||||
overflow_row);
|
||||
if (compiled_aggregate_functions_holder && !hasSparseArguments(aggregate_instructions))
|
||||
{
|
||||
if (prefetch)
|
||||
executeImplBatch<true>(
|
||||
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, false, all_keys_are_const, true, overflow_row);
|
||||
else
|
||||
executeImplBatch<false>(
|
||||
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, false, all_keys_are_const, true, overflow_row);
|
||||
}
|
||||
else
|
||||
executeImplBatch<false, false>(
|
||||
method,
|
||||
state,
|
||||
aggregates_pool,
|
||||
row_begin,
|
||||
row_end,
|
||||
aggregate_instructions,
|
||||
all_keys_are_const,
|
||||
use_compiled_functions,
|
||||
overflow_row);
|
||||
#endif
|
||||
{
|
||||
if (prefetch)
|
||||
executeImplBatch<true>(
|
||||
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, false, all_keys_are_const, false, overflow_row);
|
||||
else
|
||||
executeImplBatch<false>(
|
||||
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, false, all_keys_are_const, false, overflow_row);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
executeImplBatch<true, false>(
|
||||
method,
|
||||
state,
|
||||
aggregates_pool,
|
||||
row_begin,
|
||||
row_end,
|
||||
aggregate_instructions,
|
||||
all_keys_are_const,
|
||||
use_compiled_functions,
|
||||
overflow_row);
|
||||
executeImplBatch<false>(method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, true, all_keys_are_const, false, overflow_row);
|
||||
}
|
||||
}
|
||||
|
||||
template <bool no_more_keys, bool prefetch, typename Method, typename State>
|
||||
template <bool prefetch, typename Method, typename State>
|
||||
void NO_INLINE Aggregator::executeImplBatch(
|
||||
Method & method,
|
||||
State & state,
|
||||
@ -1168,6 +1048,7 @@ void NO_INLINE Aggregator::executeImplBatch(
|
||||
size_t row_begin,
|
||||
size_t row_end,
|
||||
AggregateFunctionInstruction * aggregate_instructions,
|
||||
bool no_more_keys,
|
||||
bool all_keys_are_const,
|
||||
bool use_compiled_functions [[maybe_unused]],
|
||||
AggregateDataPtr overflow_row) const
|
||||
@ -1181,7 +1062,7 @@ void NO_INLINE Aggregator::executeImplBatch(
|
||||
/// Optimization for special case when there are no aggregate functions.
|
||||
if (params.aggregates_size == 0)
|
||||
{
|
||||
if constexpr (no_more_keys)
|
||||
if (no_more_keys)
|
||||
return;
|
||||
|
||||
/// This pointer is unused, but the logic will compare it for nullptr to check if the cell is set.
|
||||
@ -1214,39 +1095,42 @@ void NO_INLINE Aggregator::executeImplBatch(
|
||||
}
|
||||
|
||||
/// Optimization for special case when aggregating by 8bit key.
|
||||
if constexpr (!no_more_keys && std::is_same_v<Method, typename decltype(AggregatedDataVariants::key8)::element_type>)
|
||||
if (!no_more_keys)
|
||||
{
|
||||
/// We use another method if there are aggregate functions with -Array combinator.
|
||||
bool has_arrays = false;
|
||||
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
|
||||
{
|
||||
if (inst->offsets)
|
||||
{
|
||||
has_arrays = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!has_arrays && !hasSparseArguments(aggregate_instructions) && !all_keys_are_const)
|
||||
if constexpr (std::is_same_v<Method, typename decltype(AggregatedDataVariants::key8)::element_type>)
|
||||
{
|
||||
/// We use another method if there are aggregate functions with -Array combinator.
|
||||
bool has_arrays = false;
|
||||
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
|
||||
{
|
||||
inst->batch_that->addBatchLookupTable8(
|
||||
row_begin,
|
||||
row_end,
|
||||
reinterpret_cast<AggregateDataPtr *>(method.data.data()),
|
||||
inst->state_offset,
|
||||
[&](AggregateDataPtr & aggregate_data)
|
||||
{
|
||||
AggregateDataPtr place = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
|
||||
createAggregateStates(place);
|
||||
aggregate_data = place;
|
||||
},
|
||||
state.getKeyData(),
|
||||
inst->batch_arguments,
|
||||
aggregates_pool);
|
||||
if (inst->offsets)
|
||||
{
|
||||
has_arrays = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!has_arrays && !hasSparseArguments(aggregate_instructions) && !all_keys_are_const)
|
||||
{
|
||||
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
|
||||
{
|
||||
inst->batch_that->addBatchLookupTable8(
|
||||
row_begin,
|
||||
row_end,
|
||||
reinterpret_cast<AggregateDataPtr *>(method.data.data()),
|
||||
inst->state_offset,
|
||||
[&](AggregateDataPtr & aggregate_data)
|
||||
{
|
||||
AggregateDataPtr place = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
|
||||
createAggregateStates(place);
|
||||
aggregate_data = place;
|
||||
},
|
||||
state.getKeyData(),
|
||||
inst->batch_arguments,
|
||||
aggregates_pool);
|
||||
}
|
||||
return;
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
@ -1272,12 +1156,12 @@ void NO_INLINE Aggregator::executeImplBatch(
|
||||
state.resetCache();
|
||||
|
||||
/// For all rows.
|
||||
for (size_t i = key_start; i < key_end; ++i)
|
||||
if (!no_more_keys)
|
||||
{
|
||||
AggregateDataPtr aggregate_data = nullptr;
|
||||
|
||||
if constexpr (!no_more_keys)
|
||||
for (size_t i = key_start; i < key_end; ++i)
|
||||
{
|
||||
AggregateDataPtr aggregate_data = nullptr;
|
||||
|
||||
if constexpr (prefetch && HasPrefetchMemberFunc<decltype(method.data), KeyHolder>)
|
||||
{
|
||||
if (i == key_start + prefetching.iterationsToMeasure())
|
||||
@ -1323,24 +1207,47 @@ void NO_INLINE Aggregator::executeImplBatch(
|
||||
aggregate_data = emplace_result.getMapped();
|
||||
|
||||
assert(aggregate_data != nullptr);
|
||||
places[i] = aggregate_data;
|
||||
}
|
||||
else
|
||||
}
|
||||
else
|
||||
{
|
||||
for (size_t i = key_start; i < key_end; ++i)
|
||||
{
|
||||
AggregateDataPtr aggregate_data = nullptr;
|
||||
/// Add only if the key already exists.
|
||||
auto find_result = state.findKey(method.data, i, *aggregates_pool);
|
||||
if (find_result.isFound())
|
||||
{
|
||||
aggregate_data = find_result.getMapped();
|
||||
}
|
||||
else
|
||||
{
|
||||
aggregate_data = overflow_row;
|
||||
}
|
||||
places[i] = aggregate_data;
|
||||
}
|
||||
|
||||
places[i] = aggregate_data;
|
||||
}
|
||||
|
||||
executeAggregateInstructions(
|
||||
aggregates_pool,
|
||||
row_begin,
|
||||
row_end,
|
||||
aggregate_instructions,
|
||||
places,
|
||||
key_start,
|
||||
state.hasOnlyOneValueSinceLastReset(),
|
||||
all_keys_are_const,
|
||||
use_compiled_functions);
|
||||
}
|
||||
|
||||
void Aggregator::executeAggregateInstructions(
|
||||
Arena * aggregates_pool,
|
||||
size_t row_begin,
|
||||
size_t row_end,
|
||||
AggregateFunctionInstruction * aggregate_instructions,
|
||||
const std::unique_ptr<AggregateDataPtr[]> &places,
|
||||
size_t key_start,
|
||||
bool has_only_one_value_since_last_reset,
|
||||
bool all_keys_are_const,
|
||||
bool use_compiled_functions [[maybe_unused]]) const
|
||||
{
|
||||
#if USE_EMBEDDED_COMPILER
|
||||
if (use_compiled_functions)
|
||||
{
|
||||
@ -1360,7 +1267,7 @@ void NO_INLINE Aggregator::executeImplBatch(
|
||||
columns_data.emplace_back(getColumnData(inst->batch_arguments[argument_index]));
|
||||
}
|
||||
|
||||
if (all_keys_are_const || (can_optimize_equal_keys_ranges && state.hasOnlyOneValueSinceLastReset()))
|
||||
if (all_keys_are_const || (can_optimize_equal_keys_ranges && has_only_one_value_since_last_reset))
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::AggregationOptimizedEqualRangesOfKeys);
|
||||
auto add_into_aggregate_states_function_single_place = compiled_aggregate_functions_holder->compiled_aggregate_functions.add_into_aggregate_states_function_single_place;
|
||||
@ -1384,7 +1291,7 @@ void NO_INLINE Aggregator::executeImplBatch(
|
||||
|
||||
AggregateFunctionInstruction * inst = aggregate_instructions + i;
|
||||
|
||||
if (all_keys_are_const || (inst->can_optimize_equal_keys_ranges && state.hasOnlyOneValueSinceLastReset()))
|
||||
if (all_keys_are_const || (inst->can_optimize_equal_keys_ranges && has_only_one_value_since_last_reset))
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::AggregationOptimizedEqualRangesOfKeys);
|
||||
addBatchSinglePlace(row_begin, row_end, inst, places[key_start] + inst->state_offset, aggregates_pool);
|
||||
@ -1394,6 +1301,7 @@ void NO_INLINE Aggregator::executeImplBatch(
|
||||
addBatch(row_begin, row_end, inst, places.get(), aggregates_pool);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
@ -1825,8 +1733,14 @@ Block Aggregator::convertOneBucketToBlock(
|
||||
{
|
||||
// Used in ConvertingAggregatedToChunksSource -> ConvertingAggregatedToChunksTransform (expects single chunk for each bucket_id).
|
||||
constexpr bool return_single_block = true;
|
||||
Block block = convertToBlockImpl<return_single_block>(
|
||||
method, method.data.impls[bucket], arena, data_variants.aggregates_pools, final, method.data.impls[bucket].size());
|
||||
Block block = std::get<Block>(convertToBlockImpl(
|
||||
method,
|
||||
method.data.impls[bucket],
|
||||
arena,
|
||||
data_variants.aggregates_pools,
|
||||
final,
|
||||
method.data.impls[bucket].size(),
|
||||
return_single_block));
|
||||
|
||||
block.info.bucket_num = static_cast<int>(bucket);
|
||||
return block;
|
||||
@ -1946,29 +1860,27 @@ bool Aggregator::checkLimits(size_t result_size, bool & no_more_keys) const
|
||||
}
|
||||
|
||||
|
||||
template <bool return_single_block, typename Method, typename Table>
|
||||
Aggregator::ConvertToBlockRes<return_single_block>
|
||||
Aggregator::convertToBlockImpl(Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, bool final, size_t rows) const
|
||||
template <typename Method, typename Table>
|
||||
Aggregator::ConvertToBlockResVariant
|
||||
Aggregator::convertToBlockImpl(Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, bool final,size_t rows, bool return_single_block) const
|
||||
{
|
||||
if (data.empty())
|
||||
{
|
||||
auto && out_cols = prepareOutputBlockColumns(params, aggregate_functions, getHeader(final), aggregates_pools, final, rows);
|
||||
return {finalizeBlock(params, getHeader(final), std::move(out_cols), final, rows)};
|
||||
}
|
||||
|
||||
ConvertToBlockRes<return_single_block> res;
|
||||
|
||||
ConvertToBlockResVariant res;
|
||||
bool use_compiled_functions = false;
|
||||
if (final)
|
||||
{
|
||||
#if USE_EMBEDDED_COMPILER
|
||||
use_compiled_functions = compiled_aggregate_functions_holder != nullptr && !Method::low_cardinality_optimization;
|
||||
#endif
|
||||
res = convertToBlockImplFinal<Method, return_single_block>(method, data, arena, aggregates_pools, use_compiled_functions, rows);
|
||||
res = convertToBlockImplFinal<Method>(method, data, arena, aggregates_pools, use_compiled_functions, return_single_block);
|
||||
}
|
||||
else
|
||||
{
|
||||
res = convertToBlockImplNotFinal<return_single_block>(method, data, aggregates_pools, rows);
|
||||
res = convertToBlockImplNotFinal(method, data, aggregates_pools, rows, return_single_block);
|
||||
}
|
||||
|
||||
/// In order to release memory early.
|
||||
@ -2135,19 +2047,24 @@ Block Aggregator::insertResultsIntoColumns(
|
||||
return finalizeBlock(params, getHeader(/* final */ true), std::move(out_cols), /* final */ true, places.size());
|
||||
}
|
||||
|
||||
template <typename Method, bool return_single_block, typename Table>
|
||||
Aggregator::ConvertToBlockRes<return_single_block> NO_INLINE Aggregator::convertToBlockImplFinal(
|
||||
Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, bool use_compiled_functions [[maybe_unused]], size_t) const
|
||||
template <typename Method, typename Table>
|
||||
Aggregator::ConvertToBlockResVariant Aggregator::convertToBlockImplFinal(
|
||||
Method & method,
|
||||
Table & data,
|
||||
Arena * arena,
|
||||
Arenas & aggregates_pools,
|
||||
bool use_compiled_functions [[maybe_unused]],
|
||||
bool return_single_block) const
|
||||
{
|
||||
/// +1 for nullKeyData, if `data` doesn't have it - not a problem, just some memory for one excessive row will be preallocated
|
||||
const size_t max_block_size = (return_single_block ? data.size() : std::min(params.max_block_size, data.size())) + 1;
|
||||
const bool final = true;
|
||||
ConvertToBlockRes<return_single_block> res;
|
||||
|
||||
std::optional<OutputBlockColumns> out_cols;
|
||||
std::optional<Sizes> shuffled_key_sizes;
|
||||
PaddedPODArray<AggregateDataPtr> places;
|
||||
bool has_null_key_data = false;
|
||||
BlocksList blocks;
|
||||
|
||||
auto init_out_cols = [&]()
|
||||
{
|
||||
@ -2189,40 +2106,38 @@ Aggregator::ConvertToBlockRes<return_single_block> NO_INLINE Aggregator::convert
|
||||
/// Mark the cell as destroyed so it will not be destroyed in destructor.
|
||||
mapped = nullptr;
|
||||
|
||||
if constexpr (!return_single_block)
|
||||
if (!return_single_block && places.size() >= max_block_size)
|
||||
{
|
||||
if (places.size() >= max_block_size)
|
||||
{
|
||||
res.emplace_back(
|
||||
insertResultsIntoColumns(places, std::move(out_cols.value()), arena, has_null_key_data, use_compiled_functions));
|
||||
places.clear();
|
||||
out_cols.reset();
|
||||
has_null_key_data = false;
|
||||
}
|
||||
blocks.emplace_back(
|
||||
insertResultsIntoColumns(places, std::move(out_cols.value()), arena, has_null_key_data, use_compiled_functions));
|
||||
places.clear();
|
||||
out_cols.reset();
|
||||
has_null_key_data = false;
|
||||
}
|
||||
});
|
||||
|
||||
if constexpr (return_single_block)
|
||||
if (return_single_block)
|
||||
{
|
||||
return insertResultsIntoColumns(places, std::move(out_cols.value()), arena, has_null_key_data, use_compiled_functions);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (out_cols.has_value())
|
||||
res.emplace_back(
|
||||
insertResultsIntoColumns(places, std::move(out_cols.value()), arena, has_null_key_data, use_compiled_functions));
|
||||
return res;
|
||||
{
|
||||
blocks.emplace_back(insertResultsIntoColumns(places, std::move(out_cols.value()), arena, has_null_key_data, use_compiled_functions));
|
||||
}
|
||||
return blocks;
|
||||
}
|
||||
}
|
||||
|
||||
template <bool return_single_block, typename Method, typename Table>
|
||||
Aggregator::ConvertToBlockRes<return_single_block> NO_INLINE
|
||||
Aggregator::convertToBlockImplNotFinal(Method & method, Table & data, Arenas & aggregates_pools, size_t) const
|
||||
template <typename Method, typename Table>
|
||||
Aggregator::ConvertToBlockResVariant NO_INLINE
|
||||
Aggregator::convertToBlockImplNotFinal(Method & method, Table & data, Arenas & aggregates_pools, size_t, bool return_single_block) const
|
||||
{
|
||||
/// +1 for nullKeyData, if `data` doesn't have it - not a problem, just some memory for one excessive row will be preallocated
|
||||
const size_t max_block_size = (return_single_block ? data.size() : std::min(params.max_block_size, data.size())) + 1;
|
||||
const bool final = false;
|
||||
ConvertToBlockRes<return_single_block> res;
|
||||
BlocksList res_blocks;
|
||||
|
||||
std::optional<OutputBlockColumns> out_cols;
|
||||
std::optional<Sizes> shuffled_key_sizes;
|
||||
@ -2252,7 +2167,6 @@ Aggregator::convertToBlockImplNotFinal(Method & method, Table & data, Arenas & a
|
||||
|
||||
// should be invoked at least once, because null data might be the only content of the `data`
|
||||
init_out_cols();
|
||||
|
||||
data.forEachValue(
|
||||
[&](const auto & key, auto & mapped)
|
||||
{
|
||||
@ -2269,29 +2183,24 @@ Aggregator::convertToBlockImplNotFinal(Method & method, Table & data, Arenas & a
|
||||
mapped = nullptr;
|
||||
|
||||
++rows_in_current_block;
|
||||
|
||||
if constexpr (!return_single_block)
|
||||
if (!return_single_block && rows_in_current_block >= max_block_size)
|
||||
{
|
||||
if (rows_in_current_block >= max_block_size)
|
||||
{
|
||||
res.emplace_back(finalizeBlock(params, getHeader(final), std::move(out_cols.value()), final, rows_in_current_block));
|
||||
out_cols.reset();
|
||||
rows_in_current_block = 0;
|
||||
}
|
||||
res_blocks.emplace_back(finalizeBlock(params, getHeader(final), std::move(out_cols.value()), final, rows_in_current_block));
|
||||
out_cols.reset();
|
||||
rows_in_current_block = 0;
|
||||
}
|
||||
});
|
||||
|
||||
if constexpr (return_single_block)
|
||||
if (return_single_block)
|
||||
{
|
||||
return finalizeBlock(params, getHeader(final), std::move(out_cols).value(), final, rows_in_current_block);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (rows_in_current_block)
|
||||
res.emplace_back(finalizeBlock(params, getHeader(final), std::move(out_cols).value(), final, rows_in_current_block));
|
||||
return res;
|
||||
res_blocks.emplace_back(finalizeBlock(params, getHeader(final), std::move(out_cols).value(), final, rows_in_current_block));
|
||||
return res_blocks;
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
void Aggregator::addSingleKeyToAggregateColumns(
|
||||
@ -2397,18 +2306,23 @@ template <bool return_single_block>
|
||||
Aggregator::ConvertToBlockRes<return_single_block>
|
||||
Aggregator::prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const
|
||||
{
|
||||
ConvertToBlockResVariant res_variant;
|
||||
const size_t rows = data_variants.sizeWithoutOverflowRow();
|
||||
#define M(NAME) \
|
||||
else if (data_variants.type == AggregatedDataVariants::Type::NAME) \
|
||||
{ \
|
||||
return convertToBlockImpl<return_single_block>( \
|
||||
*data_variants.NAME, data_variants.NAME->data, data_variants.aggregates_pool, data_variants.aggregates_pools, final, rows); \
|
||||
res_variant = convertToBlockImpl( \
|
||||
*data_variants.NAME, data_variants.NAME->data, data_variants.aggregates_pool, data_variants.aggregates_pools, final, rows, return_single_block); \
|
||||
}
|
||||
|
||||
if (false) {} // NOLINT
|
||||
APPLY_FOR_VARIANTS_SINGLE_LEVEL(M)
|
||||
#undef M
|
||||
else throw Exception(ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT, "Unknown aggregated data variant.");
|
||||
if constexpr (return_single_block)
|
||||
return std::get<Block>(res_variant);
|
||||
else
|
||||
return std::get<BlocksList>(res_variant);
|
||||
}
|
||||
|
||||
|
||||
@ -2534,7 +2448,7 @@ BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, b
|
||||
if (data_variants.type != AggregatedDataVariants::Type::without_key)
|
||||
{
|
||||
if (!data_variants.isTwoLevel())
|
||||
blocks.splice(blocks.end(), prepareBlockAndFillSingleLevel</* return_single_block */ false>(data_variants, final));
|
||||
blocks.splice(blocks.end(), prepareBlockAndFillSingleLevel<false>(data_variants, final));
|
||||
else
|
||||
blocks.splice(blocks.end(), prepareBlocksAndFillTwoLevel(data_variants, final, thread_pool.get()));
|
||||
}
|
||||
@ -2600,9 +2514,9 @@ void NO_INLINE Aggregator::mergeDataNullKey(
|
||||
}
|
||||
}
|
||||
|
||||
template <typename Method, bool prefetch, typename Table>
|
||||
void NO_INLINE
|
||||
Aggregator::mergeDataImpl(Table & table_dst, Table & table_src, Arena * arena, bool use_compiled_functions [[maybe_unused]]) const
|
||||
template <typename Method, typename Table>
|
||||
void NO_INLINE Aggregator::mergeDataImpl(
|
||||
Table & table_dst, Table & table_src, Arena * arena, bool use_compiled_functions [[maybe_unused]], bool prefetch) const
|
||||
{
|
||||
if constexpr (Method::low_cardinality_optimization || Method::one_key_nullable_optimization)
|
||||
mergeDataNullKey<Method, Table>(table_dst, table_src, arena);
|
||||
@ -2625,7 +2539,10 @@ Aggregator::mergeDataImpl(Table & table_dst, Table & table_src, Arena * arena, b
|
||||
src = nullptr;
|
||||
};
|
||||
|
||||
table_src.template mergeToViaEmplace<decltype(merge), prefetch>(table_dst, std::move(merge));
|
||||
if (prefetch)
|
||||
table_src.template mergeToViaEmplace<decltype(merge), true>(table_dst, std::move(merge));
|
||||
else
|
||||
table_src.template mergeToViaEmplace<decltype(merge), false>(table_dst, std::move(merge));
|
||||
table_src.clearAndShrink();
|
||||
|
||||
#if USE_EMBEDDED_COMPILER
|
||||
@ -2779,16 +2696,18 @@ void NO_INLINE Aggregator::mergeSingleLevelDataImpl(
|
||||
|
||||
if (!no_more_keys)
|
||||
{
|
||||
bool use_compiled_functions = false;
|
||||
#if USE_EMBEDDED_COMPILER
|
||||
use_compiled_functions = compiled_aggregate_functions_holder != nullptr;
|
||||
#endif
|
||||
if (prefetch)
|
||||
mergeDataImpl<Method, true>(
|
||||
getDataVariant<Method>(*res).data, getDataVariant<Method>(current).data, res->aggregates_pool, use_compiled_functions);
|
||||
if (compiled_aggregate_functions_holder)
|
||||
{
|
||||
mergeDataImpl<Method>(
|
||||
getDataVariant<Method>(*res).data, getDataVariant<Method>(current).data, res->aggregates_pool, true, prefetch);
|
||||
}
|
||||
else
|
||||
mergeDataImpl<Method, false>(
|
||||
getDataVariant<Method>(*res).data, getDataVariant<Method>(current).data, res->aggregates_pool, use_compiled_functions);
|
||||
#endif
|
||||
{
|
||||
mergeDataImpl<Method>(
|
||||
getDataVariant<Method>(*res).data, getDataVariant<Method>(current).data, res->aggregates_pool, false, prefetch);
|
||||
}
|
||||
}
|
||||
else if (res->without_key)
|
||||
{
|
||||
@ -2833,22 +2752,22 @@ void NO_INLINE Aggregator::mergeBucketImpl(
|
||||
return;
|
||||
|
||||
AggregatedDataVariants & current = *data[result_num];
|
||||
bool use_compiled_functions = false;
|
||||
#if USE_EMBEDDED_COMPILER
|
||||
use_compiled_functions = compiled_aggregate_functions_holder != nullptr;
|
||||
#endif
|
||||
if (prefetch)
|
||||
mergeDataImpl<Method, true>(
|
||||
getDataVariant<Method>(*res).data.impls[bucket],
|
||||
getDataVariant<Method>(current).data.impls[bucket],
|
||||
arena,
|
||||
use_compiled_functions);
|
||||
if (compiled_aggregate_functions_holder)
|
||||
{
|
||||
mergeDataImpl<Method>(
|
||||
getDataVariant<Method>(*res).data.impls[bucket], getDataVariant<Method>(current).data.impls[bucket], arena, true, prefetch);
|
||||
}
|
||||
else
|
||||
mergeDataImpl<Method, false>(
|
||||
#endif
|
||||
{
|
||||
mergeDataImpl<Method>(
|
||||
getDataVariant<Method>(*res).data.impls[bucket],
|
||||
getDataVariant<Method>(current).data.impls[bucket],
|
||||
arena,
|
||||
use_compiled_functions);
|
||||
false,
|
||||
prefetch);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -3560,7 +3479,4 @@ void Aggregator::destroyAllAggregateStates(AggregatedDataVariants & result) cons
|
||||
throw Exception(ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT, "Unknown aggregated data variant.");
|
||||
}
|
||||
|
||||
|
||||
template Aggregator::ConvertToBlockRes<false>
|
||||
Aggregator::prepareBlockAndFillSingleLevel<false>(AggregatedDataVariants & data_variants, bool final) const;
|
||||
}
|
||||
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue
Block a user