Merge pull request #51637 from ClibMouse/feature/grouparray-aggregate-function-big-endian

Implement endianness-independent serialization
This commit is contained in:
Robert Schulze 2023-07-17 14:07:13 +02:00 committed by GitHub
commit 67e9de14de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 93 additions and 76 deletions

View File

@ -67,29 +67,38 @@ struct AggregateFunctionBoundingRatioData
}
}
void serialize(WriteBuffer & buf) const
{
writeBinary(empty, buf);
if (!empty)
{
writePODBinary(left, buf);
writePODBinary(right, buf);
}
}
void deserialize(ReadBuffer & buf)
{
readBinary(empty, buf);
if (!empty)
{
readPODBinary(left, buf);
readPODBinary(right, buf);
}
}
void serialize(WriteBuffer & buf) const;
void deserialize(ReadBuffer & buf);
};
template <std::endian endian>
inline void transformEndianness(AggregateFunctionBoundingRatioData::Point & p)
{
transformEndianness<endian>(p.x);
transformEndianness<endian>(p.y);
}
void AggregateFunctionBoundingRatioData::serialize(WriteBuffer & buf) const
{
writeBinaryLittleEndian(empty, buf);
if (!empty)
{
writeBinaryLittleEndian(left, buf);
writeBinaryLittleEndian(right, buf);
}
}
void AggregateFunctionBoundingRatioData::deserialize(ReadBuffer & buf)
{
readBinaryLittleEndian(empty, buf);
if (!empty)
{
readBinaryLittleEndian(left, buf);
readBinaryLittleEndian(right, buf);
}
}
class AggregateFunctionBoundingRatio final : public IAggregateFunctionDataHelper<AggregateFunctionBoundingRatioData, AggregateFunctionBoundingRatio>
{

View File

@ -103,18 +103,18 @@ public:
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
{
writeIntBinary(this->data(place).sum, buf);
writeIntBinary(this->data(place).first, buf);
writeIntBinary(this->data(place).last, buf);
writePODBinary<bool>(this->data(place).seen, buf);
writeBinaryLittleEndian(this->data(place).sum, buf);
writeBinaryLittleEndian(this->data(place).first, buf);
writeBinaryLittleEndian(this->data(place).last, buf);
writeBinaryLittleEndian(this->data(place).seen, buf);
}
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena *) const override
{
readIntBinary(this->data(place).sum, buf);
readIntBinary(this->data(place).first, buf);
readIntBinary(this->data(place).last, buf);
readPODBinary<bool>(this->data(place).seen, buf);
readBinaryLittleEndian(this->data(place).sum, buf);
readBinaryLittleEndian(this->data(place).first, buf);
readBinaryLittleEndian(this->data(place).last, buf);
readBinaryLittleEndian(this->data(place).seen, buf);
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override

View File

@ -144,22 +144,22 @@ public:
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
{
writeIntBinary(this->data(place).sum, buf);
writeIntBinary(this->data(place).first, buf);
writeIntBinary(this->data(place).first_ts, buf);
writeIntBinary(this->data(place).last, buf);
writeIntBinary(this->data(place).last_ts, buf);
writePODBinary<bool>(this->data(place).seen, buf);
writeBinaryLittleEndian(this->data(place).sum, buf);
writeBinaryLittleEndian(this->data(place).first, buf);
writeBinaryLittleEndian(this->data(place).first_ts, buf);
writeBinaryLittleEndian(this->data(place).last, buf);
writeBinaryLittleEndian(this->data(place).last_ts, buf);
writeBinaryLittleEndian(this->data(place).seen, buf);
}
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena *) const override
{
readIntBinary(this->data(place).sum, buf);
readIntBinary(this->data(place).first, buf);
readIntBinary(this->data(place).first_ts, buf);
readIntBinary(this->data(place).last, buf);
readIntBinary(this->data(place).last_ts, buf);
readPODBinary<bool>(this->data(place).seen, buf);
readBinaryLittleEndian(this->data(place).sum, buf);
readBinaryLittleEndian(this->data(place).first, buf);
readBinaryLittleEndian(this->data(place).first_ts, buf);
readBinaryLittleEndian(this->data(place).last, buf);
readBinaryLittleEndian(this->data(place).last_ts, buf);
readBinaryLittleEndian(this->data(place).seen, buf);
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override

View File

@ -266,19 +266,20 @@ public:
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
{
const auto & value = this->data(place).value;
size_t size = value.size();
const size_t size = value.size();
writeVarUInt(size, buf);
buf.write(reinterpret_cast<const char *>(value.data()), size * sizeof(value[0]));
for (const auto & element : value)
writeBinaryLittleEndian(element, buf);
if constexpr (Trait::last)
DB::writeIntBinary<size_t>(this->data(place).total_values, buf);
writeBinaryLittleEndian(this->data(place).total_values, buf);
if constexpr (Trait::sampler == Sampler::RNG)
{
DB::writeIntBinary<size_t>(this->data(place).total_values, buf);
writeBinaryLittleEndian(this->data(place).total_values, buf);
WriteBufferFromOwnString rng_buf;
rng_buf << this->data(place).rng;
DB::writeStringBinary(rng_buf.str(), buf);
writeStringBinary(rng_buf.str(), buf);
}
}
@ -297,16 +298,17 @@ public:
auto & value = this->data(place).value;
value.resize_exact(size, arena);
buf.readStrict(reinterpret_cast<char *>(value.data()), size * sizeof(value[0]));
for (auto & element : value)
readBinaryLittleEndian(element, buf);
if constexpr (Trait::last)
DB::readIntBinary<size_t>(this->data(place).total_values, buf);
readBinaryLittleEndian(this->data(place).total_values, buf);
if constexpr (Trait::sampler == Sampler::RNG)
{
DB::readIntBinary<size_t>(this->data(place).total_values, buf);
readBinaryLittleEndian(this->data(place).total_values, buf);
std::string rng_string;
DB::readStringBinary(rng_string, buf);
readStringBinary(rng_string, buf);
ReadBufferFromString rng_buf(rng_string);
rng_buf >> this->data(place).rng;
}
@ -603,14 +605,14 @@ public:
node->write(buf);
if constexpr (Trait::last)
DB::writeIntBinary<size_t>(data(place).total_values, buf);
writeBinaryLittleEndian(data(place).total_values, buf);
if constexpr (Trait::sampler == Sampler::RNG)
{
DB::writeIntBinary<size_t>(data(place).total_values, buf);
writeBinaryLittleEndian(data(place).total_values, buf);
WriteBufferFromOwnString rng_buf;
rng_buf << data(place).rng;
DB::writeStringBinary(rng_buf.str(), buf);
writeStringBinary(rng_buf.str(), buf);
}
}
@ -636,13 +638,13 @@ public:
value[i] = Node::read(buf, arena);
if constexpr (Trait::last)
DB::readIntBinary<size_t>(data(place).total_values, buf);
readBinaryLittleEndian(data(place).total_values, buf);
if constexpr (Trait::sampler == Sampler::RNG)
{
DB::readIntBinary<size_t>(data(place).total_values, buf);
readBinaryLittleEndian(data(place).total_values, buf);
std::string rng_string;
DB::readStringBinary(rng_string, buf);
readStringBinary(rng_string, buf);
ReadBufferFromString rng_buf(rng_string);
rng_buf >> data(place).rng;
}

