Merge pull request #61257 from CurtizJ/fix-uniq-exact

Fix possible incorrect result of aggregate function `uniqExact`
This commit is contained in:
Anton Popov 2024-03-15 14:34:16 +01:00 committed by GitHub
commit e50ebe13f2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 107 additions and 4 deletions

View File

@ -483,6 +483,7 @@ public:
}
bool isAbleToParallelizeMerge() const override { return is_able_to_parallelize_merge; }
bool canOptimizeEqualKeysRanges() const override { return !is_able_to_parallelize_merge; }
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena *) const override
{
@ -576,6 +577,7 @@ public:
}
bool isAbleToParallelizeMerge() const override { return is_able_to_parallelize_merge; }
bool canOptimizeEqualKeysRanges() const override { return !is_able_to_parallelize_merge; }
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena *) const override
{

View File

@ -142,6 +142,7 @@ public:
}
bool isAbleToParallelizeMerge() const override { return nested_func->isAbleToParallelizeMerge(); }
bool canOptimizeEqualKeysRanges() const override { return nested_func->canOptimizeEqualKeysRanges(); }
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override
{

View File

@ -165,6 +165,7 @@ public:
}
bool isAbleToParallelizeMerge() const override { return nested_func->isAbleToParallelizeMerge(); }
bool canOptimizeEqualKeysRanges() const override { return nested_func->canOptimizeEqualKeysRanges(); }
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override
{

View File

@ -111,6 +111,7 @@ public:
}
bool isAbleToParallelizeMerge() const override { return nested_func->isAbleToParallelizeMerge(); }
bool canOptimizeEqualKeysRanges() const override { return nested_func->canOptimizeEqualKeysRanges(); }
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override
{

View File

@ -152,6 +152,7 @@ public:
}
bool isAbleToParallelizeMerge() const override { return nested_function->isAbleToParallelizeMerge(); }
bool canOptimizeEqualKeysRanges() const override { return nested_function->canOptimizeEqualKeysRanges(); }
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override
{

View File

@ -92,6 +92,7 @@ public:
}
bool isAbleToParallelizeMerge() const override { return nested_func->isAbleToParallelizeMerge(); }
bool canOptimizeEqualKeysRanges() const override { return nested_func->canOptimizeEqualKeysRanges(); }
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override
{

View File

@ -162,6 +162,10 @@ public:
/// Tells if merge() with thread pool parameter could be used.
virtual bool isAbleToParallelizeMerge() const { return false; }
/// Return true if it is allowed to replace call of `addBatch`
/// to `addBatchSinglePlace` for ranges of consecutive equal keys.
virtual bool canOptimizeEqualKeysRanges() const { return true; }
/// Should be used only if isAbleToParallelizeMerge() returned true.
virtual void
merge(AggregateDataPtr __restrict /*place*/, ConstAggregateDataPtr /*rhs*/, ThreadPool & /*thread_pool*/, Arena * /*arena*/) const

View File

@ -62,7 +62,6 @@ struct LastElementCache
bool check(const Key & key) const { return value.first == key; }
bool hasOnlyOneValue() const { return found && misses == 1; }
UInt64 getMisses() const { return misses; }
};
template <typename Data>
@ -232,7 +231,7 @@ public:
ALWAYS_INLINE UInt64 getCacheMissesSinceLastReset() const
{
if constexpr (consecutive_keys_optimization)
return cache.getMisses();
return cache.misses;
return 0;
}

View File

@ -534,6 +534,7 @@ The server successfully detected this situation and will download merged part fr
\
M(AggregationPreallocatedElementsInHashTables, "How many elements were preallocated in hash tables for aggregation.") \
M(AggregationHashTablesInitializedAsTwoLevel, "How many hash tables were inited as two-level for aggregation.") \
M(AggregationOptimizedEqualRangesOfKeys, "For how many blocks optimization of equal ranges of keys was applied") \
\
M(MetadataFromKeeperCacheHit, "Number of times an object storage metadata request was answered from cache without making request to Keeper") \
M(MetadataFromKeeperCacheMiss, "Number of times an object storage metadata request had to be answered from Keeper") \

