Fix memory leak in AggregatingInOrderTransform

Reproducer:

    # NOTE: we need clickhouse from 33957 since right now LSan is broken due to getauxval().
    $ url=https://s3.amazonaws.com/clickhouse-builds/33957/e04b862673644d313712607a0078f5d1c48b5377/package_asan/clickhouse
    $ wget $url -o clickhouse-asan
    $ chmod +x clickhouse-asan
    $ ./clickhouse-asan server &

    $ ./clickhouse-asan client
    :) create table data (key Int, value String) engine=MergeTree() order by key
    :) insert into data select number%5, toString(number) from numbers(10e6)

    # usually it is enough one query, benchmark is just for stability of the results
    # note, that if the exception was not happen from AggregatingInOrderTransform then add --continue_on_errors and wait
    $ ./clickhouse-asan benchmark --query 'select key, uniqCombined64(value), groupArray(value) from data group by key' --optimize_aggregation_in_order=1 --memory_tracker_fault_probability=0.01, max_untracked_memory='2Mi'

LSan report:

    ==24595==ERROR: LeakSanitizer: detected memory leaks

    Direct leak of 3932160 byte(s) in 6 object(s) allocated from:
        0 0xcadba93 in realloc ()
        1 0xcc108d9 in Allocator<false, false>::realloc() obj-x86_64-linux-gnu/../src/Common/Allocator.h:134:30
        2 0xde19eae in void DB::PODArrayBase<>::realloc<DB::Arena*&>(unsigned long, DB::Arena*&) obj-x86_64-linux-gnu/../src/Common/PODArray.h:161:25
        3 0xde5f039 in void DB::PODArrayBase<>::reserveForNextSize<DB::Arena*&>(DB::Arena*&) obj-x86_64-linux-gnu/../src/Common/PODArray.h
        4 0xde5f039 in void DB::PODArray<>::push_back<>(DB::GroupArrayNodeString*&, DB::Arena*&) obj-x86_64-linux-gnu/../src/Common/PODArray.h:432:19
        5 0xde5f039 in DB::GroupArrayGeneralImpl<>::add() const obj-x86_64-linux-gnu/../src/AggregateFunctions/AggregateFunctionGroupArray.h:465:31
        6 0xde5f039 in DB::IAggregateFunctionHelper<>::addBatchSinglePlaceFromInterval() const obj-x86_64-linux-gnu/../src/AggregateFunctions/IAggregateFunction.h:481:53
        7 0x299df134 in DB::Aggregator::executeOnIntervalWithoutKeyImpl() obj-x86_64-linux-gnu/../src/Interpreters/Aggregator.cpp:869:31
        8 0x2ca75f7d in DB::AggregatingInOrderTransform::consume() obj-x86_64-linux-gnu/../src/Processors/Transforms/AggregatingInOrderTransform.cpp:124:13

    ...

    SUMMARY: AddressSanitizer: 4523184 byte(s) leaked in 12 allocation(s).

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2022-02-01 18:40:48 +03:00
parent 00330461d1
commit 4fa2ae76bc
3 changed files with 35 additions and 12 deletions

View File

@ -855,12 +855,18 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl(
void NO_INLINE Aggregator::executeOnIntervalWithoutKeyImpl(
AggregatedDataWithoutKey & res,
AggregatedDataVariants & data_variants,
size_t row_begin,
size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena)
Arena * arena) const
{
/// `data_variants` will destroy the states of aggregate functions in the destructor
data_variants.aggregator = this;
data_variants.init(AggregatedDataVariants::Type::without_key);
AggregatedDataWithoutKey & res = data_variants.without_key;
/// Adding values
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
{
@ -1623,15 +1629,32 @@ Block Aggregator::prepareBlockAndFill(
}
void Aggregator::addSingleKeyToAggregateColumns(
const AggregatedDataVariants & data_variants,
AggregatedDataVariants & data_variants,
MutableColumns & aggregate_columns) const
{
const auto & data = data_variants.without_key;
for (size_t i = 0; i < params.aggregates_size; ++i)
auto & data = data_variants.without_key;
size_t i = 0;
try
{
auto & column_aggregate_func = assert_cast<ColumnAggregateFunction &>(*aggregate_columns[i]);
column_aggregate_func.getData().push_back(data + offsets_of_aggregate_states[i]);
for (i = 0; i < params.aggregates_size; ++i)
{
auto & column_aggregate_func = assert_cast<ColumnAggregateFunction &>(*aggregate_columns[i]);
column_aggregate_func.getData().push_back(data + offsets_of_aggregate_states[i]);
}
}
catch (...)
{
/// Rollback
for (size_t rollback_i = 0; rollback_i < i; ++rollback_i)
{
auto & column_aggregate_func = assert_cast<ColumnAggregateFunction &>(*aggregate_columns[rollback_i]);
column_aggregate_func.getData().pop_back();
}
throw;
}
data = nullptr;
}
void Aggregator::addArenasToAggregateColumns(

View File

@ -1138,12 +1138,12 @@ private:
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena) const;
static void executeOnIntervalWithoutKeyImpl(
AggregatedDataWithoutKey & res,
void executeOnIntervalWithoutKeyImpl(
AggregatedDataVariants & data_variants,
size_t row_begin,
size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena);
Arena * arena) const;
template <typename Method>
void writeToTemporaryFileImpl(
@ -1307,7 +1307,7 @@ private:
NestedColumnsHolder & nested_columns_holder) const;
void addSingleKeyToAggregateColumns(
const AggregatedDataVariants & data_variants,
AggregatedDataVariants & data_variants,
MutableColumns & aggregate_columns) const;
void addArenasToAggregateColumns(

View File

@ -121,7 +121,7 @@ void AggregatingInOrderTransform::consume(Chunk chunk)
/// Add data to aggr. state if interval is not empty. Empty when haven't found current key in new block.
if (key_begin != key_end)
params->aggregator.executeOnIntervalWithoutKeyImpl(variants.without_key, key_begin, key_end, aggregate_function_instructions.data(), variants.aggregates_pool);
params->aggregator.executeOnIntervalWithoutKeyImpl(variants, key_begin, key_end, aggregate_function_instructions.data(), variants.aggregates_pool);
current_memory_usage = getCurrentMemoryUsage() - initial_memory_usage;