diff --git a/dbms/src/AggregateFunctions/AggregateFunctionArray.h b/dbms/src/AggregateFunctions/AggregateFunctionArray.h index 2a4780b6c9d..66dbcd865d5 100644 --- a/dbms/src/AggregateFunctions/AggregateFunctionArray.h +++ b/dbms/src/AggregateFunctions/AggregateFunctionArray.h @@ -129,6 +129,8 @@ public: return nested_func->allocatesMemoryInArena(); } + AggregateFunctionPtr getNestedFunction() const { return nested_func; } + const char * getHeaderFilePath() const override { return __FILE__; } }; diff --git a/dbms/src/AggregateFunctions/IAggregateFunction.h b/dbms/src/AggregateFunctions/IAggregateFunction.h index e35041ab560..99a6da781d3 100644 --- a/dbms/src/AggregateFunctions/IAggregateFunction.h +++ b/dbms/src/AggregateFunctions/IAggregateFunction.h @@ -119,23 +119,56 @@ public: */ virtual bool isState() const { return false; } + using AddFunc = void (*)(const IAggregateFunction *, AggregateDataPtr, const IColumn **, size_t, Arena *); + + /** Contains a loop with calls to "add" function. You can collect arguments into array "places" + * and do a single call to "addBatch" for devirtualization and inlining. When offsets is not + * null, behave like AddBatchArrayFunc (it's used to work around unknown regressions). + */ + using AddBatchFunc = void (*)( + const IAggregateFunction *, + size_t batch_size, + AggregateDataPtr * places, + size_t place_offset, + const IColumn ** columns, + const UInt64 * offsets, + Arena * arena); + + /** The same for single place. + */ + using AddBatchSinglePlaceFunc + = void (*)(const IAggregateFunction *, size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena); + + /** In addition to the above method, this variant accepts an array of "offsets" which allows + * collecting multiple rows of arguments into array "places" as long as they are between + * offsets[i-1] and offsets[i]. It is used for arrayReduce and might be used generally to + * break data dependency when array "places" contains a large number of same values + * consecutively. + */ + using AddBatchArrayFunc = void (*)( + const IAggregateFunction *, + size_t batch_size, + AggregateDataPtr * places, + size_t place_offset, + const IColumn ** columns, + const UInt64 * offsets, + Arena * arena); + + struct AddFuncs + { + AddFunc add; + AddBatchFunc add_batch; + AddBatchSinglePlaceFunc add_batch_single_place; + AddBatchArrayFunc add_batch_array; + }; + /** The inner loop that uses the function pointer is better than using the virtual function. * The reason is that in the case of virtual functions GCC 5.1.2 generates code, * which, at each iteration of the loop, reloads the function address (the offset value in the virtual function table) from memory to the register. * This gives a performance drop on simple queries around 12%. * After the appearance of better compilers, the code can be removed. */ - using AddFunc = void (*)(const IAggregateFunction *, AggregateDataPtr, const IColumn **, size_t, Arena *); - virtual AddFunc getAddressOfAddFunction() const = 0; - - /** Contains a loop with calls to "add" function. You can collect arguments into array "places" - * and do a single call to "addBatch" for devirtualization and inlining. - */ - virtual void addBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, Arena * arena) const = 0; - - /** The same for single place. - */ - virtual void addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const = 0; + virtual AddFuncs getAddressOfAddFunctions() const = 0; /** This is used for runtime code generation to determine, which header files to include in generated source. * Always implement it as @@ -162,23 +195,58 @@ private: static_cast(*that).add(place, columns, row_num, arena); } + static void addBatch( + const IAggregateFunction * that, + size_t batch_size, + AggregateDataPtr * places, + size_t place_offset, + const IColumn ** columns, + const UInt64 * offsets, + Arena * arena) + { + if (offsets) + { + size_t current_offset = 0; + for (size_t i = 0; i < batch_size; ++i) + { + size_t next_offset = offsets[i]; + for (size_t j = current_offset; j < next_offset; ++j) + static_cast(that)->add(places[i] + place_offset, columns, j, arena); + current_offset = next_offset; + } + } + else + for (size_t i = 0; i < batch_size; ++i) + static_cast(that)->add(places[i] + place_offset, columns, i, arena); + } + + static void + addBatchSinglePlaceFree(const IAggregateFunction * that, size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena) + { + for (size_t i = 0; i < batch_size; ++i) + static_cast(that)->add(place, columns, i, arena); + } + + /// TODO: We cannot use this function directly as it slows down aggregate functions like uniqCombined due to unknown reasons. + static void addBatchArrayFree(const IAggregateFunction * that, + size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, const UInt64 * offsets, Arena * arena) + { + size_t current_offset = 0; + for (size_t i = 0; i < batch_size; ++i) + { + size_t next_offset = offsets[i]; + for (size_t j = current_offset; j < next_offset; ++j) + static_cast(that)->add(places[i] + place_offset, columns, j, arena); + current_offset = next_offset; + } + } + public: IAggregateFunctionHelper(const DataTypes & argument_types_, const Array & parameters_) : IAggregateFunction(argument_types_, parameters_) {} - AddFunc getAddressOfAddFunction() const override { return &addFree; } - - void addBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, Arena * arena) const override - { - for (size_t i = 0; i < batch_size; ++i) - static_cast(this)->add(places[i] + place_offset, columns, i, arena); - } - - void addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const override - { - for (size_t i = 0; i < batch_size; ++i) - static_cast(this)->add(place, columns, i, arena); - } + /// If we return addBatchArrayFree instead of nullptr, it leads to regression. + AddFuncs getAddressOfAddFunctions() const override { return {&addFree, &addBatch, &addBatchSinglePlaceFree, nullptr}; } }; diff --git a/dbms/src/DataStreams/AggregatingSortedBlockInputStream.h b/dbms/src/DataStreams/AggregatingSortedBlockInputStream.h index 0cf4bd64d87..63e2e7cb37a 100644 --- a/dbms/src/DataStreams/AggregatingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/AggregatingSortedBlockInputStream.h @@ -83,7 +83,7 @@ private: SimpleAggregateDescription(const AggregateFunctionPtr & function_, const size_t column_number_) : function(function_), column_number(column_number_) { - add_function = function->getAddressOfAddFunction(); + add_function = function->getAddressOfAddFunctions().add; state.reset(function->sizeOfData(), function->alignOfData()); } diff --git a/dbms/src/DataStreams/SummingSortedBlockInputStream.h b/dbms/src/DataStreams/SummingSortedBlockInputStream.h index 4412e5529f8..e041a9b7300 100644 --- a/dbms/src/DataStreams/SummingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/SummingSortedBlockInputStream.h @@ -84,7 +84,7 @@ private: void init(const char * function_name, const DataTypes & argument_types) { function = AggregateFunctionFactory::instance().get(function_name, argument_types); - add_function = function->getAddressOfAddFunction(); + add_function = function->getAddressOfAddFunctions().add; state.reset(function->sizeOfData(), function->alignOfData()); } diff --git a/dbms/src/Functions/FunctionHelpers.cpp b/dbms/src/Functions/FunctionHelpers.cpp index 1edfbfebf78..5f625ec6d26 100644 --- a/dbms/src/Functions/FunctionHelpers.cpp +++ b/dbms/src/Functions/FunctionHelpers.cpp @@ -18,6 +18,7 @@ namespace ErrorCodes { extern const int ILLEGAL_COLUMN; extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int SIZES_OF_ARRAYS_DOESNT_MATCH; } const ColumnConst * checkAndGetColumnConstStringOrFixedString(const IColumn * column) @@ -118,4 +119,28 @@ void validateArgumentType(const IFunction & func, const DataTypes & arguments, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); } +std::pair, const ColumnArray::Offset *> +checkAndGetNestedArrayOffset(const IColumn ** columns, size_t num_arguments) +{ + assert(num_arguments > 0); + std::vector nested_columns(num_arguments); + const ColumnArray::Offsets * offsets = nullptr; + for (size_t i = 0; i < num_arguments; ++i) + { + const ColumnArray::Offsets * offsets_i = nullptr; + if (const ColumnArray * arr = checkAndGetColumn(columns[i])) + { + nested_columns[i] = &arr->getData(); + offsets_i = &arr->getOffsets(); + } + else + throw Exception("Illegal column " + columns[i]->getName() + " as argument of function", ErrorCodes::ILLEGAL_COLUMN); + if (i == 0) + offsets = offsets_i; + else if (*offsets_i != *offsets) + throw Exception("Lengths of all arrays passed to aggregate function must be equal.", ErrorCodes::SIZES_OF_ARRAYS_DOESNT_MATCH); + } + return {nested_columns, offsets->data()}; +} + } diff --git a/dbms/src/Functions/FunctionHelpers.h b/dbms/src/Functions/FunctionHelpers.h index ac116510b7e..827ea53217a 100644 --- a/dbms/src/Functions/FunctionHelpers.h +++ b/dbms/src/Functions/FunctionHelpers.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -89,4 +90,8 @@ void validateArgumentType(const IFunction & func, const DataTypes & arguments, size_t argument_index, bool (* validator_func)(const IDataType &), const char * expected_type_description); +/// Checks if a list of array columns have equal offsets. Return a pair of nested columns and offsets if true, otherwise throw. +std::pair, const ColumnArray::Offset *> +checkAndGetNestedArrayOffset(const IColumn ** columns, size_t num_arguments); + } diff --git a/dbms/src/Functions/array/arrayReduce.cpp b/dbms/src/Functions/array/arrayReduce.cpp index 516449a4872..ef566345acc 100644 --- a/dbms/src/Functions/array/arrayReduce.cpp +++ b/dbms/src/Functions/array/arrayReduce.cpp @@ -7,11 +7,14 @@ #include #include #include +#include #include #include #include #include +#include + namespace DB { @@ -106,10 +109,7 @@ DataTypePtr FunctionArrayReduce::getReturnTypeImpl(const ColumnsWithTypeAndName void FunctionArrayReduce::executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) { IAggregateFunction & agg_func = *aggregate_function.get(); - AlignedBuffer place_holder(agg_func.sizeOfData(), agg_func.alignOfData()); - AggregateDataPtr place = place_holder.data(); - - std::unique_ptr arena = agg_func.allocatesMemoryInArena() ? std::make_unique() : nullptr; + std::unique_ptr arena = std::make_unique(); /// Aggregate functions do not support constant columns. Therefore, we materialize them. std::vector materialized_columns; @@ -157,32 +157,41 @@ void FunctionArrayReduce::executeImpl(Block & block, const ColumnNumbers & argum throw Exception("State function " + agg_func.getName() + " inserts results into non-state column " + block.getByPosition(result).type->getName(), ErrorCodes::ILLEGAL_COLUMN); - ColumnArray::Offset current_offset = 0; + PODArray places(input_rows_count); for (size_t i = 0; i < input_rows_count; ++i) { - agg_func.create(place); - ColumnArray::Offset next_offset = (*offsets)[i]; - + places[i] = arena->alignedAlloc(agg_func.sizeOfData(), agg_func.alignOfData()); try { - for (size_t j = current_offset; j < next_offset; ++j) - agg_func.add(place, aggregate_arguments, j, arena.get()); - - if (!res_col_aggregate_function) - agg_func.insertResultInto(place, res_col); - else - res_col_aggregate_function->insertFrom(place); + agg_func.create(places[i]); } catch (...) { - agg_func.destroy(place); + agg_func.destroy(places[i]); throw; } - - agg_func.destroy(place); - current_offset = next_offset; } + SCOPE_EXIT({ + for (size_t i = 0; i < input_rows_count; ++i) + agg_func.destroy(places[i]); + }); + + { + auto that = &agg_func; + /// Unnest consecutive trailing -State combinators + while (auto func = typeid_cast(that)) + that = func->getNestedFunction().get(); + + that->getAddressOfAddFunctions().add_batch( + that, input_rows_count, places.data(), 0, aggregate_arguments, offsets->data(), arena.get()); + } + + for (size_t i = 0; i < input_rows_count; ++i) + if (!res_col_aggregate_function) + agg_func.insertResultInto(places[i], res_col); + else + res_col_aggregate_function->insertFrom(places[i]); block.getByPosition(result).column = std::move(result_holder); } diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index fc358c22189..f0c0aec8ee6 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -26,6 +26,8 @@ #include #include #include +#include +#include namespace ProfileEvents @@ -450,7 +452,7 @@ void NO_INLINE Aggregator::executeImplCase( /// Add values to the aggregate functions. for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst) - (*inst->func)(inst->that, value + inst->state_offset, inst->arguments, i, aggregates_pool); + (*inst->funcs.add)(inst->that, value + inst->state_offset, inst->arguments, i, aggregates_pool); } } @@ -492,7 +494,10 @@ void NO_INLINE Aggregator::executeImplBatch( /// Add values to the aggregate functions. for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst) - inst->that->addBatch(rows, places.data(), inst->state_offset, inst->arguments, aggregates_pool); + { + (*inst->batch_funcs.add_batch)( + inst->batch_that, rows, places.data(), inst->state_offset, inst->batch_arguments, inst->offsets, aggregates_pool); + } } @@ -504,7 +509,13 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl( { /// Adding values for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst) - inst->that->addBatchSinglePlace(rows, res + inst->state_offset, inst->arguments, arena); + { + if (inst->offsets) + (*inst->batch_funcs.add_batch_single_place)( + inst->batch_that, inst->offsets[static_cast(rows - 1)], res + inst->state_offset, inst->batch_arguments, arena); + else + (*inst->batch_funcs.add_batch_single_place)(inst->batch_that, rows, res + inst->state_offset, inst->batch_arguments, arena); + } } @@ -564,6 +575,7 @@ bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedData AggregateFunctionInstructions aggregate_functions_instructions(params.aggregates_size + 1); aggregate_functions_instructions[params.aggregates_size].that = nullptr; + std::vector> nested_columns_holder; for (size_t i = 0; i < params.aggregates_size; ++i) { for (size_t j = 0; j < aggregate_columns[i].size(); ++j) @@ -579,10 +591,31 @@ bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedData } } - aggregate_functions_instructions[i].that = aggregate_functions[i]; - aggregate_functions_instructions[i].func = aggregate_functions[i]->getAddressOfAddFunction(); - aggregate_functions_instructions[i].state_offset = offsets_of_aggregate_states[i]; aggregate_functions_instructions[i].arguments = aggregate_columns[i].data(); + aggregate_functions_instructions[i].state_offset = offsets_of_aggregate_states[i]; + auto that = aggregate_functions[i]; + /// Unnest consecutive trailing -State combinators + while (auto func = typeid_cast(that)) + that = func->getNestedFunction().get(); + aggregate_functions_instructions[i].that = that; + aggregate_functions_instructions[i].funcs = that->getAddressOfAddFunctions(); + + if (auto func = typeid_cast(that)) + { + /// Unnest consecutive -State combinators before -Array + that = func->getNestedFunction().get(); + while (auto nested_func = typeid_cast(that)) + that = nested_func->getNestedFunction().get(); + auto [nested_columns, offsets] = checkAndGetNestedArrayOffset(aggregate_columns[i].data(), that->getArgumentTypes().size()); + nested_columns_holder.push_back(std::move(nested_columns)); + aggregate_functions_instructions[i].batch_arguments = nested_columns_holder.back().data(); + aggregate_functions_instructions[i].offsets = offsets; + } + else + aggregate_functions_instructions[i].batch_arguments = aggregate_columns[i].data(); + + aggregate_functions_instructions[i].batch_that = that; + aggregate_functions_instructions[i].batch_funcs = that->getAddressOfAddFunctions(); } if (isCancelled()) diff --git a/dbms/src/Interpreters/Aggregator.h b/dbms/src/Interpreters/Aggregator.h index 359b9f14c2a..fe139c775ff 100644 --- a/dbms/src/Interpreters/Aggregator.h +++ b/dbms/src/Interpreters/Aggregator.h @@ -1005,9 +1005,13 @@ protected: struct AggregateFunctionInstruction { const IAggregateFunction * that; - IAggregateFunction::AddFunc func; + IAggregateFunction::AddFuncs funcs; size_t state_offset; const IColumn ** arguments; + const IAggregateFunction * batch_that; + IAggregateFunction::AddFuncs batch_funcs; + const IColumn ** batch_arguments; + const UInt64 * offsets = nullptr; }; using AggregateFunctionInstructions = std::vector; diff --git a/dbms/tests/performance/vectorize_aggregation_combinators.xml b/dbms/tests/performance/vectorize_aggregation_combinators.xml new file mode 100644 index 00000000000..a1afb2e6cc8 --- /dev/null +++ b/dbms/tests/performance/vectorize_aggregation_combinators.xml @@ -0,0 +1,35 @@ + + + loop + + + + 30000 + + + 6000 + 60000 + + + + + + + + + 1 + + + CREATE TABLE array_data(k UInt16, v Array(UInt64)) ENGINE Log + + INSERT INTO array_data SELECT number % 1024, arrayWithConstant(16, number) from numbers(10000000) + + SELECT countMerge(v) FROM (SELECT countState() v FROM numbers(1000000000)) FORMAT Null + SELECT countMerge(v) FROM (SELECT number % 1024 k, countState() v FROM numbers(1000000000) GROUP BY k) FORMAT Null + + SELECT sumArray(v) FROM array_data FORMAT Null + SELECT k, sumArray(v) FROM array_data GROUP BY k FORMAT Null + SELECT arrayReduce('avg', v) FROM array_data FORMAT Null + + DROP TABLE IF EXISTS array_data +