#include #include #include #include #ifdef OS_LINUX # include #endif #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace ProfileEvents { extern const Event ExternalAggregationWritePart; extern const Event ExternalAggregationCompressedBytes; extern const Event ExternalAggregationUncompressedBytes; extern const Event ExternalProcessingCompressedBytesTotal; extern const Event ExternalProcessingUncompressedBytesTotal; extern const Event AggregationPreallocatedElementsInHashTables; extern const Event AggregationHashTablesInitializedAsTwoLevel; extern const Event OverflowThrow; extern const Event OverflowBreak; extern const Event OverflowAny; extern const Event AggregationOptimizedEqualRangesOfKeys; } namespace CurrentMetrics { extern const Metric TemporaryFilesForAggregation; extern const Metric AggregatorThreads; extern const Metric AggregatorThreadsActive; extern const Metric AggregatorThreadsScheduled; } namespace DB { namespace ErrorCodes { extern const int UNKNOWN_AGGREGATED_DATA_VARIANT; extern const int TOO_MANY_ROWS; extern const int EMPTY_DATA_PASSED; extern const int CANNOT_MERGE_DIFFERENT_AGGREGATED_DATA_VARIANTS; extern const int LOGICAL_ERROR; } } namespace { /** Collects observed HashMap-s sizes to avoid redundant intermediate resizes. */ class HashTablesStatistics { public: struct Entry { size_t sum_of_sizes; // used to determine if it's better to convert aggregation to two-level from the beginning size_t median_size; // roughly the size we're going to preallocate on each thread }; using Cache = DB::CacheBase; using CachePtr = std::shared_ptr; using Params = DB::Aggregator::Params::StatsCollectingParams; /// Collection and use of the statistics should be enabled. std::optional getSizeHint(const Params & params) { if (!params.isCollectionAndUseEnabled()) throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Collection and use of the statistics should be enabled."); std::lock_guard lock(mutex); const auto cache = getHashTableStatsCache(params, lock); if (const auto hint = cache->get(params.key)) { LOG_TRACE( getLogger("Aggregator"), "An entry for key={} found in cache: sum_of_sizes={}, median_size={}", params.key, hint->sum_of_sizes, hint->median_size); return *hint; } return std::nullopt; } /// Collection and use of the statistics should be enabled. void update(size_t sum_of_sizes, size_t median_size, const Params & params) { if (!params.isCollectionAndUseEnabled()) throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Collection and use of the statistics should be enabled."); std::lock_guard lock(mutex); const auto cache = getHashTableStatsCache(params, lock); const auto hint = cache->get(params.key); // We'll maintain the maximum among all the observed values until the next prediction turns out to be too wrong. if (!hint || sum_of_sizes < hint->sum_of_sizes / 2 || hint->sum_of_sizes < sum_of_sizes || median_size < hint->median_size / 2 || hint->median_size < median_size) { LOG_TRACE( getLogger("Aggregator"), "Statistics updated for key={}: new sum_of_sizes={}, median_size={}", params.key, sum_of_sizes, median_size); cache->set(params.key, std::make_shared(Entry{.sum_of_sizes = sum_of_sizes, .median_size = median_size})); } } std::optional getCacheStats() const { std::lock_guard lock(mutex); if (hash_table_stats) { size_t hits = 0, misses = 0; hash_table_stats->getStats(hits, misses); return DB::HashTablesCacheStatistics{.entries = hash_table_stats->count(), .hits = hits, .misses = misses}; } return std::nullopt; } static size_t calculateCacheKey(const DB::ASTPtr & select_query) { if (!select_query) throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Query ptr cannot be null"); const auto & select = select_query->as(); // It may happen in some corner cases like `select 1 as num group by num`. if (!select.tables()) return 0; SipHash hash; hash.update(select.tables()->getTreeHash(/*ignore_aliases=*/ true)); if (const auto where = select.where()) hash.update(where->getTreeHash(/*ignore_aliases=*/ true)); if (const auto group_by = select.groupBy()) hash.update(group_by->getTreeHash(/*ignore_aliases=*/ true)); return hash.get64(); } private: CachePtr getHashTableStatsCache(const Params & params, const std::lock_guard &) { if (!hash_table_stats || hash_table_stats->maxSizeInBytes() != params.max_entries_for_hash_table_stats) hash_table_stats = std::make_shared(params.max_entries_for_hash_table_stats); return hash_table_stats; } mutable std::mutex mutex; CachePtr hash_table_stats; }; HashTablesStatistics & getHashTablesStatistics() { static HashTablesStatistics hash_tables_stats; return hash_tables_stats; } bool worthConvertToTwoLevel( size_t group_by_two_level_threshold, size_t result_size, size_t group_by_two_level_threshold_bytes, auto result_size_bytes) { // params.group_by_two_level_threshold will be equal to 0 if we have only one thread to execute aggregation (refer to AggregatingStep::transformPipeline). return (group_by_two_level_threshold && result_size >= group_by_two_level_threshold) || (group_by_two_level_threshold_bytes && result_size_bytes >= static_cast(group_by_two_level_threshold_bytes)); } DB::AggregatedDataVariants::Type convertToTwoLevelTypeIfPossible(DB::AggregatedDataVariants::Type type) { using Type = DB::AggregatedDataVariants::Type; switch (type) { #define M(NAME) \ case Type::NAME: \ return Type::NAME##_two_level; APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M) #undef M default: return type; } UNREACHABLE(); } void initDataVariantsWithSizeHint( DB::AggregatedDataVariants & result, DB::AggregatedDataVariants::Type method_chosen, const DB::Aggregator::Params & params) { const auto & stats_collecting_params = params.stats_collecting_params; if (stats_collecting_params.isCollectionAndUseEnabled()) { if (auto hint = getHashTablesStatistics().getSizeHint(stats_collecting_params)) { const auto max_threads = params.group_by_two_level_threshold != 0 ? std::max(params.max_threads, 1ul) : 1; const auto lower_limit = hint->sum_of_sizes / max_threads; const auto upper_limit = stats_collecting_params.max_size_to_preallocate_for_aggregation / max_threads; if (hint->median_size > upper_limit) { /// Since we cannot afford to preallocate as much as we want, we will likely need to do resize anyway. /// But we will also work with the big (i.e. not so cache friendly) HT from the beginning which may result in a slight slowdown. /// So let's just do nothing. LOG_TRACE( getLogger("Aggregator"), "No space were preallocated in hash tables because 'max_size_to_preallocate_for_aggregation' has too small value: {}, " "should be at least {}", stats_collecting_params.max_size_to_preallocate_for_aggregation, hint->median_size * max_threads); } /// https://github.com/ClickHouse/ClickHouse/issues/44402#issuecomment-1359920703 else if ((max_threads > 1 && hint->sum_of_sizes > 100'000) || hint->sum_of_sizes > 500'000) { const auto adjusted = std::max(lower_limit, hint->median_size); if (worthConvertToTwoLevel( params.group_by_two_level_threshold, hint->sum_of_sizes, /*group_by_two_level_threshold_bytes*/ 0, /*result_size_bytes*/ 0)) method_chosen = convertToTwoLevelTypeIfPossible(method_chosen); result.init(method_chosen, adjusted); ProfileEvents::increment(ProfileEvents::AggregationHashTablesInitializedAsTwoLevel, result.isTwoLevel()); return; } } } result.init(method_chosen); } /// Collection and use of the statistics should be enabled. void updateStatistics(const DB::ManyAggregatedDataVariants & data_variants, const DB::Aggregator::Params::StatsCollectingParams & params) { if (!params.isCollectionAndUseEnabled()) throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Collection and use of the statistics should be enabled."); std::vector sizes(data_variants.size()); for (size_t i = 0; i < data_variants.size(); ++i) sizes[i] = data_variants[i]->size(); const auto median_size = sizes.begin() + sizes.size() / 2; // not precisely though... std::nth_element(sizes.begin(), median_size, sizes.end()); const auto sum_of_sizes = std::accumulate(sizes.begin(), sizes.end(), 0ull); getHashTablesStatistics().update(sum_of_sizes, *median_size, params); } // The std::is_constructible trait isn't suitable here because some classes have template constructors with semantics different from providing size hints. // Also string hash table variants are not supported due to the fact that both local perf tests and tests in CI showed slowdowns for them. template struct HasConstructorOfNumberOfElements : std::false_type { }; template struct HasConstructorOfNumberOfElements> : std::true_type { }; template typename ImplTable> struct HasConstructorOfNumberOfElements> : std::true_type { }; template struct HasConstructorOfNumberOfElements> : std::true_type { }; template struct HasConstructorOfNumberOfElements> : std::true_type { }; template