diff --git a/src/Processors/Transforms/AggregatingInOrderTransform.cpp b/src/Processors/Transforms/AggregatingInOrderTransform.cpp index 7edeff65ec8..0db95bc3b20 100644 --- a/src/Processors/Transforms/AggregatingInOrderTransform.cpp +++ b/src/Processors/Transforms/AggregatingInOrderTransform.cpp @@ -4,6 +4,12 @@ namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + + AggregatingInOrderTransform::AggregatingInOrderTransform( Block header, AggregatingTransformParamsPtr params_, const SortDescription & group_by_description_, size_t res_block_size_) @@ -140,6 +146,24 @@ void AggregatingInOrderTransform::consume(Chunk chunk) block_end_reached = true; need_generate = true; cur_block_size = 0; + + /// Arenas cannot be destroyed here, since later, in FinalizingSimpleTransform + /// there will be finalizeChunk(), but even after + /// finalizeChunk() we cannot destroy arena, since some memory + /// from Arena still in use, so we attach it to the Chunk to + /// remove it once it will be consumed. + if (params->final) + { + if (variants.aggregates_pools.size() != 1) + throw Exception("Too much arenas", ErrorCodes::LOGICAL_ERROR); + + Arenas arenas(1, std::make_shared()); + std::swap(variants.aggregates_pools, arenas); + variants.aggregates_pool = variants.aggregates_pools.at(0).get(); + + chunk.setChunkInfo(std::make_shared(std::move(arenas))); + } + return; } diff --git a/src/Processors/Transforms/AggregatingTransform.h b/src/Processors/Transforms/AggregatingTransform.h index 235d01ebc77..ab18992151a 100644 --- a/src/Processors/Transforms/AggregatingTransform.h +++ b/src/Processors/Transforms/AggregatingTransform.h @@ -8,6 +8,15 @@ namespace DB { +class AggregatedArenasChunkInfo : public ChunkInfo +{ +public: + Arenas arenas; + AggregatedArenasChunkInfo(Arenas arenas_) + : arenas(std::move(arenas_)) + {} +}; + class AggregatedChunkInfo : public ChunkInfo { public: diff --git a/tests/queries/0_stateless/01513_optimize_aggregation_in_order_memory.reference b/tests/queries/0_stateless/01513_optimize_aggregation_in_order_memory.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/01513_optimize_aggregation_in_order_memory.sql b/tests/queries/0_stateless/01513_optimize_aggregation_in_order_memory.sql new file mode 100644 index 00000000000..38920262fba --- /dev/null +++ b/tests/queries/0_stateless/01513_optimize_aggregation_in_order_memory.sql @@ -0,0 +1,16 @@ +drop table if exists data_01513; +create table data_01513 (key String) engine=MergeTree() order by key; +-- 10e3 groups, 1e3 keys each +insert into data_01513 select number%10e3 from numbers(toUInt64(2e6)); +-- reduce number of parts to 1 +optimize table data_01513 final; + +-- this is enough to trigger non-reusable Chunk in Arena. +set max_memory_usage='500M'; +set max_threads=1; +set max_block_size=500; + +select key, groupArray(repeat('a', 200)), count() from data_01513 group by key format Null; -- { serverError 241; } +select key, groupArray(repeat('a', 200)), count() from data_01513 group by key format Null settings optimize_aggregation_in_order=1; +-- for WITH TOTALS previous groups should be kept. +select key, groupArray(repeat('a', 200)), count() from data_01513 group by key with totals format Null settings optimize_aggregation_in_order=1; -- { serverError 241; }