Merge pull request #68441 from jiebinn/parallelMergeWithKey

Add thread pool and cancellation to support parallel merge with key
This commit is contained in:
Nikita Taranov 2024-09-09 10:41:45 +00:00 committed by GitHub
commit 57e3812087
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 36 additions and 15 deletions

View File

@ -459,6 +459,8 @@ public:
bool isParallelizeMergePrepareNeeded() const override { return is_parallelize_merge_prepare_needed; }
constexpr static bool parallelizeMergeWithKey() { return true; }
void parallelizeMergePrepare(AggregateDataPtrs & places, ThreadPool & thread_pool, std::atomic<bool> & is_cancelled) const override
{
if constexpr (is_parallelize_merge_prepare_needed)

View File

@ -145,6 +145,8 @@ public:
virtual bool isParallelizeMergePrepareNeeded() const { return false; }
constexpr static bool parallelizeMergeWithKey() { return false; }
virtual void parallelizeMergePrepare(AggregateDataPtrs & /*places*/, ThreadPool & /*thread_pool*/, std::atomic<bool> & /*is_cancelled*/) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "parallelizeMergePrepare() with thread pool parameter isn't implemented for {} ", getName());
@ -169,7 +171,7 @@ public:
/// Merges states (on which src places points to) with other states (on which dst places points to) of current aggregation function
/// then destroy states (on which src places points to).
virtual void mergeAndDestroyBatch(AggregateDataPtr * dst_places, AggregateDataPtr * src_places, size_t size, size_t offset, Arena * arena) const = 0;
virtual void mergeAndDestroyBatch(AggregateDataPtr * dst_places, AggregateDataPtr * src_places, size_t size, size_t offset, ThreadPool & thread_pool, std::atomic<bool> & is_cancelled, Arena * arena) const = 0;
/// Serializes state (to transmit it over the network, for example).
virtual void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> version = std::nullopt) const = 0; /// NOLINT
@ -499,11 +501,15 @@ public:
static_cast<const Derived *>(this)->merge(places[i] + place_offset, rhs[i], arena);
}
void mergeAndDestroyBatch(AggregateDataPtr * dst_places, AggregateDataPtr * rhs_places, size_t size, size_t offset, Arena * arena) const override
void mergeAndDestroyBatch(AggregateDataPtr * dst_places, AggregateDataPtr * rhs_places, size_t size, size_t offset, ThreadPool & thread_pool, std::atomic<bool> & is_cancelled, Arena * arena) const override
{
for (size_t i = 0; i < size; ++i)
{
static_cast<const Derived *>(this)->merge(dst_places[i] + offset, rhs_places[i] + offset, arena);
if constexpr (Derived::parallelizeMergeWithKey())
static_cast<const Derived *>(this)->merge(dst_places[i] + offset, rhs_places[i] + offset, thread_pool, is_cancelled, arena);
else
static_cast<const Derived *>(this)->merge(dst_places[i] + offset, rhs_places[i] + offset, arena);
static_cast<const Derived *>(this)->destroy(rhs_places[i] + offset);
}
}

View File

