Merge pull request #50405 from ClibMouse/feature/reservoir-sampler-big-endian-support

Implement big-endian support for the deterministic reservoir sampler
This commit is contained in:
Robert Schulze 2023-06-13 09:55:23 +02:00 committed by GitHub
commit 8358d29ac7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 75 additions and 48 deletions

View File

@ -74,6 +74,7 @@ ConstructorInitializerIndentWidth: 4
ContinuationIndentWidth: 4 ContinuationIndentWidth: 4
DerivePointerAlignment: false DerivePointerAlignment: false
DisableFormat: false DisableFormat: false
IndentRequiresClause: false
IndentWidth: 4 IndentWidth: 4
IndentWrappedFunctionNames: false IndentWrappedFunctionNames: false
MacroBlockBegin: '' MacroBlockBegin: ''

View File

@ -157,8 +157,8 @@ public:
void read(DB::ReadBuffer & buf) void read(DB::ReadBuffer & buf)
{ {
size_t size = 0; size_t size = 0;
DB::readIntBinary<size_t>(size, buf); readBinaryLittleEndian(size, buf);
DB::readIntBinary<size_t>(total_values, buf); readBinaryLittleEndian(total_values, buf);
/// Compatibility with old versions. /// Compatibility with old versions.
if (size > total_values) if (size > total_values)
@ -171,16 +171,16 @@ public:
samples.resize(size); samples.resize(size);
for (size_t i = 0; i < size; ++i) for (size_t i = 0; i < size; ++i)
DB::readPODBinary(samples[i], buf); readBinaryLittleEndian(samples[i], buf);
sorted = false; sorted = false;
} }
void write(DB::WriteBuffer & buf) const void write(DB::WriteBuffer & buf) const
{ {
size_t size = samples.size(); const size_t size = samples.size();
DB::writeIntBinary<size_t>(size, buf); writeBinaryLittleEndian(size, buf);
DB::writeIntBinary<size_t>(total_values, buf); writeBinaryLittleEndian(total_values, buf);
for (size_t i = 0; i < size; ++i) for (size_t i = 0; i < size; ++i)
{ {
@ -190,12 +190,12 @@ public:
/// Here we ensure that padding is zero without changing the protocol. /// Here we ensure that padding is zero without changing the protocol.
/// TODO: After implementation of "versioning aggregate function state", /// TODO: After implementation of "versioning aggregate function state",
/// change the serialization format. /// change the serialization format.
Element elem; Element elem;
memset(&elem, 0, sizeof(elem)); memset(&elem, 0, sizeof(elem));
elem = samples[i]; elem = samples[i];
DB::writePODBinary(elem, buf); DB::transformEndianness<std::endian::little>(elem);
DB::writeString(reinterpret_cast<const char*>(&elem), sizeof(elem), buf);
} }
} }

View File

@ -0,0 +1,62 @@
#pragma once
#include <base/Decimal_fwd.h>
#include <base/extended_types.h>
#include <utility>
namespace DB
{
template <std::endian endian, typename T>
requires std::is_integral_v<T>
inline void transformEndianness(T & value)
{
if constexpr (endian != std::endian::native)
value = std::byteswap(value);
}
template <std::endian endian, typename T>
requires is_big_int_v<T>
inline void transformEndianness(T & x)
{
if constexpr (std::endian::native != endian)
{
auto & items = x.items;
std::transform(std::begin(items), std::end(items), std::begin(items), [](auto & item) { return std::byteswap(item); });
std::reverse(std::begin(items), std::end(items));
}
}
template <std::endian endian, typename T>
requires is_decimal<T>
inline void transformEndianness(T & x)
{
transformEndianness<endian>(x.value);
}
template <std::endian endian, typename T>
requires std::is_floating_point_v<T>
inline void transformEndianness(T & value)
{
if constexpr (std::endian::native != endian)
{
auto * start = reinterpret_cast<std::byte *>(&value);
std::reverse(start, start + sizeof(T));
}
}
template <std::endian endian, typename T>
requires std::is_scoped_enum_v<T>
inline void transformEndianness(T & x)
{
using UnderlyingType = std::underlying_type_t<T>;
transformEndianness<endian>(reinterpret_cast<UnderlyingType &>(x));
}
template <std::endian endian, typename A, typename B>
inline void transformEndianness(std::pair<A, B> & pair)
{
transformEndianness<endian>(pair.first);
transformEndianness<endian>(pair.second);
}
}