View File

@ -233,35 +233,35 @@ public:
void write(WriteBuffer & buf) const
{
writeIntBinary<size_t>(compress_threshold, buf);
writeFloatBinary<double>(relative_error, buf);
writeIntBinary<size_t>(count, buf);
writeIntBinary<size_t>(sampled.size(), buf);
writeBinaryLittleEndian(compress_threshold, buf);
writeBinaryLittleEndian(relative_error, buf);
writeBinaryLittleEndian(count, buf);
writeBinaryLittleEndian(sampled.size(), buf);
for (const auto & stats : sampled)
{
writeFloatBinary<T>(stats.value, buf);
writeIntBinary<Int64>(stats.g, buf);
writeIntBinary<Int64>(stats.delta, buf);
writeBinaryLittleEndian(stats.value, buf);
writeBinaryLittleEndian(stats.g, buf);
writeBinaryLittleEndian(stats.delta, buf);
}
}
void read(ReadBuffer & buf)
{
readIntBinary<size_t>(compress_threshold, buf);
readFloatBinary<double>(relative_error, buf);
readIntBinary<size_t>(count, buf);
readBinaryLittleEndian(compress_threshold, buf);
readBinaryLittleEndian(relative_error, buf);
readBinaryLittleEndian(count, buf);
size_t sampled_len = 0;
readIntBinary<size_t>(sampled_len, buf);
readBinaryLittleEndian(sampled_len, buf);
sampled.resize(sampled_len);
for (size_t i = 0; i < sampled_len; ++i)
{
auto stats = sampled[i];
readFloatBinary<T>(stats.value, buf);
readIntBinary<Int64>(stats.g, buf);
readIntBinary<Int64>(stats.delta, buf);
readBinaryLittleEndian(stats.value, buf);
readBinaryLittleEndian(stats.g, buf);
readBinaryLittleEndian(stats.delta, buf);
}
}

View File

@ -207,8 +207,8 @@ public:
void read(DB::ReadBuffer & buf)
{
DB::readIntBinary<size_t>(sample_count, buf);
DB::readIntBinary<size_t>(total_values, buf);
DB::readBinaryLittleEndian(sample_count, buf);
DB::readBinaryLittleEndian(total_values, buf);
size_t size = std::min(total_values, sample_count);
static constexpr size_t MAX_RESERVOIR_SIZE = 1_GiB;
@ -224,22 +224,22 @@ public:
rng_buf >> rng;
for (size_t i = 0; i < samples.size(); ++i)
DB::readBinary(samples[i], buf);
DB::readBinaryLittleEndian(samples[i], buf);
sorted = false;
}
void write(DB::WriteBuffer & buf) const
{
DB::writeIntBinary<size_t>(sample_count, buf);
DB::writeIntBinary<size_t>(total_values, buf);
DB::writeBinaryLittleEndian(sample_count, buf);
DB::writeBinaryLittleEndian(total_values, buf);
DB::WriteBufferFromOwnString rng_buf;
rng_buf << rng;
DB::writeStringBinary(rng_buf.str(), buf);
for (size_t i = 0; i < std::min(sample_count, total_values); ++i)
DB::writeBinary(samples[i], buf);
DB::writeBinaryLittleEndian(samples[i], buf);
}
private:

View File

@ -59,4 +59,10 @@ inline void transformEndianness(std::pair<A, B> & pair)
transformEndianness<endian>(pair.first);
transformEndianness<endian>(pair.second);
}
template <std::endian endian, typename T, typename Tag>
inline void transformEndianness(StrongTypedef<T, Tag> & x)
{
transformEndianness<endian>(x.toUnderType());
}
}