Merge pull request #7608 from amosbird/batchreduce

Optimize arrayReduce, -Array and -State combinators
This commit is contained in:
alexey-milovidov 2019-11-10 22:10:03 +03:00 committed by GitHub
commit 1d910c5071
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 233 additions and 52 deletions

View File

@ -129,6 +129,8 @@ public:
return nested_func->allocatesMemoryInArena();
}
AggregateFunctionPtr getNestedFunction() const { return nested_func; }
const char * getHeaderFilePath() const override { return __FILE__; }
};

View File

@ -119,23 +119,56 @@ public:
*/
virtual bool isState() const { return false; }
using AddFunc = void (*)(const IAggregateFunction *, AggregateDataPtr, const IColumn **, size_t, Arena *);
/** Contains a loop with calls to "add" function. You can collect arguments into array "places"
* and do a single call to "addBatch" for devirtualization and inlining. When offsets is not
* null, behave like AddBatchArrayFunc (it's used to work around unknown regressions).
*/
using AddBatchFunc = void (*)(
const IAggregateFunction *,
size_t batch_size,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
const UInt64 * offsets,
Arena * arena);
/** The same for single place.
*/
using AddBatchSinglePlaceFunc
= void (*)(const IAggregateFunction *, size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena);
/** In addition to the above method, this variant accepts an array of "offsets" which allows
* collecting multiple rows of arguments into array "places" as long as they are between
* offsets[i-1] and offsets[i]. It is used for arrayReduce and might be used generally to
* break data dependency when array "places" contains a large number of same values
* consecutively.
*/
using AddBatchArrayFunc = void (*)(
const IAggregateFunction *,
size_t batch_size,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
const UInt64 * offsets,
Arena * arena);
struct AddFuncs
{
AddFunc add;
AddBatchFunc add_batch;
AddBatchSinglePlaceFunc add_batch_single_place;
AddBatchArrayFunc add_batch_array;
};
/** The inner loop that uses the function pointer is better than using the virtual function.
* The reason is that in the case of virtual functions GCC 5.1.2 generates code,
* which, at each iteration of the loop, reloads the function address (the offset value in the virtual function table) from memory to the register.
* This gives a performance drop on simple queries around 12%.
* After the appearance of better compilers, the code can be removed.
*/
using AddFunc = void (*)(const IAggregateFunction *, AggregateDataPtr, const IColumn **, size_t, Arena *);
virtual AddFunc getAddressOfAddFunction() const = 0;
/** Contains a loop with calls to "add" function. You can collect arguments into array "places"
* and do a single call to "addBatch" for devirtualization and inlining.
*/
virtual void addBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, Arena * arena) const = 0;
/** The same for single place.
*/
virtual void addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const = 0;
virtual AddFuncs getAddressOfAddFunctions() const = 0;
/** This is used for runtime code generation to determine, which header files to include in generated source.
* Always implement it as
@ -162,23 +195,58 @@ private:
static_cast<const Derived &>(*that).add(place, columns, row_num, arena);
}
static void addBatch(
const IAggregateFunction * that,
size_t batch_size,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
const UInt64 * offsets,
Arena * arena)
{
if (offsets)
{
size_t current_offset = 0;
for (size_t i = 0; i < batch_size; ++i)
{
size_t next_offset = offsets[i];
for (size_t j = current_offset; j < next_offset; ++j)
static_cast<const Derived *>(that)->add(places[i] + place_offset, columns, j, arena);
current_offset = next_offset;
}
}
else
for (size_t i = 0; i < batch_size; ++i)
static_cast<const Derived *>(that)->add(places[i] + place_offset, columns, i, arena);
}
static void
addBatchSinglePlaceFree(const IAggregateFunction * that, size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena)
{
for (size_t i = 0; i < batch_size; ++i)
static_cast<const Derived *>(that)->add(place, columns, i, arena);
}
/// TODO: We cannot use this function directly as it slows down aggregate functions like uniqCombined due to unknown reasons.
static void addBatchArrayFree(const IAggregateFunction * that,
size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, const UInt64 * offsets, Arena * arena)
{
size_t current_offset = 0;
for (size_t i = 0; i < batch_size; ++i)
{
size_t next_offset = offsets[i];
for (size_t j = current_offset; j < next_offset; ++j)
static_cast<const Derived *>(that)->add(places[i] + place_offset, columns, j, arena);
current_offset = next_offset;
}
}
public:
IAggregateFunctionHelper(const DataTypes & argument_types_, const Array & parameters_)
: IAggregateFunction(argument_types_, parameters_) {}
AddFunc getAddressOfAddFunction() const override { return &addFree; }
void addBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, Arena * arena) const override
{
for (size_t i = 0; i < batch_size; ++i)
static_cast<const Derived *>(this)->add(places[i] + place_offset, columns, i, arena);
}
void addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const override
{
for (size_t i = 0; i < batch_size; ++i)
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
/// If we return addBatchArrayFree instead of nullptr, it leads to regression.
AddFuncs getAddressOfAddFunctions() const override { return {&addFree, &addBatch, &addBatchSinglePlaceFree, nullptr}; }
};

View File

@ -83,7 +83,7 @@ private:
SimpleAggregateDescription(const AggregateFunctionPtr & function_, const size_t column_number_) : function(function_), column_number(column_number_)
{
add_function = function->getAddressOfAddFunction();
add_function = function->getAddressOfAddFunctions().add;
state.reset(function->sizeOfData(), function->alignOfData());
}

View File

@ -84,7 +84,7 @@ private:
void init(const char * function_name, const DataTypes & argument_types)
{
function = AggregateFunctionFactory::instance().get(function_name, argument_types);
add_function = function->getAddressOfAddFunction();
add_function = function->getAddressOfAddFunctions().add;
state.reset(function->sizeOfData(), function->alignOfData());
}

View File

@ -18,6 +18,7 @@ namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int SIZES_OF_ARRAYS_DOESNT_MATCH;
}
const ColumnConst * checkAndGetColumnConstStringOrFixedString(const IColumn * column)
@ -118,4 +119,28 @@ void validateArgumentType(const IFunction & func, const DataTypes & arguments,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
std::pair<std::vector<const IColumn *>, const ColumnArray::Offset *>
checkAndGetNestedArrayOffset(const IColumn ** columns, size_t num_arguments)
{
assert(num_arguments > 0);
std::vector<const IColumn *> nested_columns(num_arguments);
const ColumnArray::Offsets * offsets = nullptr;
for (size_t i = 0; i < num_arguments; ++i)
{
const ColumnArray::Offsets * offsets_i = nullptr;
if (const ColumnArray * arr = checkAndGetColumn<const ColumnArray>(columns[i]))
{
nested_columns[i] = &arr->getData();
offsets_i = &arr->getOffsets();
}
else
throw Exception("Illegal column " + columns[i]->getName() + " as argument of function", ErrorCodes::ILLEGAL_COLUMN);
if (i == 0)
offsets = offsets_i;
else if (*offsets_i != *offsets)
throw Exception("Lengths of all arrays passed to aggregate function must be equal.", ErrorCodes::SIZES_OF_ARRAYS_DOESNT_MATCH);
}
return {nested_columns, offsets->data()};
}
}

View File

@ -4,6 +4,7 @@
#include <Common/assert_cast.h>
#include <DataTypes/IDataType.h>
#include <Columns/IColumn.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnConst.h>
#include <Core/Block.h>
#include <Core/ColumnNumbers.h>
@ -89,4 +90,8 @@ void validateArgumentType(const IFunction & func, const DataTypes & arguments,
size_t argument_index, bool (* validator_func)(const IDataType &),
const char * expected_type_description);
/// Checks if a list of array columns have equal offsets. Return a pair of nested columns and offsets if true, otherwise throw.
std::pair<std::vector<const IColumn *>, const ColumnArray::Offset *>
checkAndGetNestedArrayOffset(const IColumn ** columns, size_t num_arguments);
}

View File

@ -7,11 +7,14 @@
#include <Columns/ColumnAggregateFunction.h>
#include <IO/WriteHelpers.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionState.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <AggregateFunctions/parseAggregateFunctionParameters.h>
#include <Common/AlignedBuffer.h>
#include <Common/Arena.h>
#include <ext/scope_guard.h>
namespace DB
{
@ -106,10 +109,7 @@ DataTypePtr FunctionArrayReduce::getReturnTypeImpl(const ColumnsWithTypeAndName
void FunctionArrayReduce::executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count)
{
IAggregateFunction & agg_func = *aggregate_function.get();
AlignedBuffer place_holder(agg_func.sizeOfData(), agg_func.alignOfData());
AggregateDataPtr place = place_holder.data();
std::unique_ptr<Arena> arena = agg_func.allocatesMemoryInArena() ? std::make_unique<Arena>() : nullptr;
std::unique_ptr<Arena> arena = std::make_unique<Arena>();
/// Aggregate functions do not support constant columns. Therefore, we materialize them.
std::vector<ColumnPtr> materialized_columns;
@ -157,32 +157,41 @@ void FunctionArrayReduce::executeImpl(Block & block, const ColumnNumbers & argum
throw Exception("State function " + agg_func.getName() + " inserts results into non-state column "
+ block.getByPosition(result).type->getName(), ErrorCodes::ILLEGAL_COLUMN);
ColumnArray::Offset current_offset = 0;
PODArray<AggregateDataPtr> places(input_rows_count);
for (size_t i = 0; i < input_rows_count; ++i)
{
agg_func.create(place);
ColumnArray::Offset next_offset = (*offsets)[i];
places[i] = arena->alignedAlloc(agg_func.sizeOfData(), agg_func.alignOfData());
try
{
for (size_t j = current_offset; j < next_offset; ++j)
agg_func.add(place, aggregate_arguments, j, arena.get());
if (!res_col_aggregate_function)
agg_func.insertResultInto(place, res_col);
else
res_col_aggregate_function->insertFrom(place);
agg_func.create(places[i]);
}
catch (...)
{
agg_func.destroy(place);
agg_func.destroy(places[i]);
throw;
}
agg_func.destroy(place);
current_offset = next_offset;
}
SCOPE_EXIT({
for (size_t i = 0; i < input_rows_count; ++i)
agg_func.destroy(places[i]);
});
{
auto that = &agg_func;
/// Unnest consecutive trailing -State combinators
while (auto func = typeid_cast<AggregateFunctionState *>(that))
that = func->getNestedFunction().get();
that->getAddressOfAddFunctions().add_batch(
that, input_rows_count, places.data(), 0, aggregate_arguments, offsets->data(), arena.get());
}
for (size_t i = 0; i < input_rows_count; ++i)
if (!res_col_aggregate_function)
agg_func.insertResultInto(places[i], res_col);
else
res_col_aggregate_function->insertFrom(places[i]);
block.getByPosition(result).column = std::move(result_holder);
}

View File

@ -26,6 +26,8 @@
#include <Common/assert_cast.h>
#include <common/demangle.h>
#include <common/config_common.h>
#include <AggregateFunctions/AggregateFunctionArray.h>
#include <AggregateFunctions/AggregateFunctionState.h>
namespace ProfileEvents
@ -450,7 +452,7 @@ void NO_INLINE Aggregator::executeImplCase(
/// Add values to the aggregate functions.
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
(*inst->func)(inst->that, value + inst->state_offset, inst->arguments, i, aggregates_pool);
(*inst->funcs.add)(inst->that, value + inst->state_offset, inst->arguments, i, aggregates_pool);
}
}
@ -492,7 +494,10 @@ void NO_INLINE Aggregator::executeImplBatch(
/// Add values to the aggregate functions.
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
inst->that->addBatch(rows, places.data(), inst->state_offset, inst->arguments, aggregates_pool);
{
(*inst->batch_funcs.add_batch)(
inst->batch_that, rows, places.data(), inst->state_offset, inst->batch_arguments, inst->offsets, aggregates_pool);
}
}
@ -504,7 +509,13 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl(
{
/// Adding values
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
inst->that->addBatchSinglePlace(rows, res + inst->state_offset, inst->arguments, arena);
{
if (inst->offsets)
(*inst->batch_funcs.add_batch_single_place)(
inst->batch_that, inst->offsets[static_cast<ssize_t>(rows - 1)], res + inst->state_offset, inst->batch_arguments, arena);
else
(*inst->batch_funcs.add_batch_single_place)(inst->batch_that, rows, res + inst->state_offset, inst->batch_arguments, arena);
}
}
@ -564,6 +575,7 @@ bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedData
AggregateFunctionInstructions aggregate_functions_instructions(params.aggregates_size + 1);
aggregate_functions_instructions[params.aggregates_size].that = nullptr;
std::vector<std::vector<const IColumn *>> nested_columns_holder;
for (size_t i = 0; i < params.aggregates_size; ++i)
{
for (size_t j = 0; j < aggregate_columns[i].size(); ++j)
@ -579,10 +591,31 @@ bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedData
}
}
aggregate_functions_instructions[i].that = aggregate_functions[i];
aggregate_functions_instructions[i].func = aggregate_functions[i]->getAddressOfAddFunction();
aggregate_functions_instructions[i].state_offset = offsets_of_aggregate_states[i];
aggregate_functions_instructions[i].arguments = aggregate_columns[i].data();
aggregate_functions_instructions[i].state_offset = offsets_of_aggregate_states[i];
auto that = aggregate_functions[i];
/// Unnest consecutive trailing -State combinators
while (auto func = typeid_cast<const AggregateFunctionState *>(that))
that = func->getNestedFunction().get();
aggregate_functions_instructions[i].that = that;
aggregate_functions_instructions[i].funcs = that->getAddressOfAddFunctions();
if (auto func = typeid_cast<const AggregateFunctionArray *>(that))
{
/// Unnest consecutive -State combinators before -Array
that = func->getNestedFunction().get();
while (auto nested_func = typeid_cast<const AggregateFunctionState *>(that))
that = nested_func->getNestedFunction().get();
auto [nested_columns, offsets] = checkAndGetNestedArrayOffset(aggregate_columns[i].data(), that->getArgumentTypes().size());
nested_columns_holder.push_back(std::move(nested_columns));
aggregate_functions_instructions[i].batch_arguments = nested_columns_holder.back().data();
aggregate_functions_instructions[i].offsets = offsets;
}
else
aggregate_functions_instructions[i].batch_arguments = aggregate_columns[i].data();
aggregate_functions_instructions[i].batch_that = that;
aggregate_functions_instructions[i].batch_funcs = that->getAddressOfAddFunctions();
}
if (isCancelled())

View File

@ -1005,9 +1005,13 @@ protected:
struct AggregateFunctionInstruction
{
const IAggregateFunction * that;
IAggregateFunction::AddFunc func;
IAggregateFunction::AddFuncs funcs;
size_t state_offset;
const IColumn ** arguments;
const IAggregateFunction * batch_that;
IAggregateFunction::AddFuncs batch_funcs;
const IColumn ** batch_arguments;
const UInt64 * offsets = nullptr;
};
using AggregateFunctionInstructions = std::vector<AggregateFunctionInstruction>;

View File

@ -0,0 +1,35 @@
<test>
<type>loop</type>
<stop_conditions>
<all_of>
<total_time_ms>30000</total_time_ms>
</all_of>
<any_of>
<average_speed_not_changing_for_ms>6000</average_speed_not_changing_for_ms>
<total_time_ms>60000</total_time_ms>
</any_of>
</stop_conditions>
<main_metric>
<min_time/>
</main_metric>
<settings>
<max_threads>1</max_threads>
</settings>
<create_query>CREATE TABLE array_data(k UInt16, v Array(UInt64)) ENGINE Log</create_query>
<fill_query>INSERT INTO array_data SELECT number % 1024, arrayWithConstant(16, number) from numbers(10000000)</fill_query>
<query>SELECT countMerge(v) FROM (SELECT countState() v FROM numbers(1000000000)) FORMAT Null</query>
<query>SELECT countMerge(v) FROM (SELECT number % 1024 k, countState() v FROM numbers(1000000000) GROUP BY k) FORMAT Null</query>
<query>SELECT sumArray(v) FROM array_data FORMAT Null</query>
<query>SELECT k, sumArray(v) FROM array_data GROUP BY k FORMAT Null</query>
<query>SELECT arrayReduce('avg', v) FROM array_data FORMAT Null</query>
<drop_query>DROP TABLE IF EXISTS array_data</drop_query>
</test>