Merge pull request #72818 from CurtizJ/aggregate-functions-deserialize-performance

Improve performance of deserialization of aggregate functions states and format `RowBinary`
This commit is contained in:
Anton Popov 2024-12-06 11:33:10 +00:00 committed by GitHub
commit aa2a74a565
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 80 additions and 21 deletions

View File

@ -14,6 +14,7 @@
#include <Common/Exception.h>
#include <Common/ThreadPool_fwd.h>
#include <IO/ReadBuffer.h>
#include "config.h"
#include <cstddef>
@ -176,11 +177,15 @@ public:
/// Serializes state (to transmit it over the network, for example).
virtual void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> version = std::nullopt) const = 0; /// NOLINT
/// Devirtualize serialize call.
virtual void serializeBatch(const PaddedPODArray<AggregateDataPtr> & data, size_t start, size_t size, WriteBuffer & buf, std::optional<size_t> version = std::nullopt) const = 0; /// NOLINT
/// Deserializes state. This function is called only for empty (just created) states.
virtual void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> version = std::nullopt, Arena * arena = nullptr) const = 0; /// NOLINT
/// Devirtualize create and deserialize calls. Used in deserialization of ColumnAggregateFunction.
virtual void createAndDeserializeBatch(PaddedPODArray<AggregateDataPtr> & data, AggregateDataPtr __restrict place, size_t total_size_of_state, size_t limit, ReadBuffer & buf, std::optional<size_t> version, Arena * arena) const = 0;
/// Returns true if a function requires Arena to handle own states (see add(), merge(), deserialize()).
virtual bool allocatesMemoryInArena() const = 0;
@ -479,6 +484,37 @@ public:
static_cast<const Derived *>(this)->serialize(data[i], buf, version);
}
void createAndDeserializeBatch(
PaddedPODArray<AggregateDataPtr> & data,
AggregateDataPtr __restrict place,
size_t total_size_of_state,
size_t limit,
ReadBuffer & buf,
std::optional<size_t> version,
Arena * arena) const override
{
for (size_t i = 0; i < limit; ++i)
{
if (buf.eof())
break;
static_cast<const Derived *>(this)->create(place);
try
{
static_cast<const Derived *>(this)->deserialize(place, buf, version, arena);
}
catch (...)
{
static_cast<const Derived *>(this)->destroy(place);
throw;
}
data.push_back(place);
place += total_size_of_state;
}
}
void addBatchSparse(
size_t row_begin,
size_t row_end,

View File

@ -79,27 +79,11 @@ void SerializationAggregateFunction::deserializeBinaryBulk(IColumn & column, Rea
size_t size_of_state = function->sizeOfData();
size_t align_of_state = function->alignOfData();
for (size_t i = 0; i < limit; ++i)
{
if (istr.eof())
break;
/// Adjust the size of state to make all states aligned in vector.
size_t total_size_of_state = (size_of_state + align_of_state - 1) / align_of_state * align_of_state;
char * place = arena.alignedAlloc(total_size_of_state * limit, align_of_state);
AggregateDataPtr place = arena.alignedAlloc(size_of_state, align_of_state);
function->create(place);
try
{
function->deserialize(place, istr, version, &arena);
}
catch (...)
{
function->destroy(place);
throw;
}
vec.push_back(place);
}
function->createAndDeserializeBatch(vec, place, total_size_of_state, limit, istr, version, &arena);
}
static String serializeToString(const AggregateFunctionPtr & function, const IColumn & column, size_t row_num, size_t version)

View File

@ -110,7 +110,19 @@ inline void readChar(char & x, ReadBuffer & buf)
template <typename T>
inline void readPODBinary(T & x, ReadBuffer & buf)
{
buf.readStrict(reinterpret_cast<char *>(&x), sizeof(x)); /// NOLINT
static constexpr size_t size = sizeof(T); /// NOLINT
/// If the whole value fits in buffer do not call readStrict and copy with
/// __builtin_memcpy since it is faster than generic memcpy for small copies.
if (buf.position() && buf.position() + size <= buf.buffer().end()) [[likely]]
{
__builtin_memcpy(reinterpret_cast<char *>(&x), buf.position(), size);
buf.position() += size;
}
else
{
buf.readStrict(reinterpret_cast<char *>(&x), size);
}
}
inline void readUUIDBinary(UUID & x, ReadBuffer & buf)

View File

@ -0,0 +1,27 @@
<test>
<create_query>
CREATE TABLE agg_deserialize
(
t DateTime,
v1 AggregateFunction(avgState, UInt64),
v2 AggregateFunction(argMax, UInt64, DateTime)
)
ENGINE = MergeTree() ORDER BY t
</create_query>
<fill_query>
INSERT INTO agg_deserialize SELECT
now() + number AS t,
initializeAggregation('avgState', number),
initializeAggregation('argMaxState', number, t)
FROM numbers(50000000)
</fill_query>
<query>SELECT v1 FROM agg_deserialize FORMAT Null</query>
<query>SELECT toStartOfHour(t) AS h, avgMerge(v1) FROM agg_deserialize GROUP BY h FORMAT Null</query>
<query>SELECT v2 FROM agg_deserialize FORMAT Null</query>
<query>SELECT toStartOfHour(t) AS h, argMaxMerge(v2) FROM agg_deserialize GROUP BY h FORMAT Null</query>
<drop_query>DROP TABLE IF EXISTS agg_deserialize</drop_query>
</test>