Improve hash table preallocation optimisation (#43945)

* do not preallocate if max_size_to_preallocate_for_aggregation is too small

* skip optimisation for aggr without key

* increase default for max_size_to_preallocate_for_aggregation
This commit is contained in:
Nikita Taranov 2022-12-08 00:05:15 +01:00 committed by GitHub
parent db5ac17167
commit 9e2265a6ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 30 additions and 12 deletions

View File

@ -546,7 +546,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
\
M(Bool, collect_hash_table_stats_during_aggregation, true, "Enable collecting hash table statistics to optimize memory allocation", 0) \
M(UInt64, max_entries_for_hash_table_stats, 10'000, "How many entries hash table statistics collected during aggregation is allowed to have", 0) \
M(UInt64, max_size_to_preallocate_for_aggregation, 10'000'000, "For how many elements it is allowed to preallocate space in all hash tables in total before aggregation", 0) \
M(UInt64, max_size_to_preallocate_for_aggregation, 100'000'000, "For how many elements it is allowed to preallocate space in all hash tables in total before aggregation", 0) \
\
M(Bool, kafka_disable_num_consumers_limit, false, "Disable limit on kafka_num_consumers that depends on the number of available CPU cores", 0) \
M(Bool, enable_software_prefetch_in_aggregation, true, "Enable use of software prefetch in aggregation", 0) \

View File

@ -221,16 +221,31 @@ void initDataVariantsWithSizeHint(
const auto max_threads = params.group_by_two_level_threshold != 0 ? std::max(params.max_threads, 1ul) : 1;
const auto lower_limit = hint->sum_of_sizes / max_threads;
const auto upper_limit = stats_collecting_params.max_size_to_preallocate_for_aggregation / max_threads;
const auto adjusted = std::min(std::max(lower_limit, hint->median_size), upper_limit);
if (worthConvertToTwoLevel(
params.group_by_two_level_threshold,
hint->sum_of_sizes,
/*group_by_two_level_threshold_bytes*/ 0,
/*result_size_bytes*/ 0))
method_chosen = convertToTwoLevelTypeIfPossible(method_chosen);
result.init(method_chosen, adjusted);
ProfileEvents::increment(ProfileEvents::AggregationHashTablesInitializedAsTwoLevel, result.isTwoLevel());
return;
if (hint->median_size > upper_limit)
{
/// Since we cannot afford to preallocate as much as we want, we will likely need to do resize anyway.
/// But we will also work with the big (i.e. not so cache friendly) HT from the beginning which may result in a slight slowdown.
/// So let's just do nothing.
LOG_TRACE(
&Poco::Logger::get("Aggregator"),
"No space were preallocated in hash tables because 'max_size_to_preallocate_for_aggregation' has too small value: {}, "
"should be at least {}",
stats_collecting_params.max_size_to_preallocate_for_aggregation,
hint->median_size * max_threads);
}
else
{
const auto adjusted = std::max(lower_limit, hint->median_size);
if (worthConvertToTwoLevel(
params.group_by_two_level_threshold,
hint->sum_of_sizes,
/*group_by_two_level_threshold_bytes*/ 0,
/*result_size_bytes*/ 0))
method_chosen = convertToTwoLevelTypeIfPossible(method_chosen);
result.init(method_chosen, adjusted);
ProfileEvents::increment(ProfileEvents::AggregationHashTablesInitializedAsTwoLevel, result.isTwoLevel());
return;
}
}
}
result.init(method_chosen);
@ -930,7 +945,10 @@ void Aggregator::executeOnBlockSmall(
/// How to perform the aggregation?
if (result.empty())
{
initDataVariantsWithSizeHint(result, method_chosen, params);
if (method_chosen != AggregatedDataVariants::Type::without_key)
initDataVariantsWithSizeHint(result, method_chosen, params);
else
result.init(method_chosen);
result.keys_size = params.keys_size;
result.key_sizes = key_sizes;
}