Release memory for aggregation earlier.

This commit is contained in:
Nikolai Kochetov 2023-12-08 18:18:45 +00:00
parent 30148972ed
commit ff0340b0fc
3 changed files with 29 additions and 0 deletions

View File

@ -123,7 +123,10 @@ protected:
UInt32 bucket_num = shared_data->next_bucket_to_merge.fetch_add(1); UInt32 bucket_num = shared_data->next_bucket_to_merge.fetch_add(1);
if (bucket_num >= NUM_BUCKETS) if (bucket_num >= NUM_BUCKETS)
{
data.reset();
return {}; return {};
}
Block block = params->aggregator.mergeAndConvertOneBucketToBlock(*data, arena, params->final, bucket_num, &shared_data->is_cancelled); Block block = params->aggregator.mergeAndConvertOneBucketToBlock(*data, arena, params->final, bucket_num, &shared_data->is_cancelled);
Chunk chunk = convertToChunk(block); Chunk chunk = convertToChunk(block);
@ -170,6 +173,8 @@ protected:
return convertToChunk(block); return convertToChunk(block);
} }
variant.reset();
return {}; return {};
} }
@ -489,6 +494,7 @@ private:
single_level_chunks.emplace_back(convertToChunk(block)); single_level_chunks.emplace_back(convertToChunk(block));
finished = true; finished = true;
data.reset();
} }
void createSources() void createSources()
@ -504,6 +510,8 @@ private:
processors.emplace_back(std::move(source)); processors.emplace_back(std::move(source));
} }
data.reset();
} }
}; };
@ -710,7 +718,10 @@ void AggregatingTransform::initGenerate()
} }
if (many_data->num_finished.fetch_add(1) + 1 < many_data->variants.size()) if (many_data->num_finished.fetch_add(1) + 1 < many_data->variants.size())
{
many_data.reset();
return; return;
}
if (!params->aggregator.hasTemporaryData()) if (!params->aggregator.hasTemporaryData())
{ {
@ -807,6 +818,8 @@ void AggregatingTransform::initGenerate()
processors = Pipe::detachProcessors(std::move(pipe)); processors = Pipe::detachProcessors(std::move(pipe));
} }
many_data.reset();
} }
} }

View File

@ -0,0 +1,3 @@
Spin up a long running query
1 1 1 1 1
0

View File

@ -0,0 +1,13 @@
#!/usr/bin/env bash
# Tags: long, no-random-settings
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
query_id="group-by-mem-usage-$CLICKHOUSE_DATABASE"
echo "Spin up a long running query"
${CLICKHOUSE_CLIENT} --query "with q as (select length(groupArray(toString(number))) as x from numbers_mt(2e6) group by number order by x limit 1), q1 as (select * from q), q2 as (select * from q), q3 as (select * from q), q4 as (select * from q) select * from q, q1, q2, q3, q4 settings max_bytes_before_external_group_by='1G', max_memory_usage='2G'" --query_id "$query_id"
${CLICKHOUSE_CLIENT} --query "system flush logs"
${CLICKHOUSE_CLIENT} --query "select ProfileEvents['ExternalAggregationWritePart'] from system.query_log where type = 'QueryFinish' and query_id = '$query_id' and event_date >= today() - 1"