From 132052fbf2404d8faa5e14fbc2cbd5f8e16bdc1f Mon Sep 17 00:00:00 2001 From: ltrk2 <107155950+ltrk2@users.noreply.github.com> Date: Thu, 29 Jun 2023 12:16:40 -0700 Subject: [PATCH 1/2] Implement partial support for endianness-independent serialization --- .../AggregateFunctionGroupArray.h | 32 ++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/src/AggregateFunctions/AggregateFunctionGroupArray.h b/src/AggregateFunctions/AggregateFunctionGroupArray.h index 7a5e6a8cb2d..b5905105457 100644 --- a/src/AggregateFunctions/AggregateFunctionGroupArray.h +++ b/src/AggregateFunctions/AggregateFunctionGroupArray.h @@ -266,19 +266,20 @@ public: void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional /* version */) const override { const auto & value = this->data(place).value; - size_t size = value.size(); + const size_t size = value.size(); writeVarUInt(size, buf); - buf.write(reinterpret_cast(value.data()), size * sizeof(value[0])); + for (const auto & element : value) + writeBinaryLittleEndian(element, buf); if constexpr (Trait::last) - DB::writeIntBinary(this->data(place).total_values, buf); + writeBinaryLittleEndian(this->data(place).total_values, buf); if constexpr (Trait::sampler == Sampler::RNG) { - DB::writeIntBinary(this->data(place).total_values, buf); + writeBinaryLittleEndian(this->data(place).total_values, buf); WriteBufferFromOwnString rng_buf; rng_buf << this->data(place).rng; - DB::writeStringBinary(rng_buf.str(), buf); + writeStringBinary(rng_buf.str(), buf); } } @@ -297,16 +298,17 @@ public: auto & value = this->data(place).value; value.resize_exact(size, arena); - buf.readStrict(reinterpret_cast(value.data()), size * sizeof(value[0])); + for (auto & element : value) + readBinaryLittleEndian(element, buf); if constexpr (Trait::last) - DB::readIntBinary(this->data(place).total_values, buf); + readBinaryLittleEndian(this->data(place).total_values, buf); if constexpr (Trait::sampler == Sampler::RNG) { - DB::readIntBinary(this->data(place).total_values, buf); + readBinaryLittleEndian(this->data(place).total_values, buf); std::string rng_string; - DB::readStringBinary(rng_string, buf); + readStringBinary(rng_string, buf); ReadBufferFromString rng_buf(rng_string); rng_buf >> this->data(place).rng; } @@ -603,14 +605,14 @@ public: node->write(buf); if constexpr (Trait::last) - DB::writeIntBinary(data(place).total_values, buf); + writeBinaryLittleEndian(data(place).total_values, buf); if constexpr (Trait::sampler == Sampler::RNG) { - DB::writeIntBinary(data(place).total_values, buf); + writeBinaryLittleEndian(data(place).total_values, buf); WriteBufferFromOwnString rng_buf; rng_buf << data(place).rng; - DB::writeStringBinary(rng_buf.str(), buf); + writeStringBinary(rng_buf.str(), buf); } } @@ -636,13 +638,13 @@ public: value[i] = Node::read(buf, arena); if constexpr (Trait::last) - DB::readIntBinary(data(place).total_values, buf); + readBinaryLittleEndian(data(place).total_values, buf); if constexpr (Trait::sampler == Sampler::RNG) { - DB::readIntBinary(data(place).total_values, buf); + readBinaryLittleEndian(data(place).total_values, buf); std::string rng_string; - DB::readStringBinary(rng_string, buf); + readStringBinary(rng_string, buf); ReadBufferFromString rng_buf(rng_string); rng_buf >> data(place).rng; } From 0527a32282f7d9153eecbb9c740be18140f741c4 Mon Sep 17 00:00:00 2001 From: ltrk2 <107155950+ltrk2@users.noreply.github.com> Date: Thu, 6 Jul 2023 11:27:38 -0700 Subject: [PATCH 2/2] Implement platform-independent serialization for further aggregate functions --- .../AggregateFunctionBoundingRatio.h | 51 +++++++++++-------- .../AggregateFunctionDeltaSum.h | 16 +++--- .../AggregateFunctionDeltaSumTimestamp.h | 24 ++++----- src/AggregateFunctions/QuantileApprox.h | 28 +++++----- src/AggregateFunctions/ReservoirSampler.h | 12 ++--- src/Common/TransformEndianness.hpp | 6 +++ 6 files changed, 76 insertions(+), 61 deletions(-) diff --git a/src/AggregateFunctions/AggregateFunctionBoundingRatio.h b/src/AggregateFunctions/AggregateFunctionBoundingRatio.h index 935adbf2b7d..82e4f1122a8 100644 --- a/src/AggregateFunctions/AggregateFunctionBoundingRatio.h +++ b/src/AggregateFunctions/AggregateFunctionBoundingRatio.h @@ -67,29 +67,38 @@ struct AggregateFunctionBoundingRatioData } } - void serialize(WriteBuffer & buf) const - { - writeBinary(empty, buf); - - if (!empty) - { - writePODBinary(left, buf); - writePODBinary(right, buf); - } - } - - void deserialize(ReadBuffer & buf) - { - readBinary(empty, buf); - - if (!empty) - { - readPODBinary(left, buf); - readPODBinary(right, buf); - } - } + void serialize(WriteBuffer & buf) const; + void deserialize(ReadBuffer & buf); }; +template +inline void transformEndianness(AggregateFunctionBoundingRatioData::Point & p) +{ + transformEndianness(p.x); + transformEndianness(p.y); +} + +void AggregateFunctionBoundingRatioData::serialize(WriteBuffer & buf) const +{ + writeBinaryLittleEndian(empty, buf); + + if (!empty) + { + writeBinaryLittleEndian(left, buf); + writeBinaryLittleEndian(right, buf); + } +} + +void AggregateFunctionBoundingRatioData::deserialize(ReadBuffer & buf) +{ + readBinaryLittleEndian(empty, buf); + + if (!empty) + { + readBinaryLittleEndian(left, buf); + readBinaryLittleEndian(right, buf); + } +} class AggregateFunctionBoundingRatio final : public IAggregateFunctionDataHelper { diff --git a/src/AggregateFunctions/AggregateFunctionDeltaSum.h b/src/AggregateFunctions/AggregateFunctionDeltaSum.h index 199d2706d3a..d64f949825a 100644 --- a/src/AggregateFunctions/AggregateFunctionDeltaSum.h +++ b/src/AggregateFunctions/AggregateFunctionDeltaSum.h @@ -103,18 +103,18 @@ public: void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional /* version */) const override { - writeIntBinary(this->data(place).sum, buf); - writeIntBinary(this->data(place).first, buf); - writeIntBinary(this->data(place).last, buf); - writePODBinary(this->data(place).seen, buf); + writeBinaryLittleEndian(this->data(place).sum, buf); + writeBinaryLittleEndian(this->data(place).first, buf); + writeBinaryLittleEndian(this->data(place).last, buf); + writeBinaryLittleEndian(this->data(place).seen, buf); } void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional /* version */, Arena *) const override { - readIntBinary(this->data(place).sum, buf); - readIntBinary(this->data(place).first, buf); - readIntBinary(this->data(place).last, buf); - readPODBinary(this->data(place).seen, buf); + readBinaryLittleEndian(this->data(place).sum, buf); + readBinaryLittleEndian(this->data(place).first, buf); + readBinaryLittleEndian(this->data(place).last, buf); + readBinaryLittleEndian(this->data(place).seen, buf); } void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override diff --git a/src/AggregateFunctions/AggregateFunctionDeltaSumTimestamp.h b/src/AggregateFunctions/AggregateFunctionDeltaSumTimestamp.h index 5ca07bb0bdf..5eeb1425afb 100644 --- a/src/AggregateFunctions/AggregateFunctionDeltaSumTimestamp.h +++ b/src/AggregateFunctions/AggregateFunctionDeltaSumTimestamp.h @@ -144,22 +144,22 @@ public: void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional /* version */) const override { - writeIntBinary(this->data(place).sum, buf); - writeIntBinary(this->data(place).first, buf); - writeIntBinary(this->data(place).first_ts, buf); - writeIntBinary(this->data(place).last, buf); - writeIntBinary(this->data(place).last_ts, buf); - writePODBinary(this->data(place).seen, buf); + writeBinaryLittleEndian(this->data(place).sum, buf); + writeBinaryLittleEndian(this->data(place).first, buf); + writeBinaryLittleEndian(this->data(place).first_ts, buf); + writeBinaryLittleEndian(this->data(place).last, buf); + writeBinaryLittleEndian(this->data(place).last_ts, buf); + writeBinaryLittleEndian(this->data(place).seen, buf); } void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional /* version */, Arena *) const override { - readIntBinary(this->data(place).sum, buf); - readIntBinary(this->data(place).first, buf); - readIntBinary(this->data(place).first_ts, buf); - readIntBinary(this->data(place).last, buf); - readIntBinary(this->data(place).last_ts, buf); - readPODBinary(this->data(place).seen, buf); + readBinaryLittleEndian(this->data(place).sum, buf); + readBinaryLittleEndian(this->data(place).first, buf); + readBinaryLittleEndian(this->data(place).first_ts, buf); + readBinaryLittleEndian(this->data(place).last, buf); + readBinaryLittleEndian(this->data(place).last_ts, buf); + readBinaryLittleEndian(this->data(place).seen, buf); } void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override diff --git a/src/AggregateFunctions/QuantileApprox.h b/src/AggregateFunctions/QuantileApprox.h index f58f1396fb4..6b2a6cf4398 100644 --- a/src/AggregateFunctions/QuantileApprox.h +++ b/src/AggregateFunctions/QuantileApprox.h @@ -233,35 +233,35 @@ public: void write(WriteBuffer & buf) const { - writeIntBinary(compress_threshold, buf); - writeFloatBinary(relative_error, buf); - writeIntBinary(count, buf); - writeIntBinary(sampled.size(), buf); + writeBinaryLittleEndian(compress_threshold, buf); + writeBinaryLittleEndian(relative_error, buf); + writeBinaryLittleEndian(count, buf); + writeBinaryLittleEndian(sampled.size(), buf); for (const auto & stats : sampled) { - writeFloatBinary(stats.value, buf); - writeIntBinary(stats.g, buf); - writeIntBinary(stats.delta, buf); + writeBinaryLittleEndian(stats.value, buf); + writeBinaryLittleEndian(stats.g, buf); + writeBinaryLittleEndian(stats.delta, buf); } } void read(ReadBuffer & buf) { - readIntBinary(compress_threshold, buf); - readFloatBinary(relative_error, buf); - readIntBinary(count, buf); + readBinaryLittleEndian(compress_threshold, buf); + readBinaryLittleEndian(relative_error, buf); + readBinaryLittleEndian(count, buf); size_t sampled_len = 0; - readIntBinary(sampled_len, buf); + readBinaryLittleEndian(sampled_len, buf); sampled.resize(sampled_len); for (size_t i = 0; i < sampled_len; ++i) { auto stats = sampled[i]; - readFloatBinary(stats.value, buf); - readIntBinary(stats.g, buf); - readIntBinary(stats.delta, buf); + readBinaryLittleEndian(stats.value, buf); + readBinaryLittleEndian(stats.g, buf); + readBinaryLittleEndian(stats.delta, buf); } } diff --git a/src/AggregateFunctions/ReservoirSampler.h b/src/AggregateFunctions/ReservoirSampler.h index 3d723d5aace..7409a3fa0dd 100644 --- a/src/AggregateFunctions/ReservoirSampler.h +++ b/src/AggregateFunctions/ReservoirSampler.h @@ -207,8 +207,8 @@ public: void read(DB::ReadBuffer & buf) { - DB::readIntBinary(sample_count, buf); - DB::readIntBinary(total_values, buf); + DB::readBinaryLittleEndian(sample_count, buf); + DB::readBinaryLittleEndian(total_values, buf); size_t size = std::min(total_values, sample_count); static constexpr size_t MAX_RESERVOIR_SIZE = 1_GiB; @@ -224,22 +224,22 @@ public: rng_buf >> rng; for (size_t i = 0; i < samples.size(); ++i) - DB::readBinary(samples[i], buf); + DB::readBinaryLittleEndian(samples[i], buf); sorted = false; } void write(DB::WriteBuffer & buf) const { - DB::writeIntBinary(sample_count, buf); - DB::writeIntBinary(total_values, buf); + DB::writeBinaryLittleEndian(sample_count, buf); + DB::writeBinaryLittleEndian(total_values, buf); DB::WriteBufferFromOwnString rng_buf; rng_buf << rng; DB::writeStringBinary(rng_buf.str(), buf); for (size_t i = 0; i < std::min(sample_count, total_values); ++i) - DB::writeBinary(samples[i], buf); + DB::writeBinaryLittleEndian(samples[i], buf); } private: diff --git a/src/Common/TransformEndianness.hpp b/src/Common/TransformEndianness.hpp index 4d690d75d9e..0a9055dde15 100644 --- a/src/Common/TransformEndianness.hpp +++ b/src/Common/TransformEndianness.hpp @@ -59,4 +59,10 @@ inline void transformEndianness(std::pair & pair) transformEndianness(pair.first); transformEndianness(pair.second); } + +template +inline void transformEndianness(StrongTypedef & x) +{ + transformEndianness(x.toUnderType()); +} }