Merge pull request #57691 from ClickHouse/reset-agg-state-whenever-possible

Release memory for aggregation earlier.
This commit is contained in:
Nikolai Kochetov 2023-12-12 15:36:03 +01:00 committed by GitHub
commit 23ef8444d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 50 additions and 17 deletions

View File

@ -123,7 +123,10 @@ protected:
UInt32 bucket_num = shared_data->next_bucket_to_merge.fetch_add(1); UInt32 bucket_num = shared_data->next_bucket_to_merge.fetch_add(1);
if (bucket_num >= NUM_BUCKETS) if (bucket_num >= NUM_BUCKETS)
{
data.reset();
return {}; return {};
}
Block block = params->aggregator.mergeAndConvertOneBucketToBlock(*data, arena, params->final, bucket_num, &shared_data->is_cancelled); Block block = params->aggregator.mergeAndConvertOneBucketToBlock(*data, arena, params->final, bucket_num, &shared_data->is_cancelled);
Chunk chunk = convertToChunk(block); Chunk chunk = convertToChunk(block);
@ -170,6 +173,8 @@ protected:
return convertToChunk(block); return convertToChunk(block);
} }
variant.reset();
return {}; return {};
} }
@ -400,26 +405,28 @@ private:
} }
} }
if (!shared_data->is_bucket_processed[current_bucket_num]) while (current_bucket_num < NUM_BUCKETS)
return Status::NeedData;
if (!two_level_chunks[current_bucket_num])
return Status::NeedData;
auto chunk = std::move(two_level_chunks[current_bucket_num]);
const auto has_rows = chunk.hasRows();
if (has_rows)
output.push(std::move(chunk));
++current_bucket_num;
if (current_bucket_num == NUM_BUCKETS)
{ {
output.finish(); if (!shared_data->is_bucket_processed[current_bucket_num])
/// Do not close inputs, they must be finished. return Status::NeedData;
return Status::Finished;
if (!two_level_chunks[current_bucket_num])
return Status::NeedData;
auto chunk = std::move(two_level_chunks[current_bucket_num]);
++current_bucket_num;
const auto has_rows = chunk.hasRows();
if (has_rows)
{
output.push(std::move(chunk));
return Status::PortFull;
}
} }
return has_rows ? Status::PortFull : Status::Ready; output.finish();
/// Do not close inputs, they must be finished.
return Status::Finished;
} }
AggregatingTransformParamsPtr params; AggregatingTransformParamsPtr params;
@ -489,6 +496,7 @@ private:
single_level_chunks.emplace_back(convertToChunk(block)); single_level_chunks.emplace_back(convertToChunk(block));
finished = true; finished = true;
data.reset();
} }
void createSources() void createSources()
@ -504,6 +512,8 @@ private:
processors.emplace_back(std::move(source)); processors.emplace_back(std::move(source));
} }
data.reset();
} }
}; };
@ -710,7 +720,12 @@ void AggregatingTransform::initGenerate()
} }
if (many_data->num_finished.fetch_add(1) + 1 < many_data->variants.size()) if (many_data->num_finished.fetch_add(1) + 1 < many_data->variants.size())
{
/// Note: we reset aggregation state here to release memory earlier.
/// It might cause extra memory usage for complex queries othervise.
many_data.reset();
return; return;
}
if (!params->aggregator.hasTemporaryData()) if (!params->aggregator.hasTemporaryData())
{ {
@ -807,6 +822,8 @@ void AggregatingTransform::initGenerate()
processors = Pipe::detachProcessors(std::move(pipe)); processors = Pipe::detachProcessors(std::move(pipe));
} }
many_data.reset();
} }
} }

View File

@ -0,0 +1,3 @@
Spin up a long running query
1 1 1 1 1
0

View File

@ -0,0 +1,13 @@
#!/usr/bin/env bash
# Tags: long, no-random-settings
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
query_id="group-by-mem-usage-$CLICKHOUSE_DATABASE"
echo "Spin up a long running query"
${CLICKHOUSE_CLIENT} --query "with q as (select length(groupArray(toString(number))) as x from numbers_mt(2e6) group by number order by x limit 1), q1 as (select * from q), q2 as (select * from q), q3 as (select * from q), q4 as (select * from q) select * from q, q1, q2, q3, q4 settings max_bytes_before_external_group_by='1G', max_memory_usage='2G'" --query_id "$query_id"
${CLICKHOUSE_CLIENT} --query "system flush logs"
${CLICKHOUSE_CLIENT} --query "select ProfileEvents['ExternalAggregationWritePart'] from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = '$query_id' and event_date >= today() - 1"