Merge pull request #6435 from yandex/batch-aggregator

Batch aggregator (experimental)
This commit is contained in:
alexey-milovidov 2019-08-12 04:52:22 +03:00 committed by GitHub
commit 27c46be554
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 96 additions and 80 deletions

View File

@ -62,12 +62,6 @@ public:
static_cast<ColumnUInt64 &>(to).getData().push_back(data(place).count);
}
/// May be used for optimization.
void addDelta(AggregateDataPtr place, UInt64 x) const
{
data(place).count += x;
}
const char * getHeaderFilePath() const override { return __FILE__; }
};

View File

@ -128,6 +128,15 @@ public:
using AddFunc = void (*)(const IAggregateFunction *, AggregateDataPtr, const IColumn **, size_t, Arena *);
virtual AddFunc getAddressOfAddFunction() const = 0;
/** Contains a loop with calls to "add" function. You can collect arguments into array "places"
* and do a single call to "addBatch" for devirtualization and inlining.
*/
virtual void addBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, Arena * arena) const = 0;
/** The same for single place.
*/
virtual void addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const = 0;
/** This is used for runtime code generation to determine, which header files to include in generated source.
* Always implement it as
* const char * getHeaderFilePath() const override { return __FILE__; }
@ -156,7 +165,20 @@ private:
public:
IAggregateFunctionHelper(const DataTypes & argument_types_, const Array & parameters_)
: IAggregateFunction(argument_types_, parameters_) {}
AddFunc getAddressOfAddFunction() const override { return &addFree; }
void addBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, Arena * arena) const override
{
for (size_t i = 0; i < batch_size; ++i)
static_cast<const Derived *>(this)->add(places[i] + place_offset, columns, i, arena);
}
void addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const override
{
for (size_t i = 0; i < batch_size; ++i)
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
};

View File

@ -110,8 +110,7 @@ ParallelAggregatingBlockInputStream::TemporaryFileStream::TemporaryFileStream(co
void ParallelAggregatingBlockInputStream::Handler::onBlock(Block & block, size_t thread_num)
{
parent.aggregator.executeOnBlock(block, *parent.many_data[thread_num],
parent.threads_data[thread_num].key_columns, parent.threads_data[thread_num].aggregate_columns,
parent.threads_data[thread_num].key, parent.no_more_keys);
parent.threads_data[thread_num].key_columns, parent.threads_data[thread_num].aggregate_columns, parent.no_more_keys);
parent.threads_data[thread_num].src_rows += block.rows();
parent.threads_data[thread_num].src_bytes += block.bytes();
@ -205,8 +204,7 @@ void ParallelAggregatingBlockInputStream::execute()
/// To do this, we pass a block with zero rows to aggregate.
if (total_src_rows == 0 && params.keys_size == 0 && !params.empty_result_for_aggregation_by_empty_set)
aggregator.executeOnBlock(children.at(0)->getHeader(), *many_data[0],
threads_data[0].key_columns, threads_data[0].aggregate_columns,
threads_data[0].key, no_more_keys);
threads_data[0].key_columns, threads_data[0].aggregate_columns, no_more_keys);
}
}

View File

@ -80,13 +80,11 @@ private:
size_t src_rows = 0;
size_t src_bytes = 0;
StringRefs key;
ColumnRawPtrs key_columns;
Aggregator::AggregateColumns aggregate_columns;
ThreadData(size_t keys_size_, size_t aggregates_size_)
{
key.resize(keys_size_);
key_columns.resize(keys_size_);
aggregate_columns.resize(aggregates_size_);
}

View File

@ -11,7 +11,6 @@
#include <Columns/ColumnArray.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnLowCardinality.h>
#include <AggregateFunctions/AggregateFunctionCount.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/NullBlockInputStream.h>
@ -289,7 +288,7 @@ void Aggregator::compileIfPossible(AggregatedDataVariants::Type type)
"template void Aggregator::executeSpecialized<\n"
" " << method_typename << ", TypeList<" << aggregate_functions_typenames << ">>(\n"
" " << method_typename << " &, Arena *, size_t, ColumnRawPtrs &,\n"
" AggregateColumns &, StringRefs &, bool, AggregateDataPtr) const;\n"
" AggregateColumns &, bool, AggregateDataPtr) const;\n"
"\n"
"static void wrapper" << suffix << "(\n"
" const Aggregator & aggregator,\n"
@ -298,13 +297,12 @@ void Aggregator::compileIfPossible(AggregatedDataVariants::Type type)
" size_t rows,\n"
" ColumnRawPtrs & key_columns,\n"
" Aggregator::AggregateColumns & aggregate_columns,\n"
" StringRefs & keys,\n"
" bool no_more_keys,\n"
" AggregateDataPtr overflow_row)\n"
"{\n"
" aggregator.executeSpecialized<\n"
" " << method_typename << ", TypeList<" << aggregate_functions_typenames << ">>(\n"
" method, arena, rows, key_columns, aggregate_columns, keys, no_more_keys, overflow_row);\n"
" method, arena, rows, key_columns, aggregate_columns, no_more_keys, overflow_row);\n"
"}\n"
"\n"
"void * getPtr" << suffix << "() __attribute__((__visibility__(\"default\")));\n"
@ -583,16 +581,16 @@ void NO_INLINE Aggregator::executeImpl(
size_t rows,
ColumnRawPtrs & key_columns,
AggregateFunctionInstruction * aggregate_instructions,
StringRefs & keys,
bool no_more_keys,
AggregateDataPtr overflow_row) const
{
typename Method::State state(key_columns, key_sizes, aggregation_state_cache);
if (!no_more_keys)
executeImplCase<false>(method, state, aggregates_pool, rows, key_columns, aggregate_instructions, keys, overflow_row);
//executeImplCase<false>(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row);
executeImplBatch(method, state, aggregates_pool, rows, aggregate_instructions);
else
executeImplCase<true>(method, state, aggregates_pool, rows, key_columns, aggregate_instructions, keys, overflow_row);
executeImplCase<true>(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row);
}
@ -602,9 +600,7 @@ void NO_INLINE Aggregator::executeImplCase(
typename Method::State & state,
Arena * aggregates_pool,
size_t rows,
ColumnRawPtrs & /*key_columns*/,
AggregateFunctionInstruction * aggregate_instructions,
StringRefs & /*keys*/,
AggregateDataPtr overflow_row) const
{
/// NOTE When editing this code, also pay attention to SpecializedAggregator.h.
@ -655,34 +651,60 @@ void NO_INLINE Aggregator::executeImplCase(
}
template <typename Method>
void NO_INLINE Aggregator::executeImplBatch(
Method & method,
typename Method::State & state,
Arena * aggregates_pool,
size_t rows,
AggregateFunctionInstruction * aggregate_instructions) const
{
PODArray<AggregateDataPtr> places(rows);
/// For all rows.
for (size_t i = 0; i < rows; ++i)
{
AggregateDataPtr aggregate_data = nullptr;
auto emplace_result = state.emplaceKey(method.data, i, *aggregates_pool);
/// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key.
if (emplace_result.isInserted())
{
/// exception-safety - if you can not allocate memory or create states, then destructors will not be called.
emplace_result.setMapped(nullptr);
aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
createAggregateStates(aggregate_data);
emplace_result.setMapped(aggregate_data);
}
else
aggregate_data = emplace_result.getMapped();
places[i] = aggregate_data;
}
/// Add values to the aggregate functions.
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
inst->that->addBatch(rows, places.data(), inst->state_offset, inst->arguments, aggregates_pool);
}
void NO_INLINE Aggregator::executeWithoutKeyImpl(
AggregatedDataWithoutKey & res,
size_t rows,
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena) const
{
/// Optimization in the case of a single aggregate function `count`.
AggregateFunctionCount * agg_count = params.aggregates_size == 1
? typeid_cast<AggregateFunctionCount *>(aggregate_functions[0])
: nullptr;
if (agg_count)
agg_count->addDelta(res, rows);
else
{
for (size_t i = 0; i < rows; ++i)
{
/// Adding values
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
(*inst->func)(inst->that, res + inst->state_offset, inst->arguments, i, arena);
}
}
/// Adding values
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
inst->that->addBatchSinglePlace(rows, res + inst->state_offset, inst->arguments, arena);
}
bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, StringRefs & key,
bool & no_more_keys)
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys)
{
if (isCancelled())
return true;
@ -796,9 +818,9 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
reinterpret_cast<void (*)( \
const Aggregator &, decltype(result.NAME)::element_type &, \
Arena *, size_t, ColumnRawPtrs &, AggregateColumns &, \
StringRefs &, bool, AggregateDataPtr)>(compiled_data->compiled_method_ptr) \
bool, AggregateDataPtr)>(compiled_data->compiled_method_ptr) \
(*this, *result.NAME, result.aggregates_pool, rows, key_columns, aggregate_columns, \
key, no_more_keys, overflow_row_ptr);
no_more_keys, overflow_row_ptr);
if (false) {}
APPLY_FOR_AGGREGATED_VARIANTS(M)
@ -812,9 +834,9 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
reinterpret_cast<void (*)( \
const Aggregator &, decltype(result.NAME)::element_type &, \
Arena *, size_t, ColumnRawPtrs &, AggregateColumns &, \
StringRefs &, bool, AggregateDataPtr)>(compiled_data->compiled_two_level_method_ptr) \
bool, AggregateDataPtr)>(compiled_data->compiled_two_level_method_ptr) \
(*this, *result.NAME, result.aggregates_pool, rows, key_columns, aggregate_columns, \
key, no_more_keys, overflow_row_ptr);
no_more_keys, overflow_row_ptr);
if (false) {}
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
@ -826,7 +848,7 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
executeImpl(*result.NAME, result.aggregates_pool, rows, key_columns, aggregate_functions_instructions.data(), \
key, no_more_keys, overflow_row_ptr);
no_more_keys, overflow_row_ptr);
if (false) {}
APPLY_FOR_AGGREGATED_VARIANTS(M)
@ -1032,7 +1054,6 @@ void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVaria
if (isCancelled())
return;
StringRefs key(params.keys_size);
ColumnRawPtrs key_columns(params.keys_size);
AggregateColumns aggregate_columns(params.aggregates_size);
@ -1059,14 +1080,14 @@ void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVaria
src_rows += block.rows();
src_bytes += block.bytes();
if (!executeOnBlock(block, result, key_columns, aggregate_columns, key, no_more_keys))
if (!executeOnBlock(block, result, key_columns, aggregate_columns, no_more_keys))
break;
}
/// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation.
/// To do this, we pass a block with zero rows to aggregate.
if (result.empty() && params.keys_size == 0 && !params.empty_result_for_aggregation_by_empty_set)
executeOnBlock(stream->getHeader(), result, key_columns, aggregate_columns, key, no_more_keys);
executeOnBlock(stream->getHeader(), result, key_columns, aggregate_columns, no_more_keys);
double elapsed_seconds = watch.elapsedSeconds();
size_t rows = result.sizeWithoutOverflowRow();
@ -2321,7 +2342,6 @@ void NO_INLINE Aggregator::convertBlockToTwoLevelImpl(
Method & method,
Arena * pool,
ColumnRawPtrs & key_columns,
StringRefs & keys [[maybe_unused]],
const Block & source,
std::vector<Block> & destinations) const
{
@ -2383,7 +2403,6 @@ std::vector<Block> Aggregator::convertBlockToTwoLevel(const Block & block)
AggregatedDataVariants data;
StringRefs key(params.keys_size);
ColumnRawPtrs key_columns(params.keys_size);
/// Remember the columns we will work with
@ -2423,7 +2442,7 @@ std::vector<Block> Aggregator::convertBlockToTwoLevel(const Block & block)
#define M(NAME) \
else if (data.type == AggregatedDataVariants::Type::NAME) \
convertBlockToTwoLevelImpl(*data.NAME, data.aggregates_pool, \
key_columns, key, block, splitted_blocks);
key_columns, block, splitted_blocks);
if (false) {}
APPLY_FOR_VARIANTS_TWO_LEVEL(M)

