Batch aggregation (experimental)

This commit is contained in:
Alexey Milovidov 2019-08-11 01:36:55 +03:00
parent cce3ab08bb
commit efa51a6cd9
4 changed files with 74 additions and 9 deletions

View File

@ -0,0 +1,17 @@
#include <AggregateFunctions/IAggregateFunction.h>
namespace DB
{
void IAggregateFunction::addBatch(
size_t batch_size,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
Arena * arena) const
{
for (size_t i = 0; i < batch_size; ++i)
add(places[i] + place_offset, columns, i, arena);
}
}

View File

@ -128,6 +128,11 @@ public:
using AddFunc = void (*)(const IAggregateFunction *, AggregateDataPtr, const IColumn **, size_t, Arena *);
virtual AddFunc getAddressOfAddFunction() const = 0;
/** Contains a loop with calls to "add" function. You can collect arguments into array "places"
* and do a single call to "addBatch" for devirtualization and inlining.
*/
void addBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, Arena * arena) const;
/** This is used for runtime code generation to determine, which header files to include in generated source.
* Always implement it as
* const char * getHeaderFilePath() const override { return __FILE__; }

View File

@ -583,16 +583,16 @@ void NO_INLINE Aggregator::executeImpl(
size_t rows,
ColumnRawPtrs & key_columns,
AggregateFunctionInstruction * aggregate_instructions,
StringRefs & keys,
bool no_more_keys,
AggregateDataPtr overflow_row) const
{
typename Method::State state(key_columns, key_sizes, aggregation_state_cache);
if (!no_more_keys)
executeImplCase<false>(method, state, aggregates_pool, rows, key_columns, aggregate_instructions, keys, overflow_row);
//executeImplCase<false>(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row);
executeImplBatch(method, state, aggregates_pool, rows, aggregate_instructions);
else
executeImplCase<true>(method, state, aggregates_pool, rows, key_columns, aggregate_instructions, keys, overflow_row);
executeImplCase<true>(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row);
}
@ -602,9 +602,7 @@ void NO_INLINE Aggregator::executeImplCase(
typename Method::State & state,
Arena * aggregates_pool,
size_t rows,
ColumnRawPtrs & /*key_columns*/,
AggregateFunctionInstruction * aggregate_instructions,
StringRefs & /*keys*/,
AggregateDataPtr overflow_row) const
{
/// NOTE When editing this code, also pay attention to SpecializedAggregator.h.
@ -655,6 +653,46 @@ void NO_INLINE Aggregator::executeImplCase(
}
template <typename Method>
void NO_INLINE Aggregator::executeImplBatch(
Method & method,
typename Method::State & state,
Arena * aggregates_pool,
size_t rows,
AggregateFunctionInstruction * aggregate_instructions) const
{
PODArray<AggregateDataPtr> places(rows);
/// For all rows.
for (size_t i = 0; i < rows; ++i)
{
AggregateDataPtr aggregate_data = nullptr;
auto emplace_result = state.emplaceKey(method.data, i, *aggregates_pool);
/// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key.
if (emplace_result.isInserted())
{
/// exception-safety - if you can not allocate memory or create states, then destructors will not be called.
emplace_result.setMapped(nullptr);
aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
createAggregateStates(aggregate_data);
emplace_result.setMapped(aggregate_data);
}
else
aggregate_data = emplace_result.getMapped();
places[i] = aggregate_data;
}
/// Add values to the aggregate functions.
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
inst->that->addBatch(rows, places.data(), inst->state_offset, inst->arguments, aggregates_pool);
}
void NO_INLINE Aggregator::executeWithoutKeyImpl(
AggregatedDataWithoutKey & res,
size_t rows,
@ -826,7 +864,7 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
executeImpl(*result.NAME, result.aggregates_pool, rows, key_columns, aggregate_functions_instructions.data(), \
key, no_more_keys, overflow_row_ptr);
no_more_keys, overflow_row_ptr);
if (false) {}
APPLY_FOR_AGGREGATED_VARIANTS(M)

View File

@ -1003,7 +1003,6 @@ protected:
size_t rows,
ColumnRawPtrs & key_columns,
AggregateFunctionInstruction * aggregate_instructions,
StringRefs & keys,
bool no_more_keys,
AggregateDataPtr overflow_row) const;
@ -1014,11 +1013,17 @@ protected:
typename Method::State & state,
Arena * aggregates_pool,
size_t rows,
ColumnRawPtrs & key_columns,
AggregateFunctionInstruction * aggregate_instructions,
StringRefs & keys,
AggregateDataPtr overflow_row) const;
template <typename Method>
void executeImplBatch(
Method & method,
typename Method::State & state,
Arena * aggregates_pool,
size_t rows,
AggregateFunctionInstruction * aggregate_instructions) const;
/// For case when there are no keys (all aggregate into one row).
void executeWithoutKeyImpl(
AggregatedDataWithoutKey & res,