Merge pull request #34234 from azat/agg-leak

Fix memory leak in AggregatingInOrderTransform
This commit is contained in:
Maksim Kita 2022-02-09 12:42:04 +01:00 committed by GitHub
commit 9f29c977de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 35 additions and 12 deletions

View File

@ -855,12 +855,18 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl(
void NO_INLINE Aggregator::executeOnIntervalWithoutKeyImpl(
AggregatedDataWithoutKey & res,
AggregatedDataVariants & data_variants,
size_t row_begin,
size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena)
Arena * arena) const
{
/// `data_variants` will destroy the states of aggregate functions in the destructor
data_variants.aggregator = this;
data_variants.init(AggregatedDataVariants::Type::without_key);
AggregatedDataWithoutKey & res = data_variants.without_key;
/// Adding values
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
{
@ -1623,15 +1629,32 @@ Block Aggregator::prepareBlockAndFill(
}
void Aggregator::addSingleKeyToAggregateColumns(
const AggregatedDataVariants & data_variants,
AggregatedDataVariants & data_variants,
MutableColumns & aggregate_columns) const
{
const auto & data = data_variants.without_key;
for (size_t i = 0; i < params.aggregates_size; ++i)
auto & data = data_variants.without_key;
size_t i = 0;
try
{
auto & column_aggregate_func = assert_cast<ColumnAggregateFunction &>(*aggregate_columns[i]);
column_aggregate_func.getData().push_back(data + offsets_of_aggregate_states[i]);
for (i = 0; i < params.aggregates_size; ++i)
{
auto & column_aggregate_func = assert_cast<ColumnAggregateFunction &>(*aggregate_columns[i]);
column_aggregate_func.getData().push_back(data + offsets_of_aggregate_states[i]);
}
}
catch (...)
{
/// Rollback
for (size_t rollback_i = 0; rollback_i < i; ++rollback_i)
{
auto & column_aggregate_func = assert_cast<ColumnAggregateFunction &>(*aggregate_columns[rollback_i]);
column_aggregate_func.getData().pop_back();
}
throw;
}
data = nullptr;
}
void Aggregator::addArenasToAggregateColumns(

View File

@ -1138,12 +1138,12 @@ private:
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena) const;
static void executeOnIntervalWithoutKeyImpl(
AggregatedDataWithoutKey & res,
void executeOnIntervalWithoutKeyImpl(
AggregatedDataVariants & data_variants,
size_t row_begin,
size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena);
Arena * arena) const;
template <typename Method>
void writeToTemporaryFileImpl(
@ -1307,7 +1307,7 @@ private:
NestedColumnsHolder & nested_columns_holder) const;
void addSingleKeyToAggregateColumns(
const AggregatedDataVariants & data_variants,
AggregatedDataVariants & data_variants,
MutableColumns & aggregate_columns) const;
void addArenasToAggregateColumns(

View File

@ -121,7 +121,7 @@ void AggregatingInOrderTransform::consume(Chunk chunk)
/// Add data to aggr. state if interval is not empty. Empty when haven't found current key in new block.
if (key_begin != key_end)
params->aggregator.executeOnIntervalWithoutKeyImpl(variants.without_key, key_begin, key_end, aggregate_function_instructions.data(), variants.aggregates_pool);
params->aggregator.executeOnIntervalWithoutKeyImpl(variants, key_begin, key_end, aggregate_function_instructions.data(), variants.aggregates_pool);
current_memory_usage = getCurrentMemoryUsage() - initial_memory_usage;