diff --git a/src/AggregateFunctions/AggregateFunctionAggThrow.cpp b/src/AggregateFunctions/AggregateFunctionAggThrow.cpp index ea3eb9b1a20..fada039e20a 100644 --- a/src/AggregateFunctions/AggregateFunctionAggThrow.cpp +++ b/src/AggregateFunctions/AggregateFunctionAggThrow.cpp @@ -93,7 +93,7 @@ public: buf.read(c); } - void insertResultInto(AggregateDataPtr, IColumn & to) const override + void insertResultInto(AggregateDataPtr, IColumn & to, Arena *) const override { to.insertDefault(); } diff --git a/src/AggregateFunctions/AggregateFunctionArgMinMax.h b/src/AggregateFunctions/AggregateFunctionArgMinMax.h index 9a0c428d75b..9470b1b8692 100644 --- a/src/AggregateFunctions/AggregateFunctionArgMinMax.h +++ b/src/AggregateFunctions/AggregateFunctionArgMinMax.h @@ -85,7 +85,7 @@ public: return Data::allocatesMemoryInArena(); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { this->data(place).result.insertResultInto(to); } diff --git a/src/AggregateFunctions/AggregateFunctionArray.h b/src/AggregateFunctions/AggregateFunctionArray.h index 4fe5e459ae1..24b07010707 100644 --- a/src/AggregateFunctions/AggregateFunctionArray.h +++ b/src/AggregateFunctions/AggregateFunctionArray.h @@ -119,9 +119,9 @@ public: nested_func->deserialize(place, buf, arena); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena * arena) const override { - nested_func->insertResultInto(place, to); + nested_func->insertResultInto(place, to, arena); } bool allocatesMemoryInArena() const override diff --git a/src/AggregateFunctions/AggregateFunctionAvg.h b/src/AggregateFunctions/AggregateFunctionAvg.h index d9ef8647b82..1f3426160cb 100644 --- a/src/AggregateFunctions/AggregateFunctionAvg.h +++ b/src/AggregateFunctions/AggregateFunctionAvg.h @@ -80,7 +80,7 @@ public: readBinary(this->data(place).denominator, buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { auto & column = static_cast(to); column.getData().push_back(this->data(place).template result()); diff --git a/src/AggregateFunctions/AggregateFunctionBitwise.h b/src/AggregateFunctions/AggregateFunctionBitwise.h index a4e5f7ddafa..6d9eb3c36e1 100644 --- a/src/AggregateFunctions/AggregateFunctionBitwise.h +++ b/src/AggregateFunctions/AggregateFunctionBitwise.h @@ -74,7 +74,7 @@ public: readBinary(this->data(place).value, buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { assert_cast &>(to).getData().push_back(this->data(place).value); } diff --git a/src/AggregateFunctions/AggregateFunctionBoundingRatio.h b/src/AggregateFunctions/AggregateFunctionBoundingRatio.h index 81846db4bac..9ceb7976f4a 100644 --- a/src/AggregateFunctions/AggregateFunctionBoundingRatio.h +++ b/src/AggregateFunctions/AggregateFunctionBoundingRatio.h @@ -150,7 +150,7 @@ public: data(place).deserialize(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { assert_cast(to).getData().push_back(getBoundingRatio(data(place))); } diff --git a/src/AggregateFunctions/AggregateFunctionCategoricalInformationValue.h b/src/AggregateFunctions/AggregateFunctionCategoricalInformationValue.h index 1c397c26631..aa205a71c97 100644 --- a/src/AggregateFunctions/AggregateFunctionCategoricalInformationValue.h +++ b/src/AggregateFunctions/AggregateFunctionCategoricalInformationValue.h @@ -119,8 +119,8 @@ public: void insertResultInto( AggregateDataPtr place, - IColumn & to - ) const override + IColumn & to, + Arena *) const override { auto & col = static_cast(to); auto & data_col = static_cast(col.getData()); diff --git a/src/AggregateFunctions/AggregateFunctionCount.h b/src/AggregateFunctions/AggregateFunctionCount.h index feb5725d9f1..29c5de0021c 100644 --- a/src/AggregateFunctions/AggregateFunctionCount.h +++ b/src/AggregateFunctions/AggregateFunctionCount.h @@ -57,7 +57,7 @@ public: readVarUInt(data(place).count, buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { assert_cast(to).getData().push_back(data(place).count); } @@ -112,7 +112,7 @@ public: readVarUInt(data(place).count, buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { assert_cast(to).getData().push_back(data(place).count); } diff --git a/src/AggregateFunctions/AggregateFunctionDistinct.cpp b/src/AggregateFunctions/AggregateFunctionDistinct.cpp new file mode 100644 index 00000000000..4d89e8fb199 --- /dev/null +++ b/src/AggregateFunctions/AggregateFunctionDistinct.cpp @@ -0,0 +1,64 @@ +#include +#include +#include +#include +#include "registerAggregateFunctions.h" + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; +} + +class AggregateFunctionCombinatorDistinct final : public IAggregateFunctionCombinator +{ +public: + String getName() const override { return "Distinct"; } + + DataTypes transformArguments(const DataTypes & arguments) const override + { + if (arguments.empty()) + throw Exception("Incorrect number of arguments for aggregate function with " + getName() + " suffix", + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + return arguments; + } + + AggregateFunctionPtr transformAggregateFunction( + const AggregateFunctionPtr & nested_function, + const AggregateFunctionProperties &, + const DataTypes & arguments, + const Array &) const override + { + AggregateFunctionPtr res; + if (arguments.size() == 1) + { + res.reset(createWithNumericType< + AggregateFunctionDistinct, + AggregateFunctionDistinctSingleNumericData>(*arguments[0], nested_function, arguments)); + + if (res) + return res; + + if (arguments[0]->isValueUnambiguouslyRepresentedInContiguousMemoryRegion()) + return std::make_shared< + AggregateFunctionDistinct< + AggregateFunctionDistinctSingleGenericData>>(nested_function, arguments); + else + return std::make_shared< + AggregateFunctionDistinct< + AggregateFunctionDistinctSingleGenericData>>(nested_function, arguments); + } + + return std::make_shared>(nested_function, arguments); + } +}; + +void registerAggregateFunctionCombinatorDistinct(AggregateFunctionCombinatorFactory & factory) +{ + factory.registerCombinator(std::make_shared()); +} + +} diff --git a/src/AggregateFunctions/AggregateFunctionDistinct.h b/src/AggregateFunctions/AggregateFunctionDistinct.h new file mode 100644 index 00000000000..01a9c71d94f --- /dev/null +++ b/src/AggregateFunctions/AggregateFunctionDistinct.h @@ -0,0 +1,240 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace DB +{ + + +template +struct AggregateFunctionDistinctSingleNumericData +{ + /// When creating, the hash table must be small. + using Set = HashSetWithStackMemory, 4>; + using Self = AggregateFunctionDistinctSingleNumericData; + Set set; + + void add(const IColumn ** columns, size_t /* columns_num */, size_t row_num, Arena *) + { + const auto & vec = assert_cast &>(*columns[0]).getData(); + set.insert(vec[row_num]); + } + + void merge(const Self & rhs, Arena *) + { + set.merge(rhs.set); + } + + void serialize(WriteBuffer & buf) const + { + set.write(buf); + } + + void deserialize(ReadBuffer & buf, Arena *) + { + set.read(buf); + } + + MutableColumns getArguments(const DataTypes & argument_types) const + { + MutableColumns argument_columns; + argument_columns.emplace_back(argument_types[0]->createColumn()); + for (const auto & elem : set) + argument_columns[0]->insert(elem.getValue()); + + return argument_columns; + } +}; + +struct AggregateFunctionDistinctGenericData +{ + /// When creating, the hash table must be small. + using Set = HashSetWithSavedHashWithStackMemory; + using Self = AggregateFunctionDistinctGenericData; + Set set; + + void merge(const Self & rhs, Arena * arena) + { + Set::LookupResult it; + bool inserted; + for (const auto & elem : rhs.set) + set.emplace(ArenaKeyHolder{elem.getValue(), *arena}, it, inserted); + } + + void serialize(WriteBuffer & buf) const + { + writeVarUInt(set.size(), buf); + for (const auto & elem : set) + writeStringBinary(elem.getValue(), buf); + } + + void deserialize(ReadBuffer & buf, Arena * arena) + { + size_t size; + readVarUInt(size, buf); + for (size_t i = 0; i < size; ++i) + set.insert(readStringBinaryInto(*arena, buf)); + } +}; + +template +struct AggregateFunctionDistinctSingleGenericData : public AggregateFunctionDistinctGenericData +{ + void add(const IColumn ** columns, size_t /* columns_num */, size_t row_num, Arena * arena) + { + Set::LookupResult it; + bool inserted; + auto key_holder = getKeyHolder(*columns[0], row_num, *arena); + set.emplace(key_holder, it, inserted); + } + + MutableColumns getArguments(const DataTypes & argument_types) const + { + MutableColumns argument_columns; + argument_columns.emplace_back(argument_types[0]->createColumn()); + for (const auto & elem : set) + deserializeAndInsert(elem.getValue(), *argument_columns[0]); + + return argument_columns; + } +}; + +struct AggregateFunctionDistinctMultipleGenericData : public AggregateFunctionDistinctGenericData +{ + void add(const IColumn ** columns, size_t columns_num, size_t row_num, Arena * arena) + { + const char * begin = nullptr; + StringRef value(begin, 0); + for (size_t i = 0; i < columns_num; ++i) + { + auto cur_ref = columns[i]->serializeValueIntoArena(row_num, *arena, begin); + value.data = cur_ref.data - value.size; + value.size += cur_ref.size; + } + + Set::LookupResult it; + bool inserted; + auto key_holder = SerializedKeyHolder{value, *arena}; + set.emplace(key_holder, it, inserted); + } + + MutableColumns getArguments(const DataTypes & argument_types) const + { + MutableColumns argument_columns(argument_types.size()); + for (size_t i = 0; i < argument_types.size(); ++i) + argument_columns[i] = argument_types[i]->createColumn(); + + for (const auto & elem : set) + { + const char * begin = elem.getValue().data; + for (auto & column : argument_columns) + begin = column->deserializeAndInsertFromArena(begin); + } + + return argument_columns; + } +}; + +/** Adaptor for aggregate functions. + * Adding -Distinct suffix to aggregate function +**/ +template +class AggregateFunctionDistinct : public IAggregateFunctionDataHelper> +{ +private: + static constexpr auto prefix_size = sizeof(Data); + AggregateFunctionPtr nested_func; + size_t arguments_num; + + AggregateDataPtr getNestedPlace(AggregateDataPtr place) const noexcept + { + return place + prefix_size; + } + + ConstAggregateDataPtr getNestedPlace(ConstAggregateDataPtr place) const noexcept + { + return place + prefix_size; + } + +public: + AggregateFunctionDistinct(AggregateFunctionPtr nested_func_, const DataTypes & arguments) + : IAggregateFunctionDataHelper(arguments, nested_func_->getParameters()) + , nested_func(nested_func_) + , arguments_num(arguments.size()) {} + + void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override + { + this->data(place).add(columns, arguments_num, row_num, arena); + } + + void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override + { + this->data(place).merge(this->data(rhs), arena); + } + + void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override + { + this->data(place).serialize(buf); + } + + void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override + { + this->data(place).deserialize(buf, arena); + } + + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena * arena) const override + { + auto arguments = this->data(place).getArguments(this->argument_types); + ColumnRawPtrs arguments_raw(arguments.size()); + for (size_t i = 0; i < arguments.size(); ++i) + arguments_raw[i] = arguments[i].get(); + + assert(!arguments.empty()); + nested_func->addBatchSinglePlace(arguments[0]->size(), getNestedPlace(place), arguments_raw.data(), arena); + nested_func->insertResultInto(getNestedPlace(place), to, arena); + } + + size_t sizeOfData() const override + { + return prefix_size + nested_func->sizeOfData(); + } + + void create(AggregateDataPtr place) const override + { + new (place) Data; + nested_func->create(getNestedPlace(place)); + } + + void destroy(AggregateDataPtr place) const noexcept override + { + this->data(place).~Data(); + nested_func->destroy(getNestedPlace(place)); + } + + String getName() const override + { + return nested_func->getName() + "Distinct"; + } + + DataTypePtr getReturnType() const override + { + return nested_func->getReturnType(); + } + + bool allocatesMemoryInArena() const override + { + return true; + } +}; + +} diff --git a/src/AggregateFunctions/AggregateFunctionEntropy.h b/src/AggregateFunctions/AggregateFunctionEntropy.h index ff233a5ac93..656aca43f60 100644 --- a/src/AggregateFunctions/AggregateFunctionEntropy.h +++ b/src/AggregateFunctions/AggregateFunctionEntropy.h @@ -132,7 +132,7 @@ public: this->data(place).deserialize(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { auto & column = assert_cast &>(to); column.getData().push_back(this->data(place).get()); diff --git a/src/AggregateFunctions/AggregateFunctionForEach.h b/src/AggregateFunctions/AggregateFunctionForEach.h index 23a3487de47..19f2994d3f1 100644 --- a/src/AggregateFunctions/AggregateFunctionForEach.h +++ b/src/AggregateFunctions/AggregateFunctionForEach.h @@ -225,7 +225,7 @@ public: } } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena * arena) const override { AggregateFunctionForEachData & state = data(place); @@ -236,7 +236,7 @@ public: char * nested_state = state.array_of_aggregate_datas; for (size_t i = 0; i < state.dynamic_array_size; ++i) { - nested_func->insertResultInto(nested_state, elems_to); + nested_func->insertResultInto(nested_state, elems_to, arena); nested_state += nested_size_of_data; } diff --git a/src/AggregateFunctions/AggregateFunctionGroupArray.h b/src/AggregateFunctions/AggregateFunctionGroupArray.h index b76efd9f6c2..f3d31eb599b 100644 --- a/src/AggregateFunctions/AggregateFunctionGroupArray.h +++ b/src/AggregateFunctions/AggregateFunctionGroupArray.h @@ -282,7 +282,7 @@ public: // if constexpr (Trait::sampler == Sampler::DETERMINATOR) } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { const auto & value = this->data(place).value; size_t size = value.size(); @@ -600,7 +600,7 @@ public: // if constexpr (Trait::sampler == Sampler::DETERMINATOR) } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { auto & column_array = assert_cast(to); @@ -815,7 +815,7 @@ public: data(place).last = prev; } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { auto & column_array = assert_cast(to); diff --git a/src/AggregateFunctions/AggregateFunctionGroupArrayInsertAt.h b/src/AggregateFunctions/AggregateFunctionGroupArrayInsertAt.h index 0eec38c51a7..d84c99aec57 100644 --- a/src/AggregateFunctions/AggregateFunctionGroupArrayInsertAt.h +++ b/src/AggregateFunctions/AggregateFunctionGroupArrayInsertAt.h @@ -179,7 +179,7 @@ public: } } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { ColumnArray & to_array = assert_cast(to); IColumn & to_data = to_array.getData(); diff --git a/src/AggregateFunctions/AggregateFunctionGroupArrayMoving.h b/src/AggregateFunctions/AggregateFunctionGroupArrayMoving.h index 8f93a7eb25a..19562b37a12 100644 --- a/src/AggregateFunctions/AggregateFunctionGroupArrayMoving.h +++ b/src/AggregateFunctions/AggregateFunctionGroupArrayMoving.h @@ -158,7 +158,7 @@ public: this->data(place).sum = value.back(); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { const auto & data = this->data(place); size_t size = data.value.size(); diff --git a/src/AggregateFunctions/AggregateFunctionGroupBitmap.h b/src/AggregateFunctions/AggregateFunctionGroupBitmap.h index 766479cc08d..a6470aa6943 100644 --- a/src/AggregateFunctions/AggregateFunctionGroupBitmap.h +++ b/src/AggregateFunctions/AggregateFunctionGroupBitmap.h @@ -48,7 +48,7 @@ public: this->data(place).rbs.read(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { assert_cast &>(to).getData().push_back(this->data(place).rbs.size()); } @@ -113,7 +113,7 @@ public: this->data(place).rbs.read(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { assert_cast &>(to).getData().push_back(this->data(place).rbs.size()); } diff --git a/src/AggregateFunctions/AggregateFunctionGroupUniqArray.h b/src/AggregateFunctions/AggregateFunctionGroupUniqArray.h index 88b1c87f526..2ee9d0f6e1c 100644 --- a/src/AggregateFunctions/AggregateFunctionGroupUniqArray.h +++ b/src/AggregateFunctions/AggregateFunctionGroupUniqArray.h @@ -16,6 +16,7 @@ #include #include +#include #define AGGREGATE_FUNCTION_GROUP_ARRAY_UNIQ_MAX_SIZE 0xFFFFFF @@ -97,7 +98,7 @@ public: this->data(place).value.read(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { ColumnArray & arr_to = assert_cast(to); ColumnArray::Offsets & offsets_to = arr_to.getOffsets(); @@ -147,26 +148,6 @@ class AggregateFunctionGroupUniqArrayGeneric using State = AggregateFunctionGroupUniqArrayGenericData; - static auto getKeyHolder(const IColumn & column, size_t row_num, Arena & arena) - { - if constexpr (is_plain_column) - { - return ArenaKeyHolder{column.getDataAt(row_num), arena}; - } - else - { - const char * begin = nullptr; - StringRef serialized = column.serializeValueIntoArena(row_num, arena, begin); - assert(serialized.data != nullptr); - return SerializedKeyHolder{serialized, arena}; - } - } - - static void deserializeAndInsert(StringRef str, IColumn & data_to) - { - return deserializeAndInsertImpl(str, data_to); - } - public: AggregateFunctionGroupUniqArrayGeneric(const DataTypePtr & input_data_type_, UInt64 max_elems_ = std::numeric_limits::max()) : IAggregateFunctionDataHelper>({input_data_type_}, {}) @@ -215,7 +196,7 @@ public: bool inserted; State::Set::LookupResult it; - auto key_holder = getKeyHolder(*columns[0], row_num, *arena); + auto key_holder = getKeyHolder(*columns[0], row_num, *arena); set.emplace(key_holder, it, inserted); } @@ -237,7 +218,7 @@ public: } } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { ColumnArray & arr_to = assert_cast(to); ColumnArray::Offsets & offsets_to = arr_to.getOffsets(); @@ -247,22 +228,10 @@ public: offsets_to.push_back(offsets_to.back() + set.size()); for (auto & elem : set) - deserializeAndInsert(elem.getValue(), data_to); + deserializeAndInsert(elem.getValue(), data_to); } }; -template <> -inline void deserializeAndInsertImpl(StringRef str, IColumn & data_to) -{ - data_to.deserializeAndInsertFromArena(str.data); -} - -template <> -inline void deserializeAndInsertImpl(StringRef str, IColumn & data_to) -{ - data_to.insertData(str.data, str.size); -} - #undef AGGREGATE_FUNCTION_GROUP_ARRAY_UNIQ_MAX_SIZE } diff --git a/src/AggregateFunctions/AggregateFunctionHistogram.h b/src/AggregateFunctions/AggregateFunctionHistogram.h index 8eaa42fdac4..bc9c95ecf2a 100644 --- a/src/AggregateFunctions/AggregateFunctionHistogram.h +++ b/src/AggregateFunctions/AggregateFunctionHistogram.h @@ -353,7 +353,7 @@ public: this->data(place).read(buf, max_bins); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { auto & data = this->data(place); diff --git a/src/AggregateFunctions/AggregateFunctionIf.h b/src/AggregateFunctions/AggregateFunctionIf.h index bf4f0b24de3..f04450c9142 100644 --- a/src/AggregateFunctions/AggregateFunctionIf.h +++ b/src/AggregateFunctions/AggregateFunctionIf.h @@ -95,9 +95,9 @@ public: nested_func->deserialize(place, buf, arena); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena * arena) const override { - nested_func->insertResultInto(place, to); + nested_func->insertResultInto(place, to, arena); } bool allocatesMemoryInArena() const override diff --git a/src/AggregateFunctions/AggregateFunctionMLMethod.h b/src/AggregateFunctions/AggregateFunctionMLMethod.h index a11ca9032a5..8a93b66ab3b 100644 --- a/src/AggregateFunctions/AggregateFunctionMLMethod.h +++ b/src/AggregateFunctions/AggregateFunctionMLMethod.h @@ -388,7 +388,7 @@ public: /** This function is called if aggregate function without State modifier is selected in a query. * Inserts all weights of the model into the column 'to', so user may use such information if needed */ - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { this->data(place).returnWeights(to); } diff --git a/src/AggregateFunctions/AggregateFunctionMaxIntersections.h b/src/AggregateFunctions/AggregateFunctionMaxIntersections.h index 050c5fd78ea..b8a4dd63eea 100644 --- a/src/AggregateFunctions/AggregateFunctionMaxIntersections.h +++ b/src/AggregateFunctions/AggregateFunctionMaxIntersections.h @@ -129,7 +129,7 @@ public: buf.read(reinterpret_cast(value.data()), size * sizeof(value[0])); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { Int64 current_intersections = 0; Int64 max_intersections = 0; diff --git a/src/AggregateFunctions/AggregateFunctionMerge.h b/src/AggregateFunctions/AggregateFunctionMerge.h index 51a3c11118f..066f7a762f8 100644 --- a/src/AggregateFunctions/AggregateFunctionMerge.h +++ b/src/AggregateFunctions/AggregateFunctionMerge.h @@ -93,9 +93,9 @@ public: nested_func->deserialize(place, buf, arena); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena * arena) const override { - nested_func->insertResultInto(place, to); + nested_func->insertResultInto(place, to, arena); } bool allocatesMemoryInArena() const override diff --git a/src/AggregateFunctions/AggregateFunctionMinMaxAny.h b/src/AggregateFunctions/AggregateFunctionMinMaxAny.h index 69504f7b249..a21a64af9a4 100644 --- a/src/AggregateFunctions/AggregateFunctionMinMaxAny.h +++ b/src/AggregateFunctions/AggregateFunctionMinMaxAny.h @@ -746,7 +746,7 @@ public: return Data::allocatesMemoryInArena(); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { this->data(place).insertResultInto(to); } diff --git a/src/AggregateFunctions/AggregateFunctionNothing.h b/src/AggregateFunctions/AggregateFunctionNothing.h index b3206f6db6e..f373b3b55b0 100644 --- a/src/AggregateFunctions/AggregateFunctionNothing.h +++ b/src/AggregateFunctions/AggregateFunctionNothing.h @@ -67,7 +67,7 @@ public: { } - void insertResultInto(AggregateDataPtr, IColumn & to) const override + void insertResultInto(AggregateDataPtr, IColumn & to, Arena *) const override { to.insertDefault(); } diff --git a/src/AggregateFunctions/AggregateFunctionNull.h b/src/AggregateFunctions/AggregateFunctionNull.h index d6f0079232c..2f2c23fdc8b 100644 --- a/src/AggregateFunctions/AggregateFunctionNull.h +++ b/src/AggregateFunctions/AggregateFunctionNull.h @@ -150,14 +150,14 @@ public: } } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena * arena) const override { if constexpr (result_is_nullable) { ColumnNullable & to_concrete = assert_cast(to); if (getFlag(place)) { - nested_function->insertResultInto(nestedPlace(place), to_concrete.getNestedColumn()); + nested_function->insertResultInto(nestedPlace(place), to_concrete.getNestedColumn(), arena); to_concrete.getNullMapData().push_back(0); } else @@ -167,7 +167,7 @@ public: } else { - nested_function->insertResultInto(nestedPlace(place), to); + nested_function->insertResultInto(nestedPlace(place), to, arena); } } diff --git a/src/AggregateFunctions/AggregateFunctionOrFill.h b/src/AggregateFunctions/AggregateFunctionOrFill.h index 1bbf2ea3135..333f07d5e33 100644 --- a/src/AggregateFunctions/AggregateFunctionOrFill.h +++ b/src/AggregateFunctions/AggregateFunctionOrFill.h @@ -148,7 +148,8 @@ public: void insertResultInto( AggregateDataPtr place, - IColumn & to) const override + IColumn & to, + Arena * arena) const override { if (place[size_of_data]) { @@ -157,20 +158,20 @@ public: // -OrNull if (inner_nullable) - nested_function->insertResultInto(place, to); + nested_function->insertResultInto(place, to, arena); else { ColumnNullable & col = typeid_cast(to); col.getNullMapColumn().insertDefault(); - nested_function->insertResultInto(place, col.getNestedColumn()); + nested_function->insertResultInto(place, col.getNestedColumn(), arena); } } else { // -OrDefault - nested_function->insertResultInto(place, to); + nested_function->insertResultInto(place, to, arena); } } else diff --git a/src/AggregateFunctions/AggregateFunctionQuantile.h b/src/AggregateFunctions/AggregateFunctionQuantile.h index 7bdfc13295c..536d9d5683f 100644 --- a/src/AggregateFunctions/AggregateFunctionQuantile.h +++ b/src/AggregateFunctions/AggregateFunctionQuantile.h @@ -138,7 +138,7 @@ public: this->data(place).deserialize(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { /// const_cast is required because some data structures apply finalizaton (like sorting) for obtain a result. auto & data = this->data(place); diff --git a/src/AggregateFunctions/AggregateFunctionResample.h b/src/AggregateFunctions/AggregateFunctionResample.h index 49cc312287e..043e094a688 100644 --- a/src/AggregateFunctions/AggregateFunctionResample.h +++ b/src/AggregateFunctions/AggregateFunctionResample.h @@ -174,13 +174,14 @@ public: void insertResultInto( AggregateDataPtr place, - IColumn & to) const override + IColumn & to, + Arena * arena) const override { auto & col = assert_cast(to); auto & col_offsets = assert_cast(col.getOffsetsColumn()); for (size_t i = 0; i < total; ++i) - nested_function->insertResultInto(place + i * size_of_data, col.getData()); + nested_function->insertResultInto(place + i * size_of_data, col.getData(), arena); col_offsets.getData().push_back(col.getData().size()); } diff --git a/src/AggregateFunctions/AggregateFunctionRetention.h b/src/AggregateFunctions/AggregateFunctionRetention.h index 3a76ba9f055..b742dcdf77f 100644 --- a/src/AggregateFunctions/AggregateFunctionRetention.h +++ b/src/AggregateFunctions/AggregateFunctionRetention.h @@ -123,7 +123,7 @@ public: this->data(place).deserialize(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { auto & data_to = assert_cast(assert_cast(to).getData()).getData(); auto & offsets_to = assert_cast(to).getOffsets(); diff --git a/src/AggregateFunctions/AggregateFunctionSequenceMatch.h b/src/AggregateFunctions/AggregateFunctionSequenceMatch.h index 416786f8fcb..79463e890e4 100644 --- a/src/AggregateFunctions/AggregateFunctionSequenceMatch.h +++ b/src/AggregateFunctions/AggregateFunctionSequenceMatch.h @@ -560,7 +560,7 @@ public: DataTypePtr getReturnType() const override { return std::make_shared(); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { this->data(place).sort(); @@ -588,7 +588,7 @@ public: DataTypePtr getReturnType() const override { return std::make_shared(); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { const_cast(this->data(place)).sort(); assert_cast(to).getData().push_back(count(place)); diff --git a/src/AggregateFunctions/AggregateFunctionSimpleLinearRegression.h b/src/AggregateFunctions/AggregateFunctionSimpleLinearRegression.h index d1405172e27..8c029855a26 100644 --- a/src/AggregateFunctions/AggregateFunctionSimpleLinearRegression.h +++ b/src/AggregateFunctions/AggregateFunctionSimpleLinearRegression.h @@ -170,8 +170,8 @@ public: void insertResultInto( AggregateDataPtr place, - IColumn & to - ) const override + IColumn & to, + Arena *) const override { Ret k = this->data(place).getK(); Ret b = this->data(place).getB(k); diff --git a/src/AggregateFunctions/AggregateFunctionState.h b/src/AggregateFunctions/AggregateFunctionState.h index 126d63573af..51a31677723 100644 --- a/src/AggregateFunctions/AggregateFunctionState.h +++ b/src/AggregateFunctions/AggregateFunctionState.h @@ -80,7 +80,7 @@ public: nested_func->deserialize(place, buf, arena); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { assert_cast(to).getData().push_back(place); } diff --git a/src/AggregateFunctions/AggregateFunctionStatistics.h b/src/AggregateFunctions/AggregateFunctionStatistics.h index 7f6de43f5e1..b0ff57665da 100644 --- a/src/AggregateFunctions/AggregateFunctionStatistics.h +++ b/src/AggregateFunctions/AggregateFunctionStatistics.h @@ -143,7 +143,7 @@ public: this->data(place).deserialize(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { this->data(place).publish(to); } @@ -395,7 +395,7 @@ public: this->data(place).deserialize(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { this->data(place).publish(to); } diff --git a/src/AggregateFunctions/AggregateFunctionStatisticsSimple.h b/src/AggregateFunctions/AggregateFunctionStatisticsSimple.h index 96c07cc3d41..7962453cb35 100644 --- a/src/AggregateFunctions/AggregateFunctionStatisticsSimple.h +++ b/src/AggregateFunctions/AggregateFunctionStatisticsSimple.h @@ -455,7 +455,7 @@ public: this->data(place).read(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { const auto & data = this->data(place); auto & dst = static_cast(to).getData(); diff --git a/src/AggregateFunctions/AggregateFunctionSum.h b/src/AggregateFunctions/AggregateFunctionSum.h index 9d3d559ecee..6f921dbb78b 100644 --- a/src/AggregateFunctions/AggregateFunctionSum.h +++ b/src/AggregateFunctions/AggregateFunctionSum.h @@ -305,7 +305,7 @@ public: this->data(place).read(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { auto & column = static_cast(to); column.getData().push_back(this->data(place).get()); diff --git a/src/AggregateFunctions/AggregateFunctionSumMap.h b/src/AggregateFunctions/AggregateFunctionSumMap.h index 1f4be4e806e..ab17da1b490 100644 --- a/src/AggregateFunctions/AggregateFunctionSumMap.h +++ b/src/AggregateFunctions/AggregateFunctionSumMap.h @@ -246,7 +246,7 @@ public: } } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { // Final step does compaction of keys that have zero values, this mutates the state auto & merged_maps = this->data(place).merged_maps; diff --git a/src/AggregateFunctions/AggregateFunctionTimeSeriesGroupSum.h b/src/AggregateFunctions/AggregateFunctionTimeSeriesGroupSum.h index ad83324e483..3ec40455cf3 100644 --- a/src/AggregateFunctions/AggregateFunctionTimeSeriesGroupSum.h +++ b/src/AggregateFunctions/AggregateFunctionTimeSeriesGroupSum.h @@ -253,7 +253,7 @@ public: void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override { this->data(place).deserialize(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { const auto & value = this->data(place).result; size_t size = value.size(); diff --git a/src/AggregateFunctions/AggregateFunctionTopK.h b/src/AggregateFunctions/AggregateFunctionTopK.h index 23eb0e7ff09..68317d0bdf0 100644 --- a/src/AggregateFunctions/AggregateFunctionTopK.h +++ b/src/AggregateFunctions/AggregateFunctionTopK.h @@ -79,7 +79,7 @@ public: set.read(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { ColumnArray & arr_to = assert_cast(to); ColumnArray::Offsets & offsets_to = arr_to.getOffsets(); @@ -200,7 +200,7 @@ public: this->data(place).value.merge(this->data(rhs).value); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { ColumnArray & arr_to = assert_cast(to); ColumnArray::Offsets & offsets_to = arr_to.getOffsets(); diff --git a/src/AggregateFunctions/AggregateFunctionUniq.h b/src/AggregateFunctions/AggregateFunctionUniq.h index 334e809ebe7..920232ee92c 100644 --- a/src/AggregateFunctions/AggregateFunctionUniq.h +++ b/src/AggregateFunctions/AggregateFunctionUniq.h @@ -240,7 +240,7 @@ public: this->data(place).set.read(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { assert_cast(to).getData().push_back(this->data(place).set.size()); } @@ -294,7 +294,7 @@ public: this->data(place).set.read(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { assert_cast(to).getData().push_back(this->data(place).set.size()); } diff --git a/src/AggregateFunctions/AggregateFunctionUniqCombined.h b/src/AggregateFunctions/AggregateFunctionUniqCombined.h index a92caa4a551..e34cc602ccd 100644 --- a/src/AggregateFunctions/AggregateFunctionUniqCombined.h +++ b/src/AggregateFunctions/AggregateFunctionUniqCombined.h @@ -167,7 +167,7 @@ public: this->data(place).set.read(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { assert_cast(to).getData().push_back(this->data(place).set.size()); } @@ -229,7 +229,7 @@ public: this->data(place).set.read(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { assert_cast(to).getData().push_back(this->data(place).set.size()); } diff --git a/src/AggregateFunctions/AggregateFunctionUniqUpTo.h b/src/AggregateFunctions/AggregateFunctionUniqUpTo.h index 4c71215141c..2a48e0fb182 100644 --- a/src/AggregateFunctions/AggregateFunctionUniqUpTo.h +++ b/src/AggregateFunctions/AggregateFunctionUniqUpTo.h @@ -180,7 +180,7 @@ public: this->data(place).read(buf, threshold); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { assert_cast(to).getData().push_back(this->data(place).size()); } @@ -242,7 +242,7 @@ public: this->data(place).read(buf, threshold); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { assert_cast(to).getData().push_back(this->data(place).size()); } diff --git a/src/AggregateFunctions/AggregateFunctionWindowFunnel.h b/src/AggregateFunctions/AggregateFunctionWindowFunnel.h index b5704203ade..3f41046c20e 100644 --- a/src/AggregateFunctions/AggregateFunctionWindowFunnel.h +++ b/src/AggregateFunctions/AggregateFunctionWindowFunnel.h @@ -280,7 +280,7 @@ public: this->data(place).deserialize(buf); } - void insertResultInto(AggregateDataPtr place, IColumn & to) const override + void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { assert_cast(to).getData().push_back(getEventLevel(this->data(place))); } diff --git a/src/AggregateFunctions/Helpers.h b/src/AggregateFunctions/Helpers.h index 6c03d25e0b1..bc24e53a763 100644 --- a/src/AggregateFunctions/Helpers.h +++ b/src/AggregateFunctions/Helpers.h @@ -33,6 +33,19 @@ static IAggregateFunction * createWithNumericType(const IDataType & argument_typ return nullptr; } +template