From 5570b56cc3d132952d9c86c1af275235ba050162 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 27 Apr 2021 12:01:58 +0300 Subject: [PATCH] A little bit faster merge of aggregating states. --- src/AggregateFunctions/AggregateFunctionIf.h | 10 ++++++++++ .../AggregateFunctionOrFill.h | 12 ++++++++++++ src/AggregateFunctions/IAggregateFunction.h | 19 +++++++++++++++++++ src/Interpreters/Aggregator.cpp | 19 ++++++++++--------- 4 files changed, 51 insertions(+), 9 deletions(-) diff --git a/src/AggregateFunctions/AggregateFunctionIf.h b/src/AggregateFunctions/AggregateFunctionIf.h index 8144ae355ba..0670fa0e69a 100644 --- a/src/AggregateFunctions/AggregateFunctionIf.h +++ b/src/AggregateFunctions/AggregateFunctionIf.h @@ -113,6 +113,16 @@ public: nested_func->merge(place, rhs, arena); } + void mergeBatch( + size_t batch_size, + AggregateDataPtr * places, + size_t place_offset, + const AggregateDataPtr * rhs, + Arena * arena) const override + { + nested_func->mergeBatch(batch_size, places, place_offset, rhs, arena); + } + void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf) const override { nested_func->serialize(place, buf); diff --git a/src/AggregateFunctions/AggregateFunctionOrFill.h b/src/AggregateFunctions/AggregateFunctionOrFill.h index 4bb25e0d4de..732e83e5a0c 100644 --- a/src/AggregateFunctions/AggregateFunctionOrFill.h +++ b/src/AggregateFunctions/AggregateFunctionOrFill.h @@ -196,6 +196,18 @@ public: place[size_of_data] |= rhs[size_of_data]; } + void mergeBatch( + size_t batch_size, + AggregateDataPtr * places, + size_t place_offset, + const AggregateDataPtr * rhs, + Arena * arena) const override + { + nested_function->mergeBatch(batch_size, places, place_offset, rhs, arena); + for (size_t i = 0; i < batch_size; ++i) + (places[i] + place_offset)[size_of_data] |= rhs[i][size_of_data]; + } + void serialize( ConstAggregateDataPtr place, WriteBuffer & buf) const override diff --git a/src/AggregateFunctions/IAggregateFunction.h b/src/AggregateFunctions/IAggregateFunction.h index a418cdb5523..68c176d14e9 100644 --- a/src/AggregateFunctions/IAggregateFunction.h +++ b/src/AggregateFunctions/IAggregateFunction.h @@ -153,6 +153,13 @@ public: Arena * arena, ssize_t if_argument_pos = -1) const = 0; + virtual void mergeBatch( + size_t batch_size, + AggregateDataPtr * places, + size_t place_offset, + const AggregateDataPtr * rhs, + Arena * arena) const = 0; + /** The same for single place. */ virtual void addBatchSinglePlace( @@ -279,6 +286,18 @@ public: } } + void mergeBatch( + size_t batch_size, + AggregateDataPtr * places, + size_t place_offset, + const AggregateDataPtr * rhs, + Arena * arena) const override + { + for (size_t i = 0; i < batch_size; ++i) + if (places[i]) + static_cast(this)->merge(places[i] + place_offset, rhs[i], arena); + } + void addBatchSinglePlace( size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos = -1) const override { diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index 0e6a19d7b04..26babdae832 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -1769,6 +1769,8 @@ void NO_INLINE Aggregator::mergeStreamsImplCase( /// For all rows. size_t rows = block.rows(); + std::unique_ptr places(new AggregateDataPtr[rows]); + for (size_t i = 0; i < rows; ++i) { AggregateDataPtr aggregate_data = nullptr; @@ -1797,18 +1799,17 @@ void NO_INLINE Aggregator::mergeStreamsImplCase( /// aggregate_date == nullptr means that the new key did not fit in the hash table because of no_more_keys. - /// If the key does not fit, and the data does not need to be aggregated into a separate row, then there's nothing to do. - if (!aggregate_data && !overflow_row) - continue; - AggregateDataPtr value = aggregate_data ? aggregate_data : overflow_row; + places[i] = value; + } + for (size_t j = 0; j < params.aggregates_size; ++j) + { /// Merge state of aggregate functions. - for (size_t j = 0; j < params.aggregates_size; ++j) - aggregate_functions[j]->merge( - value + offsets_of_aggregate_states[j], - (*aggregate_columns[j])[i], - aggregates_pool); + aggregate_functions[j]->mergeBatch( + rows, places.get(), offsets_of_aggregate_states[j], + aggregate_columns[j]->data(), + aggregates_pool); } /// Early release memory.