Merge pull request #11496 from ClickHouse/fix-aggregation-state-exception-safety-memory-leak

Fix memory leak when exception is thrown in the middle of aggregation with -State functions
This commit is contained in:
alexey-milovidov 2020-06-08 22:03:52 +03:00 committed by GitHub
commit 2bff9b4607
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 122 additions and 24 deletions

View File

@ -999,6 +999,73 @@ void Aggregator::convertToBlockImpl(
data.clearAndShrink();
}
template <typename Mapped>
inline void Aggregator::insertAggregatesIntoColumns(
Mapped & mapped,
MutableColumns & final_aggregate_columns) const
{
/** Final values of aggregate functions are inserted to columns.
* Then states of aggregate functions, that are not longer needed, are destroyed.
*
* We mark already destroyed states with "nullptr" in data,
* so they will not be destroyed in destructor of Aggregator
* (other values will be destroyed in destructor in case of exception).
*
* But it becomes tricky, because we have multiple aggregate states pointed by a single pointer in data.
* So, if exception is thrown in the middle of moving states for different aggregate functions,
* we have to catch exceptions and destroy all the states that are no longer needed,
* to keep the data in consistent state.
*
* It is also tricky, because there are aggregate functions with "-State" modifier.
* When we call "insertResultInto" for them, they insert a pointer to the state to ColumnAggregateFunction
* and ColumnAggregateFunction will take ownership of this state.
* So, for aggregate functions with "-State" modifier, the state must not be destroyed
* after it has been transferred to ColumnAggregateFunction.
* But we should mark that the data no longer owns these states.
*/
size_t insert_i = 0;
std::exception_ptr exception;
try
{
/// Insert final values of aggregate functions into columns.
for (; insert_i < params.aggregates_size; ++insert_i)
aggregate_functions[insert_i]->insertResultInto(
mapped + offsets_of_aggregate_states[insert_i],
*final_aggregate_columns[insert_i]);
}
catch (...)
{
exception = std::current_exception();
}
/** Destroy states that are no longer needed. This loop does not throw.
*
* Don't destroy states for "-State" aggregate functions,
* because the ownership of this state is transferred to ColumnAggregateFunction
* and ColumnAggregateFunction will take care.
*
* But it's only for states that has been transferred to ColumnAggregateFunction
* before exception has been thrown;
*/
for (size_t destroy_i = 0; destroy_i < params.aggregates_size; ++destroy_i)
{
/// If ownership was not transferred to ColumnAggregateFunction.
if (!(destroy_i < insert_i && aggregate_functions[destroy_i]->isState()))
aggregate_functions[destroy_i]->destroy(
mapped + offsets_of_aggregate_states[destroy_i]);
}
/// Mark the cell as destroyed so it will not be destroyed in destructor.
mapped = nullptr;
if (exception)
std::rethrow_exception(exception);
}
template <typename Method, typename Table>
void NO_INLINE Aggregator::convertToBlockImplFinal(
Method & method,
@ -1011,25 +1078,15 @@ void NO_INLINE Aggregator::convertToBlockImplFinal(
if (data.hasNullKeyData())
{
key_columns[0]->insertDefault();
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->insertResultInto(
data.getNullKeyData() + offsets_of_aggregate_states[i],
*final_aggregate_columns[i]);
insertAggregatesIntoColumns(data.getNullKeyData(), final_aggregate_columns);
}
}
data.forEachValue([&](const auto & key, auto & mapped)
{
method.insertKeyIntoColumns(key, key_columns, key_sizes);
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->insertResultInto(
mapped + offsets_of_aggregate_states[i],
*final_aggregate_columns[i]);
insertAggregatesIntoColumns(mapped, final_aggregate_columns);
});
destroyImpl<Method>(data);
}
template <typename Method, typename Table>
@ -1047,6 +1104,8 @@ void NO_INLINE Aggregator::convertToBlockImplNotFinal(
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_columns[i]->push_back(data.getNullKeyData() + offsets_of_aggregate_states[i]);
data.getNullKeyData() = nullptr;
}
}
@ -1187,16 +1246,16 @@ Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_va
{
AggregatedDataWithoutKey & data = data_variants.without_key;
for (size_t i = 0; i < params.aggregates_size; ++i)
{
if (!final_)
aggregate_columns[i]->push_back(data + offsets_of_aggregate_states[i]);
else
aggregate_functions[i]->insertResultInto(data + offsets_of_aggregate_states[i], *final_aggregate_columns[i]);
}
if (!final_)
{
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_columns[i]->push_back(data + offsets_of_aggregate_states[i]);
data = nullptr;
}
else
{
insertAggregatesIntoColumns(data, final_aggregate_columns);
}
if (params.overflow_row)
for (size_t i = 0; i < params.keys_size; ++i)
@ -2387,8 +2446,7 @@ void NO_INLINE Aggregator::destroyImpl(Table & table) const
return;
for (size_t i = 0; i < params.aggregates_size; ++i)
if (!aggregate_functions[i]->isState())
aggregate_functions[i]->destroy(data + offsets_of_aggregate_states[i]);
aggregate_functions[i]->destroy(data + offsets_of_aggregate_states[i]);
data = nullptr;
});
@ -2402,8 +2460,7 @@ void Aggregator::destroyWithoutKey(AggregatedDataVariants & result) const
if (nullptr != res_data)
{
for (size_t i = 0; i < params.aggregates_size; ++i)
if (!aggregate_functions[i]->isState())
aggregate_functions[i]->destroy(res_data + offsets_of_aggregate_states[i]);
aggregate_functions[i]->destroy(res_data + offsets_of_aggregate_states[i]);
res_data = nullptr;
}

View File

@ -1166,6 +1166,11 @@ protected:
MutableColumns & final_aggregate_columns,
bool final) const;
template <typename Mapped>
void insertAggregatesIntoColumns(
Mapped & mapped,
MutableColumns & final_aggregate_columns) const;
template <typename Method, typename Table>
void convertToBlockImplFinal(
Method & method,

View File

@ -0,0 +1,2 @@
Memory limit (for query) exceeded
Ok

View File

@ -0,0 +1,17 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
function test()
{
for i in {1..1000}; do
$CLICKHOUSE_CLIENT --max_memory_usage 1G <<< "SELECT uniqExactState(number) FROM system.numbers_mt GROUP BY number % 10";
done
}
export -f test;
# If the memory leak exists, it will lead to OOM fairly quickly.
timeout 30 bash -c test 2>&1 | grep -o -F 'Memory limit (for query) exceeded' | uniq
echo 'Ok'

View File

@ -0,0 +1,16 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
function test()
{
for i in {1..250}; do
$CLICKHOUSE_CLIENT --query "SELECT groupArrayIfState(('Hello, world' AS s) || s || s || s || s || s || s || s || s || s, NOT throwIf(number > 10000000, 'Ok')) FROM system.numbers_mt GROUP BY number % 10";
done
}
export -f test;
# If the memory leak exists, it will lead to OOM fairly quickly.
timeout 30 bash -c test 2>&1 | grep -o -F 'Ok' | uniq