From 78f3a575f9ddbfd47e46e8169b63979e3d2aa72f Mon Sep 17 00:00:00 2001 From: Jiebin Sun Date: Thu, 27 Jul 2023 21:06:34 +0800 Subject: [PATCH] Convert hashSets in parallel before merge (#50748) * Convert hashSets in parallel before merge Before merge, if one of the lhs and rhs is singleLevelSet and the other is twoLevelSet, then the SingleLevelSet will call convertToTwoLevel(). The convert process is not in parallel and it will cost lots of cycle if it cosume all the singleLevelSet. The idea of the patch is to convert all the singleLevelSets to twoLevelSets in parallel if the hashsets are not all singleLevel or not all twoLevel. I have tested the patch on Intel 2 x 112 vCPUs SPR server with clickbench and latest upstream ClickHouse. Q5 has got a big 264% performance improvement and 24 queries have got at least 5% performance gain. The overall geomean of 43 queries has gained 7.4% more than the base code. Signed-off-by: Jiebin Sun * add resize() for the data_vec in parallelizeMergePrepare() Signed-off-by: Jiebin Sun * Add the performance test prepare_hash_before_merge.xml Signed-off-by: Jiebin Sun * Fit the CI to rename the data set from hits_v1 to test.hits. Signed-off-by: Jiebin Sun * remove the redundant branch in UniqExactSet Co-authored-by: Nikita Taranov * Remove the empty methods and add throw exception in parallelizeMergePrepare() Signed-off-by: Jiebin Sun --------- Signed-off-by: Jiebin Sun Co-authored-by: Nikita Taranov --- .../AggregateFunctionUniq.h | 39 ++++++++++++++ src/AggregateFunctions/IAggregateFunction.h | 8 +++ src/AggregateFunctions/UniqExactSet.h | 51 +++++++++++++++++++ src/Interpreters/Aggregator.cpp | 14 +++++ .../performance/prepare_hash_before_merge.xml | 4 ++ 5 files changed, 116 insertions(+) create mode 100644 tests/performance/prepare_hash_before_merge.xml diff --git a/src/AggregateFunctions/AggregateFunctionUniq.h b/src/AggregateFunctions/AggregateFunctionUniq.h index de68e9076a0..2810051a82f 100644 --- a/src/AggregateFunctions/AggregateFunctionUniq.h +++ b/src/AggregateFunctions/AggregateFunctionUniq.h @@ -29,6 +29,10 @@ #include #include +namespace ErrorCodes +{ + extern const int NOT_IMPLEMENTED; +} namespace DB { @@ -42,6 +46,7 @@ struct AggregateFunctionUniqUniquesHashSetData Set set; constexpr static bool is_able_to_parallelize_merge = false; + constexpr static bool is_parallelize_merge_prepare_needed = false; constexpr static bool is_variadic = false; static String getName() { return "uniq"; } @@ -55,6 +60,7 @@ struct AggregateFunctionUniqUniquesHashSetDataForVariadic Set set; constexpr static bool is_able_to_parallelize_merge = false; + constexpr static bool is_parallelize_merge_prepare_needed = false; constexpr static bool is_variadic = true; constexpr static bool is_exact = is_exact_; constexpr static bool argument_is_tuple = argument_is_tuple_; @@ -72,6 +78,7 @@ struct AggregateFunctionUniqHLL12Data Set set; constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_; + constexpr static bool is_parallelize_merge_prepare_needed = false; constexpr static bool is_variadic = false; static String getName() { return "uniqHLL12"; } @@ -84,6 +91,7 @@ struct AggregateFunctionUniqHLL12Data Set set; constexpr static bool is_able_to_parallelize_merge = false; + constexpr static bool is_parallelize_merge_prepare_needed = false; constexpr static bool is_variadic = false; static String getName() { return "uniqHLL12"; } @@ -96,6 +104,7 @@ struct AggregateFunctionUniqHLL12Data Set set; constexpr static bool is_able_to_parallelize_merge = false; + constexpr static bool is_parallelize_merge_prepare_needed = false; constexpr static bool is_variadic = false; static String getName() { return "uniqHLL12"; } @@ -108,6 +117,7 @@ struct AggregateFunctionUniqHLL12Data Set set; constexpr static bool is_able_to_parallelize_merge = false; + constexpr static bool is_parallelize_merge_prepare_needed = false; constexpr static bool is_variadic = false; static String getName() { return "uniqHLL12"; } @@ -120,6 +130,7 @@ struct AggregateFunctionUniqHLL12DataForVariadic Set set; constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_; + constexpr static bool is_parallelize_merge_prepare_needed = false; constexpr static bool is_variadic = true; constexpr static bool is_exact = is_exact_; constexpr static bool argument_is_tuple = argument_is_tuple_; @@ -143,6 +154,7 @@ struct AggregateFunctionUniqExactData Set set; constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_; + constexpr static bool is_parallelize_merge_prepare_needed = true; constexpr static bool is_variadic = false; static String getName() { return "uniqExact"; } @@ -162,6 +174,7 @@ struct AggregateFunctionUniqExactData Set set; constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_; + constexpr static bool is_parallelize_merge_prepare_needed = true; constexpr static bool is_variadic = false; static String getName() { return "uniqExact"; } @@ -181,6 +194,7 @@ struct AggregateFunctionUniqExactData Set set; constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_; + constexpr static bool is_parallelize_merge_prepare_needed = true; constexpr static bool is_variadic = false; static String getName() { return "uniqExact"; } @@ -190,6 +204,7 @@ template { constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_; + constexpr static bool is_parallelize_merge_prepare_needed = true; constexpr static bool is_variadic = true; constexpr static bool is_exact = is_exact_; constexpr static bool argument_is_tuple = argument_is_tuple_; @@ -204,6 +219,7 @@ struct AggregateFunctionUniqThetaData Set set; constexpr static bool is_able_to_parallelize_merge = false; + constexpr static bool is_parallelize_merge_prepare_needed = false; constexpr static bool is_variadic = false; static String getName() { return "uniqTheta"; } @@ -213,6 +229,7 @@ template struct AggregateFunctionUniqThetaDataForVariadic : AggregateFunctionUniqThetaData { constexpr static bool is_able_to_parallelize_merge = false; + constexpr static bool is_parallelize_merge_prepare_needed = false; constexpr static bool is_variadic = true; constexpr static bool is_exact = is_exact_; constexpr static bool argument_is_tuple = argument_is_tuple_; @@ -384,8 +401,10 @@ template class AggregateFunctionUniq final : public IAggregateFunctionDataHelper> { private: + using DataSet = typename Data::Set; static constexpr size_t num_args = 1; static constexpr bool is_able_to_parallelize_merge = Data::is_able_to_parallelize_merge; + static constexpr bool is_parallelize_merge_prepare_needed = Data::is_parallelize_merge_prepare_needed; public: explicit AggregateFunctionUniq(const DataTypes & argument_types_) @@ -439,6 +458,26 @@ public: detail::Adder::add(this->data(place), columns, num_args, row_begin, row_end, flags, null_map); } + bool isParallelizeMergePrepareNeeded() const override { return is_parallelize_merge_prepare_needed;} + + void parallelizeMergePrepare(AggregateDataPtrs & places, ThreadPool & thread_pool) const override + { + if constexpr (is_parallelize_merge_prepare_needed) + { + std::vector data_vec; + data_vec.resize(places.size()); + + for (unsigned long i = 0; i < data_vec.size(); i++) + data_vec[i] = &this->data(places[i]).set; + + DataSet::parallelizeMergePrepare(data_vec, thread_pool); + } + else + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "parallelizeMergePrepare() is only implemented when is_parallelize_merge_prepare_needed is true for {} ", getName()); + } + } + void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override { this->data(place).set.merge(this->data(rhs).set); diff --git a/src/AggregateFunctions/IAggregateFunction.h b/src/AggregateFunctions/IAggregateFunction.h index df08b6f2109..b460a66ea22 100644 --- a/src/AggregateFunctions/IAggregateFunction.h +++ b/src/AggregateFunctions/IAggregateFunction.h @@ -47,6 +47,7 @@ using DataTypePtr = std::shared_ptr; using DataTypes = std::vector; using AggregateDataPtr = char *; +using AggregateDataPtrs = std::vector; using ConstAggregateDataPtr = const char *; class IAggregateFunction; @@ -148,6 +149,13 @@ public: /// Default values must be a the 0-th positions in columns. virtual void addManyDefaults(AggregateDataPtr __restrict place, const IColumn ** columns, size_t length, Arena * arena) const = 0; + virtual bool isParallelizeMergePrepareNeeded() const { return false; } + + virtual void parallelizeMergePrepare(AggregateDataPtrs & /*places*/, ThreadPool & /*thread_pool*/) const + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "parallelizeMergePrepare() with thread pool parameter isn't implemented for {} ", getName()); + } + /// Merges state (on which place points to) with other state of current aggregation function. virtual void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const = 0; diff --git a/src/AggregateFunctions/UniqExactSet.h b/src/AggregateFunctions/UniqExactSet.h index 90cfe700179..0d99b29686f 100644 --- a/src/AggregateFunctions/UniqExactSet.h +++ b/src/AggregateFunctions/UniqExactSet.h @@ -28,6 +28,57 @@ public: asTwoLevel().insert(std::forward(arg)); } + /// In merge, if one of the lhs and rhs is twolevelset and the other is singlelevelset, then the singlelevelset will need to convertToTwoLevel(). + /// It's not in parallel and will cost extra large time if the thread_num is large. + /// This method will convert all the SingleLevelSet to TwoLevelSet in parallel if the hashsets are not all singlelevel or not all twolevel. + static void parallelizeMergePrepare(const std::vector & data_vec, ThreadPool & thread_pool) + { + unsigned long single_level_set_num = 0; + + for (auto ele : data_vec) + { + if (ele->isSingleLevel()) + single_level_set_num ++; + } + + if (single_level_set_num > 0 && single_level_set_num < data_vec.size()) + { + try + { + auto data_vec_atomic_index = std::make_shared(0); + auto thread_func = [data_vec, data_vec_atomic_index, thread_group = CurrentThread::getGroup()]() + { + SCOPE_EXIT_SAFE( + if (thread_group) + CurrentThread::detachFromGroupIfNotDetached(); + ); + if (thread_group) + CurrentThread::attachToGroupIfDetached(thread_group); + + setThreadName("UniqExaConvert"); + + while (true) + { + const auto i = data_vec_atomic_index->fetch_add(1); + if (i >= data_vec.size()) + return; + if (data_vec[i]->isSingleLevel()) + data_vec[i]->convertToTwoLevel(); + } + }; + for (size_t i = 0; i < std::min(thread_pool.getMaxThreads(), single_level_set_num); ++i) + thread_pool.scheduleOrThrowOnError(thread_func); + + thread_pool.wait(); + } + catch (...) + { + thread_pool.wait(); + throw; + } + } + } + auto merge(const UniqExactSet & other, ThreadPool * thread_pool = nullptr) { if (isSingleLevel() && other.isTwoLevel()) diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index 36cd32910b5..c2914c938b5 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -2603,6 +2603,20 @@ void NO_INLINE Aggregator::mergeWithoutKeyDataImpl( AggregatedDataVariantsPtr & res = non_empty_data[0]; + for (size_t i = 0; i < params.aggregates_size; ++i) + { + if (aggregate_functions[i]->isParallelizeMergePrepareNeeded()) + { + size_t size = non_empty_data.size(); + std::vector data_vec; + + for (size_t result_num = 0; result_num < size; ++result_num) + data_vec.emplace_back(non_empty_data[result_num]->without_key + offsets_of_aggregate_states[i]); + + aggregate_functions[i]->parallelizeMergePrepare(data_vec, thread_pool); + } + } + /// We merge all aggregation results to the first. for (size_t result_num = 1, size = non_empty_data.size(); result_num < size; ++result_num) { diff --git a/tests/performance/prepare_hash_before_merge.xml b/tests/performance/prepare_hash_before_merge.xml new file mode 100644 index 00000000000..e99f762927f --- /dev/null +++ b/tests/performance/prepare_hash_before_merge.xml @@ -0,0 +1,4 @@ + + SELECT COUNT(DISTINCT Title) FROM test.hits SETTINGS max_threads = 24 + SELECT COUNT(DISTINCT Referer) FROM test.hits SETTINGS max_threads = 22 +