From c339c1ca665de7f87322a2522c5753fecacd8839 Mon Sep 17 00:00:00 2001 From: Sergey Kononenko Date: Fri, 20 Dec 2019 23:56:39 +0300 Subject: [PATCH 1/5] Reset column for runningAccumulate --- dbms/src/Functions/runningAccumulate.cpp | 29 ++++++++++++++---- .../01012_reset_running_accumulate.reference | 30 +++++++++++++++++++ .../01012_reset_running_accumulate.sql | 11 +++++++ 3 files changed, 65 insertions(+), 5 deletions(-) create mode 100644 dbms/tests/queries/0_stateless/01012_reset_running_accumulate.reference create mode 100644 dbms/tests/queries/0_stateless/01012_reset_running_accumulate.sql diff --git a/dbms/src/Functions/runningAccumulate.cpp b/dbms/src/Functions/runningAccumulate.cpp index a4ccc1e1553..92572e39926 100644 --- a/dbms/src/Functions/runningAccumulate.cpp +++ b/dbms/src/Functions/runningAccumulate.cpp @@ -46,10 +46,9 @@ public: return true; } - size_t getNumberOfArguments() const override - { - return 1; - } + bool isVariadic() const override { return true; } + + size_t getNumberOfArguments() const override { return 0; } bool isDeterministic() const override { return false; } @@ -70,14 +69,22 @@ public: void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/) override { + size_t number_of_arguments = arguments.size(); + const ColumnAggregateFunction * column_with_states = typeid_cast(&*block.getByPosition(arguments.at(0)).column); + if (!column_with_states) throw Exception("Illegal column " + block.getByPosition(arguments.at(0)).column->getName() + " of first argument of function " + getName(), ErrorCodes::ILLEGAL_COLUMN); + ColumnPtr column_with_groups; + + if (number_of_arguments == 2) + column_with_groups = block.getByPosition(arguments[1]).column; + AggregateFunctionPtr aggregate_function_ptr = column_with_states->getAggregateFunction(); const IAggregateFunction & agg_func = *aggregate_function_ptr; @@ -85,6 +92,7 @@ public: agg_func.create(place.data()); SCOPE_EXIT(agg_func.destroy(place.data())); + /// Will pass empty arena if agg_func does not allocate memory in arena std::unique_ptr arena = agg_func.allocatesMemoryInArena() ? std::make_unique() : nullptr; auto result_column_ptr = agg_func.getReturnType()->createColumn(); @@ -92,11 +100,22 @@ public: result_column.reserve(column_with_states->size()); const auto & states = column_with_states->getData(); + + size_t i = 0; + for (const auto & state_to_add : states) { - /// Will pass empty arena if agg_func does not allocate memory in arena + if (column_with_groups && i > 0 && + column_with_groups->compareAt(i, i - 1, *column_with_groups, 1) != 0) + { + agg_func.destroy(place.data()); + agg_func.create(place.data()); + } + agg_func.merge(place.data(), state_to_add, arena.get()); agg_func.insertResultInto(place.data(), result_column); + + ++i; } block.getByPosition(result).column = std::move(result_column_ptr); diff --git a/dbms/tests/queries/0_stateless/01012_reset_running_accumulate.reference b/dbms/tests/queries/0_stateless/01012_reset_running_accumulate.reference new file mode 100644 index 00000000000..98d21902f5c --- /dev/null +++ b/dbms/tests/queries/0_stateless/01012_reset_running_accumulate.reference @@ -0,0 +1,30 @@ +0 0 0 +0 6 6 +0 12 18 +0 18 36 +0 24 60 +1 1 1 +1 7 8 +1 13 21 +1 19 40 +1 25 65 +2 2 2 +2 8 10 +2 14 24 +2 20 44 +2 26 70 +3 3 3 +3 9 12 +3 15 27 +3 21 48 +3 27 75 +4 4 4 +4 10 14 +4 16 30 +4 22 52 +4 28 80 +5 5 5 +5 11 16 +5 17 33 +5 23 56 +5 29 85 diff --git a/dbms/tests/queries/0_stateless/01012_reset_running_accumulate.sql b/dbms/tests/queries/0_stateless/01012_reset_running_accumulate.sql new file mode 100644 index 00000000000..b9336b2f50c --- /dev/null +++ b/dbms/tests/queries/0_stateless/01012_reset_running_accumulate.sql @@ -0,0 +1,11 @@ +SELECT grouping, + item, + runningAccumulate(state, grouping) +FROM ( + SELECT number % 6 AS grouping, + number AS item, + sumState(number) AS state + FROM (SELECT number FROM system.numbers LIMIT 30) + GROUP BY grouping, item + ORDER BY grouping, item +); \ No newline at end of file From 50f38613587446f082bd5331c3e3cae39382bf3f Mon Sep 17 00:00:00 2001 From: Sergey Kononenko Date: Fri, 27 Dec 2019 23:55:42 +0300 Subject: [PATCH 2/5] Check arguments count --- dbms/src/Functions/runningAccumulate.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/dbms/src/Functions/runningAccumulate.cpp b/dbms/src/Functions/runningAccumulate.cpp index 92572e39926..8e0ec0f0dc3 100644 --- a/dbms/src/Functions/runningAccumulate.cpp +++ b/dbms/src/Functions/runningAccumulate.cpp @@ -15,6 +15,7 @@ namespace ErrorCodes { extern const int ILLEGAL_COLUMN; extern const int ILLEGAL_TYPE_OF_ARGUMENT; + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; } @@ -71,6 +72,10 @@ public: { size_t number_of_arguments = arguments.size(); + if (number_of_arguments == 0) + throw Exception("Incorrect number of arguments of function " + getName() + ". Must be 1 or 2.", + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + const ColumnAggregateFunction * column_with_states = typeid_cast(&*block.getByPosition(arguments.at(0)).column); From 96302d49490375643ee5609cb2e393f207235e9e Mon Sep 17 00:00:00 2001 From: Sergey Kononenko Date: Sat, 28 Dec 2019 01:52:03 +0300 Subject: [PATCH 3/5] Prevent aggregation function destroy repeat --- dbms/src/Functions/runningAccumulate.cpp | 27 ++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/dbms/src/Functions/runningAccumulate.cpp b/dbms/src/Functions/runningAccumulate.cpp index 8e0ec0f0dc3..ebb164db188 100644 --- a/dbms/src/Functions/runningAccumulate.cpp +++ b/dbms/src/Functions/runningAccumulate.cpp @@ -94,8 +94,6 @@ public: const IAggregateFunction & agg_func = *aggregate_function_ptr; AlignedBuffer place(agg_func.sizeOfData(), agg_func.alignOfData()); - agg_func.create(place.data()); - SCOPE_EXIT(agg_func.destroy(place.data())); /// Will pass empty arena if agg_func does not allocate memory in arena std::unique_ptr arena = agg_func.allocatesMemoryInArena() ? std::make_unique() : nullptr; @@ -108,13 +106,30 @@ public: size_t i = 0; + SCOPE_EXIT({ + if (i > 0) + agg_func.destroy(place.data()); + }); + for (const auto & state_to_add : states) { - if (column_with_groups && i > 0 && - column_with_groups->compareAt(i, i - 1, *column_with_groups, 1) != 0) + if (i == 0 || (column_with_groups && column_with_groups->compareAt(i, i - 1, *column_with_groups, 1) != 0)) { - agg_func.destroy(place.data()); - agg_func.create(place.data()); + if (i > 0) { + agg_func.destroy(place.data()); + } + + try + { + agg_func.create(place.data()); + } + catch (...) + { + agg_func.destroy(place.data()); + i = 0; // prevent destroy duplication + + throw; + } } agg_func.merge(place.data(), state_to_add, arena.get()); From 0616b35ac8c494357a4c4359326f4d2cdfd5e97c Mon Sep 17 00:00:00 2001 From: Sergey Kononenko Date: Sat, 28 Dec 2019 01:54:01 +0300 Subject: [PATCH 4/5] Fix style --- dbms/src/Functions/runningAccumulate.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dbms/src/Functions/runningAccumulate.cpp b/dbms/src/Functions/runningAccumulate.cpp index ebb164db188..8a79ad05eaa 100644 --- a/dbms/src/Functions/runningAccumulate.cpp +++ b/dbms/src/Functions/runningAccumulate.cpp @@ -115,9 +115,8 @@ public: { if (i == 0 || (column_with_groups && column_with_groups->compareAt(i, i - 1, *column_with_groups, 1) != 0)) { - if (i > 0) { + if (i > 0) agg_func.destroy(place.data()); - } try { From d7b030dbff98b5450e561c527ffbbf006edff4fa Mon Sep 17 00:00:00 2001 From: Sergey Kononenko Date: Sat, 28 Dec 2019 02:33:37 +0300 Subject: [PATCH 5/5] Fix bad solution --- dbms/src/Functions/runningAccumulate.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Functions/runningAccumulate.cpp b/dbms/src/Functions/runningAccumulate.cpp index 8a79ad05eaa..761f3692e3d 100644 --- a/dbms/src/Functions/runningAccumulate.cpp +++ b/dbms/src/Functions/runningAccumulate.cpp @@ -124,8 +124,8 @@ public: } catch (...) { - agg_func.destroy(place.data()); - i = 0; // prevent destroy duplication + // prevent destroy after creation failure + i = 0; throw; }