View File

@ -842,7 +842,6 @@ public:
/// Process one block. Return false if the processing should be aborted (with group_by_overflow_mode = 'break').
bool executeOnBlock(const Block & block, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
StringRefs & keys, /// - pass the corresponding objects that are initially empty.
bool & no_more_keys);
/** Convert the aggregation data structure into a block.
@ -1003,7 +1002,6 @@ protected:
size_t rows,
ColumnRawPtrs & key_columns,
AggregateFunctionInstruction * aggregate_instructions,
StringRefs & keys,
bool no_more_keys,
AggregateDataPtr overflow_row) const;
@ -1014,11 +1012,17 @@ protected:
typename Method::State & state,
Arena * aggregates_pool,
size_t rows,
ColumnRawPtrs & key_columns,
AggregateFunctionInstruction * aggregate_instructions,
StringRefs & keys,
AggregateDataPtr overflow_row) const;
template <typename Method>
void executeImplBatch(
Method & method,
typename Method::State & state,
Arena * aggregates_pool,
size_t rows,
AggregateFunctionInstruction * aggregate_instructions) const;
/// For case when there are no keys (all aggregate into one row).
void executeWithoutKeyImpl(
AggregatedDataWithoutKey & res,
@ -1042,7 +1046,6 @@ public:
size_t rows,
ColumnRawPtrs & key_columns,
AggregateColumns & aggregate_columns,
StringRefs & keys,
bool no_more_keys,
AggregateDataPtr overflow_row) const;
@ -1052,9 +1055,7 @@ public:
typename Method::State & state,
Arena * aggregates_pool,
size_t rows,
ColumnRawPtrs & key_columns,
AggregateColumns & aggregate_columns,
StringRefs & keys,
AggregateDataPtr overflow_row) const;
template <typename AggregateFunctionsList>
@ -1179,7 +1180,6 @@ protected:
Method & method,
Arena * pool,
ColumnRawPtrs & key_columns,
StringRefs & keys,
const Block & source,
std::vector<Block> & destinations) const;

View File

@ -103,7 +103,6 @@ void NO_INLINE Aggregator::executeSpecialized(
size_t rows,
ColumnRawPtrs & key_columns,
AggregateColumns & aggregate_columns,
StringRefs & keys,
bool no_more_keys,
AggregateDataPtr overflow_row) const
{
@ -111,10 +110,10 @@ void NO_INLINE Aggregator::executeSpecialized(
if (!no_more_keys)
executeSpecializedCase<false, Method, AggregateFunctionsList>(
method, state, aggregates_pool, rows, key_columns, aggregate_columns, keys, overflow_row);
method, state, aggregates_pool, rows, aggregate_columns, overflow_row);
else
executeSpecializedCase<true, Method, AggregateFunctionsList>(
method, state, aggregates_pool, rows, key_columns, aggregate_columns, keys, overflow_row);
method, state, aggregates_pool, rows, aggregate_columns, overflow_row);
}
#pragma GCC diagnostic push
@ -126,9 +125,7 @@ void NO_INLINE Aggregator::executeSpecializedCase(
typename Method::State & state,
Arena * aggregates_pool,
size_t rows,
ColumnRawPtrs & /*key_columns*/,
AggregateColumns & aggregate_columns,
StringRefs & /*keys*/,
AggregateDataPtr overflow_row) const
{
/// For all rows.
@ -184,20 +181,10 @@ void NO_INLINE Aggregator::executeSpecializedWithoutKey(
AggregateColumns & aggregate_columns,
Arena * arena) const
{
/// Optimization in the case of a single aggregate function `count`.
AggregateFunctionCount * agg_count = params.aggregates_size == 1
? typeid_cast<AggregateFunctionCount *>(aggregate_functions[0])
: nullptr;
if (agg_count)
agg_count->addDelta(res, rows);
else
for (size_t i = 0; i < rows; ++i)
{
for (size_t i = 0; i < rows; ++i)
{
AggregateFunctionsList::forEach(AggregateFunctionsUpdater(
aggregate_functions, offsets_of_aggregate_states, aggregate_columns, res, i, arena));
}
AggregateFunctionsList::forEach(AggregateFunctionsUpdater(
aggregate_functions, offsets_of_aggregate_states, aggregate_columns, res, i, arena));
}
}

View File

@ -101,7 +101,6 @@ AggregatingTransform::AggregatingTransform(
Block header, AggregatingTransformParamsPtr params_, ManyAggregatedDataPtr many_data_,
size_t current_variant, size_t temporary_data_merge_threads_, size_t max_threads_)
: IProcessor({std::move(header)}, {params_->getHeader()}), params(std::move(params_))
, key(params->params.keys_size)
, key_columns(params->params.keys_size)
, aggregate_columns(params->params.aggregates_size)
, many_data(std::move(many_data_))
@ -212,7 +211,7 @@ void AggregatingTransform::consume(Chunk chunk)
auto block = getInputs().front().getHeader().cloneWithColumns(chunk.detachColumns());
if (!params->aggregator.executeOnBlock(block, variants, key_columns, aggregate_columns, key, no_more_keys))
if (!params->aggregator.executeOnBlock(block, variants, key_columns, aggregate_columns, no_more_keys))
is_consume_finished = true;
}
@ -226,7 +225,7 @@ void AggregatingTransform::initGenerate()
/// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation.
/// To do this, we pass a block with zero rows to aggregate.
if (variants.empty() && params->params.keys_size == 0 && !params->params.empty_result_for_aggregation_by_empty_set)
params->aggregator.executeOnBlock(getInputs().front().getHeader(), variants, key_columns, aggregate_columns, key, no_more_keys);
params->aggregator.executeOnBlock(getInputs().front().getHeader(), variants, key_columns, aggregate_columns, no_more_keys);
double elapsed_seconds = watch.elapsedSeconds();
size_t rows = variants.sizeWithoutOverflowRow();

View File

@ -71,7 +71,6 @@ private:
AggregatingTransformParamsPtr params;
Logger * log = &Logger::get("AggregatingTransform");
StringRefs key;
ColumnRawPtrs key_columns;
Aggregator::AggregateColumns aggregate_columns;
bool no_more_keys = false;