Improve memory usage during memory efficient merging of aggregation results (#39429)

This commit is contained in:
Nikita Taranov 2022-08-03 17:56:59 +02:00 committed by GitHub
parent 1c0d267767
commit 4943202921
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 117 additions and 30 deletions

View File

@ -911,7 +911,7 @@ void Aggregator::mergeOnBlockSmall(
mergeStreamsImpl(result.aggregates_pool, *result.NAME, result.NAME->data, \
result.without_key, /* no_more_keys= */ false, \
row_begin, row_end, \
aggregate_columns_data, key_columns);
aggregate_columns_data, key_columns, result.aggregates_pool);
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
@ -2647,19 +2647,23 @@ void NO_INLINE Aggregator::mergeStreamsImplCase(
size_t row_begin,
size_t row_end,
const AggregateColumnsConstData & aggregate_columns_data,
const ColumnRawPtrs & key_columns) const
const ColumnRawPtrs & key_columns,
Arena * arena_for_keys) const
{
typename Method::State state(key_columns, key_sizes, aggregation_state_cache);
std::unique_ptr<AggregateDataPtr[]> places(new AggregateDataPtr[row_end]);
if (!arena_for_keys)
arena_for_keys = aggregates_pool;
for (size_t i = row_begin; i < row_end; ++i)
{
AggregateDataPtr aggregate_data = nullptr;
if (!no_more_keys)
{
auto emplace_result = state.emplaceKey(data, i, *aggregates_pool);
auto emplace_result = state.emplaceKey(data, i, *arena_for_keys); // NOLINT
if (emplace_result.isInserted())
{
emplace_result.setMapped(nullptr);
@ -2674,7 +2678,7 @@ void NO_INLINE Aggregator::mergeStreamsImplCase(
}
else
{
auto find_result = state.findKey(data, i, *aggregates_pool);
auto find_result = state.findKey(data, i, *arena_for_keys);
if (find_result.isFound())
aggregate_data = find_result.getMapped();
}
@ -2703,21 +2707,14 @@ void NO_INLINE Aggregator::mergeStreamsImpl(
Method & method,
Table & data,
AggregateDataPtr overflow_row,
bool no_more_keys) const
bool no_more_keys,
Arena * arena_for_keys) const
{
const AggregateColumnsConstData & aggregate_columns_data = params.makeAggregateColumnsData(block);
const ColumnRawPtrs & key_columns = params.makeRawKeyColumns(block);
mergeStreamsImpl<Method, Table>(
aggregates_pool,
method,
data,
overflow_row,
no_more_keys,
0,
block.rows(),
aggregate_columns_data,
key_columns);
aggregates_pool, method, data, overflow_row, no_more_keys, 0, block.rows(), aggregate_columns_data, key_columns, arena_for_keys);
}
template <typename Method, typename Table>
@ -2730,12 +2727,15 @@ void NO_INLINE Aggregator::mergeStreamsImpl(
size_t row_begin,
size_t row_end,
const AggregateColumnsConstData & aggregate_columns_data,
const ColumnRawPtrs & key_columns) const
const ColumnRawPtrs & key_columns,
Arena * arena_for_keys) const
{
if (!no_more_keys)
mergeStreamsImplCase<false>(aggregates_pool, method, data, overflow_row, row_begin, row_end, aggregate_columns_data, key_columns);
mergeStreamsImplCase<false>(
aggregates_pool, method, data, overflow_row, row_begin, row_end, aggregate_columns_data, key_columns, arena_for_keys);
else
mergeStreamsImplCase<true>(aggregates_pool, method, data, overflow_row, row_begin, row_end, aggregate_columns_data, key_columns);
mergeStreamsImplCase<true>(
aggregates_pool, method, data, overflow_row, row_begin, row_end, aggregate_columns_data, key_columns, arena_for_keys);
}
@ -3015,17 +3015,26 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
result.keys_size = params.keys_size;
result.key_sizes = key_sizes;
size_t source_rows = 0;
/// In some aggregation methods (e.g. serialized) aggregates pools are used also to store serialized aggregation keys.
/// Memory occupied by them will have the same lifetime as aggregate function states, while it is not actually necessary and leads to excessive memory consumption.
/// To avoid this we use a separate arena to allocate memory for aggregation keys. Its memory will be freed at this function return.
auto arena_for_keys = std::make_shared<Arena>();
for (Block & block : blocks)
{
source_rows += block.rows();
if (bucket_num >= 0 && block.info.bucket_num != bucket_num)
bucket_num = -1;
if (result.type == AggregatedDataVariants::Type::without_key || is_overflows)
mergeBlockWithoutKeyStreamsImpl(std::move(block), result);
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
mergeStreamsImpl(std::move(block), result.aggregates_pool, *result.NAME, result.NAME->data, nullptr, false);
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
mergeStreamsImpl(std::move(block), result.aggregates_pool, *result.NAME, result.NAME->data, nullptr, false, arena_for_keys.get());
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
@ -3049,9 +3058,15 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
size_t rows = block.rows();
size_t bytes = block.bytes();
double elapsed_seconds = watch.elapsedSeconds();
LOG_DEBUG(log, "Merged partially aggregated blocks. {} rows, {}. in {} sec. ({:.3f} rows/sec., {}/sec.)",
rows, ReadableSize(bytes),
elapsed_seconds, rows / elapsed_seconds,
LOG_DEBUG(
log,
"Merged partially aggregated blocks for bucket #{}. Got {} rows, {} from {} source rows in {} sec. ({:.3f} rows/sec., {}/sec.)",
bucket_num,
rows,
ReadableSize(bytes),
source_rows,
elapsed_seconds,
rows / elapsed_seconds,
ReadableSize(bytes / elapsed_seconds));
block.info.bucket_num = bucket_num;

View File

@ -1348,8 +1348,11 @@ private:
size_t row_begin,
size_t row_end,
const AggregateColumnsConstData & aggregate_columns_data,
const ColumnRawPtrs & key_columns) const;
const ColumnRawPtrs & key_columns,
Arena * arena_for_keys) const;
/// `arena_for_keys` used to store serialized aggregation keys (in methods like `serialized`) to save some space.
/// If not provided, aggregates_pool is used instead. Refer to mergeBlocks() for an usage example.
template <typename Method, typename Table>
void mergeStreamsImpl(
Block block,
@ -1357,7 +1360,9 @@ private:
Method & method,
Table & data,
AggregateDataPtr overflow_row,
bool no_more_keys) const;
bool no_more_keys,
Arena * arena_for_keys = nullptr) const;
template <typename Method, typename Table>
void mergeStreamsImpl(
Arena * aggregates_pool,
@ -1368,7 +1373,8 @@ private:
size_t row_begin,
size_t row_end,
const AggregateColumnsConstData & aggregate_columns_data,
const ColumnRawPtrs & key_columns) const;
const ColumnRawPtrs & key_columns,
Arena * arena_for_keys) const;
void mergeBlockWithoutKeyStreamsImpl(
Block block,

View File

@ -1,9 +1,10 @@
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
#include <limits>
#include <Interpreters/Aggregator.h>
#include <Processors/ISimpleTransform.h>
#include <Processors/ResizeProcessor.h>
#include <Processors/Transforms/AggregatingInOrderTransform.h>
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
#include <QueryPipeline/Pipe.h>
#include <Interpreters/Aggregator.h>
namespace DB
{
@ -367,7 +368,7 @@ SortingAggregatedTransform::SortingAggregatedTransform(size_t num_inputs_, Aggre
: IProcessor(InputPorts(num_inputs_, params_->getHeader()), {params_->getHeader()})
, num_inputs(num_inputs_)
, params(std::move(params_))
, last_bucket_number(num_inputs, -1)
, last_bucket_number(num_inputs, std::numeric_limits<Int32>::min())
, is_input_finished(num_inputs, false)
{
}
@ -462,7 +463,13 @@ IProcessor::Status SortingAggregatedTransform::prepare()
continue;
}
//all_finished = false;
/// We want to keep not more than `num_inputs` buckets in memory (and there will be only a single chunk with the given `bucket_id`).
const bool bucket_from_this_input_still_in_memory = chunks.contains(last_bucket_number[input_num]);
if (bucket_from_this_input_still_in_memory)
{
all_finished = false;
continue;
}
in->setNeeded();

View File

@ -1,2 +1,22 @@
49999995000000 10000000
499999500000 1000000 15
100033 2
100034 2
100035 2
100036 2
100037 2
100038 2
100039 2
10004 2
100040 2
100041 2
100033 2
100034 2
100035 2
100036 2
100037 2
100038 2
100039 2
10004 2
100040 2
100041 2

View File

@ -7,3 +7,18 @@ SET group_by_two_level_threshold_bytes = 50000000;
SELECT sum(k), sum(c) FROM (SELECT number AS k, count() AS c FROM (SELECT * FROM system.numbers LIMIT 10000000) GROUP BY k);
SELECT sum(k), sum(c), max(u) FROM (SELECT number AS k, count() AS c, uniqArray(range(number % 16)) AS u FROM (SELECT * FROM system.numbers LIMIT 1000000) GROUP BY k);
SET group_by_two_level_threshold = 100000;
SET max_bytes_before_external_group_by = '1Mi';
-- method: key_string & key_string_two_level
CREATE TABLE t_00284_str(s String) ENGINE = MergeTree() ORDER BY tuple();
INSERT INTO t_00284_str SELECT toString(number) FROM numbers_mt(1e6);
INSERT INTO t_00284_str SELECT toString(number) FROM numbers_mt(1e6);
SELECT s, count() FROM t_00284_str GROUP BY s ORDER BY s LIMIT 10 OFFSET 42;
-- method: low_cardinality_key_string & low_cardinality_key_string_two_level
CREATE TABLE t_00284_lc_str(s LowCardinality(String)) ENGINE = MergeTree() ORDER BY tuple();
INSERT INTO t_00284_lc_str SELECT toString(number) FROM numbers_mt(1e6);
INSERT INTO t_00284_lc_str SELECT toString(number) FROM numbers_mt(1e6);
SELECT s, count() FROM t_00284_lc_str GROUP BY s ORDER BY s LIMIT 10 OFFSET 42;

View File

@ -0,0 +1,23 @@
-- Tags: long, no-tsan, no-msan, no-asan, no-ubsan
create table t_2354_dist_with_external_aggr(a UInt64, b String, c FixedString(100)) engine = MergeTree order by tuple();
insert into t_2354_dist_with_external_aggr select number, toString(number) as s, toFixedString(s, 100) from numbers_mt(5e7);
set max_bytes_before_external_group_by = '2G',
max_threads = 16,
aggregation_memory_efficient_merge_threads = 16,
distributed_aggregation_memory_efficient = 1,
prefer_localhost_replica = 1,
group_by_two_level_threshold = 100000;
select a, b, c, sum(a) as s
from remote('127.0.0.{1,2}', currentDatabase(), t_2354_dist_with_external_aggr)
group by a, b, c
format Null;
system flush logs;
select memory_usage < 4 * 1024 * 1024 * 1024 -- whole aggregation state of local aggregation uncompressed is 5.8G
from system.query_log
where event_time >= now() - interval '15 minute' and type = 'QueryFinish' and is_initial_query and query like '%t_2354_dist_with_external_aggr%group_by%' and current_database = currentDatabase();