Convert hashSets in parallel before merge (#50748)

* Convert hashSets in parallel before merge

Before merge, if one of the lhs and rhs is singleLevelSet and the other is twoLevelSet,
then the SingleLevelSet will call convertToTwoLevel(). The convert process is not in parallel
and it will cost lots of cycle if it cosume all the singleLevelSet.

The idea of the patch is to convert all the singleLevelSets to twoLevelSets in parallel if
the hashsets are not all singleLevel or not all twoLevel.

I have tested the patch on Intel 2 x 112 vCPUs SPR server with clickbench and latest upstream
ClickHouse.
Q5 has got a big 264% performance improvement and 24 queries have got at least 5% performance
gain. The overall geomean of 43 queries has gained 7.4% more than the base code.

Signed-off-by: Jiebin Sun <jiebin.sun@intel.com>

* add resize() for the data_vec in parallelizeMergePrepare()

Signed-off-by: Jiebin Sun <jiebin.sun@intel.com>

* Add the performance test prepare_hash_before_merge.xml

Signed-off-by: Jiebin Sun <jiebin.sun@intel.com>

* Fit the CI to rename the data set from hits_v1 to test.hits.

Signed-off-by: Jiebin Sun <jiebin.sun@intel.com>

* remove the redundant branch in UniqExactSet

Co-authored-by: Nikita Taranov <nickita.taranov@gmail.com>

* Remove the empty methods and add throw exception in parallelizeMergePrepare()

Signed-off-by: Jiebin Sun <jiebin.sun@intel.com>

---------

Signed-off-by: Jiebin Sun <jiebin.sun@intel.com>
Co-authored-by: Nikita Taranov <nickita.taranov@gmail.com>
This commit is contained in:
Jiebin Sun 2023-07-27 21:06:34 +08:00 committed by GitHub
parent 33300a978e
commit 78f3a575f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 116 additions and 0 deletions

View File

@ -29,6 +29,10 @@
#include <AggregateFunctions/UniqVariadicHash.h>
#include <AggregateFunctions/UniquesHashSet.h>
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
namespace DB
{
@ -42,6 +46,7 @@ struct AggregateFunctionUniqUniquesHashSetData
Set set;
constexpr static bool is_able_to_parallelize_merge = false;
constexpr static bool is_parallelize_merge_prepare_needed = false;
constexpr static bool is_variadic = false;
static String getName() { return "uniq"; }
@ -55,6 +60,7 @@ struct AggregateFunctionUniqUniquesHashSetDataForVariadic
Set set;
constexpr static bool is_able_to_parallelize_merge = false;
constexpr static bool is_parallelize_merge_prepare_needed = false;
constexpr static bool is_variadic = true;
constexpr static bool is_exact = is_exact_;
constexpr static bool argument_is_tuple = argument_is_tuple_;
@ -72,6 +78,7 @@ struct AggregateFunctionUniqHLL12Data
Set set;
constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_;
constexpr static bool is_parallelize_merge_prepare_needed = false;
constexpr static bool is_variadic = false;
static String getName() { return "uniqHLL12"; }
@ -84,6 +91,7 @@ struct AggregateFunctionUniqHLL12Data<String, false>
Set set;
constexpr static bool is_able_to_parallelize_merge = false;
constexpr static bool is_parallelize_merge_prepare_needed = false;
constexpr static bool is_variadic = false;
static String getName() { return "uniqHLL12"; }
@ -96,6 +104,7 @@ struct AggregateFunctionUniqHLL12Data<UUID, false>
Set set;
constexpr static bool is_able_to_parallelize_merge = false;
constexpr static bool is_parallelize_merge_prepare_needed = false;
constexpr static bool is_variadic = false;
static String getName() { return "uniqHLL12"; }
@ -108,6 +117,7 @@ struct AggregateFunctionUniqHLL12Data<IPv6, false>
Set set;
constexpr static bool is_able_to_parallelize_merge = false;
constexpr static bool is_parallelize_merge_prepare_needed = false;
constexpr static bool is_variadic = false;
static String getName() { return "uniqHLL12"; }
@ -120,6 +130,7 @@ struct AggregateFunctionUniqHLL12DataForVariadic
Set set;
constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_;
constexpr static bool is_parallelize_merge_prepare_needed = false;
constexpr static bool is_variadic = true;
constexpr static bool is_exact = is_exact_;
constexpr static bool argument_is_tuple = argument_is_tuple_;
@ -143,6 +154,7 @@ struct AggregateFunctionUniqExactData
Set set;
constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_;
constexpr static bool is_parallelize_merge_prepare_needed = true;
constexpr static bool is_variadic = false;
static String getName() { return "uniqExact"; }
@ -162,6 +174,7 @@ struct AggregateFunctionUniqExactData<String, is_able_to_parallelize_merge_>
Set set;
constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_;
constexpr static bool is_parallelize_merge_prepare_needed = true;
constexpr static bool is_variadic = false;
static String getName() { return "uniqExact"; }
@ -181,6 +194,7 @@ struct AggregateFunctionUniqExactData<IPv6, is_able_to_parallelize_merge_>
Set set;
constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_;
constexpr static bool is_parallelize_merge_prepare_needed = true;
constexpr static bool is_variadic = false;
static String getName() { return "uniqExact"; }
@ -190,6 +204,7 @@ template <bool is_exact_, bool argument_is_tuple_, bool is_able_to_parallelize_m
struct AggregateFunctionUniqExactDataForVariadic : AggregateFunctionUniqExactData<String, is_able_to_parallelize_merge_>
{
constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_;
constexpr static bool is_parallelize_merge_prepare_needed = true;
constexpr static bool is_variadic = true;
constexpr static bool is_exact = is_exact_;
constexpr static bool argument_is_tuple = argument_is_tuple_;
@ -204,6 +219,7 @@ struct AggregateFunctionUniqThetaData
Set set;
constexpr static bool is_able_to_parallelize_merge = false;
constexpr static bool is_parallelize_merge_prepare_needed = false;
constexpr static bool is_variadic = false;
static String getName() { return "uniqTheta"; }
@ -213,6 +229,7 @@ template <bool is_exact_, bool argument_is_tuple_>
struct AggregateFunctionUniqThetaDataForVariadic : AggregateFunctionUniqThetaData
{
constexpr static bool is_able_to_parallelize_merge = false;
constexpr static bool is_parallelize_merge_prepare_needed = false;
constexpr static bool is_variadic = true;
constexpr static bool is_exact = is_exact_;
constexpr static bool argument_is_tuple = argument_is_tuple_;
@ -384,8 +401,10 @@ template <typename T, typename Data>
class AggregateFunctionUniq final : public IAggregateFunctionDataHelper<Data, AggregateFunctionUniq<T, Data>>
{
private:
using DataSet = typename Data::Set;
static constexpr size_t num_args = 1;
static constexpr bool is_able_to_parallelize_merge = Data::is_able_to_parallelize_merge;
static constexpr bool is_parallelize_merge_prepare_needed = Data::is_parallelize_merge_prepare_needed;
public:
explicit AggregateFunctionUniq(const DataTypes & argument_types_)
@ -439,6 +458,26 @@ public:
detail::Adder<T, Data>::add(this->data(place), columns, num_args, row_begin, row_end, flags, null_map);
}
bool isParallelizeMergePrepareNeeded() const override { return is_parallelize_merge_prepare_needed;}
void parallelizeMergePrepare(AggregateDataPtrs & places, ThreadPool & thread_pool) const override
{
if constexpr (is_parallelize_merge_prepare_needed)
{
std::vector<DataSet *> data_vec;
data_vec.resize(places.size());
for (unsigned long i = 0; i < data_vec.size(); i++)
data_vec[i] = &this->data(places[i]).set;
DataSet::parallelizeMergePrepare(data_vec, thread_pool);
}
else
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "parallelizeMergePrepare() is only implemented when is_parallelize_merge_prepare_needed is true for {} ", getName());
}
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
{
this->data(place).set.merge(this->data(rhs).set);

View File

@ -47,6 +47,7 @@ using DataTypePtr = std::shared_ptr<const IDataType>;
using DataTypes = std::vector<DataTypePtr>;
using AggregateDataPtr = char *;
using AggregateDataPtrs = std::vector<AggregateDataPtr>;
using ConstAggregateDataPtr = const char *;
class IAggregateFunction;
@ -148,6 +149,13 @@ public:
/// Default values must be a the 0-th positions in columns.
virtual void addManyDefaults(AggregateDataPtr __restrict place, const IColumn ** columns, size_t length, Arena * arena) const = 0;
virtual bool isParallelizeMergePrepareNeeded() const { return false; }
virtual void parallelizeMergePrepare(AggregateDataPtrs & /*places*/, ThreadPool & /*thread_pool*/) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "parallelizeMergePrepare() with thread pool parameter isn't implemented for {} ", getName());
}
/// Merges state (on which place points to) with other state of current aggregation function.
virtual void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const = 0;

View File

@ -28,6 +28,57 @@ public:
asTwoLevel().insert(std::forward<Arg>(arg));
}
/// In merge, if one of the lhs and rhs is twolevelset and the other is singlelevelset, then the singlelevelset will need to convertToTwoLevel().
/// It's not in parallel and will cost extra large time if the thread_num is large.
/// This method will convert all the SingleLevelSet to TwoLevelSet in parallel if the hashsets are not all singlelevel or not all twolevel.
static void parallelizeMergePrepare(const std::vector<UniqExactSet *> & data_vec, ThreadPool & thread_pool)
{
unsigned long single_level_set_num = 0;
for (auto ele : data_vec)
{
if (ele->isSingleLevel())
single_level_set_num ++;
}
if (single_level_set_num > 0 && single_level_set_num < data_vec.size())
{
try
{
auto data_vec_atomic_index = std::make_shared<std::atomic_uint32_t>(0);
auto thread_func = [data_vec, data_vec_atomic_index, thread_group = CurrentThread::getGroup()]()
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachFromGroupIfNotDetached();
);
if (thread_group)
CurrentThread::attachToGroupIfDetached(thread_group);
setThreadName("UniqExaConvert");
while (true)
{
const auto i = data_vec_atomic_index->fetch_add(1);
if (i >= data_vec.size())
return;
if (data_vec[i]->isSingleLevel())
data_vec[i]->convertToTwoLevel();
}
};
for (size_t i = 0; i < std::min<size_t>(thread_pool.getMaxThreads(), single_level_set_num); ++i)
thread_pool.scheduleOrThrowOnError(thread_func);
thread_pool.wait();
}
catch (...)
{
thread_pool.wait();
throw;
}
}
}
auto merge(const UniqExactSet & other, ThreadPool * thread_pool = nullptr)
{
if (isSingleLevel() && other.isTwoLevel())

View File

@ -2603,6 +2603,20 @@ void NO_INLINE Aggregator::mergeWithoutKeyDataImpl(
AggregatedDataVariantsPtr & res = non_empty_data[0];
for (size_t i = 0; i < params.aggregates_size; ++i)
{
if (aggregate_functions[i]->isParallelizeMergePrepareNeeded())
{
size_t size = non_empty_data.size();
std::vector<AggregateDataPtr> data_vec;
for (size_t result_num = 0; result_num < size; ++result_num)
data_vec.emplace_back(non_empty_data[result_num]->without_key + offsets_of_aggregate_states[i]);
aggregate_functions[i]->parallelizeMergePrepare(data_vec, thread_pool);
}
}
/// We merge all aggregation results to the first.
for (size_t result_num = 1, size = non_empty_data.size(); result_num < size; ++result_num)
{

View File

@ -0,0 +1,4 @@
<test>
<query>SELECT COUNT(DISTINCT Title) FROM test.hits SETTINGS max_threads = 24</query>
<query>SELECT COUNT(DISTINCT Referer) FROM test.hits SETTINGS max_threads = 22</query>
</test>