From 4739b87732449af62b8b7dff26bc8103075d859d Mon Sep 17 00:00:00 2001 From: Sofia Antipushina Date: Thu, 14 May 2020 05:14:50 +0300 Subject: [PATCH 01/19] Add -Distinct combinator --- .../AggregateFunctionDistinct.cpp | 53 +++++++++ .../AggregateFunctionDistinct.h | 108 ++++++++++++++++++ .../registerAggregateFunctions.cpp | 1 + .../registerAggregateFunctions.h | 1 + 4 files changed, 163 insertions(+) create mode 100644 src/AggregateFunctions/AggregateFunctionDistinct.cpp create mode 100644 src/AggregateFunctions/AggregateFunctionDistinct.h diff --git a/src/AggregateFunctions/AggregateFunctionDistinct.cpp b/src/AggregateFunctions/AggregateFunctionDistinct.cpp new file mode 100644 index 00000000000..d477a04568f --- /dev/null +++ b/src/AggregateFunctions/AggregateFunctionDistinct.cpp @@ -0,0 +1,53 @@ +#include +#include +#include +#include "registerAggregateFunctions.h" + +namespace DB +{ + + namespace ErrorCodes + { + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int ILLEGAL_TYPE_OF_ARGUMENT; + } + + class AggregateFunctionCombinatorDistinct final : public IAggregateFunctionCombinator + { + public: + String getName() const override { return "Distinct"; } + + DataTypes transformArguments(const DataTypes & arguments) const override + { + if (arguments.empty()) + throw Exception("Incorrect number of arguments for aggregate function with " + getName() + " suffix", + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + +// return DataTypes(arguments.begin(), std::prev(arguments.end())); + DataTypes nested_arguments; + for (const auto & type : arguments) + { + nested_arguments.push_back(type); +// if (const DataTypeArray * array = typeid_cast(type.get())) +// nested_arguments.push_back(array->getNestedType()); +// else +// throw Exception("Illegal type " + type->getName() + " of argument" +// " for aggregate function with " + getName() + " suffix. Must be array.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + } + + return nested_arguments; + } + + AggregateFunctionPtr transformAggregateFunction( + const AggregateFunctionPtr & nested_function, const DataTypes & arguments, const Array &) const override + { + return std::make_shared(nested_function, arguments); + } + }; + + void registerAggregateFunctionCombinatorDistinct(AggregateFunctionCombinatorFactory & factory) + { + factory.registerCombinator(std::make_shared()); + } + +} diff --git a/src/AggregateFunctions/AggregateFunctionDistinct.h b/src/AggregateFunctions/AggregateFunctionDistinct.h new file mode 100644 index 00000000000..160e113d23b --- /dev/null +++ b/src/AggregateFunctions/AggregateFunctionDistinct.h @@ -0,0 +1,108 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; +} + +/** Adaptor for aggregate functions. + * Adding -Distinct suffix to aggregate function +**/ + +class AggregateFunctionDistinct final : public IAggregateFunctionHelper { +private: + mutable std::mutex mutex; + AggregateFunctionPtr nested_func; + mutable HashSet< + UInt128, + UInt128TrivialHash, + HashTableGrower<3>, + HashTableAllocatorWithStackMemory> storage; + +public: + AggregateFunctionDistinct(AggregateFunctionPtr nested, const DataTypes & arguments) + : IAggregateFunctionHelper(arguments, {}) + , nested_func(nested) + { + if (arguments.empty()) + throw Exception("Aggregate function " + getName() + " require at least one argument", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + } + + String getName() const override { + return nested_func->getName() + "Distinct"; + } + + DataTypePtr getReturnType() const override { + return nested_func->getReturnType(); + } + + void create(AggregateDataPtr place) const override + { + nested_func->create(place); + } + + void destroy(AggregateDataPtr place) const noexcept override { + nested_func->destroy(place); + } + + size_t sizeOfData() const override + { + return nested_func->sizeOfData(); + } + + size_t alignOfData() const override + { + return nested_func->alignOfData(); + } + + bool hasTrivialDestructor() const override { + return nested_func->hasTrivialDestructor(); + } + + void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override { + UInt128 key; + SipHash hash; + columns[0]->updateHashWithValue(row_num, hash); + hash.get128(key.low, key.high); + { + std::lock_guard lock(mutex); + if (!storage.insert(key).second) { + return; + } + } + nested_func->add(place, columns, row_num, arena); + } + + void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override { + nested_func->merge(place, rhs, arena); + } + + void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override { + nested_func->serialize(place, buf); + } + + void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override { + nested_func->deserialize(place, buf, arena); + } + + void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override { + nested_func->insertResultInto(place, to); + } + + bool allocatesMemoryInArena() const override { + return nested_func->allocatesMemoryInArena(); + } +}; + +} diff --git a/src/AggregateFunctions/registerAggregateFunctions.cpp b/src/AggregateFunctions/registerAggregateFunctions.cpp index a9ab1d4f8ea..a8d0cf6e37c 100644 --- a/src/AggregateFunctions/registerAggregateFunctions.cpp +++ b/src/AggregateFunctions/registerAggregateFunctions.cpp @@ -58,6 +58,7 @@ void registerAggregateFunctions() registerAggregateFunctionCombinatorNull(factory); registerAggregateFunctionCombinatorOrFill(factory); registerAggregateFunctionCombinatorResample(factory); + registerAggregateFunctionCombinatorDistinct(factory); } } diff --git a/src/AggregateFunctions/registerAggregateFunctions.h b/src/AggregateFunctions/registerAggregateFunctions.h index 88cdf4a504d..981273141f9 100644 --- a/src/AggregateFunctions/registerAggregateFunctions.h +++ b/src/AggregateFunctions/registerAggregateFunctions.h @@ -45,6 +45,7 @@ void registerAggregateFunctionCombinatorMerge(AggregateFunctionCombinatorFactory void registerAggregateFunctionCombinatorNull(AggregateFunctionCombinatorFactory &); void registerAggregateFunctionCombinatorOrFill(AggregateFunctionCombinatorFactory &); void registerAggregateFunctionCombinatorResample(AggregateFunctionCombinatorFactory &); +void registerAggregateFunctionCombinatorDistinct(AggregateFunctionCombinatorFactory &); void registerAggregateFunctions(); From 6e2b93e5af00317f9612fbc9535cd6c8e00a5406 Mon Sep 17 00:00:00 2001 From: Sofia Antipushina Date: Thu, 14 May 2020 22:37:53 +0300 Subject: [PATCH 02/19] Stylefix --- .../AggregateFunctionDistinct.h | 40 +++++++++++-------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/src/AggregateFunctions/AggregateFunctionDistinct.h b/src/AggregateFunctions/AggregateFunctionDistinct.h index 160e113d23b..bab78aa88bf 100644 --- a/src/AggregateFunctions/AggregateFunctionDistinct.h +++ b/src/AggregateFunctions/AggregateFunctionDistinct.h @@ -11,24 +11,35 @@ namespace DB { -namespace ErrorCodes -{ +namespace ErrorCodes { extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; } +struct AggregateFunctionDistinctData { + using Key = UInt128; + + HashSet< + Key, + UInt128TrivialHash, + HashTableGrower<3>, + HashTableAllocatorWithStackMemory + > data; + std::mutex mutex; + + bool ALWAYS_INLINE TryToInsert(const Key& key) { + std::lock_guard lock(mutex); + return data.insert(key).second; + } +}; + /** Adaptor for aggregate functions. * Adding -Distinct suffix to aggregate function **/ class AggregateFunctionDistinct final : public IAggregateFunctionHelper { private: - mutable std::mutex mutex; AggregateFunctionPtr nested_func; - mutable HashSet< - UInt128, - UInt128TrivialHash, - HashTableGrower<3>, - HashTableAllocatorWithStackMemory> storage; + mutable AggregateFunctionDistinctData storage; public: AggregateFunctionDistinct(AggregateFunctionPtr nested, const DataTypes & arguments) @@ -71,17 +82,14 @@ public: } void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override { - UInt128 key; SipHash hash; columns[0]->updateHashWithValue(row_num, hash); + + UInt128 key; hash.get128(key.low, key.high); - { - std::lock_guard lock(mutex); - if (!storage.insert(key).second) { - return; - } - } - nested_func->add(place, columns, row_num, arena); + + if (storage.TryToInsert(key)) + nested_func->add(place, columns, row_num, arena); } void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override { From da81c56b5e5a33e1b36c5949775f22c5a78c350f Mon Sep 17 00:00:00 2001 From: Sofia Antipushina Date: Thu, 14 May 2020 22:46:01 +0300 Subject: [PATCH 03/19] Delete extra lines --- src/AggregateFunctions/AggregateFunctionDistinct.cpp | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/AggregateFunctions/AggregateFunctionDistinct.cpp b/src/AggregateFunctions/AggregateFunctionDistinct.cpp index d477a04568f..369b4a5f7df 100644 --- a/src/AggregateFunctions/AggregateFunctionDistinct.cpp +++ b/src/AggregateFunctions/AggregateFunctionDistinct.cpp @@ -23,16 +23,9 @@ namespace DB throw Exception("Incorrect number of arguments for aggregate function with " + getName() + " suffix", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); -// return DataTypes(arguments.begin(), std::prev(arguments.end())); DataTypes nested_arguments; - for (const auto & type : arguments) - { + for (const auto & type : arguments) { nested_arguments.push_back(type); -// if (const DataTypeArray * array = typeid_cast(type.get())) -// nested_arguments.push_back(array->getNestedType()); -// else -// throw Exception("Illegal type " + type->getName() + " of argument" -// " for aggregate function with " + getName() + " suffix. Must be array.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); } return nested_arguments; From 13224c22ab7c7078e4a41457a72bd971792d1dc9 Mon Sep 17 00:00:00 2001 From: Sofia Antipushina Date: Fri, 15 May 2020 05:02:57 +0300 Subject: [PATCH 04/19] Stylecheck fix --- .../AggregateFunctionDistinct.cpp | 3 +- .../AggregateFunctionDistinct.h | 36 ++++++++++++------- 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/src/AggregateFunctions/AggregateFunctionDistinct.cpp b/src/AggregateFunctions/AggregateFunctionDistinct.cpp index 369b4a5f7df..b01bd2226c7 100644 --- a/src/AggregateFunctions/AggregateFunctionDistinct.cpp +++ b/src/AggregateFunctions/AggregateFunctionDistinct.cpp @@ -24,7 +24,8 @@ namespace DB ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); DataTypes nested_arguments; - for (const auto & type : arguments) { + for (const auto & type : arguments) + { nested_arguments.push_back(type); } diff --git a/src/AggregateFunctions/AggregateFunctionDistinct.h b/src/AggregateFunctions/AggregateFunctionDistinct.h index bab78aa88bf..5580cc3b4df 100644 --- a/src/AggregateFunctions/AggregateFunctionDistinct.h +++ b/src/AggregateFunctions/AggregateFunctionDistinct.h @@ -11,11 +11,13 @@ namespace DB { -namespace ErrorCodes { +namespace ErrorCodes +{ extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; } -struct AggregateFunctionDistinctData { +struct AggregateFunctionDistinctData +{ using Key = UInt128; HashSet< @@ -36,7 +38,8 @@ struct AggregateFunctionDistinctData { * Adding -Distinct suffix to aggregate function **/ -class AggregateFunctionDistinct final : public IAggregateFunctionHelper { +class AggregateFunctionDistinct final : public IAggregateFunctionHelper +{ private: AggregateFunctionPtr nested_func; mutable AggregateFunctionDistinctData storage; @@ -50,11 +53,13 @@ public: throw Exception("Aggregate function " + getName() + " require at least one argument", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); } - String getName() const override { + String getName() const override + { return nested_func->getName() + "Distinct"; } - DataTypePtr getReturnType() const override { + DataTypePtr getReturnType() const override + { return nested_func->getReturnType(); } @@ -77,11 +82,13 @@ public: return nested_func->alignOfData(); } - bool hasTrivialDestructor() const override { + bool hasTrivialDestructor() const override + { return nested_func->hasTrivialDestructor(); } - void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override { + void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override + { SipHash hash; columns[0]->updateHashWithValue(row_num, hash); @@ -92,23 +99,28 @@ public: nested_func->add(place, columns, row_num, arena); } - void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override { + void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override + { nested_func->merge(place, rhs, arena); } - void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override { + void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override + { nested_func->serialize(place, buf); } - void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override { + void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override + { nested_func->deserialize(place, buf, arena); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override { + void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + { nested_func->insertResultInto(place, to); } - bool allocatesMemoryInArena() const override { + bool allocatesMemoryInArena() const override + { return nested_func->allocatesMemoryInArena(); } }; From 7c6322c5b03232a0dfd603a54b2e0037bf817122 Mon Sep 17 00:00:00 2001 From: Sofia Antipushina Date: Sat, 16 May 2020 02:06:25 +0300 Subject: [PATCH 05/19] Add support for many columns --- src/AggregateFunctions/AggregateFunctionDistinct.h | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/AggregateFunctions/AggregateFunctionDistinct.h b/src/AggregateFunctions/AggregateFunctionDistinct.h index 5580cc3b4df..b87183f15d6 100644 --- a/src/AggregateFunctions/AggregateFunctionDistinct.h +++ b/src/AggregateFunctions/AggregateFunctionDistinct.h @@ -42,12 +42,13 @@ class AggregateFunctionDistinct final : public IAggregateFunctionHelper(arguments, {}) - , nested_func(nested) + , nested_func(nested), num_arguments(arguments.size()) { if (arguments.empty()) throw Exception("Aggregate function " + getName() + " require at least one argument", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); @@ -90,7 +91,8 @@ public: void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override { SipHash hash; - columns[0]->updateHashWithValue(row_num, hash); + for (size_t i = 0; i < num_arguments; ++i) + columns[i]->updateHashWithValue(row_num, hash); UInt128 key; hash.get128(key.low, key.high); From fa38cf780c0071e1d05e4e79cf0face02628516e Mon Sep 17 00:00:00 2001 From: Sofia Antipushina Date: Sat, 16 May 2020 03:02:55 +0300 Subject: [PATCH 06/19] Add tests for -Distinct combinator --- tests/queries/0_stateless/01259_combinator_distinct.reference | 4 ++++ tests/queries/0_stateless/01259_combinator_distinct.sql | 4 ++++ 2 files changed, 8 insertions(+) create mode 100644 tests/queries/0_stateless/01259_combinator_distinct.reference create mode 100644 tests/queries/0_stateless/01259_combinator_distinct.sql diff --git a/tests/queries/0_stateless/01259_combinator_distinct.reference b/tests/queries/0_stateless/01259_combinator_distinct.reference new file mode 100644 index 00000000000..34d13676466 --- /dev/null +++ b/tests/queries/0_stateless/01259_combinator_distinct.reference @@ -0,0 +1,4 @@ +499500 +78 +[0,1,2,3,4,5,6,7,8,9,10,11,12] +5.669227916063075e-17 diff --git a/tests/queries/0_stateless/01259_combinator_distinct.sql b/tests/queries/0_stateless/01259_combinator_distinct.sql new file mode 100644 index 00000000000..e3c4bb114a3 --- /dev/null +++ b/tests/queries/0_stateless/01259_combinator_distinct.sql @@ -0,0 +1,4 @@ +SELECT sum(DISTINCT x) FROM (SELECT number AS x FROM system.numbers LIMIT 1000); +SELECT sum(DISTINCT x) FROM (SELECT number % 13 AS x FROM system.numbers LIMIT 1000); +SELECT groupArray(DISTINCT x) FROM (SELECT number % 13 AS x FROM system.numbers LIMIT 1000); +SELECT corrStableDistinct(DISTINCT x, y) FROM (SELECT number % 11 AS x, number % 13 AS y FROM system.numbers LIMIT 1000); \ No newline at end of file From aeb195950c65f7e4d9ab8ec374c599e8e1466442 Mon Sep 17 00:00:00 2001 From: Sofia Antipushina Date: Sat, 16 May 2020 03:15:44 +0300 Subject: [PATCH 07/19] Checkstyle fix --- .../AggregateFunctionDistinct.cpp | 61 +++++++++---------- .../AggregateFunctionDistinct.h | 3 +- 2 files changed, 32 insertions(+), 32 deletions(-) diff --git a/src/AggregateFunctions/AggregateFunctionDistinct.cpp b/src/AggregateFunctions/AggregateFunctionDistinct.cpp index b01bd2226c7..820c2f0f72c 100644 --- a/src/AggregateFunctions/AggregateFunctionDistinct.cpp +++ b/src/AggregateFunctions/AggregateFunctionDistinct.cpp @@ -6,42 +6,41 @@ namespace DB { - namespace ErrorCodes - { - extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; - extern const int ILLEGAL_TYPE_OF_ARGUMENT; - } +namespace ErrorCodes +{ + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; +} - class AggregateFunctionCombinatorDistinct final : public IAggregateFunctionCombinator - { - public: - String getName() const override { return "Distinct"; } +class AggregateFunctionCombinatorDistinct final : public IAggregateFunctionCombinator +{ +public: + String getName() const override { return "Distinct"; } - DataTypes transformArguments(const DataTypes & arguments) const override + DataTypes transformArguments(const DataTypes & arguments) const override + { + if (arguments.empty()) + throw Exception("Incorrect number of arguments for aggregate function with " + getName() + " suffix", + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + DataTypes nested_arguments; + for (const auto & type : arguments) { - if (arguments.empty()) - throw Exception("Incorrect number of arguments for aggregate function with " + getName() + " suffix", - ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); - - DataTypes nested_arguments; - for (const auto & type : arguments) - { - nested_arguments.push_back(type); - } - - return nested_arguments; + nested_arguments.push_back(type); } - AggregateFunctionPtr transformAggregateFunction( - const AggregateFunctionPtr & nested_function, const DataTypes & arguments, const Array &) const override - { - return std::make_shared(nested_function, arguments); - } - }; - - void registerAggregateFunctionCombinatorDistinct(AggregateFunctionCombinatorFactory & factory) - { - factory.registerCombinator(std::make_shared()); + return nested_arguments; } + AggregateFunctionPtr transformAggregateFunction( + const AggregateFunctionPtr & nested_function, const DataTypes & arguments, const Array &) const override + { + return std::make_shared(nested_function, arguments); + } +}; + +void registerAggregateFunctionCombinatorDistinct(AggregateFunctionCombinatorFactory & factory) +{ + factory.registerCombinator(std::make_shared()); +} + } diff --git a/src/AggregateFunctions/AggregateFunctionDistinct.h b/src/AggregateFunctions/AggregateFunctionDistinct.h index b87183f15d6..cc4c52ea5ff 100644 --- a/src/AggregateFunctions/AggregateFunctionDistinct.h +++ b/src/AggregateFunctions/AggregateFunctionDistinct.h @@ -28,7 +28,8 @@ struct AggregateFunctionDistinctData > data; std::mutex mutex; - bool ALWAYS_INLINE TryToInsert(const Key& key) { + bool ALWAYS_INLINE TryToInsert(const Key& key) + { std::lock_guard lock(mutex); return data.insert(key).second; } From f4369381c97fca9394bd9f0673a5bb91b0983d29 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Sat, 16 May 2020 18:00:33 +0300 Subject: [PATCH 08/19] Fix build --- src/AggregateFunctions/AggregateFunctionDistinct.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/AggregateFunctions/AggregateFunctionDistinct.h b/src/AggregateFunctions/AggregateFunctionDistinct.h index cc4c52ea5ff..e7ccbc62c57 100644 --- a/src/AggregateFunctions/AggregateFunctionDistinct.h +++ b/src/AggregateFunctions/AggregateFunctionDistinct.h @@ -117,7 +117,7 @@ public: nested_func->deserialize(place, buf, arena); } - void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to) const override { nested_func->insertResultInto(place, to); } From 7135b8491c5b66788a6dd8c3e0536e175a78a616 Mon Sep 17 00:00:00 2001 From: Sofia Antipushina Date: Mon, 25 May 2020 15:12:50 +0300 Subject: [PATCH 09/19] Base memory data storage --- .../AggregateFunctionDistinct.h | 52 +++++++++++++------ 1 file changed, 35 insertions(+), 17 deletions(-) diff --git a/src/AggregateFunctions/AggregateFunctionDistinct.h b/src/AggregateFunctions/AggregateFunctionDistinct.h index e7ccbc62c57..7e86364ab0d 100644 --- a/src/AggregateFunctions/AggregateFunctionDistinct.h +++ b/src/AggregateFunctions/AggregateFunctionDistinct.h @@ -25,13 +25,15 @@ struct AggregateFunctionDistinctData UInt128TrivialHash, HashTableGrower<3>, HashTableAllocatorWithStackMemory - > data; + > set; std::mutex mutex; - bool ALWAYS_INLINE TryToInsert(const Key& key) + bool ALWAYS_INLINE tryToInsert(const Key& key) { std::lock_guard lock(mutex); - return data.insert(key).second; + bool a = set.insert(key).second; + if (a) std::cerr << key.high << ' ' << key.low << ' ' << a << std::endl; + return a; } }; @@ -39,18 +41,30 @@ struct AggregateFunctionDistinctData * Adding -Distinct suffix to aggregate function **/ -class AggregateFunctionDistinct final : public IAggregateFunctionHelper +class AggregateFunctionDistinct final : public IAggregateFunctionDataHelper { private: AggregateFunctionPtr nested_func; size_t num_arguments; - mutable AggregateFunctionDistinctData storage; + size_t prefix_size; + + AggregateDataPtr getNestedPlace(AggregateDataPtr place) const noexcept + { + return place + prefix_size; + } + + ConstAggregateDataPtr getNestedPlace(ConstAggregateDataPtr place) const noexcept + { + return place + prefix_size; + } public: AggregateFunctionDistinct(AggregateFunctionPtr nested, const DataTypes & arguments) - : IAggregateFunctionHelper(arguments, {}) + : IAggregateFunctionDataHelper(arguments, {}) , nested_func(nested), num_arguments(arguments.size()) { + prefix_size = 640'000'000; + if (arguments.empty()) throw Exception("Aggregate function " + getName() + " require at least one argument", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); } @@ -67,16 +81,19 @@ public: void create(AggregateDataPtr place) const override { - nested_func->create(place); + new (place) AggregateFunctionDistinctData; + nested_func->create(getNestedPlace(place)); } - void destroy(AggregateDataPtr place) const noexcept override { - nested_func->destroy(place); + void destroy(AggregateDataPtr place) const noexcept override + { + data(place).~AggregateFunctionDistinctData(); + nested_func->destroy(getNestedPlace(place)); } size_t sizeOfData() const override { - return nested_func->sizeOfData(); + return prefix_size + nested_func->sizeOfData(); } size_t alignOfData() const override @@ -92,34 +109,35 @@ public: void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override { SipHash hash; - for (size_t i = 0; i < num_arguments; ++i) + for (size_t i = 0; i < num_arguments; ++i) { columns[i]->updateHashWithValue(row_num, hash); + } UInt128 key; hash.get128(key.low, key.high); - if (storage.TryToInsert(key)) - nested_func->add(place, columns, row_num, arena); + if (this->data(place).tryToInsert(key)) + nested_func->add(getNestedPlace(place), columns, row_num, arena); } void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override { - nested_func->merge(place, rhs, arena); + nested_func->merge(getNestedPlace(place), rhs, arena); } void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override { - nested_func->serialize(place, buf); + nested_func->serialize(getNestedPlace(place), buf); } void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override { - nested_func->deserialize(place, buf, arena); + nested_func->deserialize(getNestedPlace(place), buf, arena); } void insertResultInto(AggregateDataPtr place, IColumn & to) const override { - nested_func->insertResultInto(place, to); + nested_func->insertResultInto(getNestedPlace(place), to); } bool allocatesMemoryInArena() const override From f206d74b63f00b2037a82257291bd721decce8ff Mon Sep 17 00:00:00 2001 From: Sofia Antipushina Date: Mon, 25 May 2020 17:02:55 +0300 Subject: [PATCH 10/19] fix align of data && add test --- src/AggregateFunctions/AggregateFunctionDistinct.h | 7 ++----- .../0_stateless/01259_combinator_distinct.reference | 1 + tests/queries/0_stateless/01259_combinator_distinct.sql | 1 + 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/AggregateFunctions/AggregateFunctionDistinct.h b/src/AggregateFunctions/AggregateFunctionDistinct.h index 7e86364ab0d..57e17ffb13c 100644 --- a/src/AggregateFunctions/AggregateFunctionDistinct.h +++ b/src/AggregateFunctions/AggregateFunctionDistinct.h @@ -30,10 +30,7 @@ struct AggregateFunctionDistinctData bool ALWAYS_INLINE tryToInsert(const Key& key) { - std::lock_guard lock(mutex); - bool a = set.insert(key).second; - if (a) std::cerr << key.high << ' ' << key.low << ' ' << a << std::endl; - return a; + return set.insert(key).second; } }; @@ -63,7 +60,7 @@ public: : IAggregateFunctionDataHelper(arguments, {}) , nested_func(nested), num_arguments(arguments.size()) { - prefix_size = 640'000'000; + prefix_size = sizeof(AggregateFunctionDistinctData); if (arguments.empty()) throw Exception("Aggregate function " + getName() + " require at least one argument", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); diff --git a/tests/queries/0_stateless/01259_combinator_distinct.reference b/tests/queries/0_stateless/01259_combinator_distinct.reference index 34d13676466..739d225ad67 100644 --- a/tests/queries/0_stateless/01259_combinator_distinct.reference +++ b/tests/queries/0_stateless/01259_combinator_distinct.reference @@ -1,4 +1,5 @@ 499500 78 [0,1,2,3,4,5,6,7,8,9,10,11,12] +[0,1,2,3,4,5,6,7,8,9,10,11,12] 5.669227916063075e-17 diff --git a/tests/queries/0_stateless/01259_combinator_distinct.sql b/tests/queries/0_stateless/01259_combinator_distinct.sql index e3c4bb114a3..3f07dc443dd 100644 --- a/tests/queries/0_stateless/01259_combinator_distinct.sql +++ b/tests/queries/0_stateless/01259_combinator_distinct.sql @@ -1,4 +1,5 @@ SELECT sum(DISTINCT x) FROM (SELECT number AS x FROM system.numbers LIMIT 1000); SELECT sum(DISTINCT x) FROM (SELECT number % 13 AS x FROM system.numbers LIMIT 1000); SELECT groupArray(DISTINCT x) FROM (SELECT number % 13 AS x FROM system.numbers LIMIT 1000); +SELECT groupArray(DISTINCT x) FROM (SELECT number % 13 AS x FROM system.numbers_mt LIMIT 1000); SELECT corrStableDistinct(DISTINCT x, y) FROM (SELECT number % 11 AS x, number % 13 AS y FROM system.numbers LIMIT 1000); \ No newline at end of file From 3543da3ca463fc4deb1002e5b5bf91df13306931 Mon Sep 17 00:00:00 2001 From: Sofia Antipushina Date: Sun, 31 May 2020 17:44:49 +0300 Subject: [PATCH 11/19] fix stylecheck --- src/AggregateFunctions/AggregateFunctionDistinct.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/AggregateFunctions/AggregateFunctionDistinct.h b/src/AggregateFunctions/AggregateFunctionDistinct.h index 57e17ffb13c..32f5df6d8f0 100644 --- a/src/AggregateFunctions/AggregateFunctionDistinct.h +++ b/src/AggregateFunctions/AggregateFunctionDistinct.h @@ -106,7 +106,8 @@ public: void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override { SipHash hash; - for (size_t i = 0; i < num_arguments; ++i) { + for (size_t i = 0; i < num_arguments; ++i) + { columns[i]->updateHashWithValue(row_num, hash); } From 5abbaeecf5e7185b8bd7da57ed5fd6d2850b4943 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Wed, 17 Jun 2020 13:02:04 +0300 Subject: [PATCH 12/19] distinct combinator for single numeric arguments --- .../AggregateFunctionDistinct.cpp | 29 ++- .../AggregateFunctionDistinct.h | 205 ++++++++++++------ .../AggregateFunctionGroupUniqArray.h | 38 +--- src/AggregateFunctions/KeyHolderHelpers.h | 34 +++ .../01259_combinator_distinct.reference | 1 + .../0_stateless/01259_combinator_distinct.sql | 11 +- 6 files changed, 206 insertions(+), 112 deletions(-) create mode 100644 src/AggregateFunctions/KeyHolderHelpers.h diff --git a/src/AggregateFunctions/AggregateFunctionDistinct.cpp b/src/AggregateFunctions/AggregateFunctionDistinct.cpp index 820c2f0f72c..1661277d525 100644 --- a/src/AggregateFunctions/AggregateFunctionDistinct.cpp +++ b/src/AggregateFunctions/AggregateFunctionDistinct.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include "registerAggregateFunctions.h" @@ -9,6 +10,7 @@ namespace DB namespace ErrorCodes { extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int ILLEGAL_TYPE_OF_ARGUMENT; } class AggregateFunctionCombinatorDistinct final : public IAggregateFunctionCombinator @@ -22,19 +24,30 @@ public: throw Exception("Incorrect number of arguments for aggregate function with " + getName() + " suffix", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); - DataTypes nested_arguments; - for (const auto & type : arguments) - { - nested_arguments.push_back(type); - } - - return nested_arguments; + return arguments; } AggregateFunctionPtr transformAggregateFunction( const AggregateFunctionPtr & nested_function, const DataTypes & arguments, const Array &) const override { - return std::make_shared(nested_function, arguments); + AggregateFunctionPtr res; + if (arguments.size() == 1) + { + res = AggregateFunctionPtr(createWithNumericType(*arguments[0], nested_function, arguments)); + if (res) + return res; + + if (arguments[0]->isValueUnambiguouslyRepresentedInContiguousMemoryRegion()) + return std::make_shared>(nested_function, arguments); + else + return std::make_shared>(nested_function, arguments); + } + + if (!res) + throw Exception("Illegal type " /* + argument_type->getName() + */ + " of argument for aggregate function " + nested_function->getName() + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + + return res; } }; diff --git a/src/AggregateFunctions/AggregateFunctionDistinct.h b/src/AggregateFunctions/AggregateFunctionDistinct.h index 32f5df6d8f0..72099a33cfd 100644 --- a/src/AggregateFunctions/AggregateFunctionDistinct.h +++ b/src/AggregateFunctions/AggregateFunctionDistinct.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -16,34 +17,22 @@ namespace ErrorCodes extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; } -struct AggregateFunctionDistinctData +template +struct AggregateFunctionDistinctSingleNumericData { - using Key = UInt128; - - HashSet< - Key, - UInt128TrivialHash, - HashTableGrower<3>, - HashTableAllocatorWithStackMemory - > set; - std::mutex mutex; - - bool ALWAYS_INLINE tryToInsert(const Key& key) - { - return set.insert(key).second; - } + /// When creating, the hash table must be small. + using Set = HashSetWithStackMemory, 4>; + Set value; }; -/** Adaptor for aggregate functions. - * Adding -Distinct suffix to aggregate function -**/ - -class AggregateFunctionDistinct final : public IAggregateFunctionDataHelper +template +class AggregateFunctionDistinctBase : public IAggregateFunctionDataHelper { -private: +protected: + static constexpr size_t prefix_size = sizeof(Data); AggregateFunctionPtr nested_func; size_t num_arguments; - size_t prefix_size; + AggregateDataPtr getNestedPlace(AggregateDataPtr place) const noexcept { @@ -56,14 +45,22 @@ private: } public: - AggregateFunctionDistinct(AggregateFunctionPtr nested, const DataTypes & arguments) - : IAggregateFunctionDataHelper(arguments, {}) - , nested_func(nested), num_arguments(arguments.size()) - { - prefix_size = sizeof(AggregateFunctionDistinctData); - if (arguments.empty()) - throw Exception("Aggregate function " + getName() + " require at least one argument", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + size_t sizeOfData() const override + { + return prefix_size + nested_func->sizeOfData(); + } + + void create(AggregateDataPtr place) const override + { + new (place) Data; + nested_func->create(getNestedPlace(place)); + } + + void destroy(AggregateDataPtr place) const noexcept override + { + this->data(place).~Data(); + nested_func->destroy(getNestedPlace(place)); } String getName() const override @@ -76,71 +73,151 @@ public: return nested_func->getReturnType(); } - void create(AggregateDataPtr place) const override + bool allocatesMemoryInArena() const override { - new (place) AggregateFunctionDistinctData; - nested_func->create(getNestedPlace(place)); + return true; } - void destroy(AggregateDataPtr place) const noexcept override + AggregateFunctionDistinctBase(AggregateFunctionPtr nested, const DataTypes & arguments) + : IAggregateFunctionDataHelper(arguments, {}) + , nested_func(nested), num_arguments(arguments.size()) { - data(place).~AggregateFunctionDistinctData(); - nested_func->destroy(getNestedPlace(place)); + if (arguments.empty()) + throw Exception("Aggregate function " + getName() + " require at least one argument", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); } +}; - size_t sizeOfData() const override - { - return prefix_size + nested_func->sizeOfData(); - } - size_t alignOfData() const override - { - return nested_func->alignOfData(); - } +/** Adaptor for aggregate functions. + * Adding -Distinct suffix to aggregate function +**/ +template +class AggregateFunctionDistinctSingleNumericImpl final + : public AggregateFunctionDistinctBase, + AggregateFunctionDistinctSingleNumericImpl> +{ +public: - bool hasTrivialDestructor() const override - { - return nested_func->hasTrivialDestructor(); - } + AggregateFunctionDistinctSingleNumericImpl(AggregateFunctionPtr nested, const DataTypes & arguments) + : AggregateFunctionDistinctBase< + AggregateFunctionDistinctSingleNumericData, + AggregateFunctionDistinctSingleNumericImpl>(nested, arguments) {} void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override { - SipHash hash; - for (size_t i = 0; i < num_arguments; ++i) - { - columns[i]->updateHashWithValue(row_num, hash); - } - - UInt128 key; - hash.get128(key.low, key.high); - - if (this->data(place).tryToInsert(key)) - nested_func->add(getNestedPlace(place), columns, row_num, arena); + const auto & vec = assert_cast &>(*columns[0]).getData(); + if (this->data(place).value.insert(vec[row_num]).second) + this->nested_func->add(this->getNestedPlace(place), columns, row_num, arena); } void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override { - nested_func->merge(getNestedPlace(place), rhs, arena); + auto & cur_set = this->data(place).value; + auto & rhs_set = this->data(rhs).value; + + auto arguments = this->argument_types[0]->createColumn(); + for (auto & elem : rhs_set) + if (cur_set.insert(elem.getValue()).second) + arguments->insert(elem.getValue()); + + const auto * arguments_ptr = arguments.get(); + if (!arguments->empty()) + this->nested_func->addBatchSinglePlace(arguments->size(), this->getNestedPlace(place), &arguments_ptr, arena); } void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override { - nested_func->serialize(getNestedPlace(place), buf); + this->data(place).value.write(buf); + this->nested_func->serialize(this->getNestedPlace(place), buf); } void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override { - nested_func->deserialize(getNestedPlace(place), buf, arena); + this->data(place).value.read(buf); + this->nested_func->deserialize(this->getNestedPlace(place), buf, arena); } void insertResultInto(AggregateDataPtr place, IColumn & to) const override { - nested_func->insertResultInto(getNestedPlace(place), to); + this->nested_func->insertResultInto(this->getNestedPlace(place), to); + } +}; + +struct AggregateFunctionDistinctSingleGenericData +{ + using Set = HashSetWithSavedHashWithStackMemory; + Set value; +}; + +template +class AggregateFunctionDistinctSingleGenericImpl final + : public AggregateFunctionDistinctBase> +{ +public: + using Data = AggregateFunctionDistinctSingleGenericData; + + AggregateFunctionDistinctSingleGenericImpl(AggregateFunctionPtr nested, const DataTypes & arguments) + : AggregateFunctionDistinctBase< + AggregateFunctionDistinctSingleGenericData, + AggregateFunctionDistinctSingleGenericImpl>(nested, arguments) {} + + void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override + { + auto & set = this->data(place).value; + + Data::Set::LookupResult it; + bool inserted; + auto key_holder = getKeyHolder(*columns[0], row_num, *arena); + set.emplace(key_holder, it, inserted); + if (inserted) + this->nested_func->add(this->getNestedPlace(place), columns, row_num, arena); } - bool allocatesMemoryInArena() const override + void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override { - return nested_func->allocatesMemoryInArena(); + auto & cur_set = this->data(place).value; + auto & rhs_set = this->data(rhs).value; + + Data::Set::LookupResult it; + bool inserted; + auto arguments = this->argument_types[0]->createColumn(); + for (auto & elem : rhs_set) + { + cur_set.emplace(ArenaKeyHolder{elem.getValue(), *arena}, it, inserted); + if (inserted) + deserializeAndInsert(elem.getValue(), *arguments); + } + + const auto * arguments_ptr = arguments.get(); + if (!arguments->empty()) + this->nested_func->addBatchSinglePlace(arguments->size(), this->getNestedPlace(place), &arguments_ptr, arena); + } + + void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override + { + auto & set = this->data(place).value; + writeVarUInt(set.size(), buf); + for (const auto & elem : set) + writeStringBinary(elem.getValue(), buf); + + this->nested_func->serialize(this->getNestedPlace(place), buf); + } + + void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override + { + auto & set = this->data(place).value; + size_t size; + readVarUInt(size, buf); + for (size_t i = 0; i < size; ++i) + set.insert(readStringBinaryInto(*arena, buf)); + + this->nested_func->deserialize(this->getNestedPlace(place), buf, arena); + } + + void insertResultInto(AggregateDataPtr place, IColumn & to) const override + { + this->nested_func->insertResultInto(this->getNestedPlace(place), to); } }; diff --git a/src/AggregateFunctions/AggregateFunctionGroupUniqArray.h b/src/AggregateFunctions/AggregateFunctionGroupUniqArray.h index 88b1c87f526..b6683567404 100644 --- a/src/AggregateFunctions/AggregateFunctionGroupUniqArray.h +++ b/src/AggregateFunctions/AggregateFunctionGroupUniqArray.h @@ -16,6 +16,7 @@ #include #include +#include #define AGGREGATE_FUNCTION_GROUP_ARRAY_UNIQ_MAX_SIZE 0xFFFFFF @@ -147,26 +148,6 @@ class AggregateFunctionGroupUniqArrayGeneric using State = AggregateFunctionGroupUniqArrayGenericData; - static auto getKeyHolder(const IColumn & column, size_t row_num, Arena & arena) - { - if constexpr (is_plain_column) - { - return ArenaKeyHolder{column.getDataAt(row_num), arena}; - } - else - { - const char * begin = nullptr; - StringRef serialized = column.serializeValueIntoArena(row_num, arena, begin); - assert(serialized.data != nullptr); - return SerializedKeyHolder{serialized, arena}; - } - } - - static void deserializeAndInsert(StringRef str, IColumn & data_to) - { - return deserializeAndInsertImpl(str, data_to); - } - public: AggregateFunctionGroupUniqArrayGeneric(const DataTypePtr & input_data_type_, UInt64 max_elems_ = std::numeric_limits::max()) : IAggregateFunctionDataHelper>({input_data_type_}, {}) @@ -215,7 +196,7 @@ public: bool inserted; State::Set::LookupResult it; - auto key_holder = getKeyHolder(*columns[0], row_num, *arena); + auto key_holder = getKeyHolder(*columns[0], row_num, *arena); set.emplace(key_holder, it, inserted); } @@ -247,22 +228,9 @@ public: offsets_to.push_back(offsets_to.back() + set.size()); for (auto & elem : set) - deserializeAndInsert(elem.getValue(), data_to); + deserializeAndInsert(elem.getValue(), data_to); } }; - -template <> -inline void deserializeAndInsertImpl(StringRef str, IColumn & data_to) -{ - data_to.deserializeAndInsertFromArena(str.data); -} - -template <> -inline void deserializeAndInsertImpl(StringRef str, IColumn & data_to) -{ - data_to.insertData(str.data, str.size); -} - #undef AGGREGATE_FUNCTION_GROUP_ARRAY_UNIQ_MAX_SIZE } diff --git a/src/AggregateFunctions/KeyHolderHelpers.h b/src/AggregateFunctions/KeyHolderHelpers.h new file mode 100644 index 00000000000..01b3cf2a369 --- /dev/null +++ b/src/AggregateFunctions/KeyHolderHelpers.h @@ -0,0 +1,34 @@ +#pragma once + +#include +#include + +namespace DB +{ + +template +static auto getKeyHolder(const IColumn & column, size_t row_num, Arena & arena) +{ + if constexpr (is_plain_column) + { + return ArenaKeyHolder{column.getDataAt(row_num), arena}; + } + else + { + const char * begin = nullptr; + StringRef serialized = column.serializeValueIntoArena(row_num, arena, begin); + assert(serialized.data != nullptr); + return SerializedKeyHolder{serialized, arena}; + } +} + +template +static void deserializeAndInsert(StringRef str, IColumn & data_to) +{ + if constexpr (is_plain_column) + data_to.insertData(str.data, str.size); + else + data_to.deserializeAndInsertFromArena(str.data); +} + +} diff --git a/tests/queries/0_stateless/01259_combinator_distinct.reference b/tests/queries/0_stateless/01259_combinator_distinct.reference index 739d225ad67..83756ffdaa4 100644 --- a/tests/queries/0_stateless/01259_combinator_distinct.reference +++ b/tests/queries/0_stateless/01259_combinator_distinct.reference @@ -2,4 +2,5 @@ 78 [0,1,2,3,4,5,6,7,8,9,10,11,12] [0,1,2,3,4,5,6,7,8,9,10,11,12] +20 5.669227916063075e-17 diff --git a/tests/queries/0_stateless/01259_combinator_distinct.sql b/tests/queries/0_stateless/01259_combinator_distinct.sql index 3f07dc443dd..adfddeb34e4 100644 --- a/tests/queries/0_stateless/01259_combinator_distinct.sql +++ b/tests/queries/0_stateless/01259_combinator_distinct.sql @@ -1,5 +1,6 @@ -SELECT sum(DISTINCT x) FROM (SELECT number AS x FROM system.numbers LIMIT 1000); -SELECT sum(DISTINCT x) FROM (SELECT number % 13 AS x FROM system.numbers LIMIT 1000); -SELECT groupArray(DISTINCT x) FROM (SELECT number % 13 AS x FROM system.numbers LIMIT 1000); -SELECT groupArray(DISTINCT x) FROM (SELECT number % 13 AS x FROM system.numbers_mt LIMIT 1000); -SELECT corrStableDistinct(DISTINCT x, y) FROM (SELECT number % 11 AS x, number % 13 AS y FROM system.numbers LIMIT 1000); \ No newline at end of file +SELECT sum(DISTINCT x) FROM (SELECT number AS x FROM system.numbers_mt LIMIT 100000); +SELECT sum(DISTINCT x) FROM (SELECT number % 13 AS x FROM system.numbers_mt LIMIT 100000); +SELECT groupArray(DISTINCT x) FROM (SELECT number % 13 AS x FROM system.numbers_mt LIMIT 100000); +SELECT groupArray(DISTINCT x) FROM (SELECT number % 13 AS x FROM system.numbers_mt LIMIT 100000); +SELECT finalizeAggregation(countState(DISTINCT toString(number % 20))) FROM numbers_mt (100000); +-- SELECT corrStableDistinct(DISTINCT x, y) FROM (SELECT number % 11 AS x, number % 13 AS y FROM system.numbers LIMIT 1000); From 88b325dcdc373dd8c34d0479c7cc482b618da6fe Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Wed, 17 Jun 2020 22:36:27 +0300 Subject: [PATCH 13/19] rework distinct combinator --- .../AggregateFunctionAggThrow.cpp | 2 +- .../AggregateFunctionArgMinMax.h | 2 +- .../AggregateFunctionArray.h | 4 +- src/AggregateFunctions/AggregateFunctionAvg.h | 2 +- .../AggregateFunctionBitwise.h | 2 +- .../AggregateFunctionBoundingRatio.h | 2 +- ...egateFunctionCategoricalInformationValue.h | 4 +- .../AggregateFunctionCount.h | 4 +- .../AggregateFunctionDistinct.h | 67 +++++++------------ .../AggregateFunctionEntropy.h | 2 +- .../AggregateFunctionForEach.h | 4 +- .../AggregateFunctionGroupArray.h | 6 +- .../AggregateFunctionGroupArrayInsertAt.h | 2 +- .../AggregateFunctionGroupArrayMoving.h | 2 +- .../AggregateFunctionGroupBitmap.h | 4 +- .../AggregateFunctionGroupUniqArray.h | 5 +- .../AggregateFunctionHistogram.h | 2 +- src/AggregateFunctions/AggregateFunctionIf.h | 4 +- .../AggregateFunctionMLMethod.h | 2 +- .../AggregateFunctionMaxIntersections.h | 2 +- .../AggregateFunctionMerge.h | 4 +- .../AggregateFunctionMinMaxAny.h | 2 +- .../AggregateFunctionNothing.h | 2 +- .../AggregateFunctionNull.h | 6 +- .../AggregateFunctionOrFill.h | 9 +-- .../AggregateFunctionQuantile.h | 2 +- .../AggregateFunctionResample.h | 5 +- .../AggregateFunctionRetention.h | 2 +- .../AggregateFunctionSequenceMatch.h | 4 +- .../AggregateFunctionSimpleLinearRegression.h | 4 +- .../AggregateFunctionState.h | 2 +- .../AggregateFunctionStatistics.h | 4 +- .../AggregateFunctionStatisticsSimple.h | 2 +- src/AggregateFunctions/AggregateFunctionSum.h | 2 +- .../AggregateFunctionSumMap.h | 2 +- .../AggregateFunctionTimeSeriesGroupSum.h | 2 +- .../AggregateFunctionTopK.h | 4 +- .../AggregateFunctionUniq.h | 4 +- .../AggregateFunctionUniqCombined.h | 4 +- .../AggregateFunctionUniqUpTo.h | 4 +- .../AggregateFunctionWindowFunnel.h | 2 +- src/AggregateFunctions/IAggregateFunction.h | 2 +- src/Columns/ColumnAggregateFunction.cpp | 2 +- src/Functions/array/arrayReduce.cpp | 2 +- src/Functions/array/arrayReduceInRanges.cpp | 2 +- src/Functions/runningAccumulate.cpp | 2 +- src/Interpreters/Aggregator.cpp | 28 +++++--- src/Interpreters/Aggregator.h | 7 +- .../Algorithms/AggregatingSortedAlgorithm.cpp | 2 +- .../GraphiteRollupSortedAlgorithm.cpp | 2 +- .../Algorithms/SummingSortedAlgorithm.cpp | 2 +- .../01259_combinator_distinct.reference | 4 +- .../0_stateless/01259_combinator_distinct.sql | 9 ++- 53 files changed, 128 insertions(+), 132 deletions(-) diff --git a/src/AggregateFunctions/AggregateFunctionAggThrow.cpp b/src/AggregateFunctions/AggregateFunctionAggThrow.cpp index ea3eb9b1a20..fada039e20a 100644 --- a/src/AggregateFunctions/AggregateFunctionAggThrow.cpp +++ b/src/AggregateFunctions/AggregateFunctionAggThrow.cpp @@ -93,7 +93,7 @@ public: buf.read(c); } - void insertResultInto(AggregateDataPtr, IColumn & to) const override + void insertResultInto(AggregateDataPtr, IColumn & to, Arena *) const override { to.insertDefault(); } diff --git a/src/AggregateFunctions/AggregateFunctionArgMinMax.h b/src/AggregateFunctions/AggregateFunctionArgMinMax.h index 9a0c428d75b..9470b1b8692 100644 --- a/src/AggregateFunctions/AggregateFunctionArgMinMax.h +++ b/src/AggregateFunctions/AggregateFunctionArgMinMax.h @@ -85,7 +85,7 @@ public: return Data::allocatesMemoryInArena(); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { this->data(place).result.insertResultInto(to); } diff --git a/src/AggregateFunctions/AggregateFunctionArray.h b/src/AggregateFunctions/AggregateFunctionArray.h index 4fe5e459ae1..24b07010707 100644 --- a/src/AggregateFunctions/AggregateFunctionArray.h +++ b/src/AggregateFunctions/AggregateFunctionArray.h @@ -119,9 +119,9 @@ public: nested_func->deserialize(place, buf, arena); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena * arena) const override { - nested_func->insertResultInto(place, to); + nested_func->insertResultInto(place, to, arena); } bool allocatesMemoryInArena() const override diff --git a/src/AggregateFunctions/AggregateFunctionAvg.h b/src/AggregateFunctions/AggregateFunctionAvg.h index d9ef8647b82..1f3426160cb 100644 --- a/src/AggregateFunctions/AggregateFunctionAvg.h +++ b/src/AggregateFunctions/AggregateFunctionAvg.h @@ -80,7 +80,7 @@ public: readBinary(this->data(place).denominator, buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { auto & column = static_cast(to); column.getData().push_back(this->data(place).template result()); diff --git a/src/AggregateFunctions/AggregateFunctionBitwise.h b/src/AggregateFunctions/AggregateFunctionBitwise.h index a4e5f7ddafa..6d9eb3c36e1 100644 --- a/src/AggregateFunctions/AggregateFunctionBitwise.h +++ b/src/AggregateFunctions/AggregateFunctionBitwise.h @@ -74,7 +74,7 @@ public: readBinary(this->data(place).value, buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { assert_cast &>(to).getData().push_back(this->data(place).value); } diff --git a/src/AggregateFunctions/AggregateFunctionBoundingRatio.h b/src/AggregateFunctions/AggregateFunctionBoundingRatio.h index 81846db4bac..9ceb7976f4a 100644 --- a/src/AggregateFunctions/AggregateFunctionBoundingRatio.h +++ b/src/AggregateFunctions/AggregateFunctionBoundingRatio.h @@ -150,7 +150,7 @@ public: data(place).deserialize(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { assert_cast(to).getData().push_back(getBoundingRatio(data(place))); } diff --git a/src/AggregateFunctions/AggregateFunctionCategoricalInformationValue.h b/src/AggregateFunctions/AggregateFunctionCategoricalInformationValue.h index 1c397c26631..aa205a71c97 100644 --- a/src/AggregateFunctions/AggregateFunctionCategoricalInformationValue.h +++ b/src/AggregateFunctions/AggregateFunctionCategoricalInformationValue.h @@ -119,8 +119,8 @@ public: void insertResultInto( AggregateDataPtr place, - IColumn & to - ) const override + IColumn & to, + Arena *) const override { auto & col = static_cast(to); auto & data_col = static_cast(col.getData()); diff --git a/src/AggregateFunctions/AggregateFunctionCount.h b/src/AggregateFunctions/AggregateFunctionCount.h index e54f014f7a4..51040bdcfad 100644 --- a/src/AggregateFunctions/AggregateFunctionCount.h +++ b/src/AggregateFunctions/AggregateFunctionCount.h @@ -57,7 +57,7 @@ public: readVarUInt(data(place).count, buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { assert_cast(to).getData().push_back(data(place).count); } @@ -114,7 +114,7 @@ public: readVarUInt(data(place).count, buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { assert_cast(to).getData().push_back(data(place).count); } diff --git a/src/AggregateFunctions/AggregateFunctionDistinct.h b/src/AggregateFunctions/AggregateFunctionDistinct.h index 72099a33cfd..5c663bb6441 100644 --- a/src/AggregateFunctions/AggregateFunctionDistinct.h +++ b/src/AggregateFunctions/AggregateFunctionDistinct.h @@ -32,7 +32,6 @@ protected: static constexpr size_t prefix_size = sizeof(Data); AggregateFunctionPtr nested_func; size_t num_arguments; - AggregateDataPtr getNestedPlace(AggregateDataPtr place) const noexcept { @@ -103,43 +102,37 @@ public: AggregateFunctionDistinctSingleNumericData, AggregateFunctionDistinctSingleNumericImpl>(nested, arguments) {} - void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override + void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override { const auto & vec = assert_cast &>(*columns[0]).getData(); - if (this->data(place).value.insert(vec[row_num]).second) - this->nested_func->add(this->getNestedPlace(place), columns, row_num, arena); + this->data(place).value.insert(vec[row_num]); } - void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override + void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override { - auto & cur_set = this->data(place).value; - auto & rhs_set = this->data(rhs).value; - - auto arguments = this->argument_types[0]->createColumn(); - for (auto & elem : rhs_set) - if (cur_set.insert(elem.getValue()).second) - arguments->insert(elem.getValue()); - - const auto * arguments_ptr = arguments.get(); - if (!arguments->empty()) - this->nested_func->addBatchSinglePlace(arguments->size(), this->getNestedPlace(place), &arguments_ptr, arena); + this->data(place).value.merge(this->data(rhs).value); } void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override { this->data(place).value.write(buf); - this->nested_func->serialize(this->getNestedPlace(place), buf); } - void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override + void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override { this->data(place).value.read(buf); - this->nested_func->deserialize(this->getNestedPlace(place), buf, arena); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena * arena) const override { - this->nested_func->insertResultInto(this->getNestedPlace(place), to); + const auto & set = this->data(place).value; + auto arguments = this->argument_types[0]->createColumn(); + for (const auto & elem : set) + arguments->insert(elem.getValue()); + + const auto * arguments_ptr = arguments.get(); + this->nested_func->addBatchSinglePlace(arguments->size(), this->getNestedPlace(place), &arguments_ptr, arena); + this->nested_func->insertResultInto(this->getNestedPlace(place), to, arena); } }; @@ -170,38 +163,25 @@ public: bool inserted; auto key_holder = getKeyHolder(*columns[0], row_num, *arena); set.emplace(key_holder, it, inserted); - if (inserted) - this->nested_func->add(this->getNestedPlace(place), columns, row_num, arena); } void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override { auto & cur_set = this->data(place).value; - auto & rhs_set = this->data(rhs).value; + const auto & rhs_set = this->data(rhs).value; Data::Set::LookupResult it; bool inserted; - auto arguments = this->argument_types[0]->createColumn(); - for (auto & elem : rhs_set) - { + for (const auto & elem : rhs_set) cur_set.emplace(ArenaKeyHolder{elem.getValue(), *arena}, it, inserted); - if (inserted) - deserializeAndInsert(elem.getValue(), *arguments); - } - - const auto * arguments_ptr = arguments.get(); - if (!arguments->empty()) - this->nested_func->addBatchSinglePlace(arguments->size(), this->getNestedPlace(place), &arguments_ptr, arena); } void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override { - auto & set = this->data(place).value; + const auto & set = this->data(place).value; writeVarUInt(set.size(), buf); for (const auto & elem : set) writeStringBinary(elem.getValue(), buf); - - this->nested_func->serialize(this->getNestedPlace(place), buf); } void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override @@ -211,13 +191,18 @@ public: readVarUInt(size, buf); for (size_t i = 0; i < size; ++i) set.insert(readStringBinaryInto(*arena, buf)); - - this->nested_func->deserialize(this->getNestedPlace(place), buf, arena); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena * arena) const override { - this->nested_func->insertResultInto(this->getNestedPlace(place), to); + const auto & set = this->data(place).value; + auto arguments = this->argument_types[0]->createColumn(); + for (const auto & elem : set) + deserializeAndInsert(elem.getValue(), *arguments); + + const auto * arguments_ptr = arguments.get(); + this->nested_func->addBatchSinglePlace(arguments->size(), this->getNestedPlace(place), &arguments_ptr, arena); + this->nested_func->insertResultInto(this->getNestedPlace(place), to, arena); } }; diff --git a/src/AggregateFunctions/AggregateFunctionEntropy.h b/src/AggregateFunctions/AggregateFunctionEntropy.h index ff233a5ac93..656aca43f60 100644 --- a/src/AggregateFunctions/AggregateFunctionEntropy.h +++ b/src/AggregateFunctions/AggregateFunctionEntropy.h @@ -132,7 +132,7 @@ public: this->data(place).deserialize(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { auto & column = assert_cast &>(to); column.getData().push_back(this->data(place).get()); diff --git a/src/AggregateFunctions/AggregateFunctionForEach.h b/src/AggregateFunctions/AggregateFunctionForEach.h index 23a3487de47..19f2994d3f1 100644 --- a/src/AggregateFunctions/AggregateFunctionForEach.h +++ b/src/AggregateFunctions/AggregateFunctionForEach.h @@ -225,7 +225,7 @@ public: } } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena * arena) const override { AggregateFunctionForEachData & state = data(place); @@ -236,7 +236,7 @@ public: char * nested_state = state.array_of_aggregate_datas; for (size_t i = 0; i < state.dynamic_array_size; ++i) { - nested_func->insertResultInto(nested_state, elems_to); + nested_func->insertResultInto(nested_state, elems_to, arena); nested_state += nested_size_of_data; } diff --git a/src/AggregateFunctions/AggregateFunctionGroupArray.h b/src/AggregateFunctions/AggregateFunctionGroupArray.h index b76efd9f6c2..f3d31eb599b 100644 --- a/src/AggregateFunctions/AggregateFunctionGroupArray.h +++ b/src/AggregateFunctions/AggregateFunctionGroupArray.h @@ -282,7 +282,7 @@ public: // if constexpr (Trait::sampler == Sampler::DETERMINATOR) } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { const auto & value = this->data(place).value; size_t size = value.size(); @@ -600,7 +600,7 @@ public: // if constexpr (Trait::sampler == Sampler::DETERMINATOR) } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { auto & column_array = assert_cast(to); @@ -815,7 +815,7 @@ public: data(place).last = prev; } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { auto & column_array = assert_cast(to); diff --git a/src/AggregateFunctions/AggregateFunctionGroupArrayInsertAt.h b/src/AggregateFunctions/AggregateFunctionGroupArrayInsertAt.h index 0eec38c51a7..d84c99aec57 100644 --- a/src/AggregateFunctions/AggregateFunctionGroupArrayInsertAt.h +++ b/src/AggregateFunctions/AggregateFunctionGroupArrayInsertAt.h @@ -179,7 +179,7 @@ public: } } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { ColumnArray & to_array = assert_cast(to); IColumn & to_data = to_array.getData(); diff --git a/src/AggregateFunctions/AggregateFunctionGroupArrayMoving.h b/src/AggregateFunctions/AggregateFunctionGroupArrayMoving.h index 8f93a7eb25a..19562b37a12 100644 --- a/src/AggregateFunctions/AggregateFunctionGroupArrayMoving.h +++ b/src/AggregateFunctions/AggregateFunctionGroupArrayMoving.h @@ -158,7 +158,7 @@ public: this->data(place).sum = value.back(); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { const auto & data = this->data(place); size_t size = data.value.size(); diff --git a/src/AggregateFunctions/AggregateFunctionGroupBitmap.h b/src/AggregateFunctions/AggregateFunctionGroupBitmap.h index 766479cc08d..a6470aa6943 100644 --- a/src/AggregateFunctions/AggregateFunctionGroupBitmap.h +++ b/src/AggregateFunctions/AggregateFunctionGroupBitmap.h @@ -48,7 +48,7 @@ public: this->data(place).rbs.read(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { assert_cast &>(to).getData().push_back(this->data(place).rbs.size()); } @@ -113,7 +113,7 @@ public: this->data(place).rbs.read(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { assert_cast &>(to).getData().push_back(this->data(place).rbs.size()); } diff --git a/src/AggregateFunctions/AggregateFunctionGroupUniqArray.h b/src/AggregateFunctions/AggregateFunctionGroupUniqArray.h index b6683567404..2ee9d0f6e1c 100644 --- a/src/AggregateFunctions/AggregateFunctionGroupUniqArray.h +++ b/src/AggregateFunctions/AggregateFunctionGroupUniqArray.h @@ -98,7 +98,7 @@ public: this->data(place).value.read(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { ColumnArray & arr_to = assert_cast(to); ColumnArray::Offsets & offsets_to = arr_to.getOffsets(); @@ -218,7 +218,7 @@ public: } } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { ColumnArray & arr_to = assert_cast(to); ColumnArray::Offsets & offsets_to = arr_to.getOffsets(); @@ -231,6 +231,7 @@ public: deserializeAndInsert(elem.getValue(), data_to); } }; + #undef AGGREGATE_FUNCTION_GROUP_ARRAY_UNIQ_MAX_SIZE } diff --git a/src/AggregateFunctions/AggregateFunctionHistogram.h b/src/AggregateFunctions/AggregateFunctionHistogram.h index 8eaa42fdac4..bc9c95ecf2a 100644 --- a/src/AggregateFunctions/AggregateFunctionHistogram.h +++ b/src/AggregateFunctions/AggregateFunctionHistogram.h @@ -353,7 +353,7 @@ public: this->data(place).read(buf, max_bins); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { auto & data = this->data(place); diff --git a/src/AggregateFunctions/AggregateFunctionIf.h b/src/AggregateFunctions/AggregateFunctionIf.h index bf4f0b24de3..f04450c9142 100644 --- a/src/AggregateFunctions/AggregateFunctionIf.h +++ b/src/AggregateFunctions/AggregateFunctionIf.h @@ -95,9 +95,9 @@ public: nested_func->deserialize(place, buf, arena); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena * arena) const override { - nested_func->insertResultInto(place, to); + nested_func->insertResultInto(place, to, arena); } bool allocatesMemoryInArena() const override diff --git a/src/AggregateFunctions/AggregateFunctionMLMethod.h b/src/AggregateFunctions/AggregateFunctionMLMethod.h index a11ca9032a5..8a93b66ab3b 100644 --- a/src/AggregateFunctions/AggregateFunctionMLMethod.h +++ b/src/AggregateFunctions/AggregateFunctionMLMethod.h @@ -388,7 +388,7 @@ public: /** This function is called if aggregate function without State modifier is selected in a query. * Inserts all weights of the model into the column 'to', so user may use such information if needed */ - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { this->data(place).returnWeights(to); } diff --git a/src/AggregateFunctions/AggregateFunctionMaxIntersections.h b/src/AggregateFunctions/AggregateFunctionMaxIntersections.h index 050c5fd78ea..b8a4dd63eea 100644 --- a/src/AggregateFunctions/AggregateFunctionMaxIntersections.h +++ b/src/AggregateFunctions/AggregateFunctionMaxIntersections.h @@ -129,7 +129,7 @@ public: buf.read(reinterpret_cast(value.data()), size * sizeof(value[0])); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { Int64 current_intersections = 0; Int64 max_intersections = 0; diff --git a/src/AggregateFunctions/AggregateFunctionMerge.h b/src/AggregateFunctions/AggregateFunctionMerge.h index 51a3c11118f..066f7a762f8 100644 --- a/src/AggregateFunctions/AggregateFunctionMerge.h +++ b/src/AggregateFunctions/AggregateFunctionMerge.h @@ -93,9 +93,9 @@ public: nested_func->deserialize(place, buf, arena); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena * arena) const override { - nested_func->insertResultInto(place, to); + nested_func->insertResultInto(place, to, arena); } bool allocatesMemoryInArena() const override diff --git a/src/AggregateFunctions/AggregateFunctionMinMaxAny.h b/src/AggregateFunctions/AggregateFunctionMinMaxAny.h index 69504f7b249..a21a64af9a4 100644 --- a/src/AggregateFunctions/AggregateFunctionMinMaxAny.h +++ b/src/AggregateFunctions/AggregateFunctionMinMaxAny.h @@ -746,7 +746,7 @@ public: return Data::allocatesMemoryInArena(); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { this->data(place).insertResultInto(to); } diff --git a/src/AggregateFunctions/AggregateFunctionNothing.h b/src/AggregateFunctions/AggregateFunctionNothing.h index 511dbbecd38..af90dfb5179 100644 --- a/src/AggregateFunctions/AggregateFunctionNothing.h +++ b/src/AggregateFunctions/AggregateFunctionNothing.h @@ -67,7 +67,7 @@ public: { } - void insertResultInto(AggregateDataPtr, IColumn & to) const override + void insertResultInto(AggregateDataPtr, IColumn & to, Arena *) const override { to.insertDefault(); } diff --git a/src/AggregateFunctions/AggregateFunctionNull.h b/src/AggregateFunctions/AggregateFunctionNull.h index d6f0079232c..2f2c23fdc8b 100644 --- a/src/AggregateFunctions/AggregateFunctionNull.h +++ b/src/AggregateFunctions/AggregateFunctionNull.h @@ -150,14 +150,14 @@ public: } } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena * arena) const override { if constexpr (result_is_nullable) { ColumnNullable & to_concrete = assert_cast(to); if (getFlag(place)) { - nested_function->insertResultInto(nestedPlace(place), to_concrete.getNestedColumn()); + nested_function->insertResultInto(nestedPlace(place), to_concrete.getNestedColumn(), arena); to_concrete.getNullMapData().push_back(0); } else @@ -167,7 +167,7 @@ public: } else { - nested_function->insertResultInto(nestedPlace(place), to); + nested_function->insertResultInto(nestedPlace(place), to, arena); } } diff --git a/src/AggregateFunctions/AggregateFunctionOrFill.h b/src/AggregateFunctions/AggregateFunctionOrFill.h index 1bbf2ea3135..333f07d5e33 100644 --- a/src/AggregateFunctions/AggregateFunctionOrFill.h +++ b/src/AggregateFunctions/AggregateFunctionOrFill.h @@ -148,7 +148,8 @@ public: void insertResultInto( AggregateDataPtr place, - IColumn & to) const override + IColumn & to, + Arena * arena) const override { if (place[size_of_data]) { @@ -157,20 +158,20 @@ public: // -OrNull if (inner_nullable) - nested_function->insertResultInto(place, to); + nested_function->insertResultInto(place, to, arena); else { ColumnNullable & col = typeid_cast(to); col.getNullMapColumn().insertDefault(); - nested_function->insertResultInto(place, col.getNestedColumn()); + nested_function->insertResultInto(place, col.getNestedColumn(), arena); } } else { // -OrDefault - nested_function->insertResultInto(place, to); + nested_function->insertResultInto(place, to, arena); } } else diff --git a/src/AggregateFunctions/AggregateFunctionQuantile.h b/src/AggregateFunctions/AggregateFunctionQuantile.h index 7bdfc13295c..536d9d5683f 100644 --- a/src/AggregateFunctions/AggregateFunctionQuantile.h +++ b/src/AggregateFunctions/AggregateFunctionQuantile.h @@ -138,7 +138,7 @@ public: this->data(place).deserialize(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { /// const_cast is required because some data structures apply finalizaton (like sorting) for obtain a result. auto & data = this->data(place); diff --git a/src/AggregateFunctions/AggregateFunctionResample.h b/src/AggregateFunctions/AggregateFunctionResample.h index 49cc312287e..043e094a688 100644 --- a/src/AggregateFunctions/AggregateFunctionResample.h +++ b/src/AggregateFunctions/AggregateFunctionResample.h @@ -174,13 +174,14 @@ public: void insertResultInto( AggregateDataPtr place, - IColumn & to) const override + IColumn & to, + Arena * arena) const override { auto & col = assert_cast(to); auto & col_offsets = assert_cast(col.getOffsetsColumn()); for (size_t i = 0; i < total; ++i) - nested_function->insertResultInto(place + i * size_of_data, col.getData()); + nested_function->insertResultInto(place + i * size_of_data, col.getData(), arena); col_offsets.getData().push_back(col.getData().size()); } diff --git a/src/AggregateFunctions/AggregateFunctionRetention.h b/src/AggregateFunctions/AggregateFunctionRetention.h index 3a76ba9f055..b742dcdf77f 100644 --- a/src/AggregateFunctions/AggregateFunctionRetention.h +++ b/src/AggregateFunctions/AggregateFunctionRetention.h @@ -123,7 +123,7 @@ public: this->data(place).deserialize(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { auto & data_to = assert_cast(assert_cast(to).getData()).getData(); auto & offsets_to = assert_cast(to).getOffsets(); diff --git a/src/AggregateFunctions/AggregateFunctionSequenceMatch.h b/src/AggregateFunctions/AggregateFunctionSequenceMatch.h index 416786f8fcb..79463e890e4 100644 --- a/src/AggregateFunctions/AggregateFunctionSequenceMatch.h +++ b/src/AggregateFunctions/AggregateFunctionSequenceMatch.h @@ -560,7 +560,7 @@ public: DataTypePtr getReturnType() const override { return std::make_shared(); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { this->data(place).sort(); @@ -588,7 +588,7 @@ public: DataTypePtr getReturnType() const override { return std::make_shared(); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { const_cast(this->data(place)).sort(); assert_cast(to).getData().push_back(count(place)); diff --git a/src/AggregateFunctions/AggregateFunctionSimpleLinearRegression.h b/src/AggregateFunctions/AggregateFunctionSimpleLinearRegression.h index d1405172e27..8c029855a26 100644 --- a/src/AggregateFunctions/AggregateFunctionSimpleLinearRegression.h +++ b/src/AggregateFunctions/AggregateFunctionSimpleLinearRegression.h @@ -170,8 +170,8 @@ public: void insertResultInto( AggregateDataPtr place, - IColumn & to - ) const override + IColumn & to, + Arena *) const override { Ret k = this->data(place).getK(); Ret b = this->data(place).getB(k); diff --git a/src/AggregateFunctions/AggregateFunctionState.h b/src/AggregateFunctions/AggregateFunctionState.h index 126d63573af..51a31677723 100644 --- a/src/AggregateFunctions/AggregateFunctionState.h +++ b/src/AggregateFunctions/AggregateFunctionState.h @@ -80,7 +80,7 @@ public: nested_func->deserialize(place, buf, arena); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { assert_cast(to).getData().push_back(place); } diff --git a/src/AggregateFunctions/AggregateFunctionStatistics.h b/src/AggregateFunctions/AggregateFunctionStatistics.h index 7f6de43f5e1..b0ff57665da 100644 --- a/src/AggregateFunctions/AggregateFunctionStatistics.h +++ b/src/AggregateFunctions/AggregateFunctionStatistics.h @@ -143,7 +143,7 @@ public: this->data(place).deserialize(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { this->data(place).publish(to); } @@ -395,7 +395,7 @@ public: this->data(place).deserialize(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { this->data(place).publish(to); } diff --git a/src/AggregateFunctions/AggregateFunctionStatisticsSimple.h b/src/AggregateFunctions/AggregateFunctionStatisticsSimple.h index 96c07cc3d41..7962453cb35 100644 --- a/src/AggregateFunctions/AggregateFunctionStatisticsSimple.h +++ b/src/AggregateFunctions/AggregateFunctionStatisticsSimple.h @@ -455,7 +455,7 @@ public: this->data(place).read(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { const auto & data = this->data(place); auto & dst = static_cast(to).getData(); diff --git a/src/AggregateFunctions/AggregateFunctionSum.h b/src/AggregateFunctions/AggregateFunctionSum.h index 9d3d559ecee..6f921dbb78b 100644 --- a/src/AggregateFunctions/AggregateFunctionSum.h +++ b/src/AggregateFunctions/AggregateFunctionSum.h @@ -305,7 +305,7 @@ public: this->data(place).read(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { auto & column = static_cast(to); column.getData().push_back(this->data(place).get()); diff --git a/src/AggregateFunctions/AggregateFunctionSumMap.h b/src/AggregateFunctions/AggregateFunctionSumMap.h index e2aef611955..8209170791e 100644 --- a/src/AggregateFunctions/AggregateFunctionSumMap.h +++ b/src/AggregateFunctions/AggregateFunctionSumMap.h @@ -242,7 +242,7 @@ public: } } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { // Final step does compaction of keys that have zero values, this mutates the state auto & merged_maps = this->data(place).merged_maps; diff --git a/src/AggregateFunctions/AggregateFunctionTimeSeriesGroupSum.h b/src/AggregateFunctions/AggregateFunctionTimeSeriesGroupSum.h index ad83324e483..3ec40455cf3 100644 --- a/src/AggregateFunctions/AggregateFunctionTimeSeriesGroupSum.h +++ b/src/AggregateFunctions/AggregateFunctionTimeSeriesGroupSum.h @@ -253,7 +253,7 @@ public: void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override { this->data(place).deserialize(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { const auto & value = this->data(place).result; size_t size = value.size(); diff --git a/src/AggregateFunctions/AggregateFunctionTopK.h b/src/AggregateFunctions/AggregateFunctionTopK.h index 23eb0e7ff09..68317d0bdf0 100644 --- a/src/AggregateFunctions/AggregateFunctionTopK.h +++ b/src/AggregateFunctions/AggregateFunctionTopK.h @@ -79,7 +79,7 @@ public: set.read(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { ColumnArray & arr_to = assert_cast(to); ColumnArray::Offsets & offsets_to = arr_to.getOffsets(); @@ -200,7 +200,7 @@ public: this->data(place).value.merge(this->data(rhs).value); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { ColumnArray & arr_to = assert_cast(to); ColumnArray::Offsets & offsets_to = arr_to.getOffsets(); diff --git a/src/AggregateFunctions/AggregateFunctionUniq.h b/src/AggregateFunctions/AggregateFunctionUniq.h index 1588611b8a2..fe0e96f036b 100644 --- a/src/AggregateFunctions/AggregateFunctionUniq.h +++ b/src/AggregateFunctions/AggregateFunctionUniq.h @@ -240,7 +240,7 @@ public: this->data(place).set.read(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { assert_cast(to).getData().push_back(this->data(place).set.size()); } @@ -300,7 +300,7 @@ public: this->data(place).set.read(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { assert_cast(to).getData().push_back(this->data(place).set.size()); } diff --git a/src/AggregateFunctions/AggregateFunctionUniqCombined.h b/src/AggregateFunctions/AggregateFunctionUniqCombined.h index a92caa4a551..e34cc602ccd 100644 --- a/src/AggregateFunctions/AggregateFunctionUniqCombined.h +++ b/src/AggregateFunctions/AggregateFunctionUniqCombined.h @@ -167,7 +167,7 @@ public: this->data(place).set.read(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { assert_cast(to).getData().push_back(this->data(place).set.size()); } @@ -229,7 +229,7 @@ public: this->data(place).set.read(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { assert_cast(to).getData().push_back(this->data(place).set.size()); } diff --git a/src/AggregateFunctions/AggregateFunctionUniqUpTo.h b/src/AggregateFunctions/AggregateFunctionUniqUpTo.h index 4c71215141c..2a48e0fb182 100644 --- a/src/AggregateFunctions/AggregateFunctionUniqUpTo.h +++ b/src/AggregateFunctions/AggregateFunctionUniqUpTo.h @@ -180,7 +180,7 @@ public: this->data(place).read(buf, threshold); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { assert_cast(to).getData().push_back(this->data(place).size()); } @@ -242,7 +242,7 @@ public: this->data(place).read(buf, threshold); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { assert_cast(to).getData().push_back(this->data(place).size()); } diff --git a/src/AggregateFunctions/AggregateFunctionWindowFunnel.h b/src/AggregateFunctions/AggregateFunctionWindowFunnel.h index b5704203ade..3f41046c20e 100644 --- a/src/AggregateFunctions/AggregateFunctionWindowFunnel.h +++ b/src/AggregateFunctions/AggregateFunctionWindowFunnel.h @@ -280,7 +280,7 @@ public: this->data(place).deserialize(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { assert_cast(to).getData().push_back(getEventLevel(this->data(place))); } diff --git a/src/AggregateFunctions/IAggregateFunction.h b/src/AggregateFunctions/IAggregateFunction.h index 439a5e07c2e..32dcce908c6 100644 --- a/src/AggregateFunctions/IAggregateFunction.h +++ b/src/AggregateFunctions/IAggregateFunction.h @@ -106,7 +106,7 @@ public: /// Inserts results into a column. /// This method must be called once, from single thread. /// After this method was called for state, you can't do anything with state but destroy. - virtual void insertResultInto(AggregateDataPtr place, IColumn & to) const = 0; + virtual void insertResultInto(AggregateDataPtr place, IColumn & to, Arena * arena) const = 0; /// Used for machine learning methods. Predict result from trained model. /// Will insert result into `to` column for rows in range [offset, offset + limit). diff --git a/src/Columns/ColumnAggregateFunction.cpp b/src/Columns/ColumnAggregateFunction.cpp index d4021b45f0e..3374d171059 100644 --- a/src/Columns/ColumnAggregateFunction.cpp +++ b/src/Columns/ColumnAggregateFunction.cpp @@ -135,7 +135,7 @@ MutableColumnPtr ColumnAggregateFunction::convertToValues(MutableColumnPtr colum res->reserve(data.size()); for (auto * val : data) - func->insertResultInto(val, *res); + func->insertResultInto(val, *res, &column_aggregate_func.createOrGetArena()); return res; } diff --git a/src/Functions/array/arrayReduce.cpp b/src/Functions/array/arrayReduce.cpp index 8d44acc82f5..5c9c9472e98 100644 --- a/src/Functions/array/arrayReduce.cpp +++ b/src/Functions/array/arrayReduce.cpp @@ -187,7 +187,7 @@ void FunctionArrayReduce::executeImpl(Block & block, const ColumnNumbers & argum for (size_t i = 0; i < input_rows_count; ++i) if (!res_col_aggregate_function) - agg_func.insertResultInto(places[i], res_col); + agg_func.insertResultInto(places[i], res_col, arena.get()); else res_col_aggregate_function->insertFrom(places[i]); block.getByPosition(result).column = std::move(result_holder); diff --git a/src/Functions/array/arrayReduceInRanges.cpp b/src/Functions/array/arrayReduceInRanges.cpp index 2dd0cd56343..5b594fdb621 100644 --- a/src/Functions/array/arrayReduceInRanges.cpp +++ b/src/Functions/array/arrayReduceInRanges.cpp @@ -376,7 +376,7 @@ void FunctionArrayReduceInRanges::executeImpl(Block & block, const ColumnNumbers } if (!res_col_aggregate_function) - agg_func.insertResultInto(place, result_data); + agg_func.insertResultInto(place, result_data, arena.get()); else res_col_aggregate_function->insertFrom(place); } diff --git a/src/Functions/runningAccumulate.cpp b/src/Functions/runningAccumulate.cpp index 275259e1209..bf109654bc2 100644 --- a/src/Functions/runningAccumulate.cpp +++ b/src/Functions/runningAccumulate.cpp @@ -124,7 +124,7 @@ public: } agg_func.merge(place.data(), state_to_add, arena.get()); - agg_func.insertResultInto(place.data(), result_column); + agg_func.insertResultInto(place.data(), result_column, arena.get()); ++row_number; } diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index 538a24fa997..5bd427b42cd 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -822,10 +822,11 @@ Block Aggregator::convertOneBucketToBlock( MutableColumns & key_columns, AggregateColumnsData & aggregate_columns, MutableColumns & final_aggregate_columns, + Arena * arena, bool final_) { convertToBlockImpl(method, method.data.impls[bucket], - key_columns, aggregate_columns, final_aggregate_columns, final_); + key_columns, aggregate_columns, final_aggregate_columns, arena, final_); }); block.info.bucket_num = bucket; @@ -983,6 +984,7 @@ void Aggregator::convertToBlockImpl( MutableColumns & key_columns, AggregateColumnsData & aggregate_columns, MutableColumns & final_aggregate_columns, + Arena * arena, bool final) const { if (data.empty()) @@ -992,7 +994,7 @@ void Aggregator::convertToBlockImpl( throw Exception{"Aggregate. Unexpected key columns size.", ErrorCodes::LOGICAL_ERROR}; if (final) - convertToBlockImplFinal(method, data, key_columns, final_aggregate_columns); + convertToBlockImplFinal(method, data, key_columns, final_aggregate_columns, arena); else convertToBlockImplNotFinal(method, data, key_columns, aggregate_columns); /// In order to release memory early. @@ -1003,7 +1005,8 @@ void Aggregator::convertToBlockImpl( template inline void Aggregator::insertAggregatesIntoColumns( Mapped & mapped, - MutableColumns & final_aggregate_columns) const + MutableColumns & final_aggregate_columns, + Arena * arena) const { /** Final values of aggregate functions are inserted to columns. * Then states of aggregate functions, that are not longer needed, are destroyed. @@ -1034,7 +1037,8 @@ inline void Aggregator::insertAggregatesIntoColumns( for (; insert_i < params.aggregates_size; ++insert_i) aggregate_functions[insert_i]->insertResultInto( mapped + offsets_of_aggregate_states[insert_i], - *final_aggregate_columns[insert_i]); + *final_aggregate_columns[insert_i], + arena); } catch (...) { @@ -1071,21 +1075,22 @@ void NO_INLINE Aggregator::convertToBlockImplFinal( Method & method, Table & data, MutableColumns & key_columns, - MutableColumns & final_aggregate_columns) const + MutableColumns & final_aggregate_columns, + Arena * arena) const { if constexpr (Method::low_cardinality_optimization) { if (data.hasNullKeyData()) { key_columns[0]->insertDefault(); - insertAggregatesIntoColumns(data.getNullKeyData(), final_aggregate_columns); + insertAggregatesIntoColumns(data.getNullKeyData(), final_aggregate_columns, arena); } } data.forEachValue([&](const auto & key, auto & mapped) { method.insertKeyIntoColumns(key, key_columns, key_sizes); - insertAggregatesIntoColumns(mapped, final_aggregate_columns); + insertAggregatesIntoColumns(mapped, final_aggregate_columns, arena); }); } @@ -1174,7 +1179,7 @@ Block Aggregator::prepareBlockAndFill( } } - filler(key_columns, aggregate_columns_data, final_aggregate_columns, final); + filler(key_columns, aggregate_columns_data, final_aggregate_columns, data_variants.aggregates_pool, final); Block res = header.cloneEmpty(); @@ -1198,6 +1203,7 @@ Block Aggregator::prepareBlockAndFill( return res; } + void Aggregator::fillAggregateColumnsWithSingleKey( AggregatedDataVariants & data_variants, MutableColumns & final_aggregate_columns) @@ -1240,6 +1246,7 @@ Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_va MutableColumns & key_columns, AggregateColumnsData & aggregate_columns, MutableColumns & final_aggregate_columns, + Arena * arena, bool final_) { if (data_variants.type == AggregatedDataVariants::Type::without_key || params.overflow_row) @@ -1254,7 +1261,7 @@ Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_va } else { - insertAggregatesIntoColumns(data, final_aggregate_columns); + insertAggregatesIntoColumns(data, final_aggregate_columns, arena); } if (params.overflow_row) @@ -1282,12 +1289,13 @@ Block Aggregator::prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_v MutableColumns & key_columns, AggregateColumnsData & aggregate_columns, MutableColumns & final_aggregate_columns, + Arena * arena, bool final_) { #define M(NAME) \ else if (data_variants.type == AggregatedDataVariants::Type::NAME) \ convertToBlockImpl(*data_variants.NAME, data_variants.NAME->data, \ - key_columns, aggregate_columns, final_aggregate_columns, final_); + key_columns, aggregate_columns, final_aggregate_columns, arena, final_); if (false) {} // NOLINT APPLY_FOR_VARIANTS_SINGLE_LEVEL(M) diff --git a/src/Interpreters/Aggregator.h b/src/Interpreters/Aggregator.h index 6d0eeee9014..6c55cb88781 100644 --- a/src/Interpreters/Aggregator.h +++ b/src/Interpreters/Aggregator.h @@ -1164,19 +1164,22 @@ protected: MutableColumns & key_columns, AggregateColumnsData & aggregate_columns, MutableColumns & final_aggregate_columns, + Arena * arena, bool final) const; template void insertAggregatesIntoColumns( Mapped & mapped, - MutableColumns & final_aggregate_columns) const; + MutableColumns & final_aggregate_columns, + Arena * arena) const; template void convertToBlockImplFinal( Method & method, Table & data, MutableColumns & key_columns, - MutableColumns & final_aggregate_columns) const; + MutableColumns & final_aggregate_columns, + Arena * arena) const; template void convertToBlockImplNotFinal( diff --git a/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.cpp index be9bf3e354c..3214ca0b4cc 100644 --- a/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.cpp @@ -223,7 +223,7 @@ void AggregatingSortedAlgorithm::AggregatingMergedData::finishGroup() /// Write the simple aggregation result for the current group. for (auto & desc : def.columns_to_simple_aggregate) { - desc.function->insertResultInto(desc.state.data(), *desc.column); + desc.function->insertResultInto(desc.state.data(), *desc.column, arena.get()); desc.destroyState(); } diff --git a/src/Processors/Merges/Algorithms/GraphiteRollupSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/GraphiteRollupSortedAlgorithm.cpp index f26fe96876f..02e0746f09d 100644 --- a/src/Processors/Merges/Algorithms/GraphiteRollupSortedAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/GraphiteRollupSortedAlgorithm.cpp @@ -301,7 +301,7 @@ void GraphiteRollupSortedAlgorithm::GraphiteRollupMergedData::insertRow( const Graphite::AggregationPattern * aggregation_pattern = std::get<1>(current_rule); if (aggregate_state_created) { - aggregation_pattern->function->insertResultInto(place_for_aggregate_state.data(), *value_column); + aggregation_pattern->function->insertResultInto(place_for_aggregate_state.data(), *value_column, nullptr); aggregation_pattern->function->destroy(place_for_aggregate_state.data()); aggregate_state_created = false; } diff --git a/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.cpp index 89154044ae5..7d58c22702e 100644 --- a/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.cpp @@ -497,7 +497,7 @@ void SummingSortedAlgorithm::SummingMergedData::finishGroup() { try { - desc.function->insertResultInto(desc.state.data(), *desc.merged_column); + desc.function->insertResultInto(desc.state.data(), *desc.merged_column, nullptr); /// Update zero status of current row if (desc.column_numbers.size() == 1) diff --git a/tests/queries/0_stateless/01259_combinator_distinct.reference b/tests/queries/0_stateless/01259_combinator_distinct.reference index 83756ffdaa4..281250dedb6 100644 --- a/tests/queries/0_stateless/01259_combinator_distinct.reference +++ b/tests/queries/0_stateless/01259_combinator_distinct.reference @@ -1,6 +1,4 @@ -499500 +4999950000 78 [0,1,2,3,4,5,6,7,8,9,10,11,12] -[0,1,2,3,4,5,6,7,8,9,10,11,12] 20 -5.669227916063075e-17 diff --git a/tests/queries/0_stateless/01259_combinator_distinct.sql b/tests/queries/0_stateless/01259_combinator_distinct.sql index adfddeb34e4..1fef2f17008 100644 --- a/tests/queries/0_stateless/01259_combinator_distinct.sql +++ b/tests/queries/0_stateless/01259_combinator_distinct.sql @@ -1,6 +1,5 @@ -SELECT sum(DISTINCT x) FROM (SELECT number AS x FROM system.numbers_mt LIMIT 100000); -SELECT sum(DISTINCT x) FROM (SELECT number % 13 AS x FROM system.numbers_mt LIMIT 100000); -SELECT groupArray(DISTINCT x) FROM (SELECT number % 13 AS x FROM system.numbers_mt LIMIT 100000); -SELECT groupArray(DISTINCT x) FROM (SELECT number % 13 AS x FROM system.numbers_mt LIMIT 100000); -SELECT finalizeAggregation(countState(DISTINCT toString(number % 20))) FROM numbers_mt (100000); +SELECT sum(DISTINCT number) FROM numbers_mt(100000); +SELECT sum(DISTINCT number % 13) FROM numbers_mt(100000); +SELECT arraySort(groupArray(DISTINCT number % 13)) FROM numbers_mt(100000); +SELECT finalizeAggregation(countState(DISTINCT toString(number % 20))) FROM numbers_mt(100000); -- SELECT corrStableDistinct(DISTINCT x, y) FROM (SELECT number % 11 AS x, number % 13 AS y FROM system.numbers LIMIT 1000); From fb7f4c6369f52673378bd56cd3690ae256f03ea7 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Thu, 18 Jun 2020 02:42:40 +0300 Subject: [PATCH 14/19] fix build --- src/AggregateFunctions/AggregateFunctionDistinct.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/AggregateFunctions/AggregateFunctionDistinct.cpp b/src/AggregateFunctions/AggregateFunctionDistinct.cpp index 1661277d525..c77e977b0fa 100644 --- a/src/AggregateFunctions/AggregateFunctionDistinct.cpp +++ b/src/AggregateFunctions/AggregateFunctionDistinct.cpp @@ -28,7 +28,10 @@ public: } AggregateFunctionPtr transformAggregateFunction( - const AggregateFunctionPtr & nested_function, const DataTypes & arguments, const Array &) const override + const AggregateFunctionPtr & nested_function, + const AggregateFunctionProperties &, + const DataTypes & arguments, + const Array &) const override { AggregateFunctionPtr res; if (arguments.size() == 1) From 7a76abeb2a0c55bb50d6b3ee87ba1c8732d361af Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Fri, 19 Jun 2020 23:13:07 +0300 Subject: [PATCH 15/19] distinct combinator for function of multiuple arguments --- .../AggregateFunctionDistinct.cpp | 19 +- .../AggregateFunctionDistinct.h | 312 ++++++++++-------- src/AggregateFunctions/Helpers.h | 13 + src/AggregateFunctions/KeyHolderHelpers.h | 2 +- .../01259_combinator_distinct.reference | 7 + .../0_stateless/01259_combinator_distinct.sql | 6 +- 6 files changed, 214 insertions(+), 145 deletions(-) diff --git a/src/AggregateFunctions/AggregateFunctionDistinct.cpp b/src/AggregateFunctions/AggregateFunctionDistinct.cpp index c77e977b0fa..1b1e0b872cf 100644 --- a/src/AggregateFunctions/AggregateFunctionDistinct.cpp +++ b/src/AggregateFunctions/AggregateFunctionDistinct.cpp @@ -36,21 +36,24 @@ public: AggregateFunctionPtr res; if (arguments.size() == 1) { - res = AggregateFunctionPtr(createWithNumericType(*arguments[0], nested_function, arguments)); + res.reset(createWithNumericType< + AggregateFunctionDistinct, + AggregateFunctionDistinctSingleNumericData>(*arguments[0], nested_function, arguments)); + if (res) return res; if (arguments[0]->isValueUnambiguouslyRepresentedInContiguousMemoryRegion()) - return std::make_shared>(nested_function, arguments); + return std::make_shared< + AggregateFunctionDistinct< + AggregateFunctionDistinctSingleGenericData>>(nested_function, arguments); else - return std::make_shared>(nested_function, arguments); + return std::make_shared< + AggregateFunctionDistinct< + AggregateFunctionDistinctSingleGenericData>>(nested_function, arguments); } - if (!res) - throw Exception("Illegal type " /* + argument_type->getName() + */ - " of argument for aggregate function " + nested_function->getName() + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); - - return res; + return std::make_shared>(nested_function, arguments); } }; diff --git a/src/AggregateFunctions/AggregateFunctionDistinct.h b/src/AggregateFunctions/AggregateFunctionDistinct.h index 5c663bb6441..cb5fd526f6d 100644 --- a/src/AggregateFunctions/AggregateFunctionDistinct.h +++ b/src/AggregateFunctions/AggregateFunctionDistinct.h @@ -6,8 +6,11 @@ #include #include #include - #include +#include +#include + +#include namespace DB { @@ -17,21 +20,148 @@ namespace ErrorCodes extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; } + template struct AggregateFunctionDistinctSingleNumericData { /// When creating, the hash table must be small. using Set = HashSetWithStackMemory, 4>; - Set value; + using Self = AggregateFunctionDistinctSingleNumericData; + Set set; + + void add(const IColumn ** columns, size_t /* columns_num */, size_t row_num, Arena *) + { + const auto & vec = assert_cast &>(*columns[0]).getData(); + set.insert(vec[row_num]); + } + + void merge(const Self & rhs, Arena *) + { + set.merge(rhs.set); + } + + void serialize(WriteBuffer & buf) const + { + set.write(buf); + } + + void deserialize(ReadBuffer & buf, Arena *) + { + set.read(buf); + } + + MutableColumns getArguments(const DataTypes & argument_types) const + { + MutableColumns argument_columns; + argument_columns.emplace_back(argument_types[0]->createColumn()); + for (const auto & elem : set) + argument_columns[0]->insert(elem.getValue()); + + return argument_columns; + } }; -template -class AggregateFunctionDistinctBase : public IAggregateFunctionDataHelper +struct AggregateFunctionDistinctGenericData { -protected: - static constexpr size_t prefix_size = sizeof(Data); + /// When creating, the hash table must be small. + using Set = HashSetWithSavedHashWithStackMemory; + using Self = AggregateFunctionDistinctGenericData; + Set set; + + void merge(const Self & rhs, Arena * arena) + { + Set::LookupResult it; + bool inserted; + for (const auto & elem : rhs.set) + set.emplace(ArenaKeyHolder{elem.getValue(), *arena}, it, inserted); + } + + void serialize(WriteBuffer & buf) const + { + writeVarUInt(set.size(), buf); + for (const auto & elem : set) + writeStringBinary(elem.getValue(), buf); + } + + void deserialize(ReadBuffer & buf, Arena * arena) + { + size_t size; + readVarUInt(size, buf); + for (size_t i = 0; i < size; ++i) + set.insert(readStringBinaryInto(*arena, buf)); + } +}; + +template +struct AggregateFunctionDistinctSingleGenericData : public AggregateFunctionDistinctGenericData +{ + void add(const IColumn ** columns, size_t /* columns_num */, size_t row_num, Arena * arena) + { + Set::LookupResult it; + bool inserted; + auto key_holder = getKeyHolder(*columns[0], row_num, *arena); + set.emplace(key_holder, it, inserted); + } + + MutableColumns getArguments(const DataTypes & argument_types) const + { + MutableColumns argument_columns; + argument_columns.emplace_back(argument_types[0]->createColumn()); + for (const auto & elem : set) + deserializeAndInsert(elem.getValue(), *argument_columns[0]); + + return argument_columns; + } +}; + +struct AggregateFunctionDistinctMultipleGenericData : public AggregateFunctionDistinctGenericData +{ + void add(const IColumn ** columns, size_t columns_num, size_t row_num, Arena * arena) + { + const char * begin = nullptr; + StringRef value(begin, 0); + SipHash hash; + for (size_t i = 0; i < columns_num; ++i) + { + columns[i]->updateHashWithValue(row_num, hash); + auto cur_ref = columns[i]->serializeValueIntoArena(row_num, *arena, begin); + value.data = cur_ref.data - value.size; + value.size += cur_ref.size; + } + + Set::LookupResult it; + bool inserted; + auto key_holder = SerializedKeyHolder{value, *arena}; + set.emplace(key_holder, it, inserted); + } + + MutableColumns getArguments(const DataTypes & argument_types) const + { + MutableColumns argument_columns(argument_types.size()); + for (size_t i = 0; i < argument_types.size(); ++i) + argument_columns[i] = argument_types[i]->createColumn(); + + for (const auto & elem : set) + { + const char * begin = elem.getValue().data; + for (auto & column : argument_columns) + begin = column->deserializeAndInsertFromArena(begin); + } + + return argument_columns; + } +}; + +/** Adaptor for aggregate functions. + * Adding -Distinct suffix to aggregate function +**/ +template +class AggregateFunctionDistinct : public IAggregateFunctionDataHelper> +{ +private: + static constexpr auto prefix_size = sizeof(Data); AggregateFunctionPtr nested_func; - size_t num_arguments; + size_t arguments_num; AggregateDataPtr getNestedPlace(AggregateDataPtr place) const noexcept { @@ -44,6 +174,46 @@ protected: } public: + AggregateFunctionDistinct(AggregateFunctionPtr nested_func_, const DataTypes & arguments) + : IAggregateFunctionDataHelper(arguments, nested_func_->getParameters()) + , nested_func(nested_func_) + , arguments_num(arguments.size()) + { + if (arguments.empty()) + throw Exception("Aggregate function " + getName() + " require at least one argument", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + } + + void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override + { + this->data(place).add(columns, arguments_num, row_num, arena); + } + + void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override + { + this->data(place).merge(this->data(rhs), arena); + } + + void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override + { + this->data(place).serialize(buf); + } + + void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override + { + this->data(place).deserialize(buf, arena); + } + + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena * arena) const override + { + auto arguments = this->data(place).getArguments(this->argument_types); + ColumnRawPtrs arguments_raw(arguments.size()); + for (size_t i = 0; i < arguments.size(); ++i) + arguments_raw[i] = arguments[i].get(); + + assert(!arguments.empty()); + this->nested_func->addBatchSinglePlace(arguments[0]->size(), this->getNestedPlace(place), arguments_raw.data(), arena); + this->nested_func->insertResultInto(this->getNestedPlace(place), to, arena); + } size_t sizeOfData() const override { @@ -76,134 +246,6 @@ public: { return true; } - - AggregateFunctionDistinctBase(AggregateFunctionPtr nested, const DataTypes & arguments) - : IAggregateFunctionDataHelper(arguments, {}) - , nested_func(nested), num_arguments(arguments.size()) - { - if (arguments.empty()) - throw Exception("Aggregate function " + getName() + " require at least one argument", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); - } -}; - - -/** Adaptor for aggregate functions. - * Adding -Distinct suffix to aggregate function -**/ -template -class AggregateFunctionDistinctSingleNumericImpl final - : public AggregateFunctionDistinctBase, - AggregateFunctionDistinctSingleNumericImpl> -{ -public: - - AggregateFunctionDistinctSingleNumericImpl(AggregateFunctionPtr nested, const DataTypes & arguments) - : AggregateFunctionDistinctBase< - AggregateFunctionDistinctSingleNumericData, - AggregateFunctionDistinctSingleNumericImpl>(nested, arguments) {} - - void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override - { - const auto & vec = assert_cast &>(*columns[0]).getData(); - this->data(place).value.insert(vec[row_num]); - } - - void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override - { - this->data(place).value.merge(this->data(rhs).value); - } - - void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override - { - this->data(place).value.write(buf); - } - - void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override - { - this->data(place).value.read(buf); - } - - void insertResultInto(AggregateDataPtr place, IColumn & to, Arena * arena) const override - { - const auto & set = this->data(place).value; - auto arguments = this->argument_types[0]->createColumn(); - for (const auto & elem : set) - arguments->insert(elem.getValue()); - - const auto * arguments_ptr = arguments.get(); - this->nested_func->addBatchSinglePlace(arguments->size(), this->getNestedPlace(place), &arguments_ptr, arena); - this->nested_func->insertResultInto(this->getNestedPlace(place), to, arena); - } -}; - -struct AggregateFunctionDistinctSingleGenericData -{ - using Set = HashSetWithSavedHashWithStackMemory; - Set value; -}; - -template -class AggregateFunctionDistinctSingleGenericImpl final - : public AggregateFunctionDistinctBase> -{ -public: - using Data = AggregateFunctionDistinctSingleGenericData; - - AggregateFunctionDistinctSingleGenericImpl(AggregateFunctionPtr nested, const DataTypes & arguments) - : AggregateFunctionDistinctBase< - AggregateFunctionDistinctSingleGenericData, - AggregateFunctionDistinctSingleGenericImpl>(nested, arguments) {} - - void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override - { - auto & set = this->data(place).value; - - Data::Set::LookupResult it; - bool inserted; - auto key_holder = getKeyHolder(*columns[0], row_num, *arena); - set.emplace(key_holder, it, inserted); - } - - void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override - { - auto & cur_set = this->data(place).value; - const auto & rhs_set = this->data(rhs).value; - - Data::Set::LookupResult it; - bool inserted; - for (const auto & elem : rhs_set) - cur_set.emplace(ArenaKeyHolder{elem.getValue(), *arena}, it, inserted); - } - - void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override - { - const auto & set = this->data(place).value; - writeVarUInt(set.size(), buf); - for (const auto & elem : set) - writeStringBinary(elem.getValue(), buf); - } - - void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override - { - auto & set = this->data(place).value; - size_t size; - readVarUInt(size, buf); - for (size_t i = 0; i < size; ++i) - set.insert(readStringBinaryInto(*arena, buf)); - } - - void insertResultInto(AggregateDataPtr place, IColumn & to, Arena * arena) const override - { - const auto & set = this->data(place).value; - auto arguments = this->argument_types[0]->createColumn(); - for (const auto & elem : set) - deserializeAndInsert(elem.getValue(), *arguments); - - const auto * arguments_ptr = arguments.get(); - this->nested_func->addBatchSinglePlace(arguments->size(), this->getNestedPlace(place), &arguments_ptr, arena); - this->nested_func->insertResultInto(this->getNestedPlace(place), to, arena); - } }; } diff --git a/src/AggregateFunctions/Helpers.h b/src/AggregateFunctions/Helpers.h index 6c03d25e0b1..bc24e53a763 100644 --- a/src/AggregateFunctions/Helpers.h +++ b/src/AggregateFunctions/Helpers.h @@ -33,6 +33,19 @@ static IAggregateFunction * createWithNumericType(const IDataType & argument_typ return nullptr; } +template