diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index e7f6f16b91d..ec4a2f2ba9d 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -1016,6 +1016,8 @@ void NO_INLINE Aggregator::convertToBlockImplFinal( aggregate_functions[i]->insertResultInto( data.getNullKeyData() + offsets_of_aggregate_states[i], *final_aggregate_columns[i]); + + data.getNullKeyData() = nullptr; } } @@ -1023,13 +1025,65 @@ void NO_INLINE Aggregator::convertToBlockImplFinal( { method.insertKeyIntoColumns(key, key_columns, key_sizes); - for (size_t i = 0; i < params.aggregates_size; ++i) - aggregate_functions[i]->insertResultInto( - mapped + offsets_of_aggregate_states[i], - *final_aggregate_columns[i]); - }); + /** Final values of aggregate functions are inserted to columns. + * Then states of aggregate functions, that are not longer needed, are destroyed. + * + * We mark already destroyed states with "nullptr" in data, + * so they will not be destroyed in destructor of Aggregator + * (other values will be destroyed in destructor in case of exception). + * + * But it becomes tricky, because we have multiple aggregate states pointed by a single pointer in data. + * So, if exception is thrown in the middle of moving states for different aggregate functions, + * we have to catch exceptions and destroy all the states that are no longer needed, + * to keep the data in consistent state. + * + * It is also tricky, because there are aggregate functions with "-State" modifier. + * When we call "insertResultInto" for them, they insert a pointer to the state to ColumnAggregateFunction + * and ColumnAggregateFunction will take ownership of this state. + * So, for aggregate functions with "-State" modifier, the state must not be destroyed + * after it has been transferred to ColumnAggregateFunction. + * But we should mark that the data no longer owns these states. + */ - destroyImpl(data); + size_t insert_i = 0; + std::exception_ptr exception; + + try + { + /// Insert final values of aggregate functions into columns. + for (; insert_i < params.aggregates_size; ++insert_i) + aggregate_functions[insert_i]->insertResultInto( + mapped + offsets_of_aggregate_states[insert_i], + *final_aggregate_columns[insert_i]); + } + catch (...) + { + exception = std::current_exception(); + } + + /** Destroy states that are no longer needed. This loop does not throw. + * + * Don't destroy states for "-State" aggregate functions, + * because the ownership of this state is transferred to ColumnAggregateFunction + * and ColumnAggregateFunction will take care. + * + * But it's only for states that has been transferred to ColumnAggregateFunction + * before exception has been thrown; + */ + for (size_t destroy_i = 0; destroy_i < params.aggregates_size; ++destroy_i) + { + /// If ownership was not transferred to ColumnAggregateFunction. + if (!(destroy_i < insert_i && aggregate_functions[destroy_i]->isState())) + aggregate_functions[destroy_i]->destroy( + mapped + offsets_of_aggregate_states[destroy_i]); + } + + /// Mark the cell as destroyed so it will not be destroyed in destructor. + mapped = nullptr; + + if (exception) + std::rethrow_exception(exception); + }); } template @@ -1047,6 +1101,8 @@ void NO_INLINE Aggregator::convertToBlockImplNotFinal( for (size_t i = 0; i < params.aggregates_size; ++i) aggregate_columns[i]->push_back(data.getNullKeyData() + offsets_of_aggregate_states[i]); + + data.getNullKeyData() = nullptr; } } @@ -2387,8 +2443,7 @@ void NO_INLINE Aggregator::destroyImpl(Table & table) const return; for (size_t i = 0; i < params.aggregates_size; ++i) - if (!aggregate_functions[i]->isState()) - aggregate_functions[i]->destroy(data + offsets_of_aggregate_states[i]); + aggregate_functions[i]->destroy(data + offsets_of_aggregate_states[i]); data = nullptr; }); @@ -2402,8 +2457,7 @@ void Aggregator::destroyWithoutKey(AggregatedDataVariants & result) const if (nullptr != res_data) { for (size_t i = 0; i < params.aggregates_size; ++i) - if (!aggregate_functions[i]->isState()) - aggregate_functions[i]->destroy(res_data + offsets_of_aggregate_states[i]); + aggregate_functions[i]->destroy(res_data + offsets_of_aggregate_states[i]); res_data = nullptr; }