split prepareBlockAndFill

This commit is contained in:
Nikita Taranov 2022-07-08 21:24:33 +02:00
parent 18c3ae5b37
commit bca28ba9f8
2 changed files with 55 additions and 15 deletions

View File

@ -1951,13 +1951,7 @@ void NO_INLINE Aggregator::convertToBlockImplNotFinal(
});
}
template <typename Filler>
Block Aggregator::prepareBlockAndFill(
AggregatedDataVariants & data_variants,
bool final,
size_t rows,
Filler && filler) const
Aggregator::OutputBlockColumns Aggregator::prepareOutputBlockColumns(Arenas & aggregates_pools, bool final, size_t rows) const
{
MutableColumns key_columns(params.keys_size);
MutableColumns aggregate_columns(params.aggregates_size);
@ -1982,7 +1976,7 @@ Block Aggregator::prepareBlockAndFill(
/// The ColumnAggregateFunction column captures the shared ownership of the arena with the aggregate function states.
ColumnAggregateFunction & column_aggregate_func = assert_cast<ColumnAggregateFunction &>(*aggregate_columns[i]);
for (auto & pool : data_variants.aggregates_pools)
for (auto & pool : aggregates_pools)
column_aggregate_func.addArena(pool);
aggregate_columns_data[i] = &column_aggregate_func.getData();
@ -1997,22 +1991,49 @@ Block Aggregator::prepareBlockAndFill(
{
/// The ColumnAggregateFunction column captures the shared ownership of the arena with aggregate function states.
if (auto * column_aggregate_func = typeid_cast<ColumnAggregateFunction *>(final_aggregate_columns[i].get()))
for (auto & pool : data_variants.aggregates_pools)
for (auto & pool : aggregates_pools)
column_aggregate_func->addArena(pool);
/// Aggregate state can be wrapped into array if aggregate function ends with -Resample combinator.
final_aggregate_columns[i]->forEachSubcolumn([&data_variants](auto & subcolumn)
final_aggregate_columns[i]->forEachSubcolumn(
[&aggregates_pools](auto & subcolumn)
{
if (auto * column_aggregate_func = typeid_cast<ColumnAggregateFunction *>(subcolumn.get()))
for (auto & pool : data_variants.aggregates_pools)
for (auto & pool : aggregates_pools)
column_aggregate_func->addArena(pool);
});
}
}
}
return {
.key_columns = std::move(key_columns),
.aggregate_columns = std::move(aggregate_columns),
.final_aggregate_columns = std::move(final_aggregate_columns),
.aggregate_columns_data = std::move(aggregate_columns_data),
};
}
template <typename Filler>
Block Aggregator::prepareBlockAndFill(
AggregatedDataVariants & data_variants,
bool final,
size_t rows,
Filler && filler) const
{
auto && out_cols = prepareOutputBlockColumns(data_variants.aggregates_pools, final, rows);
auto && [key_columns, aggregate_columns, final_aggregate_columns, aggregate_columns_data] = out_cols;
filler(key_columns, aggregate_columns_data, final_aggregate_columns, final);
return finalizeBlock(std::move(out_cols), final, rows);
}
Block Aggregator::finalizeBlock(OutputBlockColumns && out_cols, bool final, size_t rows) const
{
auto && [key_columns, aggregate_columns, final_aggregate_columns, aggregate_columns_data] = out_cols;
Block res_header = getHeader(final);
Block res = res_header.cloneEmpty();
for (size_t i = 0; i < params.keys_size; ++i)

View File

@ -1306,6 +1306,25 @@ private:
std::vector<IColumn *> key_columns,
AggregateColumnsData & aggregate_columns) const;
struct OutputBlockColumns
{
/*OutputBlockColumns(size_t keys_size, size_t aggregates_size)
: key_columns(keys_size)
, aggregate_columns(aggregates_size)
, final_aggregate_columns(aggregates_size)
, aggregate_columns_data(aggregates_size)
{
}*/
MutableColumns key_columns;
MutableColumns aggregate_columns;
MutableColumns final_aggregate_columns;
AggregateColumnsData aggregate_columns_data;
};
OutputBlockColumns prepareOutputBlockColumns(Arenas & aggregates_pools, bool final, size_t rows) const;
Block finalizeBlock(OutputBlockColumns && out_cols, bool final, size_t rows) const;
template <typename Filler>
Block prepareBlockAndFill(
AggregatedDataVariants & data_variants,