@ -101,6 +101,13 @@ public:
auto merge(const UniqExactSet & other, ThreadPool * thread_pool = nullptr, std::atomic<bool> * is_cancelled = nullptr)
{
/// If the size is large, we may convert the singleLevelHash to twoLevelHash and merge in parallel.
if (other.size() > 40000)
{
if (isSingleLevel())
convertToTwoLevel();
}
if (isSingleLevel() && other.isTwoLevel())
convertToTwoLevel();

View File

@ -2371,7 +2371,7 @@ void NO_INLINE Aggregator::mergeDataNullKey(
template <typename Method, typename Table>
void NO_INLINE Aggregator::mergeDataImpl(
Table & table_dst, Table & table_src, Arena * arena, bool use_compiled_functions [[maybe_unused]], bool prefetch) const
Table & table_dst, Table & table_src, Arena * arena, bool use_compiled_functions [[maybe_unused]], bool prefetch, ThreadPool & thread_pool, std::atomic<bool> & is_cancelled) const
{
if constexpr (Method::low_cardinality_optimization || Method::one_key_nullable_optimization)
mergeDataNullKey<Method, Table>(table_dst, table_src, arena);
@ -2410,7 +2410,7 @@ void NO_INLINE Aggregator::mergeDataImpl(
{
if (!is_aggregate_function_compiled[i])
aggregate_functions[i]->mergeAndDestroyBatch(
dst_places.data(), src_places.data(), dst_places.size(), offsets_of_aggregate_states[i], arena);
dst_places.data(), src_places.data(), dst_places.size(), offsets_of_aggregate_states[i], thread_pool, is_cancelled, arena);
}
return;
@ -2420,7 +2420,7 @@ void NO_INLINE Aggregator::mergeDataImpl(
for (size_t i = 0; i < params.aggregates_size; ++i)
{
aggregate_functions[i]->mergeAndDestroyBatch(
dst_places.data(), src_places.data(), dst_places.size(), offsets_of_aggregate_states[i], arena);
dst_places.data(), src_places.data(), dst_places.size(), offsets_of_aggregate_states[i], thread_pool, is_cancelled, arena);
}
}
@ -2535,8 +2535,10 @@ void NO_INLINE Aggregator::mergeWithoutKeyDataImpl(
template <typename Method>
void NO_INLINE Aggregator::mergeSingleLevelDataImpl(
ManyAggregatedDataVariants & non_empty_data) const
ManyAggregatedDataVariants & non_empty_data, std::atomic<bool> & is_cancelled) const
{
ThreadPool thread_pool{CurrentMetrics::AggregatorThreads, CurrentMetrics::AggregatorThreadsActive, CurrentMetrics::AggregatorThreadsScheduled, params.max_threads};
AggregatedDataVariantsPtr & res = non_empty_data[0];
bool no_more_keys = false;
@ -2557,13 +2559,13 @@ void NO_INLINE Aggregator::mergeSingleLevelDataImpl(
if (compiled_aggregate_functions_holder)
{
mergeDataImpl<Method>(
getDataVariant<Method>(*res).data, getDataVariant<Method>(current).data, res->aggregates_pool, true, prefetch);
getDataVariant<Method>(*res).data, getDataVariant<Method>(current).data, res->aggregates_pool, true, prefetch, thread_pool, is_cancelled);
}
else
#endif
{
mergeDataImpl<Method>(
getDataVariant<Method>(*res).data, getDataVariant<Method>(current).data, res->aggregates_pool, false, prefetch);
getDataVariant<Method>(*res).data, getDataVariant<Method>(current).data, res->aggregates_pool, false, prefetch, thread_pool, is_cancelled);
}
}
else if (res->without_key)
@ -2589,7 +2591,7 @@ void NO_INLINE Aggregator::mergeSingleLevelDataImpl(
#define M(NAME) \
template void NO_INLINE Aggregator::mergeSingleLevelDataImpl<decltype(AggregatedDataVariants::NAME)::element_type>( \
ManyAggregatedDataVariants & non_empty_data) const;
ManyAggregatedDataVariants & non_empty_data, std::atomic<bool> & is_cancelled) const;
APPLY_FOR_VARIANTS_SINGLE_LEVEL(M)
#undef M
@ -2597,6 +2599,8 @@ template <typename Method>
void NO_INLINE Aggregator::mergeBucketImpl(
ManyAggregatedDataVariants & data, Int32 bucket, Arena * arena, std::atomic<bool> & is_cancelled) const
{
ThreadPool thread_pool{CurrentMetrics::AggregatorThreads, CurrentMetrics::AggregatorThreadsActive, CurrentMetrics::AggregatorThreadsScheduled, params.max_threads};
/// We merge all aggregation results to the first.
AggregatedDataVariantsPtr & res = data[0];
@ -2613,7 +2617,7 @@ void NO_INLINE Aggregator::mergeBucketImpl(
if (compiled_aggregate_functions_holder)
{
mergeDataImpl<Method>(
getDataVariant<Method>(*res).data.impls[bucket], getDataVariant<Method>(current).data.impls[bucket], arena, true, prefetch);
getDataVariant<Method>(*res).data.impls[bucket], getDataVariant<Method>(current).data.impls[bucket], arena, true, prefetch, thread_pool, is_cancelled);
}
else
#endif
@ -2623,7 +2627,9 @@ void NO_INLINE Aggregator::mergeBucketImpl(
getDataVariant<Method>(current).data.impls[bucket],
arena,
false,
prefetch);
prefetch,
thread_pool,
is_cancelled);
}
}
}

View File

@ -467,7 +467,7 @@ private:
/// Merge data from hash table `src` into `dst`.
template <typename Method, typename Table>
void mergeDataImpl(Table & table_dst, Table & table_src, Arena * arena, bool use_compiled_functions, bool prefetch) const;
void mergeDataImpl(Table & table_dst, Table & table_src, Arena * arena, bool use_compiled_functions, bool prefetch, ThreadPool & thread_pool, std::atomic<bool> & is_cancelled) const;
/// Merge data from hash table `src` into `dst`, but only for keys that already exist in dst. In other cases, merge the data into `overflows`.
template <typename Method, typename Table>
@ -490,7 +490,7 @@ private:
template <typename Method>
void mergeSingleLevelDataImpl(
ManyAggregatedDataVariants & non_empty_data) const;
ManyAggregatedDataVariants & non_empty_data, std::atomic<bool> & is_cancelled) const;
template <bool return_single_block>
using ConvertToBlockRes = std::conditional_t<return_single_block, Block, BlocksList>;

View File

@ -486,7 +486,7 @@ private:
#define M(NAME) \
else if (first->type == AggregatedDataVariants::Type::NAME) \
params->aggregator.mergeSingleLevelDataImpl<decltype(first->NAME)::element_type>(*data);
params->aggregator.mergeSingleLevelDataImpl<decltype(first->NAME)::element_type>(*data, shared_data->is_cancelled);
if (false) {} // NOLINT
APPLY_FOR_VARIANTS_SINGLE_LEVEL(M)
#undef M