more accurate crutch

This commit is contained in:
Nikita Taranov 2022-07-12 17:06:20 +02:00
parent e5e0a24ab3
commit db0110fd7a
4 changed files with 42 additions and 29 deletions

View File

@ -1578,9 +1578,8 @@ Block Aggregator::convertOneBucketToBlock(
bool final,
size_t bucket) const
{
Block block = convertToBlockImpl(
method, method.data.impls[bucket], arena, data_variants.aggregates_pools, final, method.data.impls[bucket].size())
.front();
Block block = convertToBlockImpl</* return_single_block */ true>(
method, method.data.impls[bucket], arena, data_variants.aggregates_pools, final, method.data.impls[bucket].size());
block.info.bucket_num = bucket;
return block;
@ -1686,8 +1685,8 @@ bool Aggregator::checkLimits(size_t result_size, bool & no_more_keys) const
}
template <typename Method, typename Table>
BlocksList
template <bool return_single_block, typename Method, typename Table>
Aggregator::ConvertToBlockRes<return_single_block>
Aggregator::convertToBlockImpl(Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, bool final, size_t rows) const
{
if (data.empty())
@ -1696,7 +1695,7 @@ Aggregator::convertToBlockImpl(Method & method, Table & data, Arena * arena, Are
return {finalizeBlock(std::move(out_cols), final, rows)};
}
BlocksList res;
ConvertToBlockRes<return_single_block> res;
if (final)
{
@ -1704,17 +1703,17 @@ Aggregator::convertToBlockImpl(Method & method, Table & data, Arena * arena, Are
if (compiled_aggregate_functions_holder)
{
static constexpr bool use_compiled_functions = !Method::low_cardinality_optimization;
res = convertToBlockImplFinal<Method, use_compiled_functions>(method, data, arena, aggregates_pools, rows);
res = convertToBlockImplFinal<Method, use_compiled_functions, return_single_block>(method, data, arena, aggregates_pools, rows);
}
else
#endif
{
res = convertToBlockImplFinal<Method, false>(method, data, arena, aggregates_pools, rows);
res = convertToBlockImplFinal<Method, false, return_single_block>(method, data, arena, aggregates_pools, rows);
}
}
else
{
res = convertToBlockImplNotFinal(method, data, aggregates_pools, rows);
res = convertToBlockImplNotFinal<return_single_block>(method, data, aggregates_pools, rows);
}
/// In order to release memory early.
@ -1789,8 +1788,8 @@ inline void Aggregator::insertAggregatesIntoColumns(Mapped & mapped, MutableColu
}
template <typename Method, bool use_compiled_functions, typename Table>
BlocksList NO_INLINE
template <typename Method, bool use_compiled_functions, bool return_single_block, typename Table>
Aggregator::ConvertToBlockRes<return_single_block> NO_INLINE
Aggregator::convertToBlockImplFinal(Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, size_t rows) const
{
auto && out_cols = prepareOutputBlockColumns(aggregates_pools, /* final */ true, rows);
@ -1913,8 +1912,9 @@ Aggregator::convertToBlockImplFinal(Method & method, Table & data, Arena * arena
return {finalizeBlock(std::move(out_cols), /* final */ true, rows)};
}
template <typename Method, typename Table>
BlocksList NO_INLINE Aggregator::convertToBlockImplNotFinal(Method & method, Table & data, Arenas & aggregates_pools, size_t rows) const
template <bool return_single_block, typename Method, typename Table>
Aggregator::ConvertToBlockRes<return_single_block> NO_INLINE
Aggregator::convertToBlockImplNotFinal(Method & method, Table & data, Arenas & aggregates_pools, size_t rows) const
{
auto && out_cols = prepareOutputBlockColumns(aggregates_pools, /* final */ false, rows);
auto && [key_columns, aggregate_columns, final_aggregate_columns, aggregate_columns_data] = out_cols;
@ -2167,14 +2167,16 @@ Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_va
return block;
}
BlocksList Aggregator::prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const
template <bool return_single_block>
Aggregator::ConvertToBlockRes<return_single_block>
Aggregator::prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const
{
// clang-format off
const size_t rows = data_variants.sizeWithoutOverflowRow();
#define M(NAME) \
else if (data_variants.type == AggregatedDataVariants::Type::NAME) \
{ \
return convertToBlockImpl( \
return convertToBlockImpl<return_single_block>( \
*data_variants.NAME, data_variants.NAME->data, data_variants.aggregates_pool, data_variants.aggregates_pools, final, rows); \
}
@ -2306,7 +2308,7 @@ BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, b
if (data_variants.type != AggregatedDataVariants::Type::without_key)
{
if (!data_variants.isTwoLevel())
blocks.emplace_back(prepareBlockAndFillSingleLevel(data_variants, final).front());
blocks.splice(blocks.end(), prepareBlockAndFillSingleLevel</* return_single_block */ false>(data_variants, final));
else
blocks.splice(blocks.end(), prepareBlocksAndFillTwoLevel(data_variants, final, thread_pool.get()));
}
@ -3075,7 +3077,7 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
if (result.type == AggregatedDataVariants::Type::without_key || is_overflows)
block = prepareBlockAndFillWithoutKey(result, final, is_overflows);
else
block = prepareBlockAndFillSingleLevel(result, final).front();
block = prepareBlockAndFillSingleLevel</* return_single_block */ true>(result, final);
/// NOTE: two-level data is not possible here - chooseAggregationMethod chooses only among single-level methods.
if (!final)

View File

@ -1,8 +1,9 @@
#pragma once
#include <mutex>
#include <memory>
#include <functional>
#include <memory>
#include <mutex>
#include <type_traits>
#include <Common/logger_useful.h>
@ -1275,8 +1276,12 @@ private:
void mergeSingleLevelDataImpl(
ManyAggregatedDataVariants & non_empty_data) const;
template <typename Method, typename Table>
BlocksList convertToBlockImpl(Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, bool final, size_t rows) const;
template <bool return_single_block>
using ConvertToBlockRes = std::conditional_t<return_single_block, Block, BlocksList>;
template <bool return_single_block, typename Method, typename Table>
ConvertToBlockRes<return_single_block>
convertToBlockImpl(Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, bool final, size_t rows) const;
template <typename Mapped>
void insertAggregatesIntoColumns(
@ -1284,11 +1289,13 @@ private:
MutableColumns & final_aggregate_columns,
Arena * arena) const;
template <typename Method, bool use_compiled_functions, typename Table>
BlocksList convertToBlockImplFinal(Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, size_t rows) const;
template <typename Method, bool use_compiled_functions, bool return_single_block, typename Table>
ConvertToBlockRes<return_single_block>
convertToBlockImplFinal(Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, size_t rows) const;
template <typename Method, typename Table>
BlocksList convertToBlockImplNotFinal(Method & method, Table & data, Arenas & aggregates_pools, size_t rows) const;
template <bool return_single_block, typename Method, typename Table>
ConvertToBlockRes<return_single_block>
convertToBlockImplNotFinal(Method & method, Table & data, Arenas & aggregates_pools, size_t rows) const;
struct OutputBlockColumns
{
@ -1332,9 +1339,11 @@ private:
std::atomic<bool> * is_cancelled = nullptr) const;
Block prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final, bool is_overflows) const;
BlocksList prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const;
BlocksList prepareBlocksAndFillTwoLevel(AggregatedDataVariants & data_variants, bool final, ThreadPool * thread_pool) const;
template <bool return_single_block>
ConvertToBlockRes<return_single_block> prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const;
template <typename Method>
BlocksList prepareBlocksAndFillTwoLevelImpl(
AggregatedDataVariants & data_variants,

View File

@ -182,7 +182,8 @@ void AggregatingInOrderTransform::consume(Chunk chunk)
if (cur_block_size >= max_block_size || cur_block_bytes + current_memory_usage >= max_block_bytes)
{
if (group_by_key)
group_by_block = params->aggregator.prepareBlockAndFillSingleLevel(variants, /* final= */ false).front();
group_by_block
= params->aggregator.prepareBlockAndFillSingleLevel</* return_single_block */ true>(variants, /* final= */ false);
cur_block_bytes += current_memory_usage;
finalizeCurrentChunk(std::move(chunk), key_end);
return;
@ -293,7 +294,8 @@ void AggregatingInOrderTransform::generate()
if (cur_block_size && is_consume_finished)
{
if (group_by_key)
group_by_block = params->aggregator.prepareBlockAndFillSingleLevel(variants, /* final= */ false).front();
group_by_block
= params->aggregator.prepareBlockAndFillSingleLevel</* return_single_block */ true>(variants, /* final= */ false);
else
params->aggregator.addSingleKeyToAggregateColumns(variants, res_aggregate_columns);
variants.invalidate();

View File

@ -356,7 +356,7 @@ private:
else
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
auto blocks = params->aggregator.prepareBlockAndFillSingleLevel(*first, params->final);
auto blocks = params->aggregator.prepareBlockAndFillSingleLevel</* return_single_block */ false>(*first, params->final);
for (auto & block : blocks)
single_level_chunks.emplace_back(convertToChunk(block));