From a47d15ab626d4d1b69a00b7cf0d3998fa7957e6a Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Fri, 19 Jan 2024 14:17:04 +0100 Subject: [PATCH 1/4] impl --- .../AggregateFunctionUniq.h | 2 +- src/AggregateFunctions/UniqExactSet.h | 26 ++++++++++++++++++- src/Interpreters/Aggregator.cpp | 19 +++++++++++++- 3 files changed, 44 insertions(+), 3 deletions(-) diff --git a/src/AggregateFunctions/AggregateFunctionUniq.h b/src/AggregateFunctions/AggregateFunctionUniq.h index f20fb8cb933..8ac75e4451c 100644 --- a/src/AggregateFunctions/AggregateFunctionUniq.h +++ b/src/AggregateFunctions/AggregateFunctionUniq.h @@ -350,7 +350,7 @@ struct Adder if constexpr (Data::is_able_to_parallelize_merge) { - if (data.set.isSingleLevel() && data.set.size() > 100'000) + if (data.set.isSingleLevel() && data.set.worthConvertingToTwoLevel(data.set.size())) data.set.convertToTwoLevel(); } } diff --git a/src/AggregateFunctions/UniqExactSet.h b/src/AggregateFunctions/UniqExactSet.h index 06157405cc5..b0255131117 100644 --- a/src/AggregateFunctions/UniqExactSet.h +++ b/src/AggregateFunctions/UniqExactSet.h @@ -15,6 +15,7 @@ template class UniqExactSet { static_assert(std::is_same_v); + static_assert(std::is_same_v); public: using value_type = typename SingleLevelSet::value_type; @@ -147,7 +148,28 @@ public: } } - void read(ReadBuffer & in) { asSingleLevel().read(in); } + void read(ReadBuffer & in) + { + size_t new_size = 0; + auto * const position = in.position(); + DB::readVarUInt(new_size, in); + + if (worthConvertingToTwoLevel(new_size)) + { + two_level_set = std::make_shared(new_size); + for (size_t i = 0; i < new_size; ++i) + { + typename SingleLevelSet::Cell x; + x.read(in); + asTwoLevel().insert(x.getValue()); + } + } + else + { + in.position() = position; // Rollback position + asSingleLevel().read(in); + } + } void write(WriteBuffer & out) const { @@ -166,6 +188,8 @@ public: return two_level_set ? two_level_set : std::make_shared(asSingleLevel()); } + static bool worthConvertingToTwoLevel(size_t size) { return size > 100'000; } + void convertToTwoLevel() { two_level_set = getTwoLevelSet(); diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index e4856c33988..4171818d3e6 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -3054,6 +3054,8 @@ void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl( size_t row_end, const AggregateColumnsConstData & aggregate_columns_data) const { + using namespace CurrentMetrics; + AggregatedDataWithoutKey & res = result.without_key; if (!res) { @@ -3062,11 +3064,26 @@ void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl( res = place; } + ThreadPool thread_pool{AggregatorThreads, AggregatorThreadsActive, AggregatorThreadsScheduled, params.max_threads}; + for (size_t row = row_begin; row < row_end; ++row) { /// Adding Values for (size_t i = 0; i < params.aggregates_size; ++i) - aggregate_functions[i]->merge(res + offsets_of_aggregate_states[i], (*aggregate_columns_data[i])[row], result.aggregates_pool); + { + if (aggregate_functions[i]->isParallelizeMergePrepareNeeded()) + { + std::vector data_vec{res + offsets_of_aggregate_states[i], (*aggregate_columns_data[i])[row]}; + aggregate_functions[i]->parallelizeMergePrepare(data_vec, thread_pool); + } + + if (aggregate_functions[i]->isAbleToParallelizeMerge()) + aggregate_functions[i]->merge( + res + offsets_of_aggregate_states[i], (*aggregate_columns_data[i])[row], thread_pool, result.aggregates_pool); + else + aggregate_functions[i]->merge( + res + offsets_of_aggregate_states[i], (*aggregate_columns_data[i])[row], result.aggregates_pool); + } } } From 2b5482be8c9d67edd19d8d4168d9f2e845f49860 Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Fri, 19 Jan 2024 17:57:11 +0100 Subject: [PATCH 2/4] add perf test --- tests/performance/uniq_without_key_dist.xml | 22 +++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 tests/performance/uniq_without_key_dist.xml diff --git a/tests/performance/uniq_without_key_dist.xml b/tests/performance/uniq_without_key_dist.xml new file mode 100644 index 00000000000..600b378a7f7 --- /dev/null +++ b/tests/performance/uniq_without_key_dist.xml @@ -0,0 +1,22 @@ + + + + uniq_keys + + 100000 + 250000 + 500000 + 1000000 + 5000000 + + + + + create table t_{uniq_keys}(a UInt64) engine=MergeTree order by tuple() + + insert into t_{uniq_keys} select number % {uniq_keys} from numbers_mt(5e7) + + SELECT uniqExact(a) FROM remote('127.0.0.{{1,2}}', default, t_{uniq_keys}) SETTINGS max_threads=5 + + drop table t_{uniq_keys} + From f1efb29a66a662f0c7af34ef317be4a734571bc1 Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Fri, 19 Jan 2024 18:13:03 +0100 Subject: [PATCH 3/4] better --- src/AggregateFunctions/UniqExactSet.h | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/AggregateFunctions/UniqExactSet.h b/src/AggregateFunctions/UniqExactSet.h index b0255131117..159e4a74182 100644 --- a/src/AggregateFunctions/UniqExactSet.h +++ b/src/AggregateFunctions/UniqExactSet.h @@ -152,7 +152,10 @@ public: { size_t new_size = 0; auto * const position = in.position(); - DB::readVarUInt(new_size, in); + readVarUInt(new_size, in); + if (new_size > 100'000'000'000) + throw DB::Exception( + DB::ErrorCodes::TOO_LARGE_ARRAY_SIZE, "The size of serialized hash table is suspiciously large: {}", new_size); if (worthConvertingToTwoLevel(new_size)) { From 60a910e94ff51d89cfaabffe096647b23c253ba8 Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Fri, 19 Jan 2024 18:47:21 +0100 Subject: [PATCH 4/4] better --- src/AggregateFunctions/UniqExactSet.h | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/AggregateFunctions/UniqExactSet.h b/src/AggregateFunctions/UniqExactSet.h index 159e4a74182..e8c0de660ff 100644 --- a/src/AggregateFunctions/UniqExactSet.h +++ b/src/AggregateFunctions/UniqExactSet.h @@ -11,6 +11,11 @@ namespace DB { +namespace ErrorCodes +{ +extern const int TOO_LARGE_ARRAY_SIZE; +} + template class UniqExactSet {