diff --git a/src/AggregateFunctions/IAggregateFunction.h b/src/AggregateFunctions/IAggregateFunction.h index 66330465a39..86e72dd09b0 100644 --- a/src/AggregateFunctions/IAggregateFunction.h +++ b/src/AggregateFunctions/IAggregateFunction.h @@ -14,6 +14,7 @@ #include #include +#include #include "config.h" #include @@ -176,11 +177,15 @@ public: /// Serializes state (to transmit it over the network, for example). virtual void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional version = std::nullopt) const = 0; /// NOLINT + /// Devirtualize serialize call. virtual void serializeBatch(const PaddedPODArray & data, size_t start, size_t size, WriteBuffer & buf, std::optional version = std::nullopt) const = 0; /// NOLINT /// Deserializes state. This function is called only for empty (just created) states. virtual void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional version = std::nullopt, Arena * arena = nullptr) const = 0; /// NOLINT + /// Devirtualize create and deserialize calls. Used in deserialization of ColumnAggregateFunction. + virtual void createAndDeserializeBatch(PaddedPODArray & data, AggregateDataPtr __restrict place, size_t total_size_of_state, size_t limit, ReadBuffer & buf, std::optional version, Arena * arena) const = 0; + /// Returns true if a function requires Arena to handle own states (see add(), merge(), deserialize()). virtual bool allocatesMemoryInArena() const = 0; @@ -479,6 +484,37 @@ public: static_cast(this)->serialize(data[i], buf, version); } + void createAndDeserializeBatch( + PaddedPODArray & data, + AggregateDataPtr __restrict place, + size_t total_size_of_state, + size_t limit, + ReadBuffer & buf, + std::optional version, + Arena * arena) const override + { + for (size_t i = 0; i < limit; ++i) + { + if (buf.eof()) + break; + + static_cast(this)->create(place); + + try + { + static_cast(this)->deserialize(place, buf, version, arena); + } + catch (...) + { + static_cast(this)->destroy(place); + throw; + } + + data.push_back(place); + place += total_size_of_state; + } + } + void addBatchSparse( size_t row_begin, size_t row_end, diff --git a/src/DataTypes/Serializations/SerializationAggregateFunction.cpp b/src/DataTypes/Serializations/SerializationAggregateFunction.cpp index 04e61558852..f63094ef13f 100644 --- a/src/DataTypes/Serializations/SerializationAggregateFunction.cpp +++ b/src/DataTypes/Serializations/SerializationAggregateFunction.cpp @@ -79,27 +79,11 @@ void SerializationAggregateFunction::deserializeBinaryBulk(IColumn & column, Rea size_t size_of_state = function->sizeOfData(); size_t align_of_state = function->alignOfData(); - for (size_t i = 0; i < limit; ++i) - { - if (istr.eof()) - break; + /// Adjust the size of state to make all states aligned in vector. + size_t total_size_of_state = (size_of_state + align_of_state - 1) / align_of_state * align_of_state; + char * place = arena.alignedAlloc(total_size_of_state * limit, align_of_state); - AggregateDataPtr place = arena.alignedAlloc(size_of_state, align_of_state); - - function->create(place); - - try - { - function->deserialize(place, istr, version, &arena); - } - catch (...) - { - function->destroy(place); - throw; - } - - vec.push_back(place); - } + function->createAndDeserializeBatch(vec, place, total_size_of_state, limit, istr, version, &arena); } static String serializeToString(const AggregateFunctionPtr & function, const IColumn & column, size_t row_num, size_t version) diff --git a/src/IO/ReadHelpers.h b/src/IO/ReadHelpers.h index 867c985934b..4d54a1f0618 100644 --- a/src/IO/ReadHelpers.h +++ b/src/IO/ReadHelpers.h @@ -110,7 +110,19 @@ inline void readChar(char & x, ReadBuffer & buf) template inline void readPODBinary(T & x, ReadBuffer & buf) { - buf.readStrict(reinterpret_cast(&x), sizeof(x)); /// NOLINT + static constexpr size_t size = sizeof(T); /// NOLINT + + /// If the whole value fits in buffer do not call readStrict and copy with + /// __builtin_memcpy since it is faster than generic memcpy for small copies. + if (buf.position() && buf.position() + size <= buf.buffer().end()) [[likely]] + { + __builtin_memcpy(reinterpret_cast(&x), buf.position(), size); + buf.position() += size; + } + else + { + buf.readStrict(reinterpret_cast(&x), size); + } } inline void readUUIDBinary(UUID & x, ReadBuffer & buf) diff --git a/tests/performance/aggregate_functions_deserialization.xml b/tests/performance/aggregate_functions_deserialization.xml new file mode 100644 index 00000000000..218c19e4e49 --- /dev/null +++ b/tests/performance/aggregate_functions_deserialization.xml @@ -0,0 +1,27 @@ + + + CREATE TABLE agg_deserialize + ( + t DateTime, + v1 AggregateFunction(avgState, UInt64), + v2 AggregateFunction(argMax, UInt64, DateTime) + ) + ENGINE = MergeTree() ORDER BY t + + + + INSERT INTO agg_deserialize SELECT + now() + number AS t, + initializeAggregation('avgState', number), + initializeAggregation('argMaxState', number, t) + FROM numbers(50000000) + + + SELECT v1 FROM agg_deserialize FORMAT Null + SELECT toStartOfHour(t) AS h, avgMerge(v1) FROM agg_deserialize GROUP BY h FORMAT Null + + SELECT v2 FROM agg_deserialize FORMAT Null + SELECT toStartOfHour(t) AS h, argMaxMerge(v2) FROM agg_deserialize GROUP BY h FORMAT Null + + DROP TABLE IF EXISTS agg_deserialize +