From 1088bfffb1dc404da608586306715357f43b6b6a Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 29 Jul 2020 21:35:52 +0300 Subject: [PATCH] Special case for aggregation by 8bit field --- src/AggregateFunctions/IAggregateFunction.h | 43 +++++++++++++++++++-- src/Common/ColumnsHashing.h | 2 + src/Common/HashTable/FixedHashTable.h | 3 ++ src/Interpreters/Aggregator.cpp | 20 ++++++++++ 4 files changed, 65 insertions(+), 3 deletions(-) diff --git a/src/AggregateFunctions/IAggregateFunction.h b/src/AggregateFunctions/IAggregateFunction.h index 25d8580a923..a0876d457b8 100644 --- a/src/AggregateFunctions/IAggregateFunction.h +++ b/src/AggregateFunctions/IAggregateFunction.h @@ -151,7 +151,8 @@ public: virtual void addBatchSinglePlaceNotNull( size_t batch_size, AggregateDataPtr place, const IColumn ** columns, const UInt8 * null_map, Arena * arena) const = 0; - virtual void addBatchSinglePlaceFromInterval(size_t batch_begin, size_t batch_end, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const = 0; + virtual void addBatchSinglePlaceFromInterval( + size_t batch_begin, size_t batch_end, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const = 0; /** In addition to addBatch, this method collects multiple rows of arguments into array "places" * as long as they are between offsets[i-1] and offsets[i]. This is used for arrayReduce and @@ -159,7 +160,24 @@ public: * "places" contains a large number of same values consecutively. */ virtual void addBatchArray( - size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, const UInt64 * offsets, Arena * arena) const = 0; + size_t batch_size, + AggregateDataPtr * places, + size_t place_offset, + const IColumn ** columns, + const UInt64 * offsets, + Arena * arena) const = 0; + + /** The case when the aggregation key is UInt8 + * and pointers to aggregation states are stored in AggregateDataPtr[256] lookup table. + */ + virtual void addBatchLookupTable8( + size_t batch_size, + AggregateDataPtr * places, + size_t place_offset, + std::function init, + const UInt8 * key, + const IColumn ** columns, + Arena * arena) const = 0; /** By default all NULLs are skipped during aggregation. * If it returns nullptr, the default one will be used. @@ -204,6 +222,24 @@ public: static_cast(this)->add(places[i] + place_offset, columns, i, arena); } + void addBatchLookupTable8( + size_t batch_size, + AggregateDataPtr * places, + size_t place_offset, + std::function init, + const UInt8 * key, + const IColumn ** columns, + Arena * arena) const override + { + for (size_t i = 0; i < batch_size; ++i) + { + AggregateDataPtr & place = places[key[i]]; + if (unlikely(!place)) + init(place); + static_cast(this)->add(place + place_offset, columns, i, arena); + } + } + void addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const override { for (size_t i = 0; i < batch_size; ++i) @@ -218,7 +254,8 @@ public: static_cast(this)->add(place, columns, i, arena); } - void addBatchSinglePlaceFromInterval(size_t batch_begin, size_t batch_end, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const override + void addBatchSinglePlaceFromInterval( + size_t batch_begin, size_t batch_end, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const override { for (size_t i = batch_begin; i < batch_end; ++i) static_cast(this)->add(place, columns, i, arena); diff --git a/src/Common/ColumnsHashing.h b/src/Common/ColumnsHashing.h index 10d28078d58..a7fcfd4f8c0 100644 --- a/src/Common/ColumnsHashing.h +++ b/src/Common/ColumnsHashing.h @@ -64,6 +64,8 @@ struct HashMethodOneNumber /// Is used for default implementation in HashMethodBase. FieldType getKeyHolder(size_t row, Arena &) const { return unalignedLoad(vec + row * sizeof(FieldType)); } + + const FieldType * getKeyData() const { return reinterpret_cast(vec); } }; diff --git a/src/Common/HashTable/FixedHashTable.h b/src/Common/HashTable/FixedHashTable.h index ab6ec74774f..b8bd757d08b 100644 --- a/src/Common/HashTable/FixedHashTable.h +++ b/src/Common/HashTable/FixedHashTable.h @@ -468,6 +468,9 @@ public: size_t getBufferSizeInCells() const { return NUM_CELLS; } + const Cell * data() const { return buf; } + Cell * data() { return buf; } + #ifdef DBMS_HASH_MAP_COUNT_COLLISIONS size_t getCollisions() const { return 0; } #endif diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index 2a2c66341dc..7e7db42e77b 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -521,6 +521,26 @@ void NO_INLINE Aggregator::executeImplBatch( size_t rows, AggregateFunctionInstruction * aggregate_instructions) const { + if constexpr (std::is_same_v) + { + for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst) + { + inst->batch_that->addBatchLookupTable8( + rows, + reinterpret_cast(method.data.data()), + inst->state_offset, + [&](AggregateDataPtr & aggregate_data) + { + aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states); + createAggregateStates(aggregate_data); + }, + state.getKeyData(), + inst->batch_arguments, + aggregates_pool); + } + return; + } + PODArray places(rows); /// For all rows.