diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index c3fd8b8024a..a2f24a79e40 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -855,12 +855,18 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl( void NO_INLINE Aggregator::executeOnIntervalWithoutKeyImpl( - AggregatedDataWithoutKey & res, + AggregatedDataVariants & data_variants, size_t row_begin, size_t row_end, AggregateFunctionInstruction * aggregate_instructions, - Arena * arena) + Arena * arena) const { + /// `data_variants` will destroy the states of aggregate functions in the destructor + data_variants.aggregator = this; + data_variants.init(AggregatedDataVariants::Type::without_key); + + AggregatedDataWithoutKey & res = data_variants.without_key; + /// Adding values for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst) { @@ -1623,15 +1629,32 @@ Block Aggregator::prepareBlockAndFill( } void Aggregator::addSingleKeyToAggregateColumns( - const AggregatedDataVariants & data_variants, + AggregatedDataVariants & data_variants, MutableColumns & aggregate_columns) const { - const auto & data = data_variants.without_key; - for (size_t i = 0; i < params.aggregates_size; ++i) + auto & data = data_variants.without_key; + + size_t i = 0; + try { - auto & column_aggregate_func = assert_cast(*aggregate_columns[i]); - column_aggregate_func.getData().push_back(data + offsets_of_aggregate_states[i]); + for (i = 0; i < params.aggregates_size; ++i) + { + auto & column_aggregate_func = assert_cast(*aggregate_columns[i]); + column_aggregate_func.getData().push_back(data + offsets_of_aggregate_states[i]); + } } + catch (...) + { + /// Rollback + for (size_t rollback_i = 0; rollback_i < i; ++rollback_i) + { + auto & column_aggregate_func = assert_cast(*aggregate_columns[rollback_i]); + column_aggregate_func.getData().pop_back(); + } + throw; + } + + data = nullptr; } void Aggregator::addArenasToAggregateColumns( diff --git a/src/Interpreters/Aggregator.h b/src/Interpreters/Aggregator.h index c79c2c5ef64..05c9133cb35 100644 --- a/src/Interpreters/Aggregator.h +++ b/src/Interpreters/Aggregator.h @@ -1138,12 +1138,12 @@ private: AggregateFunctionInstruction * aggregate_instructions, Arena * arena) const; - static void executeOnIntervalWithoutKeyImpl( - AggregatedDataWithoutKey & res, + void executeOnIntervalWithoutKeyImpl( + AggregatedDataVariants & data_variants, size_t row_begin, size_t row_end, AggregateFunctionInstruction * aggregate_instructions, - Arena * arena); + Arena * arena) const; template void writeToTemporaryFileImpl( @@ -1307,7 +1307,7 @@ private: NestedColumnsHolder & nested_columns_holder) const; void addSingleKeyToAggregateColumns( - const AggregatedDataVariants & data_variants, + AggregatedDataVariants & data_variants, MutableColumns & aggregate_columns) const; void addArenasToAggregateColumns( diff --git a/src/Processors/Transforms/AggregatingInOrderTransform.cpp b/src/Processors/Transforms/AggregatingInOrderTransform.cpp index 857f362c4be..63497ea1af4 100644 --- a/src/Processors/Transforms/AggregatingInOrderTransform.cpp +++ b/src/Processors/Transforms/AggregatingInOrderTransform.cpp @@ -121,7 +121,7 @@ void AggregatingInOrderTransform::consume(Chunk chunk) /// Add data to aggr. state if interval is not empty. Empty when haven't found current key in new block. if (key_begin != key_end) - params->aggregator.executeOnIntervalWithoutKeyImpl(variants.without_key, key_begin, key_end, aggregate_function_instructions.data(), variants.aggregates_pool); + params->aggregator.executeOnIntervalWithoutKeyImpl(variants, key_begin, key_end, aggregate_function_instructions.data(), variants.aggregates_pool); current_memory_usage = getCurrentMemoryUsage() - initial_memory_usage;