dbms: added functions for manipulation of aggregation states [#METR-16677].

This commit is contained in:
Alexey Milovidov 2015-06-06 03:28:37 +03:00
parent 890ee9c0fa
commit 417ab909b8
2 changed files with 105 additions and 0 deletions

View File

@ -12,6 +12,7 @@
#include <DB/DataTypes/DataTypeDateTime.h>
#include <DB/DataTypes/DataTypeTuple.h>
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/DataTypes/DataTypeAggregateFunction.h>
#include <DB/Columns/ColumnString.h>
#include <DB/Columns/ColumnFixedString.h>
#include <DB/Columns/ColumnConst.h>
@ -20,6 +21,7 @@
#include <DB/Columns/ColumnTuple.h>
#include <DB/Columns/ColumnArray.h>
#include <DB/Columns/ColumnReplicated.h>
#include <DB/Columns/ColumnAggregateFunction.h>
#include <DB/Common/UnicodeBar.h>
#include <DB/Functions/IFunction.h>
#include <DB/Interpreters/ExpressionActions.h>
@ -58,6 +60,11 @@ namespace DB
* bar(x, min, max, width) - рисует полосу из количества символов, пропорционального (x - min) и равного width при x == max.
*
* version() - возвращает текущую версию сервера в строке.
*
* finalizeAggregation(agg_state) - по состоянию агрегации получить результат.
*
* runningAccumulate(agg_state) - принимает состояния агрегатной функции и возвращает столбец со значениями,
* являющимися результатом накопления этих состояний для множества строк блока, от первой до текущей строки.
*/
@ -890,6 +897,7 @@ using FunctionIsFinite = FunctionNumericPredicate<IsFiniteImpl>;
using FunctionIsInfinite = FunctionNumericPredicate<IsInfiniteImpl>;
using FunctionIsNaN = FunctionNumericPredicate<IsNaNImpl>;
class FunctionVersion : public IFunction
{
public:
@ -920,4 +928,98 @@ private:
}
};
/** Весьма необычная функция.
* Принимает состояние агрегатной функции (например runningAccumulate(uniqState(UserID))),
* и для каждой строки блока, возвращает результат агрегатной функции по объединению состояний от всех предыдущих строк блока и текущей строки.
*
* То есть, функция зависит от разбиения данных на блоки и от порядка строк в блоке.
*/
class FunctionRunningAccumulate : public IFunction
{
public:
static constexpr auto name = "runningAccumulate";
static IFunction * create(const Context & context) { return new FunctionRunningAccumulate; }
String getName() const override { return name; }
DataTypePtr getReturnType(const DataTypes & arguments) const override
{
if (arguments.size() != 1)
throw Exception("Function " + getName() + " requires exactly one argument.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const DataTypeAggregateFunction * type = typeid_cast<const DataTypeAggregateFunction *>(&*arguments[0]);
if (!type)
throw Exception("Argument for function " + getName() + " must have type AggregateFunction - state of aggregate function.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return type->getReturnType()->clone();
}
void execute(Block & block, const ColumnNumbers & arguments, size_t result) override
{
const ColumnAggregateFunction * column_with_states = typeid_cast<const ColumnAggregateFunction *>(&*block.getByPosition(arguments.at(0)).column);
if (!column_with_states)
throw Exception(
"Illegal column " + block.getByPosition(arguments.at(0)).column->getName() + " of first argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
AggregateFunctionPtr aggregate_function_ptr = column_with_states->getAggregateFunction();
const IAggregateFunction & agg_func = *aggregate_function_ptr;
auto deleter = [&agg_func] (char * ptr) { agg_func.destroy(ptr); free(ptr); };
std::unique_ptr<char, decltype(deleter)> place { reinterpret_cast<char *>(malloc(agg_func.sizeOfData())), deleter };
agg_func.create(place.get()); /// Немного не exception-safe. Если здесь выкинется исключение, то зря вызовется destroy.
ColumnPtr result_column_ptr = agg_func.getReturnType()->createColumn();
block.getByPosition(result).column = result_column_ptr;
IColumn & result_column = *result_column_ptr;
result_column.reserve(column_with_states->size());
const auto & states = column_with_states->getData();
for (const auto & state_to_add : states)
{
agg_func.merge(place.get(), state_to_add);
agg_func.insertResultInto(place.get(), result_column);
}
}
};
/** Принимает состояние агрегатной функции. Возвращает результат агрегации.
*/
class FunctionFinalizeAggregation : public IFunction
{
public:
static constexpr auto name = "finalizeAggregation";
static IFunction * create(const Context & context) { return new FunctionFinalizeAggregation; }
String getName() const override { return name; }
DataTypePtr getReturnType(const DataTypes & arguments) const override
{
if (arguments.size() != 1)
throw Exception("Function " + getName() + " requires exactly one argument.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const DataTypeAggregateFunction * type = typeid_cast<const DataTypeAggregateFunction *>(&*arguments[0]);
if (!type)
throw Exception("Argument for function " + getName() + " must have type AggregateFunction - state of aggregate function.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return type->getReturnType()->clone();
}
void execute(Block & block, const ColumnNumbers & arguments, size_t result) override
{
ColumnAggregateFunction * column_with_states = typeid_cast<ColumnAggregateFunction *>(&*block.getByPosition(arguments.at(0)).column);
if (!column_with_states)
throw Exception(
"Illegal column " + block.getByPosition(arguments.at(0)).column->getName() + " of first argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
block.getByPosition(result).column = column_with_states->convertToValues();
}
};
}

View File

@ -340,6 +340,9 @@ void registerFunctionsMiscellaneous(FunctionFactory & factory)
factory.registerFunction<FunctionIsNaN>();
factory.registerFunction<FunctionVersion>();
factory.registerFunction<FunctionRunningAccumulate>();
factory.registerFunction<FunctionFinalizeAggregation>();
}
}