diff --git a/src/Common/ColumnsHashingImpl.h b/src/Common/ColumnsHashingImpl.h index c584f160581..7116160e94c 100644 --- a/src/Common/ColumnsHashingImpl.h +++ b/src/Common/ColumnsHashingImpl.h @@ -35,6 +35,12 @@ struct LastElementCacheStats { UInt64 hits = 0; UInt64 misses = 0; + + void update(size_t num_tries, size_t num_misses) + { + hits += num_tries - num_misses; + misses += num_misses; + } }; namespace columns_hashing_impl @@ -48,15 +54,15 @@ struct LastElementCache Value value; bool empty = true; bool found = false; - LastElementCacheStats stats; + UInt64 misses = 0; bool check(const Value & value_) const { return value == value_; } template bool check(const Key & key) const { return value.first == key; } - bool hasOnlyOneValue() const { return !empty && found && stats.misses == 0; } - LastElementCacheStats getStats() const { return stats; } + bool hasOnlyOneValue() const { return found && misses == 1; } + UInt64 getMisses() const { return misses; } }; template @@ -212,7 +218,7 @@ public: { cache.empty = true; cache.found = false; - cache.stats = {}; + cache.misses = 0; } } @@ -223,11 +229,11 @@ public: return false; } - ALWAYS_INLINE LastElementCacheStats getCacheStatsSinceLastReset() const + ALWAYS_INLINE UInt64 getCacheMissesSinceLastReset() const { if constexpr (consecutive_keys_optimization) - return cache.getStats(); - return {0, 0}; + return cache.getMisses(); + return 0; } ALWAYS_INLINE bool isNullAt(size_t row) const @@ -271,20 +277,12 @@ protected: { if constexpr (consecutive_keys_optimization) { - if (!cache.empty) + if (cache.found && cache.check(keyHolderGetKey(key_holder))) { - if (cache.found && cache.check(keyHolderGetKey(key_holder))) - { - ++cache.stats.hits; - if constexpr (has_mapped) - return EmplaceResult(cache.value.second, cache.value.second, false); - else - return EmplaceResult(false); - } + if constexpr (has_mapped) + return EmplaceResult(cache.value.second, cache.value.second, false); else - { - ++cache.stats.misses; - } + return EmplaceResult(false); } } @@ -308,6 +306,7 @@ protected: { cache.found = true; cache.empty = false; + ++cache.misses; if constexpr (has_mapped) { @@ -330,25 +329,17 @@ protected: template ALWAYS_INLINE FindResult findKeyImpl(Key key, Data & data) { - if constexpr (Cache::consecutive_keys_optimization) + if constexpr (consecutive_keys_optimization) { /// It's possible to support such combination, but code will became more complex. /// Now there's not place where we need this options enabled together static_assert(!FindResult::has_offset, "`consecutive_keys_optimization` and `has_offset` are conflicting options"); - if (!cache.empty) + if (likely(!cache.empty) && cache.check(key)) { - if (cache.check(key)) - { - ++cache.stats.hits; - if constexpr (has_mapped) - return FindResult(&cache.value.second, cache.found, 0); - else - return FindResult(cache.found, 0); - } + if constexpr (has_mapped) + return FindResult(&cache.value.second, cache.found, 0); else - { - ++cache.stats.misses; - } + return FindResult(cache.found, 0); } } @@ -358,6 +349,7 @@ protected: { cache.found = it != nullptr; cache.empty = false; + ++cache.misses; if constexpr (has_mapped) { diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index 49956054d18..415461b3963 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -1014,7 +1014,9 @@ void Aggregator::mergeOnBlockSmall( #define M(NAME, IS_TWO_LEVEL) \ else if (result.type == AggregatedDataVariants::Type::NAME) \ mergeStreamsImpl(result.aggregates_pool, *result.NAME, result.NAME->data, \ - result.without_key, /* no_more_keys= */ false, \ + result.without_key, \ + result.consecutive_keys_cache_stats, \ + /* no_more_keys= */ false, \ row_begin, row_end, \ aggregate_columns_data, key_columns, result.aggregates_pool); @@ -1067,10 +1069,7 @@ void NO_INLINE Aggregator::executeImpl( { typename Method::State state(key_columns, key_sizes, aggregation_state_cache); executeImpl(method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, no_more_keys, all_keys_are_const, overflow_row); - - auto current_stats = state.getCacheStatsSinceLastReset(); - consecutive_keys_cache_stats.hits += current_stats.hits; - consecutive_keys_cache_stats.misses += current_stats.misses; + consecutive_keys_cache_stats.update(row_end - row_begin, state.getCacheMissesSinceLastReset()); } else { @@ -2921,20 +2920,17 @@ ManyAggregatedDataVariants Aggregator::prepareVariantsToMerge(ManyAggregatedData return non_empty_data; } -template +template void NO_INLINE Aggregator::mergeStreamsImplCase( Arena * aggregates_pool, - Method & method [[maybe_unused]], + State & state, Table & data, AggregateDataPtr overflow_row, size_t row_begin, size_t row_end, const AggregateColumnsConstData & aggregate_columns_data, - const ColumnRawPtrs & key_columns, Arena * arena_for_keys) const { - typename Method::State state(key_columns, key_sizes, aggregation_state_cache); - std::unique_ptr places(new AggregateDataPtr[row_end]); if (!arena_for_keys) @@ -2990,6 +2986,7 @@ void NO_INLINE Aggregator::mergeStreamsImpl( Method & method, Table & data, AggregateDataPtr overflow_row, + LastElementCacheStats & consecutive_keys_cache_stats, bool no_more_keys, Arena * arena_for_keys) const { @@ -2997,15 +2994,17 @@ void NO_INLINE Aggregator::mergeStreamsImpl( const ColumnRawPtrs & key_columns = params.makeRawKeyColumns(block); mergeStreamsImpl( - aggregates_pool, method, data, overflow_row, no_more_keys, 0, block.rows(), aggregate_columns_data, key_columns, arena_for_keys); + aggregates_pool, method, data, overflow_row, consecutive_keys_cache_stats, + no_more_keys, 0, block.rows(), aggregate_columns_data, key_columns, arena_for_keys); } template void NO_INLINE Aggregator::mergeStreamsImpl( Arena * aggregates_pool, - Method & method, + Method & method [[maybe_unused]], Table & data, AggregateDataPtr overflow_row, + LastElementCacheStats & consecutive_keys_cache_stats, bool no_more_keys, size_t row_begin, size_t row_end, @@ -3013,12 +3012,30 @@ void NO_INLINE Aggregator::mergeStreamsImpl( const ColumnRawPtrs & key_columns, Arena * arena_for_keys) const { - if (!no_more_keys) - mergeStreamsImplCase( - aggregates_pool, method, data, overflow_row, row_begin, row_end, aggregate_columns_data, key_columns, arena_for_keys); + UInt64 total_rows = consecutive_keys_cache_stats.hits + consecutive_keys_cache_stats.misses; + double cache_hit_rate = total_rows ? static_cast(consecutive_keys_cache_stats.hits) / (total_rows) : 1.0; + bool use_cache = cache_hit_rate >= 0.5; + + if (use_cache) + { + typename Method::State state(key_columns, key_sizes, aggregation_state_cache); + + if (!no_more_keys) + mergeStreamsImplCase(aggregates_pool, state, data, overflow_row, row_begin, row_end, aggregate_columns_data, arena_for_keys); + else + mergeStreamsImplCase(aggregates_pool, state, data, overflow_row, row_begin, row_end, aggregate_columns_data, arena_for_keys); + + consecutive_keys_cache_stats.update(row_end - row_begin, state.getCacheMissesSinceLastReset()); + } else - mergeStreamsImplCase( - aggregates_pool, method, data, overflow_row, row_begin, row_end, aggregate_columns_data, key_columns, arena_for_keys); + { + typename Method::StateNoCache state(key_columns, key_sizes, aggregation_state_cache); + + if (!no_more_keys) + mergeStreamsImplCase(aggregates_pool, state, data, overflow_row, row_begin, row_end, aggregate_columns_data, arena_for_keys); + else + mergeStreamsImplCase(aggregates_pool, state, data, overflow_row, row_begin, row_end, aggregate_columns_data, arena_for_keys); + } } @@ -3078,7 +3095,7 @@ bool Aggregator::mergeOnBlock(Block block, AggregatedDataVariants & result, bool mergeBlockWithoutKeyStreamsImpl(std::move(block), result); #define M(NAME, IS_TWO_LEVEL) \ else if (result.type == AggregatedDataVariants::Type::NAME) \ - mergeStreamsImpl(std::move(block), result.aggregates_pool, *result.NAME, result.NAME->data, result.without_key, no_more_keys); + mergeStreamsImpl(std::move(block), result.aggregates_pool, *result.NAME, result.NAME->data, result.without_key, result.consecutive_keys_cache_stats, no_more_keys); APPLY_FOR_AGGREGATED_VARIANTS(M) #undef M @@ -3183,7 +3200,7 @@ void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVari { #define M(NAME) \ else if (result.type == AggregatedDataVariants::Type::NAME) \ - mergeStreamsImpl(std::move(block), aggregates_pool, *result.NAME, result.NAME->data.impls[bucket], nullptr, false); + mergeStreamsImpl(std::move(block), aggregates_pool, *result.NAME, result.NAME->data.impls[bucket], nullptr, result.consecutive_keys_cache_stats, false); if (false) {} // NOLINT APPLY_FOR_VARIANTS_TWO_LEVEL(M) @@ -3238,7 +3255,7 @@ void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVari #define M(NAME, IS_TWO_LEVEL) \ else if (result.type == AggregatedDataVariants::Type::NAME) \ - mergeStreamsImpl(std::move(block), result.aggregates_pool, *result.NAME, result.NAME->data, result.without_key, no_more_keys); + mergeStreamsImpl(std::move(block), result.aggregates_pool, *result.NAME, result.NAME->data, result.without_key, result.consecutive_keys_cache_stats, no_more_keys); APPLY_FOR_AGGREGATED_VARIANTS(M) #undef M @@ -3316,7 +3333,7 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final) #define M(NAME, IS_TWO_LEVEL) \ else if (result.type == AggregatedDataVariants::Type::NAME) \ - mergeStreamsImpl(std::move(block), result.aggregates_pool, *result.NAME, result.NAME->data, nullptr, false, arena_for_keys.get()); + mergeStreamsImpl(std::move(block), result.aggregates_pool, *result.NAME, result.NAME->data, nullptr, result.consecutive_keys_cache_stats, false, arena_for_keys.get()); APPLY_FOR_AGGREGATED_VARIANTS(M) #undef M diff --git a/src/Interpreters/Aggregator.h b/src/Interpreters/Aggregator.h index 7544ed239fa..7270ad73bdf 100644 --- a/src/Interpreters/Aggregator.h +++ b/src/Interpreters/Aggregator.h @@ -1467,16 +1467,15 @@ private: bool final, ThreadPool * thread_pool) const; - template + template void mergeStreamsImplCase( Arena * aggregates_pool, - Method & method, + State & state, Table & data, AggregateDataPtr overflow_row, size_t row_begin, size_t row_end, const AggregateColumnsConstData & aggregate_columns_data, - const ColumnRawPtrs & key_columns, Arena * arena_for_keys) const; /// `arena_for_keys` used to store serialized aggregation keys (in methods like `serialized`) to save some space. @@ -1488,6 +1487,7 @@ private: Method & method, Table & data, AggregateDataPtr overflow_row, + LastElementCacheStats & consecutive_keys_cache_stats, bool no_more_keys, Arena * arena_for_keys = nullptr) const; @@ -1497,6 +1497,7 @@ private: Method & method, Table & data, AggregateDataPtr overflow_row, + LastElementCacheStats & consecutive_keys_cache_stats, bool no_more_keys, size_t row_begin, size_t row_end, @@ -1507,6 +1508,7 @@ private: void mergeBlockWithoutKeyStreamsImpl( Block block, AggregatedDataVariants & result) const; + void mergeWithoutKeyStreamsImpl( AggregatedDataVariants & result, size_t row_begin, diff --git a/tests/performance/group_by_consecutive_keys.xml b/tests/performance/group_by_consecutive_keys.xml new file mode 100644 index 00000000000..c5c885d2bb6 --- /dev/null +++ b/tests/performance/group_by_consecutive_keys.xml @@ -0,0 +1,8 @@ + + SELECT toUInt64(intDiv(number, 1000000)) AS n, count(), sum(number) FROM numbers(10000000) GROUP BY n FORMAT Null + SELECT toString(intDiv(number, 1000000)) AS n, count(), sum(number) FROM numbers(10000000) GROUP BY n FORMAT Null + SELECT toUInt64(intDiv(number, 1000000)) AS n, count(), uniq(number) FROM numbers(10000000) GROUP BY n FORMAT Null + SELECT toUInt64(intDiv(number, 100000)) AS n, count(), sum(number) FROM numbers(10000000) GROUP BY n FORMAT Null + SELECT toUInt64(intDiv(number, 100)) AS n, count(), sum(number) FROM numbers(10000000) GROUP BY n FORMAT Null + SELECT toUInt64(intDiv(number, 10)) AS n, count(), sum(number) FROM numbers(10000000) GROUP BY n FORMAT Null +