View File

@ -16,6 +16,7 @@
#include <Common/DateLUT.h> #include <Common/DateLUT.h>
#include <Common/LocalDate.h> #include <Common/LocalDate.h>
#include <Common/LocalDateTime.h> #include <Common/LocalDateTime.h>
#include <Common/TransformEndianness.hpp>
#include <base/StringRef.h> #include <base/StringRef.h>
#include <base/arithmeticOverflow.h> #include <base/arithmeticOverflow.h>
#include <base/sort.h> #include <base/sort.h>
@ -1092,30 +1093,11 @@ inline void readBinary(Decimal128 & x, ReadBuffer & buf) { readPODBinary(x, buf)
inline void readBinary(Decimal256 & x, ReadBuffer & buf) { readPODBinary(x.value, buf); } inline void readBinary(Decimal256 & x, ReadBuffer & buf) { readPODBinary(x.value, buf); }
inline void readBinary(LocalDate & x, ReadBuffer & buf) { readPODBinary(x, buf); } inline void readBinary(LocalDate & x, ReadBuffer & buf) { readPODBinary(x, buf); }
template <std::endian endian, typename T> template <std::endian endian, typename T>
requires is_arithmetic_v<T> && (sizeof(T) <= 8)
inline void readBinaryEndian(T & x, ReadBuffer & buf) inline void readBinaryEndian(T & x, ReadBuffer & buf)
{ {
readPODBinary(x, buf); readPODBinary(x, buf);
if constexpr (std::endian::native != endian) transformEndianness<endian>(x);
x = std::byteswap(x);
}
template <std::endian endian, typename T>
requires is_big_int_v<T>
inline void readBinaryEndian(T & x, ReadBuffer & buf)
{
if constexpr (std::endian::native == endian)
{
for (size_t i = 0; i != std::size(x.items); ++i)
readBinaryEndian<endian>(x.items[i], buf);
}
else
{
for (size_t i = 0; i != std::size(x.items); ++i)
readBinaryEndian<endian>(x.items[std::size(x.items) - i - 1], buf);
}
} }
template <typename T> template <typename T>

View File

@ -13,6 +13,7 @@
#include <Common/DateLUT.h> #include <Common/DateLUT.h>
#include <Common/LocalDate.h> #include <Common/LocalDate.h>
#include <Common/LocalDateTime.h> #include <Common/LocalDateTime.h>
#include <Common/TransformEndianness.hpp>
#include <base/find_symbols.h> #include <base/find_symbols.h>
#include <base/StringRef.h> #include <base/StringRef.h>
#include <base/DecomposedFloat.h> #include <base/DecomposedFloat.h>
@ -1174,32 +1175,13 @@ inline void writeNullTerminatedString(const String & s, WriteBuffer & buffer)
buffer.write(s.c_str(), s.size() + 1); buffer.write(s.c_str(), s.size() + 1);
} }
template <std::endian endian, typename T> template <std::endian endian, typename T>
requires is_arithmetic_v<T> && (sizeof(T) <= 8)
inline void writeBinaryEndian(T x, WriteBuffer & buf) inline void writeBinaryEndian(T x, WriteBuffer & buf)
{ {
if constexpr (std::endian::native != endian) transformEndianness<endian>(x);
x = std::byteswap(x);
writePODBinary(x, buf); writePODBinary(x, buf);
} }
template <std::endian endian, typename T>
requires is_big_int_v<T>
inline void writeBinaryEndian(const T & x, WriteBuffer & buf)
{
if constexpr (std::endian::native == endian)
{
for (size_t i = 0; i != std::size(x.items); ++i)
writeBinaryEndian<endian>(x.items[i], buf);
}
else
{
for (size_t i = 0; i != std::size(x.items); ++i)
writeBinaryEndian<endian>(x.items[std::size(x.items) - i - 1], buf);
}
}
template <typename T> template <typename T>
inline void writeBinaryLittleEndian(T x, WriteBuffer & buf) inline void writeBinaryLittleEndian(T x, WriteBuffer & buf)
{ {