From e32d02ebb50bb5375e540a14f1634aded64aa4d9 Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Tue, 25 Aug 2015 19:34:28 +0300 Subject: [PATCH] dbms: Server: Added an optional parameter to runningAccumulate to make it possible to order merging operations within a block according to a key. It is used only for research and testing purposes. [#METR-17276] --- .../DB/Functions/FunctionsMiscellaneous.h | 136 +++++++++++++++--- 1 file changed, 118 insertions(+), 18 deletions(-) diff --git a/dbms/include/DB/Functions/FunctionsMiscellaneous.h b/dbms/include/DB/Functions/FunctionsMiscellaneous.h index 3fe496f847a..d06394d6c62 100644 --- a/dbms/include/DB/Functions/FunctionsMiscellaneous.h +++ b/dbms/include/DB/Functions/FunctionsMiscellaneous.h @@ -964,26 +964,14 @@ private: */ class FunctionRunningAccumulate : public IFunction { -public: - static constexpr auto name = "runningAccumulate"; - static IFunction * create(const Context & context) { return new FunctionRunningAccumulate; } - - String getName() const override { return name; } - - DataTypePtr getReturnType(const DataTypes & arguments) const override +private: + template + bool checkType(const IDataType * type) const { - if (arguments.size() != 1) - throw Exception("Function " + getName() + " requires exactly one argument.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); - - const DataTypeAggregateFunction * type = typeid_cast(&*arguments[0]); - if (!type) - throw Exception("Argument for function " + getName() + " must have type AggregateFunction - state of aggregate function.", - ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); - - return type->getReturnType()->clone(); + return typeid_cast(type) != nullptr; } - void execute(Block & block, const ColumnNumbers & arguments, size_t result) override + void executeWithoutKey(Block & block, const ColumnNumbers & arguments, size_t result) { const ColumnAggregateFunction * column_with_states = typeid_cast(&*block.getByPosition(arguments.at(0)).column); if (!column_with_states) @@ -1011,8 +999,120 @@ public: agg_func.insertResultInto(place.get(), result_column); } } -}; + template + bool executeForType(Block & block, const ColumnNumbers & arguments, size_t result) + { + using ColumnType = ColumnVector; + if (const auto * column_keys = typeid_cast(&*block.getByPosition(arguments[1]).column)) + { + const ColumnAggregateFunction * column_with_states = typeid_cast(&*block.getByPosition(arguments.at(0)).column); + if (!column_with_states) + throw Exception( + "Illegal column " + block.getByPosition(arguments.at(0)).column->getName() + " of first argument of function " + getName(), + ErrorCodes::ILLEGAL_COLUMN); + + AggregateFunctionPtr aggregate_function_ptr = column_with_states->getAggregateFunction(); + const IAggregateFunction & agg_func = *aggregate_function_ptr; + + auto deleter = [&agg_func] (char * ptr) { agg_func.destroy(ptr); free(ptr); }; + std::unique_ptr place { reinterpret_cast(malloc(agg_func.sizeOfData())), deleter }; + + agg_func.create(place.get()); /// Немного не exception-safe. Если здесь выкинется исключение, то зря вызовется destroy. + + ColumnPtr result_column_ptr = agg_func.getReturnType()->createColumn(); + block.getByPosition(result).column = result_column_ptr; + IColumn & result_column = *result_column_ptr; + result_column.reserve(column_with_states->size()); + + const auto & states = column_with_states->getData(); + const auto & keys = column_keys->getData(); + size_t size = states.size(); + + using Position = typename std::pair; + std::vector positions; + positions.reserve(size); + + for (size_t i = 0; i < size; ++i) + positions.push_back(std::make_pair(keys[i], i)); + std::sort(positions.begin(), positions.end(), [](const Position & lhs, const Position & rhs) + { + return lhs.first < rhs.first; + }); + + for (const auto & pos : positions) + { + size_t index = pos.second; + agg_func.merge(place.get(), states[index]); + agg_func.insertResultInto(place.get(), result_column); + } + + return true; + } + else + return false; + } + +public: + static constexpr auto name = "runningAccumulate"; + static IFunction * create(const Context & context) { return new FunctionRunningAccumulate; } + + String getName() const override { return name; } + + DataTypePtr getReturnType(const DataTypes & arguments) const override + { + if ((arguments.size() < 1) || (arguments.size() > 2)) + throw Exception("Function " + getName() + " requires 1 or 2 arguments.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + if (arguments.size() == 2) + { + const IDataType * type = &*arguments[1]; + if (! (checkType(type) + || checkType(type) + || checkType(type) + || checkType(type) + || checkType(type) + || checkType(type) + || checkType(type) + || checkType(type))) + { + throw Exception("Illegal type in second argument of function " + getName(), + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + } + } + + const DataTypeAggregateFunction * type = typeid_cast(&*arguments[0]); + if (!type) + throw Exception("Argument for function " + getName() + " must have type AggregateFunction - state of aggregate function.", + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + + return type->getReturnType()->clone(); + } + + void execute(Block & block, const ColumnNumbers & arguments, size_t result) override + { + if (arguments.size() == 1) + executeWithoutKey(block, arguments, result); + else if (arguments.size() == 2) + { + if (!( executeForType(block, arguments, result) + || executeForType(block, arguments, result) + || executeForType(block, arguments, result) + || executeForType(block, arguments, result) + || executeForType(block, arguments, result) + || executeForType(block, arguments, result) + || executeForType(block, arguments, result) + || executeForType(block, arguments, result))) + { + throw Exception("Illegal column " + block.getByPosition(arguments[0]).column->getName() + + " of argument of function " + getName(), + ErrorCodes::ILLEGAL_COLUMN); + } + } + else + throw Exception("Function " + getName() + " requires 1 or 2 arguments.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + } +}; /** Принимает состояние агрегатной функции. Возвращает результат агрегации. */