diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index a99ecee43bf..fc5774735a0 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -911,7 +911,7 @@ void Aggregator::mergeOnBlockSmall( mergeStreamsImpl(result.aggregates_pool, *result.NAME, result.NAME->data, \ result.without_key, /* no_more_keys= */ false, \ row_begin, row_end, \ - aggregate_columns_data, key_columns); + aggregate_columns_data, key_columns, result.aggregates_pool); APPLY_FOR_AGGREGATED_VARIANTS(M) #undef M @@ -2647,19 +2647,23 @@ void NO_INLINE Aggregator::mergeStreamsImplCase( size_t row_begin, size_t row_end, const AggregateColumnsConstData & aggregate_columns_data, - const ColumnRawPtrs & key_columns) const + const ColumnRawPtrs & key_columns, + Arena * arena_for_keys) const { typename Method::State state(key_columns, key_sizes, aggregation_state_cache); std::unique_ptr places(new AggregateDataPtr[row_end]); + if (!arena_for_keys) + arena_for_keys = aggregates_pool; + for (size_t i = row_begin; i < row_end; ++i) { AggregateDataPtr aggregate_data = nullptr; if (!no_more_keys) { - auto emplace_result = state.emplaceKey(data, i, *aggregates_pool); + auto emplace_result = state.emplaceKey(data, i, *arena_for_keys); // NOLINT if (emplace_result.isInserted()) { emplace_result.setMapped(nullptr); @@ -2674,7 +2678,7 @@ void NO_INLINE Aggregator::mergeStreamsImplCase( } else { - auto find_result = state.findKey(data, i, *aggregates_pool); + auto find_result = state.findKey(data, i, *arena_for_keys); if (find_result.isFound()) aggregate_data = find_result.getMapped(); } @@ -2703,21 +2707,14 @@ void NO_INLINE Aggregator::mergeStreamsImpl( Method & method, Table & data, AggregateDataPtr overflow_row, - bool no_more_keys) const + bool no_more_keys, + Arena * arena_for_keys) const { const AggregateColumnsConstData & aggregate_columns_data = params.makeAggregateColumnsData(block); const ColumnRawPtrs & key_columns = params.makeRawKeyColumns(block); mergeStreamsImpl( - aggregates_pool, - method, - data, - overflow_row, - no_more_keys, - 0, - block.rows(), - aggregate_columns_data, - key_columns); + aggregates_pool, method, data, overflow_row, no_more_keys, 0, block.rows(), aggregate_columns_data, key_columns, arena_for_keys); } template @@ -2730,12 +2727,15 @@ void NO_INLINE Aggregator::mergeStreamsImpl( size_t row_begin, size_t row_end, const AggregateColumnsConstData & aggregate_columns_data, - const ColumnRawPtrs & key_columns) const + const ColumnRawPtrs & key_columns, + Arena * arena_for_keys) const { if (!no_more_keys) - mergeStreamsImplCase(aggregates_pool, method, data, overflow_row, row_begin, row_end, aggregate_columns_data, key_columns); + mergeStreamsImplCase( + aggregates_pool, method, data, overflow_row, row_begin, row_end, aggregate_columns_data, key_columns, arena_for_keys); else - mergeStreamsImplCase(aggregates_pool, method, data, overflow_row, row_begin, row_end, aggregate_columns_data, key_columns); + mergeStreamsImplCase( + aggregates_pool, method, data, overflow_row, row_begin, row_end, aggregate_columns_data, key_columns, arena_for_keys); } @@ -3015,17 +3015,26 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final) result.keys_size = params.keys_size; result.key_sizes = key_sizes; + size_t source_rows = 0; + + /// In some aggregation methods (e.g. serialized) aggregates pools are used also to store serialized aggregation keys. + /// Memory occupied by them will have the same lifetime as aggregate function states, while it is not actually necessary and leads to excessive memory consumption. + /// To avoid this we use a separate arena to allocate memory for aggregation keys. Its memory will be freed at this function return. + auto arena_for_keys = std::make_shared(); + for (Block & block : blocks) { + source_rows += block.rows(); + if (bucket_num >= 0 && block.info.bucket_num != bucket_num) bucket_num = -1; if (result.type == AggregatedDataVariants::Type::without_key || is_overflows) mergeBlockWithoutKeyStreamsImpl(std::move(block), result); - #define M(NAME, IS_TWO_LEVEL) \ - else if (result.type == AggregatedDataVariants::Type::NAME) \ - mergeStreamsImpl(std::move(block), result.aggregates_pool, *result.NAME, result.NAME->data, nullptr, false); +#define M(NAME, IS_TWO_LEVEL) \ + else if (result.type == AggregatedDataVariants::Type::NAME) \ + mergeStreamsImpl(std::move(block), result.aggregates_pool, *result.NAME, result.NAME->data, nullptr, false, arena_for_keys.get()); APPLY_FOR_AGGREGATED_VARIANTS(M) #undef M @@ -3049,9 +3058,15 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final) size_t rows = block.rows(); size_t bytes = block.bytes(); double elapsed_seconds = watch.elapsedSeconds(); - LOG_DEBUG(log, "Merged partially aggregated blocks. {} rows, {}. in {} sec. ({:.3f} rows/sec., {}/sec.)", - rows, ReadableSize(bytes), - elapsed_seconds, rows / elapsed_seconds, + LOG_DEBUG( + log, + "Merged partially aggregated blocks for bucket #{}. Got {} rows, {} from {} source rows in {} sec. ({:.3f} rows/sec., {}/sec.)", + bucket_num, + rows, + ReadableSize(bytes), + source_rows, + elapsed_seconds, + rows / elapsed_seconds, ReadableSize(bytes / elapsed_seconds)); block.info.bucket_num = bucket_num; diff --git a/src/Interpreters/Aggregator.h b/src/Interpreters/Aggregator.h index 716849465de..3e8b25c1a8c 100644 --- a/src/Interpreters/Aggregator.h +++ b/src/Interpreters/Aggregator.h @@ -1348,8 +1348,11 @@ private: size_t row_begin, size_t row_end, const AggregateColumnsConstData & aggregate_columns_data, - const ColumnRawPtrs & key_columns) const; + const ColumnRawPtrs & key_columns, + Arena * arena_for_keys) const; + /// `arena_for_keys` used to store serialized aggregation keys (in methods like `serialized`) to save some space. + /// If not provided, aggregates_pool is used instead. Refer to mergeBlocks() for an usage example. template void mergeStreamsImpl( Block block, @@ -1357,7 +1360,9 @@ private: Method & method, Table & data, AggregateDataPtr overflow_row, - bool no_more_keys) const; + bool no_more_keys, + Arena * arena_for_keys = nullptr) const; + template void mergeStreamsImpl( Arena * aggregates_pool, @@ -1368,7 +1373,8 @@ private: size_t row_begin, size_t row_end, const AggregateColumnsConstData & aggregate_columns_data, - const ColumnRawPtrs & key_columns) const; + const ColumnRawPtrs & key_columns, + Arena * arena_for_keys) const; void mergeBlockWithoutKeyStreamsImpl( Block block, diff --git a/src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.cpp b/src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.cpp index 905620d39f9..8471139d9dc 100644 --- a/src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.cpp +++ b/src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.cpp @@ -1,9 +1,10 @@ -#include +#include +#include #include #include #include +#include #include -#include namespace DB { @@ -367,7 +368,7 @@ SortingAggregatedTransform::SortingAggregatedTransform(size_t num_inputs_, Aggre : IProcessor(InputPorts(num_inputs_, params_->getHeader()), {params_->getHeader()}) , num_inputs(num_inputs_) , params(std::move(params_)) - , last_bucket_number(num_inputs, -1) + , last_bucket_number(num_inputs, std::numeric_limits::min()) , is_input_finished(num_inputs, false) { } @@ -462,7 +463,13 @@ IProcessor::Status SortingAggregatedTransform::prepare() continue; } - //all_finished = false; + /// We want to keep not more than `num_inputs` buckets in memory (and there will be only a single chunk with the given `bucket_id`). + const bool bucket_from_this_input_still_in_memory = chunks.contains(last_bucket_number[input_num]); + if (bucket_from_this_input_still_in_memory) + { + all_finished = false; + continue; + } in->setNeeded(); diff --git a/tests/queries/0_stateless/00284_external_aggregation.reference b/tests/queries/0_stateless/00284_external_aggregation.reference index 48e30e781e0..be0db217a97 100644 --- a/tests/queries/0_stateless/00284_external_aggregation.reference +++ b/tests/queries/0_stateless/00284_external_aggregation.reference @@ -1,2 +1,22 @@ 49999995000000 10000000 499999500000 1000000 15 +100033 2 +100034 2 +100035 2 +100036 2 +100037 2 +100038 2 +100039 2 +10004 2 +100040 2 +100041 2 +100033 2 +100034 2 +100035 2 +100036 2 +100037 2 +100038 2 +100039 2 +10004 2 +100040 2 +100041 2 diff --git a/tests/queries/0_stateless/00284_external_aggregation.sql b/tests/queries/0_stateless/00284_external_aggregation.sql index 057cb749521..a42dd91b6a5 100644 --- a/tests/queries/0_stateless/00284_external_aggregation.sql +++ b/tests/queries/0_stateless/00284_external_aggregation.sql @@ -7,3 +7,18 @@ SET group_by_two_level_threshold_bytes = 50000000; SELECT sum(k), sum(c) FROM (SELECT number AS k, count() AS c FROM (SELECT * FROM system.numbers LIMIT 10000000) GROUP BY k); SELECT sum(k), sum(c), max(u) FROM (SELECT number AS k, count() AS c, uniqArray(range(number % 16)) AS u FROM (SELECT * FROM system.numbers LIMIT 1000000) GROUP BY k); + +SET group_by_two_level_threshold = 100000; +SET max_bytes_before_external_group_by = '1Mi'; + +-- method: key_string & key_string_two_level +CREATE TABLE t_00284_str(s String) ENGINE = MergeTree() ORDER BY tuple(); +INSERT INTO t_00284_str SELECT toString(number) FROM numbers_mt(1e6); +INSERT INTO t_00284_str SELECT toString(number) FROM numbers_mt(1e6); +SELECT s, count() FROM t_00284_str GROUP BY s ORDER BY s LIMIT 10 OFFSET 42; + +-- method: low_cardinality_key_string & low_cardinality_key_string_two_level +CREATE TABLE t_00284_lc_str(s LowCardinality(String)) ENGINE = MergeTree() ORDER BY tuple(); +INSERT INTO t_00284_lc_str SELECT toString(number) FROM numbers_mt(1e6); +INSERT INTO t_00284_lc_str SELECT toString(number) FROM numbers_mt(1e6); +SELECT s, count() FROM t_00284_lc_str GROUP BY s ORDER BY s LIMIT 10 OFFSET 42; diff --git a/tests/queries/0_stateless/02354_distributed_with_external_aggregation_memory_usage.reference b/tests/queries/0_stateless/02354_distributed_with_external_aggregation_memory_usage.reference new file mode 100644 index 00000000000..d00491fd7e5 --- /dev/null +++ b/tests/queries/0_stateless/02354_distributed_with_external_aggregation_memory_usage.reference @@ -0,0 +1 @@ +1 diff --git a/tests/queries/0_stateless/02354_distributed_with_external_aggregation_memory_usage.sql b/tests/queries/0_stateless/02354_distributed_with_external_aggregation_memory_usage.sql new file mode 100644 index 00000000000..340eee038ba --- /dev/null +++ b/tests/queries/0_stateless/02354_distributed_with_external_aggregation_memory_usage.sql @@ -0,0 +1,23 @@ +-- Tags: long, no-tsan, no-msan, no-asan, no-ubsan + +create table t_2354_dist_with_external_aggr(a UInt64, b String, c FixedString(100)) engine = MergeTree order by tuple(); + +insert into t_2354_dist_with_external_aggr select number, toString(number) as s, toFixedString(s, 100) from numbers_mt(5e7); + +set max_bytes_before_external_group_by = '2G', + max_threads = 16, + aggregation_memory_efficient_merge_threads = 16, + distributed_aggregation_memory_efficient = 1, + prefer_localhost_replica = 1, + group_by_two_level_threshold = 100000; + +select a, b, c, sum(a) as s +from remote('127.0.0.{1,2}', currentDatabase(), t_2354_dist_with_external_aggr) +group by a, b, c +format Null; + +system flush logs; + +select memory_usage < 4 * 1024 * 1024 * 1024 -- whole aggregation state of local aggregation uncompressed is 5.8G +from system.query_log +where event_time >= now() - interval '15 minute' and type = 'QueryFinish' and is_initial_query and query like '%t_2354_dist_with_external_aggr%group_by%' and current_database = currentDatabase();