diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index 6e101005599..e59605890d3 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -1951,13 +1951,7 @@ void NO_INLINE Aggregator::convertToBlockImplNotFinal( }); } - -template -Block Aggregator::prepareBlockAndFill( - AggregatedDataVariants & data_variants, - bool final, - size_t rows, - Filler && filler) const +Aggregator::OutputBlockColumns Aggregator::prepareOutputBlockColumns(Arenas & aggregates_pools, bool final, size_t rows) const { MutableColumns key_columns(params.keys_size); MutableColumns aggregate_columns(params.aggregates_size); @@ -1982,7 +1976,7 @@ Block Aggregator::prepareBlockAndFill( /// The ColumnAggregateFunction column captures the shared ownership of the arena with the aggregate function states. ColumnAggregateFunction & column_aggregate_func = assert_cast(*aggregate_columns[i]); - for (auto & pool : data_variants.aggregates_pools) + for (auto & pool : aggregates_pools) column_aggregate_func.addArena(pool); aggregate_columns_data[i] = &column_aggregate_func.getData(); @@ -1997,22 +1991,49 @@ Block Aggregator::prepareBlockAndFill( { /// The ColumnAggregateFunction column captures the shared ownership of the arena with aggregate function states. if (auto * column_aggregate_func = typeid_cast(final_aggregate_columns[i].get())) - for (auto & pool : data_variants.aggregates_pools) + for (auto & pool : aggregates_pools) column_aggregate_func->addArena(pool); /// Aggregate state can be wrapped into array if aggregate function ends with -Resample combinator. - final_aggregate_columns[i]->forEachSubcolumn([&data_variants](auto & subcolumn) - { - if (auto * column_aggregate_func = typeid_cast(subcolumn.get())) - for (auto & pool : data_variants.aggregates_pools) - column_aggregate_func->addArena(pool); - }); + final_aggregate_columns[i]->forEachSubcolumn( + [&aggregates_pools](auto & subcolumn) + { + if (auto * column_aggregate_func = typeid_cast(subcolumn.get())) + for (auto & pool : aggregates_pools) + column_aggregate_func->addArena(pool); + }); } } } + return { + .key_columns = std::move(key_columns), + .aggregate_columns = std::move(aggregate_columns), + .final_aggregate_columns = std::move(final_aggregate_columns), + .aggregate_columns_data = std::move(aggregate_columns_data), + }; +} + +template +Block Aggregator::prepareBlockAndFill( + AggregatedDataVariants & data_variants, + bool final, + size_t rows, + Filler && filler) const +{ + auto && out_cols = prepareOutputBlockColumns(data_variants.aggregates_pools, final, rows); + auto && [key_columns, aggregate_columns, final_aggregate_columns, aggregate_columns_data] = out_cols; + filler(key_columns, aggregate_columns_data, final_aggregate_columns, final); + return finalizeBlock(std::move(out_cols), final, rows); +} + +Block Aggregator::finalizeBlock(OutputBlockColumns && out_cols, bool final, size_t rows) const +{ + auto && [key_columns, aggregate_columns, final_aggregate_columns, aggregate_columns_data] = out_cols; + + Block res_header = getHeader(final); Block res = res_header.cloneEmpty(); for (size_t i = 0; i < params.keys_size; ++i) diff --git a/src/Interpreters/Aggregator.h b/src/Interpreters/Aggregator.h index 3e8b25c1a8c..6e37c5a63d4 100644 --- a/src/Interpreters/Aggregator.h +++ b/src/Interpreters/Aggregator.h @@ -1306,6 +1306,25 @@ private: std::vector key_columns, AggregateColumnsData & aggregate_columns) const; + struct OutputBlockColumns + { + /*OutputBlockColumns(size_t keys_size, size_t aggregates_size) + : key_columns(keys_size) + , aggregate_columns(aggregates_size) + , final_aggregate_columns(aggregates_size) + , aggregate_columns_data(aggregates_size) + { + }*/ + + MutableColumns key_columns; + MutableColumns aggregate_columns; + MutableColumns final_aggregate_columns; + AggregateColumnsData aggregate_columns_data; + }; + + OutputBlockColumns prepareOutputBlockColumns(Arenas & aggregates_pools, bool final, size_t rows) const; + Block finalizeBlock(OutputBlockColumns && out_cols, bool final, size_t rows) const; + template Block prepareBlockAndFill( AggregatedDataVariants & data_variants,