ClickHouse/src/Functions/runningAccumulate.cpp

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

149 lines
4.8 KiB
C++
Raw Normal View History

2021-05-17 07:30:42 +00:00
#include <Functions/IFunction.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Columns/ColumnAggregateFunction.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <Common/AlignedBuffer.h>
#include <Common/Arena.h>
2022-04-27 15:05:45 +00:00
#include <Common/scope_guard_safe.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
2019-12-27 20:55:42 +00:00
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
2020-09-07 18:00:37 +00:00
namespace
{
/** runningAccumulate(agg_state) - takes the states of the aggregate function and returns a column with values,
2020-10-14 14:04:50 +00:00
* are the result of the accumulation of these states for a set of columns lines, from the first to the current line.
*
* Quite unusual function.
* Takes state of aggregate function (example runningAccumulate(uniqState(UserID))),
2020-10-14 14:04:50 +00:00
* and for each row of columns, return result of aggregate function on merge of states of all previous rows and current row.
*
* So, result of function depends on partition of data to columns and on order of data in columns.
*/
class FunctionRunningAccumulate : public IFunction
{
public:
static constexpr auto name = "runningAccumulate";
2021-06-01 12:20:52 +00:00
static FunctionPtr create(ContextPtr)
{
return std::make_shared<FunctionRunningAccumulate>();
}
String getName() const override
{
return name;
}
bool isStateful() const override
{
return true;
}
2019-12-20 20:56:39 +00:00
bool isVariadic() const override { return true; }
size_t getNumberOfArguments() const override { return 0; }
bool isDeterministic() const override
{
return false;
}
bool isDeterministicInScopeOfQuery() const override
{
return false;
}
2021-06-22 16:21:23 +00:00
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
2020-03-09 03:38:43 +00:00
if (arguments.empty() || arguments.size() > 2)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Incorrect number of arguments of function {}. Must be 1 or 2.", getName());
const DataTypeAggregateFunction * type = checkAndGetDataType<DataTypeAggregateFunction>(arguments[0].get());
if (!type)
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Argument for function {} must have type AggregateFunction - state "
"of aggregate function.", getName());
return type->getReturnType();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override
{
const ColumnAggregateFunction * column_with_states
2020-10-19 15:27:41 +00:00
= typeid_cast<const ColumnAggregateFunction *>(&*arguments.at(0).column);
2019-12-20 20:56:39 +00:00
if (!column_with_states)
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Illegal column {} of first argument of function {}",
arguments.at(0).column->getName(), getName());
2019-12-20 20:56:39 +00:00
ColumnPtr column_with_groups;
if (arguments.size() == 2)
2020-10-19 15:27:41 +00:00
column_with_groups = arguments[1].column;
2019-12-20 20:56:39 +00:00
2022-11-30 18:48:09 +00:00
AggregateFunctionPtr aggregate_function_ptr = column_with_states->getAggregateFunction();
const IAggregateFunction & agg_func = *aggregate_function_ptr;
AlignedBuffer place(agg_func.sizeOfData(), agg_func.alignOfData());
2019-12-20 20:56:39 +00:00
/// Will pass empty arena if agg_func does not allocate memory in arena
std::unique_ptr<Arena> arena = agg_func.allocatesMemoryInArena() ? std::make_unique<Arena>() : nullptr;
2022-11-28 15:02:59 +00:00
auto result_column_ptr = agg_func.getResultType()->createColumn();
IColumn & result_column = *result_column_ptr;
result_column.reserve(column_with_states->size());
const auto & states = column_with_states->getData();
2019-12-20 20:56:39 +00:00
bool state_created = false;
SCOPE_EXIT_MEMORY_SAFE({
if (state_created)
agg_func.destroy(place.data());
});
size_t row_number = 0;
for (const auto & state_to_add : states)
{
if (row_number == 0 || (column_with_groups && column_with_groups->compareAt(row_number, row_number - 1, *column_with_groups, 1) != 0))
2019-12-20 20:56:39 +00:00
{
if (state_created)
{
agg_func.destroy(place.data());
state_created = false;
}
2021-05-08 14:09:40 +00:00
agg_func.create(place.data()); /// This function can throw.
state_created = true; //-V519
2019-12-20 20:56:39 +00:00
}
agg_func.merge(place.data(), state_to_add, arena.get());
2020-06-17 19:36:27 +00:00
agg_func.insertResultInto(place.data(), result_column, arena.get());
2019-12-20 20:56:39 +00:00
++row_number;
}
2020-10-19 15:27:41 +00:00
return result_column_ptr;
}
};
2020-09-07 18:00:37 +00:00
}
REGISTER_FUNCTION(RunningAccumulate)
{
factory.registerFunction<FunctionRunningAccumulate>();
}
}