Interpreters/Aggregator: cleaner interface for block release during merge

Suggested-by: @amosbird
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2022-06-15 23:45:38 +03:00
parent 3e833b403d
commit 0d4f78639e
2 changed files with 11 additions and 17 deletions

View File

@ -2694,7 +2694,7 @@ void NO_INLINE Aggregator::mergeStreamsImplCase(
template <typename Method, typename Table>
void NO_INLINE Aggregator::mergeStreamsImpl(
Block & block,
Block block,
Arena * aggregates_pool,
Method & method,
Table & data,
@ -2714,9 +2714,6 @@ void NO_INLINE Aggregator::mergeStreamsImpl(
block.rows(),
aggregate_columns_data,
key_columns);
/// Early release memory.
block.clear();
}
template <typename Method, typename Table>
@ -2739,14 +2736,11 @@ void NO_INLINE Aggregator::mergeStreamsImpl(
void NO_INLINE Aggregator::mergeBlockWithoutKeyStreamsImpl(
Block & block,
Block block,
AggregatedDataVariants & result) const
{
AggregateColumnsConstData aggregate_columns = params.makeAggregateColumnsData(block);
mergeWithoutKeyStreamsImpl(result, 0, block.rows(), aggregate_columns);
/// Early release memory.
block.clear();
}
void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl(
AggregatedDataVariants & result,
@ -2786,10 +2780,10 @@ bool Aggregator::mergeOnBlock(Block block, AggregatedDataVariants & result, bool
}
if (result.type == AggregatedDataVariants::Type::without_key || block.info.is_overflows)
mergeBlockWithoutKeyStreamsImpl(block, result);
mergeBlockWithoutKeyStreamsImpl(std::move(block), result);
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
mergeStreamsImpl(block, result.aggregates_pool, *result.NAME, result.NAME->data, result.without_key, no_more_keys);
mergeStreamsImpl(std::move(block), result.aggregates_pool, *result.NAME, result.NAME->data, result.without_key, no_more_keys);
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
@ -2905,7 +2899,7 @@ void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVari
{
#define M(NAME) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
mergeStreamsImpl(block, aggregates_pool, *result.NAME, result.NAME->data.impls[bucket], nullptr, false);
mergeStreamsImpl(std::move(block), aggregates_pool, *result.NAME, result.NAME->data.impls[bucket], nullptr, false);
if (false) {} // NOLINT
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
@ -2956,11 +2950,11 @@ void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVari
break;
if (result.type == AggregatedDataVariants::Type::without_key || block.info.is_overflows)
mergeBlockWithoutKeyStreamsImpl(block, result);
mergeBlockWithoutKeyStreamsImpl(std::move(block), result);
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
mergeStreamsImpl(block, result.aggregates_pool, *result.NAME, result.NAME->data, result.without_key, no_more_keys);
mergeStreamsImpl(std::move(block), result.aggregates_pool, *result.NAME, result.NAME->data, result.without_key, no_more_keys);
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
@ -3023,11 +3017,11 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
bucket_num = -1;
if (result.type == AggregatedDataVariants::Type::without_key || is_overflows)
mergeBlockWithoutKeyStreamsImpl(block, result);
mergeBlockWithoutKeyStreamsImpl(std::move(block), result);
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
mergeStreamsImpl(block, result.aggregates_pool, *result.NAME, result.NAME->data, nullptr, false);
mergeStreamsImpl(std::move(block), result.aggregates_pool, *result.NAME, result.NAME->data, nullptr, false);
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M

View File

@ -1365,7 +1365,7 @@ private:
template <typename Method, typename Table>
void mergeStreamsImpl(
Block & block,
Block block,
Arena * aggregates_pool,
Method & method,
Table & data,
@ -1384,7 +1384,7 @@ private:
const ColumnRawPtrs & key_columns) const;
void mergeBlockWithoutKeyStreamsImpl(
Block & block,
Block block,
AggregatedDataVariants & result) const;
void mergeWithoutKeyStreamsImpl(
AggregatedDataVariants & result,