From 7beb58b0cf2cd04fdf7548ce0574bbb1ec6280a5 Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Thu, 17 Nov 2022 13:19:02 +0100 Subject: [PATCH] Optimize merge of uniqExact without_key (#43072) * impl for uniqExact * rm unused (read|write)Text methods * fix style * small fixes * impl for variadic uniqExact * refactor * fix style * more agressive inlining * disable if max_threads=1 * small improvements * review fixes * Revert "rm unused (read|write)Text methods" This reverts commit a7e74805842572f6fa2c28ea111ab8ca7c19ad21. * encapsulate is_able_to_parallelize_merge in Data * encapsulate is_exact & argument_is_tuple in Data --- .../AggregateFunctionUniq.cpp | 56 ++-- .../AggregateFunctionUniq.h | 307 +++++++++++++++--- src/AggregateFunctions/Helpers.h | 13 + src/AggregateFunctions/IAggregateFunction.h | 15 +- src/AggregateFunctions/UniqExactSet.h | 112 +++++++ src/AggregateFunctions/UniquesHashSet.h | 2 +- src/Common/HashTable/HashSet.h | 57 ++++ src/Common/HashTable/HashTable.h | 20 +- src/Common/HashTable/TwoLevelHashTable.h | 8 +- src/Common/examples/small_table.cpp | 2 +- src/Common/tests/gtest_hash_table.cpp | 56 ++++ src/Interpreters/Aggregator.cpp | 12 +- .../test_aggregate_function_state.py | 228 +++++++++++++ .../test_aggregate_function_state_avg.py | 82 ----- tests/performance/uniq_without_key.xml | 33 ++ 15 files changed, 832 insertions(+), 171 deletions(-) create mode 100644 src/AggregateFunctions/UniqExactSet.h create mode 100644 tests/integration/test_backward_compatibility/test_aggregate_function_state.py delete mode 100644 tests/integration/test_backward_compatibility/test_aggregate_function_state_avg.py create mode 100644 tests/performance/uniq_without_key.xml diff --git a/src/AggregateFunctions/AggregateFunctionUniq.cpp b/src/AggregateFunctions/AggregateFunctionUniq.cpp index 0d1c831c839..1c90767131c 100644 --- a/src/AggregateFunctions/AggregateFunctionUniq.cpp +++ b/src/AggregateFunctions/AggregateFunctionUniq.cpp @@ -9,6 +9,7 @@ #include #include +#include namespace DB { @@ -28,8 +29,9 @@ namespace /** `DataForVariadic` is a data structure that will be used for `uniq` aggregate function of multiple arguments. * It differs, for example, in that it uses a trivial hash function, since `uniq` of many arguments first hashes them out itself. */ -template -AggregateFunctionPtr createAggregateFunctionUniq(const std::string & name, const DataTypes & argument_types, const Array & params, const Settings *) +template typename DataForVariadic> +AggregateFunctionPtr +createAggregateFunctionUniq(const std::string & name, const DataTypes & argument_types, const Array & params, const Settings *) { assertNoParameters(name, params); @@ -61,21 +63,22 @@ AggregateFunctionPtr createAggregateFunctionUniq(const std::string & name, const else if (which.isTuple()) { if (use_exact_hash_function) - return std::make_shared>(argument_types); + return std::make_shared>>(argument_types); else - return std::make_shared>(argument_types); + return std::make_shared>>(argument_types); } } /// "Variadic" method also works as a fallback generic case for single argument. if (use_exact_hash_function) - return std::make_shared>(argument_types); + return std::make_shared>>(argument_types); else - return std::make_shared>(argument_types); + return std::make_shared>>(argument_types); } -template class Data, typename DataForVariadic> -AggregateFunctionPtr createAggregateFunctionUniq(const std::string & name, const DataTypes & argument_types, const Array & params, const Settings *) +template typename Data, template typename DataForVariadic, bool is_able_to_parallelize_merge> +AggregateFunctionPtr +createAggregateFunctionUniq(const std::string & name, const DataTypes & argument_types, const Array & params, const Settings *) { assertNoParameters(name, params); @@ -91,35 +94,35 @@ AggregateFunctionPtr createAggregateFunctionUniq(const std::string & name, const { const IDataType & argument_type = *argument_types[0]; - AggregateFunctionPtr res(createWithNumericType(*argument_types[0], argument_types)); + AggregateFunctionPtr res(createWithNumericType(*argument_types[0], argument_types)); WhichDataType which(argument_type); if (res) return res; else if (which.isDate()) - return std::make_shared>>(argument_types); + return std::make_shared>>(argument_types); else if (which.isDate32()) - return std::make_shared>>(argument_types); + return std::make_shared>>(argument_types); else if (which.isDateTime()) - return std::make_shared>>(argument_types); + return std::make_shared>>(argument_types); else if (which.isStringOrFixedString()) - return std::make_shared>>(argument_types); + return std::make_shared>>(argument_types); else if (which.isUUID()) - return std::make_shared>>(argument_types); + return std::make_shared>>(argument_types); else if (which.isTuple()) { if (use_exact_hash_function) - return std::make_shared>(argument_types); + return std::make_shared>>(argument_types); else - return std::make_shared>(argument_types); + return std::make_shared>>(argument_types); } } /// "Variadic" method also works as a fallback generic case for single argument. if (use_exact_hash_function) - return std::make_shared>(argument_types); + return std::make_shared>>(argument_types); else - return std::make_shared>(argument_types); + return std::make_shared>>(argument_types); } } @@ -132,14 +135,23 @@ void registerAggregateFunctionsUniq(AggregateFunctionFactory & factory) {createAggregateFunctionUniq, properties}); factory.registerFunction("uniqHLL12", - {createAggregateFunctionUniq, properties}); + {createAggregateFunctionUniq, properties}); - factory.registerFunction("uniqExact", - {createAggregateFunctionUniq>, properties}); + auto assign_bool_param = [](const std::string & name, const DataTypes & argument_types, const Array & params, const Settings * settings) + { + /// Using two level hash set if we wouldn't be able to merge in parallel can cause ~10% slowdown. + if (settings && settings->max_threads > 1) + return createAggregateFunctionUniq< + true, AggregateFunctionUniqExactData, AggregateFunctionUniqExactDataForVariadic, true /* is_able_to_parallelize_merge */>(name, argument_types, params, settings); + else + return createAggregateFunctionUniq< + true, AggregateFunctionUniqExactData, AggregateFunctionUniqExactDataForVariadic, false /* is_able_to_parallelize_merge */>(name, argument_types, params, settings); + }; + factory.registerFunction("uniqExact", {assign_bool_param, properties}); #if USE_DATASKETCHES factory.registerFunction("uniqTheta", - {createAggregateFunctionUniq, properties}); + {createAggregateFunctionUniq, properties}); #endif } diff --git a/src/AggregateFunctions/AggregateFunctionUniq.h b/src/AggregateFunctions/AggregateFunctionUniq.h index fe2530800cc..1a98bfc8456 100644 --- a/src/AggregateFunctions/AggregateFunctionUniq.h +++ b/src/AggregateFunctions/AggregateFunctionUniq.h @@ -1,7 +1,10 @@ #pragma once -#include +#include +#include #include +#include +#include #include @@ -13,17 +16,18 @@ #include +#include #include #include #include -#include -#include #include +#include -#include #include #include +#include #include +#include namespace DB @@ -37,94 +41,128 @@ struct AggregateFunctionUniqUniquesHashSetData using Set = UniquesHashSet>; Set set; + constexpr static bool is_able_to_parallelize_merge = false; + constexpr static bool is_variadic = false; + static String getName() { return "uniq"; } }; /// For a function that takes multiple arguments. Such a function pre-hashes them in advance, so TrivialHash is used here. +template struct AggregateFunctionUniqUniquesHashSetDataForVariadic { using Set = UniquesHashSet; Set set; + constexpr static bool is_able_to_parallelize_merge = false; + constexpr static bool is_variadic = true; + constexpr static bool is_exact = is_exact_; + constexpr static bool argument_is_tuple = argument_is_tuple_; + static String getName() { return "uniq"; } }; /// uniqHLL12 -template +template struct AggregateFunctionUniqHLL12Data { using Set = HyperLogLogWithSmallSetOptimization; Set set; - static String getName() { return "uniqHLL12"; } -}; - -template <> -struct AggregateFunctionUniqHLL12Data -{ - using Set = HyperLogLogWithSmallSetOptimization; - Set set; + constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_; + constexpr static bool is_variadic = false; static String getName() { return "uniqHLL12"; } }; template <> -struct AggregateFunctionUniqHLL12Data +struct AggregateFunctionUniqHLL12Data { using Set = HyperLogLogWithSmallSetOptimization; Set set; + constexpr static bool is_able_to_parallelize_merge = false; + constexpr static bool is_variadic = false; + static String getName() { return "uniqHLL12"; } }; +template <> +struct AggregateFunctionUniqHLL12Data +{ + using Set = HyperLogLogWithSmallSetOptimization; + Set set; + + constexpr static bool is_able_to_parallelize_merge = false; + constexpr static bool is_variadic = false; + + static String getName() { return "uniqHLL12"; } +}; + +template struct AggregateFunctionUniqHLL12DataForVariadic { using Set = HyperLogLogWithSmallSetOptimization; Set set; + constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_; + constexpr static bool is_variadic = true; + constexpr static bool is_exact = is_exact_; + constexpr static bool argument_is_tuple = argument_is_tuple_; + static String getName() { return "uniqHLL12"; } }; /// uniqExact -template +template struct AggregateFunctionUniqExactData { using Key = T; /// When creating, the hash table must be small. - using Set = HashSet< - Key, - HashCRC32, - HashTableGrower<4>, - HashTableAllocatorWithStackMemory>; + using SingleLevelSet = HashSet, HashTableGrower<4>, HashTableAllocatorWithStackMemory>; + using TwoLevelSet = TwoLevelHashSet>; + using Set = UniqExactSet; Set set; + constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_; + constexpr static bool is_variadic = false; + static String getName() { return "uniqExact"; } }; /// For rows, we put the SipHash values (128 bits) into the hash table. -template <> -struct AggregateFunctionUniqExactData +template +struct AggregateFunctionUniqExactData { using Key = UInt128; /// When creating, the hash table must be small. - using Set = HashSet< - Key, - UInt128TrivialHash, - HashTableGrower<3>, - HashTableAllocatorWithStackMemory>; + using SingleLevelSet = HashSet, HashTableAllocatorWithStackMemory>; + using TwoLevelSet = TwoLevelHashSet; + using Set = UniqExactSet; Set set; + constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_; + constexpr static bool is_variadic = false; + static String getName() { return "uniqExact"; } }; +template +struct AggregateFunctionUniqExactDataForVariadic : AggregateFunctionUniqExactData +{ + constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_; + constexpr static bool is_variadic = true; + constexpr static bool is_exact = is_exact_; + constexpr static bool argument_is_tuple = argument_is_tuple_; +}; /// uniqTheta #if USE_DATASKETCHES @@ -134,14 +172,37 @@ struct AggregateFunctionUniqThetaData using Set = ThetaSketchData; Set set; + constexpr static bool is_able_to_parallelize_merge = false; + constexpr static bool is_variadic = false; + static String getName() { return "uniqTheta"; } }; +template +struct AggregateFunctionUniqThetaDataForVariadic : AggregateFunctionUniqThetaData +{ + constexpr static bool is_able_to_parallelize_merge = false; + constexpr static bool is_variadic = true; + constexpr static bool is_exact = is_exact_; + constexpr static bool argument_is_tuple = argument_is_tuple_; +}; + #endif namespace detail { +template +struct IsUniqExactSet : std::false_type +{ +}; + +template +struct IsUniqExactSet> : std::true_type +{ +}; + + /** Hash function for uniq. */ template struct AggregateFunctionUniqTraits @@ -162,17 +223,31 @@ template struct AggregateFunctionUniqTraits }; -/** The structure for the delegation work to add one element to the `uniq` aggregate functions. +/** The structure for the delegation work to add elements to the `uniq` aggregate functions. * Used for partial specialization to add strings. */ template -struct OneAdder +struct Adder { - static void ALWAYS_INLINE add(Data & data, const IColumn & column, size_t row_num) + /// We have to introduce this template parameter (and a bunch of ugly code dealing with it), because we cannot + /// add runtime branches in whatever_hash_set::insert - it will immediately pop up in the perf top. + template + static void ALWAYS_INLINE add(Data & data, const IColumn ** columns, size_t num_args, size_t row_num) { - if constexpr (std::is_same_v - || std::is_same_v>) + if constexpr (Data::is_variadic) { + if constexpr (IsUniqExactSet::value) + data.set.template insert( + UniqVariadicHash::apply(num_args, columns, row_num)); + else + data.set.insert(T{UniqVariadicHash::apply(num_args, columns, row_num)}); + } + else if constexpr ( + std::is_same_v< + Data, + AggregateFunctionUniqUniquesHashSetData> || std::is_same_v>) + { + const auto & column = *columns[0]; if constexpr (!std::is_same_v) { using ValueType = typename decltype(data.set)::value_type; @@ -185,11 +260,13 @@ struct OneAdder data.set.insert(CityHash_v1_0_2::CityHash64(value.data, value.size)); } } - else if constexpr (std::is_same_v>) + else if constexpr (std::is_same_v>) { + const auto & column = *columns[0]; if constexpr (!std::is_same_v) { - data.set.insert(assert_cast &>(column).getData()[row_num]); + data.set.template insert( + assert_cast &>(column).getData()[row_num]); } else { @@ -200,16 +277,72 @@ struct OneAdder hash.update(value.data, value.size); hash.get128(key); - data.set.insert(key); + data.set.template insert(key); } } #if USE_DATASKETCHES else if constexpr (std::is_same_v) { + const auto & column = *columns[0]; data.set.insertOriginal(column.getDataAt(row_num)); } #endif } + + static void ALWAYS_INLINE + add(Data & data, const IColumn ** columns, size_t num_args, size_t row_begin, size_t row_end, const char8_t * flags, const UInt8 * null_map) + { + bool use_single_level_hash_table = true; + if constexpr (Data::is_able_to_parallelize_merge) + use_single_level_hash_table = data.set.isSingleLevel(); + + if (use_single_level_hash_table) + addImpl(data, columns, num_args, row_begin, row_end, flags, null_map); + else + addImpl(data, columns, num_args, row_begin, row_end, flags, null_map); + + if constexpr (Data::is_able_to_parallelize_merge) + { + if (data.set.isSingleLevel() && data.set.size() > 100'000) + data.set.convertToTwoLevel(); + } + } + +private: + template + static void ALWAYS_INLINE + addImpl(Data & data, const IColumn ** columns, size_t num_args, size_t row_begin, size_t row_end, const char8_t * flags, const UInt8 * null_map) + { + if (!flags) + { + if (!null_map) + { + for (size_t row = row_begin; row < row_end; ++row) + add(data, columns, num_args, row); + } + else + { + for (size_t row = row_begin; row < row_end; ++row) + if (!null_map[row]) + add(data, columns, num_args, row); + } + } + else + { + if (!null_map) + { + for (size_t row = row_begin; row < row_end; ++row) + if (flags[row]) + add(data, columns, num_args, row); + } + else + { + for (size_t row = row_begin; row < row_end; ++row) + if (!null_map[row] && flags[row]) + add(data, columns, num_args, row); + } + } + } }; } @@ -219,9 +352,15 @@ struct OneAdder template class AggregateFunctionUniq final : public IAggregateFunctionDataHelper> { +private: + static constexpr size_t num_args = 1; + static constexpr bool is_able_to_parallelize_merge = Data::is_able_to_parallelize_merge; + public: - AggregateFunctionUniq(const DataTypes & argument_types_) - : IAggregateFunctionDataHelper>(argument_types_, {}) {} + explicit AggregateFunctionUniq(const DataTypes & argument_types_) + : IAggregateFunctionDataHelper>(argument_types_, {}) + { + } String getName() const override { return Data::getName(); } @@ -235,7 +374,18 @@ public: /// ALWAYS_INLINE is required to have better code layout for uniqHLL12 function void ALWAYS_INLINE add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override { - detail::OneAdder::add(this->data(place), *columns[0], row_num); + detail::Adder::add(this->data(place), columns, num_args, row_num); + } + + void ALWAYS_INLINE addBatchSinglePlace( + size_t row_begin, size_t row_end, AggregateDataPtr __restrict place, const IColumn ** columns, Arena *, ssize_t if_argument_pos) + const override + { + const char8_t * flags = nullptr; + if (if_argument_pos >= 0) + flags = assert_cast(*columns[if_argument_pos]).getData().data(); + + detail::Adder::add(this->data(place), columns, num_args, row_begin, row_end, flags, nullptr /* null_map */); } void addManyDefaults( @@ -244,7 +394,23 @@ public: size_t /*length*/, Arena * /*arena*/) const override { - detail::OneAdder::add(this->data(place), *columns[0], 0); + detail::Adder::add(this->data(place), columns, num_args, 0); + } + + void addBatchSinglePlaceNotNull( + size_t row_begin, + size_t row_end, + AggregateDataPtr __restrict place, + const IColumn ** columns, + const UInt8 * null_map, + Arena *, + ssize_t if_argument_pos) const override + { + const char8_t * flags = nullptr; + if (if_argument_pos >= 0) + flags = assert_cast(*columns[if_argument_pos]).getData().data(); + + detail::Adder::add(this->data(place), columns, num_args, row_begin, row_end, flags, null_map); } void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override @@ -252,6 +418,16 @@ public: this->data(place).set.merge(this->data(rhs).set); } + bool isAbleToParallelizeMerge() const override { return is_able_to_parallelize_merge; } + + void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena *) const override + { + if constexpr (is_able_to_parallelize_merge) + this->data(place).set.merge(this->data(rhs).set, &thread_pool); + else + this->data(place).set.merge(this->data(rhs).set); + } + void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional /* version */) const override { this->data(place).set.write(buf); @@ -273,15 +449,20 @@ public: * You can pass multiple arguments as is; You can also pass one argument - a tuple. * But (for the possibility of efficient implementation), you can not pass several arguments, among which there are tuples. */ -template -class AggregateFunctionUniqVariadic final : public IAggregateFunctionDataHelper> +template +class AggregateFunctionUniqVariadic final : public IAggregateFunctionDataHelper> { private: + using T = typename Data::Set::value_type; + + static constexpr size_t is_able_to_parallelize_merge = Data::is_able_to_parallelize_merge; + static constexpr size_t argument_is_tuple = Data::argument_is_tuple; + size_t num_args = 0; public: - AggregateFunctionUniqVariadic(const DataTypes & arguments) - : IAggregateFunctionDataHelper>(arguments, {}) + explicit AggregateFunctionUniqVariadic(const DataTypes & arguments) + : IAggregateFunctionDataHelper>(arguments, {}) { if (argument_is_tuple) num_args = typeid_cast(*arguments[0]).getElements().size(); @@ -300,8 +481,34 @@ public: void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override { - this->data(place).set.insert(typename Data::Set::value_type( - UniqVariadicHash::apply(num_args, columns, row_num))); + detail::Adder::add(this->data(place), columns, num_args, row_num); + } + + void addBatchSinglePlace( + size_t row_begin, size_t row_end, AggregateDataPtr __restrict place, const IColumn ** columns, Arena *, ssize_t if_argument_pos) + const override + { + const char8_t * flags = nullptr; + if (if_argument_pos >= 0) + flags = assert_cast(*columns[if_argument_pos]).getData().data(); + + detail::Adder::add(this->data(place), columns, num_args, row_begin, row_end, flags, nullptr /* null_map */); + } + + void addBatchSinglePlaceNotNull( + size_t row_begin, + size_t row_end, + AggregateDataPtr __restrict place, + const IColumn ** columns, + const UInt8 * null_map, + Arena *, + ssize_t if_argument_pos) const override + { + const char8_t * flags = nullptr; + if (if_argument_pos >= 0) + flags = assert_cast(*columns[if_argument_pos]).getData().data(); + + detail::Adder::add(this->data(place), columns, num_args, row_begin, row_end, flags, null_map); } void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override @@ -309,6 +516,16 @@ public: this->data(place).set.merge(this->data(rhs).set); } + bool isAbleToParallelizeMerge() const override { return is_able_to_parallelize_merge; } + + void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena *) const override + { + if constexpr (is_able_to_parallelize_merge) + this->data(place).set.merge(this->data(rhs).set, &thread_pool); + else + this->data(place).set.merge(this->data(rhs).set); + } + void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional /* version */) const override { this->data(place).set.write(buf); diff --git a/src/AggregateFunctions/Helpers.h b/src/AggregateFunctions/Helpers.h index 6e140f4b9cf..c97733571a3 100644 --- a/src/AggregateFunctions/Helpers.h +++ b/src/AggregateFunctions/Helpers.h @@ -74,6 +74,19 @@ static IAggregateFunction * createWithNumericType(const IDataType & argument_typ return nullptr; } +template