From 417ab909b8d5a358d3602a0b150bebda3e427c21 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 6 Jun 2015 03:28:37 +0300 Subject: [PATCH] dbms: added functions for manipulation of aggregation states [#METR-16677]. --- .../DB/Functions/FunctionsMiscellaneous.h | 102 ++++++++++++++++++ dbms/src/Functions/FunctionsMiscellaneous.cpp | 3 + 2 files changed, 105 insertions(+) diff --git a/dbms/include/DB/Functions/FunctionsMiscellaneous.h b/dbms/include/DB/Functions/FunctionsMiscellaneous.h index 77b55120b44..de7d94274dd 100644 --- a/dbms/include/DB/Functions/FunctionsMiscellaneous.h +++ b/dbms/include/DB/Functions/FunctionsMiscellaneous.h @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -20,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -58,6 +60,11 @@ namespace DB * bar(x, min, max, width) - рисует полосу из количества символов, пропорционального (x - min) и равного width при x == max. * * version() - возвращает текущую версию сервера в строке. + * + * finalizeAggregation(agg_state) - по состоянию агрегации получить результат. + * + * runningAccumulate(agg_state) - принимает состояния агрегатной функции и возвращает столбец со значениями, + * являющимися результатом накопления этих состояний для множества строк блока, от первой до текущей строки. */ @@ -890,6 +897,7 @@ using FunctionIsFinite = FunctionNumericPredicate; using FunctionIsInfinite = FunctionNumericPredicate; using FunctionIsNaN = FunctionNumericPredicate; + class FunctionVersion : public IFunction { public: @@ -920,4 +928,98 @@ private: } }; + +/** Весьма необычная функция. + * Принимает состояние агрегатной функции (например runningAccumulate(uniqState(UserID))), + * и для каждой строки блока, возвращает результат агрегатной функции по объединению состояний от всех предыдущих строк блока и текущей строки. + * + * То есть, функция зависит от разбиения данных на блоки и от порядка строк в блоке. + */ +class FunctionRunningAccumulate : public IFunction +{ +public: + static constexpr auto name = "runningAccumulate"; + static IFunction * create(const Context & context) { return new FunctionRunningAccumulate; } + + String getName() const override { return name; } + + DataTypePtr getReturnType(const DataTypes & arguments) const override + { + if (arguments.size() != 1) + throw Exception("Function " + getName() + " requires exactly one argument.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + const DataTypeAggregateFunction * type = typeid_cast(&*arguments[0]); + if (!type) + throw Exception("Argument for function " + getName() + " must have type AggregateFunction - state of aggregate function.", + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + + return type->getReturnType()->clone(); + } + + void execute(Block & block, const ColumnNumbers & arguments, size_t result) override + { + const ColumnAggregateFunction * column_with_states = typeid_cast(&*block.getByPosition(arguments.at(0)).column); + if (!column_with_states) + throw Exception( + "Illegal column " + block.getByPosition(arguments.at(0)).column->getName() + " of first argument of function " + getName(), + ErrorCodes::ILLEGAL_COLUMN); + + AggregateFunctionPtr aggregate_function_ptr = column_with_states->getAggregateFunction(); + const IAggregateFunction & agg_func = *aggregate_function_ptr; + + auto deleter = [&agg_func] (char * ptr) { agg_func.destroy(ptr); free(ptr); }; + std::unique_ptr place { reinterpret_cast(malloc(agg_func.sizeOfData())), deleter }; + + agg_func.create(place.get()); /// Немного не exception-safe. Если здесь выкинется исключение, то зря вызовется destroy. + + ColumnPtr result_column_ptr = agg_func.getReturnType()->createColumn(); + block.getByPosition(result).column = result_column_ptr; + IColumn & result_column = *result_column_ptr; + result_column.reserve(column_with_states->size()); + + const auto & states = column_with_states->getData(); + for (const auto & state_to_add : states) + { + agg_func.merge(place.get(), state_to_add); + agg_func.insertResultInto(place.get(), result_column); + } + } +}; + + +/** Принимает состояние агрегатной функции. Возвращает результат агрегации. + */ +class FunctionFinalizeAggregation : public IFunction +{ +public: + static constexpr auto name = "finalizeAggregation"; + static IFunction * create(const Context & context) { return new FunctionFinalizeAggregation; } + + String getName() const override { return name; } + + DataTypePtr getReturnType(const DataTypes & arguments) const override + { + if (arguments.size() != 1) + throw Exception("Function " + getName() + " requires exactly one argument.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + const DataTypeAggregateFunction * type = typeid_cast(&*arguments[0]); + if (!type) + throw Exception("Argument for function " + getName() + " must have type AggregateFunction - state of aggregate function.", + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + + return type->getReturnType()->clone(); + } + + void execute(Block & block, const ColumnNumbers & arguments, size_t result) override + { + ColumnAggregateFunction * column_with_states = typeid_cast(&*block.getByPosition(arguments.at(0)).column); + if (!column_with_states) + throw Exception( + "Illegal column " + block.getByPosition(arguments.at(0)).column->getName() + " of first argument of function " + getName(), + ErrorCodes::ILLEGAL_COLUMN); + + block.getByPosition(result).column = column_with_states->convertToValues(); + } +}; + } diff --git a/dbms/src/Functions/FunctionsMiscellaneous.cpp b/dbms/src/Functions/FunctionsMiscellaneous.cpp index b0cbe7b283a..59dd89481f1 100644 --- a/dbms/src/Functions/FunctionsMiscellaneous.cpp +++ b/dbms/src/Functions/FunctionsMiscellaneous.cpp @@ -340,6 +340,9 @@ void registerFunctionsMiscellaneous(FunctionFactory & factory) factory.registerFunction(); factory.registerFunction(); + + factory.registerFunction(); + factory.registerFunction(); } }