Better addBatchArray aggregator

This commit is contained in:
Amos Bird 2019-11-11 16:36:19 +08:00
parent 1d910c5071
commit 3707da4fbf
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
10 changed files with 69 additions and 104 deletions

View File

@ -219,7 +219,8 @@ public:
return std::make_shared<DataTypeUInt64>();
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
/// ALWAYS_INLINE is required to have better code layout for uniqHLL12 function
void ALWAYS_INLINE add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
{
detail::OneAdder<T, Data>::add(this->data(place), *columns[0], row_num);
}

View File

@ -48,7 +48,8 @@ struct __attribute__((__packed__)) AggregateFunctionUniqUpToData
}
/// threshold - for how many elements there is room in a `data`.
void insert(T x, UInt8 threshold)
/// ALWAYS_INLINE is required to have better code layout for uniqUpTo function
void ALWAYS_INLINE insert(T x, UInt8 threshold)
{
/// The state is already full - nothing needs to be done.
if (count > threshold)
@ -100,7 +101,8 @@ struct __attribute__((__packed__)) AggregateFunctionUniqUpToData
rb.read(reinterpret_cast<char *>(data), count * sizeof(data[0]));
}
void add(const IColumn & column, size_t row_num, UInt8 threshold)
/// ALWAYS_INLINE is required to have better code layout for uniqUpTo function
void ALWAYS_INLINE add(const IColumn & column, size_t row_num, UInt8 threshold)
{
insert(assert_cast<const ColumnVector<T> &>(column).getData()[row_num], threshold);
}
@ -111,7 +113,8 @@ struct __attribute__((__packed__)) AggregateFunctionUniqUpToData
template <>
struct AggregateFunctionUniqUpToData<String> : AggregateFunctionUniqUpToData<UInt64>
{
void add(const IColumn & column, size_t row_num, UInt8 threshold)
/// ALWAYS_INLINE is required to have better code layout for uniqUpTo function
void ALWAYS_INLINE add(const IColumn & column, size_t row_num, UInt8 threshold)
{
/// Keep in mind that calculations are approximate.
StringRef value = column.getDataAt(row_num);
@ -122,7 +125,8 @@ struct AggregateFunctionUniqUpToData<String> : AggregateFunctionUniqUpToData<UIn
template <>
struct AggregateFunctionUniqUpToData<UInt128> : AggregateFunctionUniqUpToData<UInt64>
{
void add(const IColumn & column, size_t row_num, UInt8 threshold)
/// ALWAYS_INLINE is required to have better code layout for uniqUpTo function
void ALWAYS_INLINE add(const IColumn & column, size_t row_num, UInt8 threshold)
{
UInt128 value = assert_cast<const ColumnVector<UInt128> &>(column).getData()[row_num];
insert(sipHash64(value), threshold);
@ -155,7 +159,8 @@ public:
return std::make_shared<DataTypeUInt64>();
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
/// ALWAYS_INLINE is required to have better code layout for uniqUpTo function
void ALWAYS_INLINE add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
{
this->data(place).add(*columns[0], row_num, threshold);
}

View File

@ -119,56 +119,34 @@ public:
*/
virtual bool isState() const { return false; }
using AddFunc = void (*)(const IAggregateFunction *, AggregateDataPtr, const IColumn **, size_t, Arena *);
/** Contains a loop with calls to "add" function. You can collect arguments into array "places"
* and do a single call to "addBatch" for devirtualization and inlining. When offsets is not
* null, behave like AddBatchArrayFunc (it's used to work around unknown regressions).
*/
using AddBatchFunc = void (*)(
const IAggregateFunction *,
size_t batch_size,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
const UInt64 * offsets,
Arena * arena);
/** The same for single place.
*/
using AddBatchSinglePlaceFunc
= void (*)(const IAggregateFunction *, size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena);
/** In addition to the above method, this variant accepts an array of "offsets" which allows
* collecting multiple rows of arguments into array "places" as long as they are between
* offsets[i-1] and offsets[i]. It is used for arrayReduce and might be used generally to
* break data dependency when array "places" contains a large number of same values
* consecutively.
*/
using AddBatchArrayFunc = void (*)(
const IAggregateFunction *,
size_t batch_size,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
const UInt64 * offsets,
Arena * arena);
struct AddFuncs
{
AddFunc add;
AddBatchFunc add_batch;
AddBatchSinglePlaceFunc add_batch_single_place;
AddBatchArrayFunc add_batch_array;
};
/** The inner loop that uses the function pointer is better than using the virtual function.
* The reason is that in the case of virtual functions GCC 5.1.2 generates code,
* which, at each iteration of the loop, reloads the function address (the offset value in the virtual function table) from memory to the register.
* This gives a performance drop on simple queries around 12%.
* After the appearance of better compilers, the code can be removed.
*/
virtual AddFuncs getAddressOfAddFunctions() const = 0;
using AddFunc = void (*)(const IAggregateFunction *, AggregateDataPtr, const IColumn **, size_t, Arena *);
virtual AddFunc getAddressOfAddFunction() const = 0;
/** Contains a loop with calls to "add" function. You can collect arguments into array "places"
* and do a single call to "addBatch" for devirtualization and inlining.
*/
virtual void
addBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, Arena * arena)
const = 0;
/** The same for single place.
*/
virtual void addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const = 0;
/** In addition to addBatch, this method collects multiple rows of arguments into array "places"
* as long as they are between offsets[i-1] and offsets[i]. This is used for arrayReduce and
* -Array combinator. It might also be used generally to break data dependency when array
* "places" contains a large number of same values consecutively.
*/
virtual void
addBatchArray(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, const UInt64 * offsets, Arena * arena)
const = 0;
/** This is used for runtime code generation to determine, which header files to include in generated source.
* Always implement it as
@ -195,58 +173,37 @@ private:
static_cast<const Derived &>(*that).add(place, columns, row_num, arena);
}
static void addBatch(
const IAggregateFunction * that,
size_t batch_size,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
const UInt64 * offsets,
Arena * arena)
{
if (offsets)
{
size_t current_offset = 0;
for (size_t i = 0; i < batch_size; ++i)
{
size_t next_offset = offsets[i];
for (size_t j = current_offset; j < next_offset; ++j)
static_cast<const Derived *>(that)->add(places[i] + place_offset, columns, j, arena);
current_offset = next_offset;
}
}
else
for (size_t i = 0; i < batch_size; ++i)
static_cast<const Derived *>(that)->add(places[i] + place_offset, columns, i, arena);
}
public:
IAggregateFunctionHelper(const DataTypes & argument_types_, const Array & parameters_)
: IAggregateFunction(argument_types_, parameters_) {}
static void
addBatchSinglePlaceFree(const IAggregateFunction * that, size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena)
AddFunc getAddressOfAddFunction() const override { return &addFree; }
void addBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, Arena * arena) const override
{
for (size_t i = 0; i < batch_size; ++i)
static_cast<const Derived *>(that)->add(place, columns, i, arena);
static_cast<const Derived *>(this)->add(places[i] + place_offset, columns, i, arena);
}
/// TODO: We cannot use this function directly as it slows down aggregate functions like uniqCombined due to unknown reasons.
static void addBatchArrayFree(const IAggregateFunction * that,
void addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const override
{
for (size_t i = 0; i < batch_size; ++i)
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
void addBatchArray(
size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, const UInt64 * offsets, Arena * arena)
const override
{
size_t current_offset = 0;
for (size_t i = 0; i < batch_size; ++i)
{
size_t next_offset = offsets[i];
for (size_t j = current_offset; j < next_offset; ++j)
static_cast<const Derived *>(that)->add(places[i] + place_offset, columns, j, arena);
static_cast<const Derived *>(this)->add(places[i] + place_offset, columns, j, arena);
current_offset = next_offset;
}
}
public:
IAggregateFunctionHelper(const DataTypes & argument_types_, const Array & parameters_)
: IAggregateFunction(argument_types_, parameters_) {}
/// If we return addBatchArrayFree instead of nullptr, it leads to regression.
AddFuncs getAddressOfAddFunctions() const override { return {&addFree, &addBatch, &addBatchSinglePlaceFree, nullptr}; }
};

View File

@ -293,7 +293,8 @@ private:
public:
using value_type = Value;
void insert(Value value)
/// ALWAYS_INLINE is required to have better code layout for uniqCombined function
void ALWAYS_INLINE insert(Value value)
{
HashValueType hash = getHash(value);
@ -420,7 +421,8 @@ private:
}
/// Update maximum rank for current bucket.
void update(HashValueType bucket, UInt8 rank)
/// ALWAYS_INLINE is required to have better code layout for uniqCombined function
void ALWAYS_INLINE update(HashValueType bucket, UInt8 rank)
{
typename RankStore::Locus content = rank_store[bucket];
UInt8 cur_rank = static_cast<UInt8>(content);

View File

@ -56,7 +56,8 @@ public:
delete large;
}
void insert(Key value)
/// ALWAYS_INLINE is required to have better code layout for uniqHLL12 function
void ALWAYS_INLINE insert(Key value)
{
if (!isLarge())
{

View File

@ -83,7 +83,7 @@ private:
SimpleAggregateDescription(const AggregateFunctionPtr & function_, const size_t column_number_) : function(function_), column_number(column_number_)
{
add_function = function->getAddressOfAddFunctions().add;
add_function = function->getAddressOfAddFunction();
state.reset(function->sizeOfData(), function->alignOfData());
}

View File

@ -84,7 +84,7 @@ private:
void init(const char * function_name, const DataTypes & argument_types)
{
function = AggregateFunctionFactory::instance().get(function_name, argument_types);
add_function = function->getAddressOfAddFunctions().add;
add_function = function->getAddressOfAddFunction();
state.reset(function->sizeOfData(), function->alignOfData());
}

View File

@ -183,8 +183,7 @@ void FunctionArrayReduce::executeImpl(Block & block, const ColumnNumbers & argum
while (auto func = typeid_cast<AggregateFunctionState *>(that))
that = func->getNestedFunction().get();
that->getAddressOfAddFunctions().add_batch(
that, input_rows_count, places.data(), 0, aggregate_arguments, offsets->data(), arena.get());
that->addBatchArray(input_rows_count, places.data(), 0, aggregate_arguments, offsets->data(), arena.get());
}
for (size_t i = 0; i < input_rows_count; ++i)

View File

@ -452,7 +452,7 @@ void NO_INLINE Aggregator::executeImplCase(
/// Add values to the aggregate functions.
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
(*inst->funcs.add)(inst->that, value + inst->state_offset, inst->arguments, i, aggregates_pool);
(*inst->func)(inst->that, value + inst->state_offset, inst->arguments, i, aggregates_pool);
}
}
@ -495,8 +495,10 @@ void NO_INLINE Aggregator::executeImplBatch(
/// Add values to the aggregate functions.
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
{
(*inst->batch_funcs.add_batch)(
inst->batch_that, rows, places.data(), inst->state_offset, inst->batch_arguments, inst->offsets, aggregates_pool);
if (inst->offsets)
inst->batch_that->addBatchArray(rows, places.data(), inst->state_offset, inst->batch_arguments, inst->offsets, aggregates_pool);
else
inst->batch_that->addBatch(rows, places.data(), inst->state_offset, inst->batch_arguments, aggregates_pool);
}
}
@ -511,10 +513,10 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl(
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
{
if (inst->offsets)
(*inst->batch_funcs.add_batch_single_place)(
inst->batch_that, inst->offsets[static_cast<ssize_t>(rows - 1)], res + inst->state_offset, inst->batch_arguments, arena);
inst->batch_that->addBatchSinglePlace(
inst->offsets[static_cast<ssize_t>(rows - 1)], res + inst->state_offset, inst->batch_arguments, arena);
else
(*inst->batch_funcs.add_batch_single_place)(inst->batch_that, rows, res + inst->state_offset, inst->batch_arguments, arena);
inst->batch_that->addBatchSinglePlace(rows, res + inst->state_offset, inst->batch_arguments, arena);
}
}
@ -598,7 +600,7 @@ bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedData
while (auto func = typeid_cast<const AggregateFunctionState *>(that))
that = func->getNestedFunction().get();
aggregate_functions_instructions[i].that = that;
aggregate_functions_instructions[i].funcs = that->getAddressOfAddFunctions();
aggregate_functions_instructions[i].func = that->getAddressOfAddFunction();
if (auto func = typeid_cast<const AggregateFunctionArray *>(that))
{
@ -615,7 +617,6 @@ bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedData
aggregate_functions_instructions[i].batch_arguments = aggregate_columns[i].data();
aggregate_functions_instructions[i].batch_that = that;
aggregate_functions_instructions[i].batch_funcs = that->getAddressOfAddFunctions();
}
if (isCancelled())

View File

@ -1005,11 +1005,10 @@ protected:
struct AggregateFunctionInstruction
{
const IAggregateFunction * that;
IAggregateFunction::AddFuncs funcs;
IAggregateFunction::AddFunc func;
size_t state_offset;
const IColumn ** arguments;
const IAggregateFunction * batch_that;
IAggregateFunction::AddFuncs batch_funcs;
const IColumn ** batch_arguments;
const UInt64 * offsets = nullptr;
};