View File

@ -53,6 +53,7 @@ namespace ProfileEvents
extern const Event OverflowThrow;
extern const Event OverflowBreak;
extern const Event OverflowAny;
extern const Event AggregationOptimizedEqualRangesOfKeys;
}
namespace CurrentMetrics
@ -1344,6 +1345,7 @@ void NO_INLINE Aggregator::executeImplBatch(
if (use_compiled_functions)
{
std::vector<ColumnData> columns_data;
bool can_optimize_equal_keys_ranges = true;
for (size_t i = 0; i < aggregate_functions.size(); ++i)
{
@ -1352,13 +1354,15 @@ void NO_INLINE Aggregator::executeImplBatch(
AggregateFunctionInstruction * inst = aggregate_instructions + i;
size_t arguments_size = inst->that->getArgumentTypes().size(); // NOLINT
can_optimize_equal_keys_ranges &= inst->can_optimize_equal_keys_ranges;
for (size_t argument_index = 0; argument_index < arguments_size; ++argument_index)
columns_data.emplace_back(getColumnData(inst->batch_arguments[argument_index]));
}
if (all_keys_are_const || (!no_more_keys && state.hasOnlyOneValueSinceLastReset()))
if (all_keys_are_const || (can_optimize_equal_keys_ranges && state.hasOnlyOneValueSinceLastReset()))
{
ProfileEvents::increment(ProfileEvents::AggregationOptimizedEqualRangesOfKeys);
auto add_into_aggregate_states_function_single_place = compiled_aggregate_functions_holder->compiled_aggregate_functions.add_into_aggregate_states_function_single_place;
add_into_aggregate_states_function_single_place(row_begin, row_end, columns_data.data(), places[key_start]);
}
@ -1380,10 +1384,15 @@ void NO_INLINE Aggregator::executeImplBatch(
AggregateFunctionInstruction * inst = aggregate_instructions + i;
if (all_keys_are_const || (!no_more_keys && state.hasOnlyOneValueSinceLastReset()))
if (all_keys_are_const || (inst->can_optimize_equal_keys_ranges && state.hasOnlyOneValueSinceLastReset()))
{
ProfileEvents::increment(ProfileEvents::AggregationOptimizedEqualRangesOfKeys);
addBatchSinglePlace(row_begin, row_end, inst, places[key_start] + inst->state_offset, aggregates_pool);
}
else
{
addBatch(row_begin, row_end, inst, places.get(), aggregates_pool);
}
}
}
@ -1573,6 +1582,7 @@ void Aggregator::prepareAggregateInstructions(
}
aggregate_functions_instructions[i].has_sparse_arguments = has_sparse_arguments;
aggregate_functions_instructions[i].can_optimize_equal_keys_ranges = aggregate_functions[i]->canOptimizeEqualKeysRanges();
aggregate_functions_instructions[i].arguments = aggregate_columns[i].data();
aggregate_functions_instructions[i].state_offset = offsets_of_aggregate_states[i];

View File

@ -1221,6 +1221,7 @@ public:
const IColumn ** batch_arguments{};
const UInt64 * offsets{};
bool has_sparse_arguments = false;
bool can_optimize_equal_keys_ranges = true;
};
/// Used for optimize_aggregation_in_order:

View File

@ -0,0 +1,16 @@
0 30000
1 30000
2 30000
0 30000
1 30000
2 30000
0 449985000
1 449985000
2 449985000
0 449985000
1 449985000
2 449985000
sum 1 1
sum 16 1
uniqExact 1 1
uniqExact 16 0

View File

@ -0,0 +1,29 @@
DROP TABLE IF EXISTS t_optimize_equal_ranges;
CREATE TABLE t_optimize_equal_ranges (a UInt64, b String, c UInt64) ENGINE = MergeTree ORDER BY a;
SET max_block_size = 1024;
SET max_bytes_before_external_group_by = 0;
SET optimize_aggregation_in_order = 0;
SET optimize_use_projections = 0;
INSERT INTO t_optimize_equal_ranges SELECT 0, toString(number), number FROM numbers(30000);
INSERT INTO t_optimize_equal_ranges SELECT 1, toString(number), number FROM numbers(30000);
INSERT INTO t_optimize_equal_ranges SELECT 2, toString(number), number FROM numbers(30000);
SELECT a, uniqExact(b) FROM t_optimize_equal_ranges GROUP BY a ORDER BY a SETTINGS max_threads = 16;
SELECT a, uniqExact(b) FROM t_optimize_equal_ranges GROUP BY a ORDER BY a SETTINGS max_threads = 1;
SELECT a, sum(c) FROM t_optimize_equal_ranges GROUP BY a ORDER BY a SETTINGS max_threads = 16;
SELECT a, sum(c) FROM t_optimize_equal_ranges GROUP BY a ORDER BY a SETTINGS max_threads = 1;
SYSTEM FLUSH LOGS;
SELECT
used_aggregate_functions[1] AS func,
Settings['max_threads'] AS threads,
ProfileEvents['AggregationOptimizedEqualRangesOfKeys'] > 0
FROM system.query_log
WHERE type = 'QueryFinish' AND current_database = currentDatabase() AND query LIKE '%SELECT%FROM%t_optimize_equal_ranges%'
ORDER BY func, threads;
DROP TABLE t_optimize_equal_ranges;

View File

@ -0,0 +1,36 @@
DROP TABLE IF EXISTS t_uniq_exact;
CREATE TABLE t_uniq_exact (a UInt64, b String, c UInt64) ENGINE = MergeTree ORDER BY a;
SET group_by_two_level_threshold_bytes = 1;
SET group_by_two_level_threshold = 1;
SET max_threads = 4;
SET max_bytes_before_external_group_by = 0;
SET optimize_aggregation_in_order = 0;
INSERT INTO t_uniq_exact SELECT 0, randomPrintableASCII(5), rand() FROM numbers(300000);
INSERT INTO t_uniq_exact SELECT 1, randomPrintableASCII(5), rand() FROM numbers(300000);
INSERT INTO t_uniq_exact SELECT 2, randomPrintableASCII(5), rand() FROM numbers(300000);
INSERT INTO t_uniq_exact SELECT 3, randomPrintableASCII(5), rand() FROM numbers(300000);
INSERT INTO t_uniq_exact SELECT 4, randomPrintableASCII(5), rand() FROM numbers(300000);
INSERT INTO t_uniq_exact SELECT 5, randomPrintableASCII(5), rand() FROM numbers(300000);
INSERT INTO t_uniq_exact SELECT 6, randomPrintableASCII(5), rand() FROM numbers(300000);
INSERT INTO t_uniq_exact SELECT 7, randomPrintableASCII(5), rand() FROM numbers(300000);
INSERT INTO t_uniq_exact SELECT 8, randomPrintableASCII(5), rand() FROM numbers(300000);
INSERT INTO t_uniq_exact SELECT 9, randomPrintableASCII(5), rand() FROM numbers(300000);
OPTIMIZE TABLE t_uniq_exact FINAL;
SELECT a, uniqExact(b) FROM t_uniq_exact GROUP BY a ORDER BY a
SETTINGS min_hit_rate_to_use_consecutive_keys_optimization = 1.0
EXCEPT
SELECT a, uniqExact(b) FROM t_uniq_exact GROUP BY a ORDER BY a
SETTINGS min_hit_rate_to_use_consecutive_keys_optimization = 0.5;
SELECT a, sum(c) FROM t_uniq_exact GROUP BY a ORDER BY a
SETTINGS min_hit_rate_to_use_consecutive_keys_optimization = 1.0
EXCEPT
SELECT a, sum(c) FROM t_uniq_exact GROUP BY a ORDER BY a
SETTINGS min_hit_rate_to_use_consecutive_keys_optimization = 0.5;
DROP TABLE t_uniq_exact;