mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 17:41:59 +00:00
Merge branch 'kononencheg/runningAccumulate' of https://github.com/kononencheg/ClickHouse into kononencheg-kononencheg/runningAccumulate
This commit is contained in:
commit
4f05295402
@ -15,6 +15,7 @@ namespace ErrorCodes
|
||||
{
|
||||
extern const int ILLEGAL_COLUMN;
|
||||
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
}
|
||||
|
||||
|
||||
@ -46,10 +47,9 @@ public:
|
||||
return true;
|
||||
}
|
||||
|
||||
size_t getNumberOfArguments() const override
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
bool isVariadic() const override { return true; }
|
||||
|
||||
size_t getNumberOfArguments() const override { return 0; }
|
||||
|
||||
bool isDeterministic() const override { return false; }
|
||||
|
||||
@ -70,21 +70,32 @@ public:
|
||||
|
||||
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/) override
|
||||
{
|
||||
size_t number_of_arguments = arguments.size();
|
||||
|
||||
if (number_of_arguments == 0)
|
||||
throw Exception("Incorrect number of arguments of function " + getName() + ". Must be 1 or 2.",
|
||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
const ColumnAggregateFunction * column_with_states
|
||||
= typeid_cast<const ColumnAggregateFunction *>(&*block.getByPosition(arguments.at(0)).column);
|
||||
|
||||
if (!column_with_states)
|
||||
throw Exception("Illegal column " + block.getByPosition(arguments.at(0)).column->getName()
|
||||
+ " of first argument of function "
|
||||
+ getName(),
|
||||
ErrorCodes::ILLEGAL_COLUMN);
|
||||
|
||||
ColumnPtr column_with_groups;
|
||||
|
||||
if (number_of_arguments == 2)
|
||||
column_with_groups = block.getByPosition(arguments[1]).column;
|
||||
|
||||
AggregateFunctionPtr aggregate_function_ptr = column_with_states->getAggregateFunction();
|
||||
const IAggregateFunction & agg_func = *aggregate_function_ptr;
|
||||
|
||||
AlignedBuffer place(agg_func.sizeOfData(), agg_func.alignOfData());
|
||||
agg_func.create(place.data());
|
||||
SCOPE_EXIT(agg_func.destroy(place.data()));
|
||||
|
||||
/// Will pass empty arena if agg_func does not allocate memory in arena
|
||||
std::unique_ptr<Arena> arena = agg_func.allocatesMemoryInArena() ? std::make_unique<Arena>() : nullptr;
|
||||
|
||||
auto result_column_ptr = agg_func.getReturnType()->createColumn();
|
||||
@ -92,11 +103,38 @@ public:
|
||||
result_column.reserve(column_with_states->size());
|
||||
|
||||
const auto & states = column_with_states->getData();
|
||||
|
||||
size_t i = 0;
|
||||
|
||||
SCOPE_EXIT({
|
||||
if (i > 0)
|
||||
agg_func.destroy(place.data());
|
||||
});
|
||||
|
||||
for (const auto & state_to_add : states)
|
||||
{
|
||||
/// Will pass empty arena if agg_func does not allocate memory in arena
|
||||
if (i == 0 || (column_with_groups && column_with_groups->compareAt(i, i - 1, *column_with_groups, 1) != 0))
|
||||
{
|
||||
if (i > 0)
|
||||
agg_func.destroy(place.data());
|
||||
|
||||
try
|
||||
{
|
||||
agg_func.create(place.data());
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
// prevent destroy after creation failure
|
||||
i = 0;
|
||||
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
agg_func.merge(place.data(), state_to_add, arena.get());
|
||||
agg_func.insertResultInto(place.data(), result_column);
|
||||
|
||||
++i;
|
||||
}
|
||||
|
||||
block.getByPosition(result).column = std::move(result_column_ptr);
|
||||
|
@ -0,0 +1,30 @@
|
||||
0 0 0
|
||||
0 6 6
|
||||
0 12 18
|
||||
0 18 36
|
||||
0 24 60
|
||||
1 1 1
|
||||
1 7 8
|
||||
1 13 21
|
||||
1 19 40
|
||||
1 25 65
|
||||
2 2 2
|
||||
2 8 10
|
||||
2 14 24
|
||||
2 20 44
|
||||
2 26 70
|
||||
3 3 3
|
||||
3 9 12
|
||||
3 15 27
|
||||
3 21 48
|
||||
3 27 75
|
||||
4 4 4
|
||||
4 10 14
|
||||
4 16 30
|
||||
4 22 52
|
||||
4 28 80
|
||||
5 5 5
|
||||
5 11 16
|
||||
5 17 33
|
||||
5 23 56
|
||||
5 29 85
|
@ -0,0 +1,11 @@
|
||||
SELECT grouping,
|
||||
item,
|
||||
runningAccumulate(state, grouping)
|
||||
FROM (
|
||||
SELECT number % 6 AS grouping,
|
||||
number AS item,
|
||||
sumState(number) AS state
|
||||
FROM (SELECT number FROM system.numbers LIMIT 30)
|
||||
GROUP BY grouping, item
|
||||
ORDER BY grouping, item
|
||||
);
|
Loading…
Reference in New Issue
Block a user