diff --git a/src/AggregateFunctions/AggregateFunctionUniq.h b/src/AggregateFunctions/AggregateFunctionUniq.h index 8ac75e4451c..891f2ac4284 100644 --- a/src/AggregateFunctions/AggregateFunctionUniq.h +++ b/src/AggregateFunctions/AggregateFunctionUniq.h @@ -483,6 +483,7 @@ public: } bool isAbleToParallelizeMerge() const override { return is_able_to_parallelize_merge; } + bool canOptimizeEqualKeysRanges() const override { return !is_able_to_parallelize_merge; } void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena *) const override { @@ -576,6 +577,7 @@ public: } bool isAbleToParallelizeMerge() const override { return is_able_to_parallelize_merge; } + bool canOptimizeEqualKeysRanges() const override { return !is_able_to_parallelize_merge; } void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena *) const override { diff --git a/src/AggregateFunctions/Combinators/AggregateFunctionArray.h b/src/AggregateFunctions/Combinators/AggregateFunctionArray.h index 7f38453f86b..6b918926d0d 100644 --- a/src/AggregateFunctions/Combinators/AggregateFunctionArray.h +++ b/src/AggregateFunctions/Combinators/AggregateFunctionArray.h @@ -142,6 +142,7 @@ public: } bool isAbleToParallelizeMerge() const override { return nested_func->isAbleToParallelizeMerge(); } + bool canOptimizeEqualKeysRanges() const override { return nested_func->canOptimizeEqualKeysRanges(); } void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override { diff --git a/src/AggregateFunctions/Combinators/AggregateFunctionIf.h b/src/AggregateFunctions/Combinators/AggregateFunctionIf.h index e81f2203e7b..df23398a10d 100644 --- a/src/AggregateFunctions/Combinators/AggregateFunctionIf.h +++ b/src/AggregateFunctions/Combinators/AggregateFunctionIf.h @@ -165,6 +165,7 @@ public: } bool isAbleToParallelizeMerge() const override { return nested_func->isAbleToParallelizeMerge(); } + bool canOptimizeEqualKeysRanges() const override { return nested_func->canOptimizeEqualKeysRanges(); } void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override { diff --git a/src/AggregateFunctions/Combinators/AggregateFunctionMerge.h b/src/AggregateFunctions/Combinators/AggregateFunctionMerge.h index 5b9e8e606af..53c24bd60c1 100644 --- a/src/AggregateFunctions/Combinators/AggregateFunctionMerge.h +++ b/src/AggregateFunctions/Combinators/AggregateFunctionMerge.h @@ -111,6 +111,7 @@ public: } bool isAbleToParallelizeMerge() const override { return nested_func->isAbleToParallelizeMerge(); } + bool canOptimizeEqualKeysRanges() const override { return nested_func->canOptimizeEqualKeysRanges(); } void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override { diff --git a/src/AggregateFunctions/Combinators/AggregateFunctionNull.h b/src/AggregateFunctions/Combinators/AggregateFunctionNull.h index 8b614f68540..ba72f960852 100644 --- a/src/AggregateFunctions/Combinators/AggregateFunctionNull.h +++ b/src/AggregateFunctions/Combinators/AggregateFunctionNull.h @@ -152,6 +152,7 @@ public: } bool isAbleToParallelizeMerge() const override { return nested_function->isAbleToParallelizeMerge(); } + bool canOptimizeEqualKeysRanges() const override { return nested_function->canOptimizeEqualKeysRanges(); } void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override { diff --git a/src/AggregateFunctions/Combinators/AggregateFunctionState.h b/src/AggregateFunctions/Combinators/AggregateFunctionState.h index 8335d21cb1e..b0ab6d49604 100644 --- a/src/AggregateFunctions/Combinators/AggregateFunctionState.h +++ b/src/AggregateFunctions/Combinators/AggregateFunctionState.h @@ -92,6 +92,7 @@ public: } bool isAbleToParallelizeMerge() const override { return nested_func->isAbleToParallelizeMerge(); } + bool canOptimizeEqualKeysRanges() const override { return nested_func->canOptimizeEqualKeysRanges(); } void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override { diff --git a/src/AggregateFunctions/IAggregateFunction.h b/src/AggregateFunctions/IAggregateFunction.h index 94bb121893d..499185320e6 100644 --- a/src/AggregateFunctions/IAggregateFunction.h +++ b/src/AggregateFunctions/IAggregateFunction.h @@ -162,6 +162,10 @@ public: /// Tells if merge() with thread pool parameter could be used. virtual bool isAbleToParallelizeMerge() const { return false; } + /// Return true if it is allowed to replace call of `addBatch` + /// to `addBatchSinglePlace` for ranges of consecutive equal keys. + virtual bool canOptimizeEqualKeysRanges() const { return true; } + /// Should be used only if isAbleToParallelizeMerge() returned true. virtual void merge(AggregateDataPtr __restrict /*place*/, ConstAggregateDataPtr /*rhs*/, ThreadPool & /*thread_pool*/, Arena * /*arena*/) const diff --git a/src/Common/ColumnsHashingImpl.h b/src/Common/ColumnsHashingImpl.h index 7116160e94c..d68171a6566 100644 --- a/src/Common/ColumnsHashingImpl.h +++ b/src/Common/ColumnsHashingImpl.h @@ -62,7 +62,6 @@ struct LastElementCache bool check(const Key & key) const { return value.first == key; } bool hasOnlyOneValue() const { return found && misses == 1; } - UInt64 getMisses() const { return misses; } }; template @@ -232,7 +231,7 @@ public: ALWAYS_INLINE UInt64 getCacheMissesSinceLastReset() const { if constexpr (consecutive_keys_optimization) - return cache.getMisses(); + return cache.misses; return 0; } diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index ab1a16a3edf..e91b5adec87 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -534,6 +534,7 @@ The server successfully detected this situation and will download merged part fr \ M(AggregationPreallocatedElementsInHashTables, "How many elements were preallocated in hash tables for aggregation.") \ M(AggregationHashTablesInitializedAsTwoLevel, "How many hash tables were inited as two-level for aggregation.") \ + M(AggregationOptimizedEqualRangesOfKeys, "For how many blocks optimization of equal ranges of keys was applied") \ \ M(MetadataFromKeeperCacheHit, "Number of times an object storage metadata request was answered from cache without making request to Keeper") \ M(MetadataFromKeeperCacheMiss, "Number of times an object storage metadata request had to be answered from Keeper") \ diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index a9578b5540f..39b63f6b635 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -53,6 +53,7 @@ namespace ProfileEvents extern const Event OverflowThrow; extern const Event OverflowBreak; extern const Event OverflowAny; + extern const Event AggregationOptimizedEqualRangesOfKeys; } namespace CurrentMetrics @@ -1344,6 +1345,7 @@ void NO_INLINE Aggregator::executeImplBatch( if (use_compiled_functions) { std::vector columns_data; + bool can_optimize_equal_keys_ranges = true; for (size_t i = 0; i < aggregate_functions.size(); ++i) { @@ -1352,13 +1354,15 @@ void NO_INLINE Aggregator::executeImplBatch( AggregateFunctionInstruction * inst = aggregate_instructions + i; size_t arguments_size = inst->that->getArgumentTypes().size(); // NOLINT + can_optimize_equal_keys_ranges &= inst->can_optimize_equal_keys_ranges; for (size_t argument_index = 0; argument_index < arguments_size; ++argument_index) columns_data.emplace_back(getColumnData(inst->batch_arguments[argument_index])); } - if (all_keys_are_const || (!no_more_keys && state.hasOnlyOneValueSinceLastReset())) + if (all_keys_are_const || (can_optimize_equal_keys_ranges && state.hasOnlyOneValueSinceLastReset())) { + ProfileEvents::increment(ProfileEvents::AggregationOptimizedEqualRangesOfKeys); auto add_into_aggregate_states_function_single_place = compiled_aggregate_functions_holder->compiled_aggregate_functions.add_into_aggregate_states_function_single_place; add_into_aggregate_states_function_single_place(row_begin, row_end, columns_data.data(), places[key_start]); } @@ -1380,10 +1384,15 @@ void NO_INLINE Aggregator::executeImplBatch( AggregateFunctionInstruction * inst = aggregate_instructions + i; - if (all_keys_are_const || (!no_more_keys && state.hasOnlyOneValueSinceLastReset())) + if (all_keys_are_const || (inst->can_optimize_equal_keys_ranges && state.hasOnlyOneValueSinceLastReset())) + { + ProfileEvents::increment(ProfileEvents::AggregationOptimizedEqualRangesOfKeys); addBatchSinglePlace(row_begin, row_end, inst, places[key_start] + inst->state_offset, aggregates_pool); + } else + { addBatch(row_begin, row_end, inst, places.get(), aggregates_pool); + } } } @@ -1573,6 +1582,7 @@ void Aggregator::prepareAggregateInstructions( } aggregate_functions_instructions[i].has_sparse_arguments = has_sparse_arguments; + aggregate_functions_instructions[i].can_optimize_equal_keys_ranges = aggregate_functions[i]->canOptimizeEqualKeysRanges(); aggregate_functions_instructions[i].arguments = aggregate_columns[i].data(); aggregate_functions_instructions[i].state_offset = offsets_of_aggregate_states[i]; diff --git a/src/Interpreters/Aggregator.h b/src/Interpreters/Aggregator.h index 6c357623003..eeb2355d370 100644 --- a/src/Interpreters/Aggregator.h +++ b/src/Interpreters/Aggregator.h @@ -1221,6 +1221,7 @@ public: const IColumn ** batch_arguments{}; const UInt64 * offsets{}; bool has_sparse_arguments = false; + bool can_optimize_equal_keys_ranges = true; }; /// Used for optimize_aggregation_in_order: diff --git a/tests/queries/0_stateless/03008_optimize_equal_ranges.reference b/tests/queries/0_stateless/03008_optimize_equal_ranges.reference new file mode 100644 index 00000000000..fc7a4f3c118 --- /dev/null +++ b/tests/queries/0_stateless/03008_optimize_equal_ranges.reference @@ -0,0 +1,16 @@ +0 30000 +1 30000 +2 30000 +0 30000 +1 30000 +2 30000 +0 449985000 +1 449985000 +2 449985000 +0 449985000 +1 449985000 +2 449985000 +sum 1 1 +sum 16 1 +uniqExact 1 1 +uniqExact 16 0 diff --git a/tests/queries/0_stateless/03008_optimize_equal_ranges.sql b/tests/queries/0_stateless/03008_optimize_equal_ranges.sql new file mode 100644 index 00000000000..6d769c7382a --- /dev/null +++ b/tests/queries/0_stateless/03008_optimize_equal_ranges.sql @@ -0,0 +1,29 @@ +DROP TABLE IF EXISTS t_optimize_equal_ranges; + +CREATE TABLE t_optimize_equal_ranges (a UInt64, b String, c UInt64) ENGINE = MergeTree ORDER BY a; + +SET max_block_size = 1024; +SET max_bytes_before_external_group_by = 0; +SET optimize_aggregation_in_order = 0; +SET optimize_use_projections = 0; + +INSERT INTO t_optimize_equal_ranges SELECT 0, toString(number), number FROM numbers(30000); +INSERT INTO t_optimize_equal_ranges SELECT 1, toString(number), number FROM numbers(30000); +INSERT INTO t_optimize_equal_ranges SELECT 2, toString(number), number FROM numbers(30000); + +SELECT a, uniqExact(b) FROM t_optimize_equal_ranges GROUP BY a ORDER BY a SETTINGS max_threads = 16; +SELECT a, uniqExact(b) FROM t_optimize_equal_ranges GROUP BY a ORDER BY a SETTINGS max_threads = 1; +SELECT a, sum(c) FROM t_optimize_equal_ranges GROUP BY a ORDER BY a SETTINGS max_threads = 16; +SELECT a, sum(c) FROM t_optimize_equal_ranges GROUP BY a ORDER BY a SETTINGS max_threads = 1; + +SYSTEM FLUSH LOGS; + +SELECT + used_aggregate_functions[1] AS func, + Settings['max_threads'] AS threads, + ProfileEvents['AggregationOptimizedEqualRangesOfKeys'] > 0 +FROM system.query_log +WHERE type = 'QueryFinish' AND current_database = currentDatabase() AND query LIKE '%SELECT%FROM%t_optimize_equal_ranges%' +ORDER BY func, threads; + +DROP TABLE t_optimize_equal_ranges; diff --git a/tests/queries/0_stateless/03008_uniq_exact_equal_ranges.reference b/tests/queries/0_stateless/03008_uniq_exact_equal_ranges.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/03008_uniq_exact_equal_ranges.sql b/tests/queries/0_stateless/03008_uniq_exact_equal_ranges.sql new file mode 100644 index 00000000000..2e708f28cac --- /dev/null +++ b/tests/queries/0_stateless/03008_uniq_exact_equal_ranges.sql @@ -0,0 +1,36 @@ +DROP TABLE IF EXISTS t_uniq_exact; + +CREATE TABLE t_uniq_exact (a UInt64, b String, c UInt64) ENGINE = MergeTree ORDER BY a; + +SET group_by_two_level_threshold_bytes = 1; +SET group_by_two_level_threshold = 1; +SET max_threads = 4; +SET max_bytes_before_external_group_by = 0; +SET optimize_aggregation_in_order = 0; + +INSERT INTO t_uniq_exact SELECT 0, randomPrintableASCII(5), rand() FROM numbers(300000); +INSERT INTO t_uniq_exact SELECT 1, randomPrintableASCII(5), rand() FROM numbers(300000); +INSERT INTO t_uniq_exact SELECT 2, randomPrintableASCII(5), rand() FROM numbers(300000); +INSERT INTO t_uniq_exact SELECT 3, randomPrintableASCII(5), rand() FROM numbers(300000); +INSERT INTO t_uniq_exact SELECT 4, randomPrintableASCII(5), rand() FROM numbers(300000); +INSERT INTO t_uniq_exact SELECT 5, randomPrintableASCII(5), rand() FROM numbers(300000); +INSERT INTO t_uniq_exact SELECT 6, randomPrintableASCII(5), rand() FROM numbers(300000); +INSERT INTO t_uniq_exact SELECT 7, randomPrintableASCII(5), rand() FROM numbers(300000); +INSERT INTO t_uniq_exact SELECT 8, randomPrintableASCII(5), rand() FROM numbers(300000); +INSERT INTO t_uniq_exact SELECT 9, randomPrintableASCII(5), rand() FROM numbers(300000); + +OPTIMIZE TABLE t_uniq_exact FINAL; + +SELECT a, uniqExact(b) FROM t_uniq_exact GROUP BY a ORDER BY a +SETTINGS min_hit_rate_to_use_consecutive_keys_optimization = 1.0 +EXCEPT +SELECT a, uniqExact(b) FROM t_uniq_exact GROUP BY a ORDER BY a +SETTINGS min_hit_rate_to_use_consecutive_keys_optimization = 0.5; + +SELECT a, sum(c) FROM t_uniq_exact GROUP BY a ORDER BY a +SETTINGS min_hit_rate_to_use_consecutive_keys_optimization = 1.0 +EXCEPT +SELECT a, sum(c) FROM t_uniq_exact GROUP BY a ORDER BY a +SETTINGS min_hit_rate_to_use_consecutive_keys_optimization = 0.5; + +DROP TABLE t_uniq_exact;