From db0110fd7a1a62312ac7b93f67e4651573905def Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Tue, 12 Jul 2022 17:06:20 +0200 Subject: [PATCH] more accurate crutch --- src/Interpreters/Aggregator.cpp | 36 ++++++++++--------- src/Interpreters/Aggregator.h | 27 +++++++++----- .../AggregatingInOrderTransform.cpp | 6 ++-- .../Transforms/AggregatingTransform.cpp | 2 +- 4 files changed, 42 insertions(+), 29 deletions(-) diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index 063348f23a9..56f69c1f940 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -1578,9 +1578,8 @@ Block Aggregator::convertOneBucketToBlock( bool final, size_t bucket) const { - Block block = convertToBlockImpl( - method, method.data.impls[bucket], arena, data_variants.aggregates_pools, final, method.data.impls[bucket].size()) - .front(); + Block block = convertToBlockImpl( + method, method.data.impls[bucket], arena, data_variants.aggregates_pools, final, method.data.impls[bucket].size()); block.info.bucket_num = bucket; return block; @@ -1686,8 +1685,8 @@ bool Aggregator::checkLimits(size_t result_size, bool & no_more_keys) const } -template -BlocksList +template +Aggregator::ConvertToBlockRes Aggregator::convertToBlockImpl(Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, bool final, size_t rows) const { if (data.empty()) @@ -1696,7 +1695,7 @@ Aggregator::convertToBlockImpl(Method & method, Table & data, Arena * arena, Are return {finalizeBlock(std::move(out_cols), final, rows)}; } - BlocksList res; + ConvertToBlockRes res; if (final) { @@ -1704,17 +1703,17 @@ Aggregator::convertToBlockImpl(Method & method, Table & data, Arena * arena, Are if (compiled_aggregate_functions_holder) { static constexpr bool use_compiled_functions = !Method::low_cardinality_optimization; - res = convertToBlockImplFinal(method, data, arena, aggregates_pools, rows); + res = convertToBlockImplFinal(method, data, arena, aggregates_pools, rows); } else #endif { - res = convertToBlockImplFinal(method, data, arena, aggregates_pools, rows); + res = convertToBlockImplFinal(method, data, arena, aggregates_pools, rows); } } else { - res = convertToBlockImplNotFinal(method, data, aggregates_pools, rows); + res = convertToBlockImplNotFinal(method, data, aggregates_pools, rows); } /// In order to release memory early. @@ -1789,8 +1788,8 @@ inline void Aggregator::insertAggregatesIntoColumns(Mapped & mapped, MutableColu } -template -BlocksList NO_INLINE +template +Aggregator::ConvertToBlockRes NO_INLINE Aggregator::convertToBlockImplFinal(Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, size_t rows) const { auto && out_cols = prepareOutputBlockColumns(aggregates_pools, /* final */ true, rows); @@ -1913,8 +1912,9 @@ Aggregator::convertToBlockImplFinal(Method & method, Table & data, Arena * arena return {finalizeBlock(std::move(out_cols), /* final */ true, rows)}; } -template -BlocksList NO_INLINE Aggregator::convertToBlockImplNotFinal(Method & method, Table & data, Arenas & aggregates_pools, size_t rows) const +template +Aggregator::ConvertToBlockRes NO_INLINE +Aggregator::convertToBlockImplNotFinal(Method & method, Table & data, Arenas & aggregates_pools, size_t rows) const { auto && out_cols = prepareOutputBlockColumns(aggregates_pools, /* final */ false, rows); auto && [key_columns, aggregate_columns, final_aggregate_columns, aggregate_columns_data] = out_cols; @@ -2167,14 +2167,16 @@ Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_va return block; } -BlocksList Aggregator::prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const +template +Aggregator::ConvertToBlockRes +Aggregator::prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const { // clang-format off const size_t rows = data_variants.sizeWithoutOverflowRow(); #define M(NAME) \ else if (data_variants.type == AggregatedDataVariants::Type::NAME) \ { \ - return convertToBlockImpl( \ + return convertToBlockImpl( \ *data_variants.NAME, data_variants.NAME->data, data_variants.aggregates_pool, data_variants.aggregates_pools, final, rows); \ } @@ -2306,7 +2308,7 @@ BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, b if (data_variants.type != AggregatedDataVariants::Type::without_key) { if (!data_variants.isTwoLevel()) - blocks.emplace_back(prepareBlockAndFillSingleLevel(data_variants, final).front()); + blocks.splice(blocks.end(), prepareBlockAndFillSingleLevel(data_variants, final)); else blocks.splice(blocks.end(), prepareBlocksAndFillTwoLevel(data_variants, final, thread_pool.get())); } @@ -3075,7 +3077,7 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final) if (result.type == AggregatedDataVariants::Type::without_key || is_overflows) block = prepareBlockAndFillWithoutKey(result, final, is_overflows); else - block = prepareBlockAndFillSingleLevel(result, final).front(); + block = prepareBlockAndFillSingleLevel(result, final); /// NOTE: two-level data is not possible here - chooseAggregationMethod chooses only among single-level methods. if (!final) diff --git a/src/Interpreters/Aggregator.h b/src/Interpreters/Aggregator.h index 74cbcee433a..4f06ee67624 100644 --- a/src/Interpreters/Aggregator.h +++ b/src/Interpreters/Aggregator.h @@ -1,8 +1,9 @@ #pragma once -#include -#include #include +#include +#include +#include #include @@ -1275,8 +1276,12 @@ private: void mergeSingleLevelDataImpl( ManyAggregatedDataVariants & non_empty_data) const; - template - BlocksList convertToBlockImpl(Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, bool final, size_t rows) const; + template + using ConvertToBlockRes = std::conditional_t; + + template + ConvertToBlockRes + convertToBlockImpl(Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, bool final, size_t rows) const; template void insertAggregatesIntoColumns( @@ -1284,11 +1289,13 @@ private: MutableColumns & final_aggregate_columns, Arena * arena) const; - template - BlocksList convertToBlockImplFinal(Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, size_t rows) const; + template + ConvertToBlockRes + convertToBlockImplFinal(Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, size_t rows) const; - template - BlocksList convertToBlockImplNotFinal(Method & method, Table & data, Arenas & aggregates_pools, size_t rows) const; + template + ConvertToBlockRes + convertToBlockImplNotFinal(Method & method, Table & data, Arenas & aggregates_pools, size_t rows) const; struct OutputBlockColumns { @@ -1332,9 +1339,11 @@ private: std::atomic * is_cancelled = nullptr) const; Block prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final, bool is_overflows) const; - BlocksList prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const; BlocksList prepareBlocksAndFillTwoLevel(AggregatedDataVariants & data_variants, bool final, ThreadPool * thread_pool) const; + template + ConvertToBlockRes prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const; + template BlocksList prepareBlocksAndFillTwoLevelImpl( AggregatedDataVariants & data_variants, diff --git a/src/Processors/Transforms/AggregatingInOrderTransform.cpp b/src/Processors/Transforms/AggregatingInOrderTransform.cpp index 6b8707113ac..9137a50aba5 100644 --- a/src/Processors/Transforms/AggregatingInOrderTransform.cpp +++ b/src/Processors/Transforms/AggregatingInOrderTransform.cpp @@ -182,7 +182,8 @@ void AggregatingInOrderTransform::consume(Chunk chunk) if (cur_block_size >= max_block_size || cur_block_bytes + current_memory_usage >= max_block_bytes) { if (group_by_key) - group_by_block = params->aggregator.prepareBlockAndFillSingleLevel(variants, /* final= */ false).front(); + group_by_block + = params->aggregator.prepareBlockAndFillSingleLevel(variants, /* final= */ false); cur_block_bytes += current_memory_usage; finalizeCurrentChunk(std::move(chunk), key_end); return; @@ -293,7 +294,8 @@ void AggregatingInOrderTransform::generate() if (cur_block_size && is_consume_finished) { if (group_by_key) - group_by_block = params->aggregator.prepareBlockAndFillSingleLevel(variants, /* final= */ false).front(); + group_by_block + = params->aggregator.prepareBlockAndFillSingleLevel(variants, /* final= */ false); else params->aggregator.addSingleKeyToAggregateColumns(variants, res_aggregate_columns); variants.invalidate(); diff --git a/src/Processors/Transforms/AggregatingTransform.cpp b/src/Processors/Transforms/AggregatingTransform.cpp index a2be680c6c2..fb1dd45544d 100644 --- a/src/Processors/Transforms/AggregatingTransform.cpp +++ b/src/Processors/Transforms/AggregatingTransform.cpp @@ -356,7 +356,7 @@ private: else throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); - auto blocks = params->aggregator.prepareBlockAndFillSingleLevel(*first, params->final); + auto blocks = params->aggregator.prepareBlockAndFillSingleLevel(*first, params->final); for (auto & block : blocks) single_level_chunks.emplace_back(convertToChunk(block));