mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Implement endianness-independent SipHash and MergeTree checksum serialization
This commit is contained in:
parent
7f64787bec
commit
51e2c58a53
@ -365,17 +365,14 @@ static void transformFixedString(const UInt8 * src, UInt8 * dst, size_t size, UI
|
||||
hash.update(seed);
|
||||
hash.update(i);
|
||||
|
||||
const auto checksum = getSipHash128AsArray(hash);
|
||||
if (size >= 16)
|
||||
{
|
||||
char * hash_dst = reinterpret_cast<char *>(std::min(pos, end - 16));
|
||||
hash.get128(hash_dst);
|
||||
auto * hash_dst = std::min(pos, end - 16);
|
||||
memcpy(hash_dst, checksum.data(), checksum.size());
|
||||
}
|
||||
else
|
||||
{
|
||||
char value[16];
|
||||
hash.get128(value);
|
||||
memcpy(dst, value, end - dst);
|
||||
}
|
||||
memcpy(dst, checksum.data(), end - dst);
|
||||
|
||||
pos += 16;
|
||||
++i;
|
||||
@ -401,7 +398,7 @@ static void transformUUID(const UUID & src_uuid, UUID & dst_uuid, UInt64 seed)
|
||||
hash.update(reinterpret_cast<const char *>(&src), sizeof(UUID));
|
||||
|
||||
/// Saving version and variant from an old UUID
|
||||
hash.get128(reinterpret_cast<char *>(&dst));
|
||||
// hash.get128Impl(reinterpret_cast<char *>(&dst));
|
||||
|
||||
dst.items[1] = (dst.items[1] & 0x1fffffffffffffffull) | (src.items[1] & 0xe000000000000000ull);
|
||||
dst.items[0] = (dst.items[0] & 0xffffffffffff0fffull) | (src.items[0] & 0x000000000000f000ull);
|
||||
|
@ -298,10 +298,9 @@ struct Adder
|
||||
{
|
||||
StringRef value = column.getDataAt(row_num);
|
||||
|
||||
UInt128 key;
|
||||
SipHash hash;
|
||||
hash.update(value.data, value.size);
|
||||
hash.get128(key);
|
||||
const auto key = hash.get128();
|
||||
|
||||
data.set.template insert<const UInt128 &, use_single_level_hash_table>(key);
|
||||
}
|
||||
|
@ -107,9 +107,7 @@ struct UniqVariadicHash<true, false>
|
||||
++column;
|
||||
}
|
||||
|
||||
UInt128 key;
|
||||
hash.get128(key);
|
||||
return key;
|
||||
return hash.get128();
|
||||
}
|
||||
};
|
||||
|
||||
@ -131,9 +129,7 @@ struct UniqVariadicHash<true, true>
|
||||
++column;
|
||||
}
|
||||
|
||||
UInt128 key;
|
||||
hash.get128(key);
|
||||
return key;
|
||||
return hash.get128();
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -229,10 +229,7 @@ IQueryTreeNode::Hash IQueryTreeNode::getTreeHash() const
|
||||
}
|
||||
}
|
||||
|
||||
Hash result;
|
||||
hash_state.get128(result);
|
||||
|
||||
return result;
|
||||
return getSipHash128AsLoHi(hash_state);
|
||||
}
|
||||
|
||||
QueryTreeNodePtr IQueryTreeNode::clone() const
|
||||
|
@ -521,8 +521,7 @@ void QueryFuzzer::fuzzCreateQuery(ASTCreateQuery & create)
|
||||
if (create.storage)
|
||||
create.storage->updateTreeHash(sip_hash);
|
||||
|
||||
IAST::Hash hash;
|
||||
sip_hash.get128(hash);
|
||||
const auto hash = getSipHash128AsLoHi(sip_hash);
|
||||
|
||||
/// Save only tables with unique definition.
|
||||
if (created_tables_hashes.insert(hash).second)
|
||||
|
@ -670,8 +670,9 @@ UInt128 ColumnUnique<ColumnType>::IncrementalHash::getHash(const ColumnType & co
|
||||
for (size_t i = 0; i < column_size; ++i)
|
||||
column.updateHashWithValue(i, sip_hash);
|
||||
|
||||
hash = sip_hash.get128();
|
||||
|
||||
std::lock_guard lock(mutex);
|
||||
sip_hash.get128(hash);
|
||||
cur_hash = hash;
|
||||
num_added_rows.store(column_size);
|
||||
}
|
||||
|
@ -13,6 +13,8 @@
|
||||
* (~ 700 MB/sec, 15 million strings per second)
|
||||
*/
|
||||
|
||||
#include "TransformEndianness.hpp"
|
||||
|
||||
#include <bit>
|
||||
#include <string>
|
||||
#include <type_traits>
|
||||
@ -22,14 +24,10 @@
|
||||
#include <base/unaligned.h>
|
||||
#include <Common/Exception.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
namespace DB::ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
#define SIPROUND \
|
||||
do \
|
||||
@ -90,6 +88,20 @@ private:
|
||||
SIPROUND;
|
||||
}
|
||||
|
||||
/// @brief Retrieves the result in some form with the endianness of the platform taken into account.
|
||||
/// @warning This can only be done once!
|
||||
void get128Impl(char * out)
|
||||
{
|
||||
finalize();
|
||||
#if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
|
||||
unalignedStore<UInt64>(out + 8, v0 ^ v1);
|
||||
unalignedStore<UInt64>(out, v2 ^ v3);
|
||||
#else
|
||||
unalignedStore<UInt64>(out, v0 ^ v1);
|
||||
unalignedStore<UInt64>(out + 8, v2 ^ v3);
|
||||
#endif
|
||||
}
|
||||
|
||||
public:
|
||||
/// Arguments - seed.
|
||||
SipHash(UInt64 key0 = 0, UInt64 key1 = 0, bool is_reference_128_ = false) /// NOLINT
|
||||
@ -161,60 +173,26 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
template <typename Transform = void, typename T>
|
||||
ALWAYS_INLINE void update(const T & x)
|
||||
{
|
||||
if constexpr (std::endian::native == std::endian::big)
|
||||
{
|
||||
T rev_x = x;
|
||||
char *start = reinterpret_cast<char *>(&rev_x);
|
||||
char *end = start + sizeof(T);
|
||||
std::reverse(start, end);
|
||||
update(reinterpret_cast<const char *>(&rev_x), sizeof(rev_x)); /// NOLINT
|
||||
auto transformed_x = x;
|
||||
if constexpr (!std::is_same_v<Transform, void>)
|
||||
transformed_x = Transform()(x);
|
||||
else
|
||||
DB::transformEndianness<std::endian::little>(transformed_x);
|
||||
|
||||
update(reinterpret_cast<const char *>(&transformed_x), sizeof(transformed_x)); /// NOLINT
|
||||
}
|
||||
else
|
||||
update(reinterpret_cast<const char *>(&x), sizeof(x)); /// NOLINT
|
||||
}
|
||||
|
||||
ALWAYS_INLINE void update(const std::string & x)
|
||||
{
|
||||
update(x.data(), x.length());
|
||||
}
|
||||
|
||||
ALWAYS_INLINE void update(const std::string_view x)
|
||||
{
|
||||
update(x.data(), x.size());
|
||||
}
|
||||
|
||||
/// Get the result in some form. This can only be done once!
|
||||
|
||||
ALWAYS_INLINE void get128(char * out)
|
||||
{
|
||||
finalize();
|
||||
#if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
|
||||
unalignedStore<UInt64>(out + 8, v0 ^ v1);
|
||||
unalignedStore<UInt64>(out, v2 ^ v3);
|
||||
#else
|
||||
unalignedStore<UInt64>(out, v0 ^ v1);
|
||||
unalignedStore<UInt64>(out + 8, v2 ^ v3);
|
||||
#endif
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
ALWAYS_INLINE void get128(T & lo, T & hi)
|
||||
{
|
||||
static_assert(sizeof(T) == 8);
|
||||
finalize();
|
||||
lo = v0 ^ v1;
|
||||
hi = v2 ^ v3;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
ALWAYS_INLINE void get128(T & dst)
|
||||
{
|
||||
static_assert(sizeof(T) == 16);
|
||||
get128(reinterpret_cast<char *>(&dst));
|
||||
}
|
||||
ALWAYS_INLINE void update(const std::string & x) { update(x.data(), x.length()); }
|
||||
ALWAYS_INLINE void update(const std::string_view x) { update(x.data(), x.size()); }
|
||||
ALWAYS_INLINE void update(const char * s) { update(std::string_view(s)); }
|
||||
|
||||
UInt64 get64()
|
||||
{
|
||||
@ -222,10 +200,23 @@ public:
|
||||
return v0 ^ v1 ^ v2 ^ v3;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
requires (sizeof(T) == 8)
|
||||
ALWAYS_INLINE void get128(T & lo, T & hi)
|
||||
{
|
||||
finalize();
|
||||
lo = v0 ^ v1;
|
||||
hi = v2 ^ v3;
|
||||
}
|
||||
|
||||
UInt128 get128()
|
||||
{
|
||||
UInt128 res;
|
||||
get128(res);
|
||||
#if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
|
||||
get128(res.items[1], res.items[0]);
|
||||
#else
|
||||
get128(res.items[0], res.items[1]);
|
||||
#endif
|
||||
return res;
|
||||
}
|
||||
|
||||
@ -247,9 +238,7 @@ public:
|
||||
{
|
||||
lo = std::byteswap(lo);
|
||||
hi = std::byteswap(hi);
|
||||
auto tmp = hi;
|
||||
hi = lo;
|
||||
lo = tmp;
|
||||
std::swap(lo, hi);
|
||||
}
|
||||
|
||||
UInt128 res = hi;
|
||||
@ -265,11 +254,18 @@ public:
|
||||
|
||||
#include <cstddef>
|
||||
|
||||
inline void sipHash128(const char * data, const size_t size, char * out)
|
||||
inline std::array<char, 16> getSipHash128AsArray(SipHash & sip_hash)
|
||||
{
|
||||
SipHash hash;
|
||||
hash.update(data, size);
|
||||
hash.get128(out);
|
||||
std::array<char, 16> arr;
|
||||
*reinterpret_cast<UInt128*>(arr.data()) = sip_hash.get128();
|
||||
return arr;
|
||||
}
|
||||
|
||||
inline std::pair<UInt64, UInt64> getSipHash128AsLoHi(SipHash & sip_hash)
|
||||
{
|
||||
std::pair<UInt64, UInt64> lo_hi;
|
||||
sip_hash.get128(lo_hi.first, lo_hi.second);
|
||||
return lo_hi;
|
||||
}
|
||||
|
||||
inline UInt128 sipHash128Keyed(UInt64 key0, UInt64 key1, const char * data, const size_t size)
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
#include <base/Decimal_fwd.h>
|
||||
#include <base/extended_types.h>
|
||||
#include <base/strong_typedef.h>
|
||||
|
||||
#include <utility>
|
||||
|
||||
@ -46,7 +47,7 @@ inline void transformEndianness(T & value)
|
||||
}
|
||||
|
||||
template <std::endian endian, typename T>
|
||||
requires std::is_scoped_enum_v<T>
|
||||
requires std::is_enum_v<T> || std::is_scoped_enum_v<T>
|
||||
inline void transformEndianness(T & x)
|
||||
{
|
||||
using UnderlyingType = std::underlying_type_t<T>;
|
||||
|
@ -94,7 +94,8 @@ int main(int, char **)
|
||||
{
|
||||
SipHash hash;
|
||||
hash.update(strings[i].data(), strings[i].size());
|
||||
hash.get128(&hashes[i * 16]);
|
||||
const auto hashed_value = getSipHash128AsArray(hash);
|
||||
memcpy(&hashes[i * 16], hashed_value.data(), hashed_value.size());
|
||||
}
|
||||
|
||||
watch.stop();
|
||||
|
@ -37,8 +37,7 @@ SipHash getHashOfLoadedBinary()
|
||||
std::string getHashOfLoadedBinaryHex()
|
||||
{
|
||||
SipHash hash = getHashOfLoadedBinary();
|
||||
UInt128 checksum;
|
||||
hash.get128(checksum);
|
||||
const auto checksum = hash.get128();
|
||||
return getHexUIntUppercase(checksum);
|
||||
}
|
||||
|
||||
|
@ -39,7 +39,7 @@ DB::UInt64 randomSeed()
|
||||
#if defined(__linux__)
|
||||
struct utsname sysinfo;
|
||||
if (uname(&sysinfo) == 0)
|
||||
hash.update(sysinfo);
|
||||
hash.update<std::identity>(sysinfo);
|
||||
#endif
|
||||
|
||||
return hash.get64();
|
||||
|
@ -65,9 +65,7 @@ UInt128 PathInData::getPartsHash(const Parts::const_iterator & begin, const Part
|
||||
hash.update(part_it->anonymous_array_level);
|
||||
}
|
||||
|
||||
UInt128 res;
|
||||
hash.get128(res);
|
||||
return res;
|
||||
return hash.get128();
|
||||
}
|
||||
|
||||
void PathInData::buildPath(const Parts & other_parts)
|
||||
|
@ -635,9 +635,7 @@ UInt128 sipHash128(Polygon && polygon)
|
||||
for (auto & inner : inners)
|
||||
hash_ring(inner);
|
||||
|
||||
UInt128 res;
|
||||
hash.get128(res);
|
||||
return res;
|
||||
return hash.get128();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -268,10 +268,9 @@ void FunctionArrayDistinct::executeHashed(
|
||||
if (nullable_col && (*src_null_map)[j])
|
||||
continue;
|
||||
|
||||
UInt128 hash;
|
||||
SipHash hash_function;
|
||||
src_data.updateHashWithValue(j, hash_function);
|
||||
hash_function.get128(hash);
|
||||
const auto hash = hash_function.get128();
|
||||
|
||||
if (!set.find(hash))
|
||||
{
|
||||
|
@ -134,18 +134,14 @@ private:
|
||||
/// Hash a set of keys into a UInt128 value.
|
||||
static inline UInt128 ALWAYS_INLINE hash128depths(const std::vector<size_t> & indices, const ColumnRawPtrs & key_columns)
|
||||
{
|
||||
UInt128 key;
|
||||
SipHash hash;
|
||||
|
||||
for (size_t j = 0, keys_size = key_columns.size(); j < keys_size; ++j)
|
||||
{
|
||||
// Debug: const auto & field = (*key_columns[j])[indices[j]]; DUMP(j, indices[j], field);
|
||||
key_columns[j]->updateHashWithValue(indices[j], hash);
|
||||
}
|
||||
|
||||
hash.get128(key);
|
||||
|
||||
return key;
|
||||
return hash.get128();
|
||||
}
|
||||
|
||||
|
||||
|
@ -33,15 +33,12 @@ public:
|
||||
/// Calculate key from path to file and offset.
|
||||
static UInt128 hash(const String & path_to_file, size_t offset, ssize_t length = -1)
|
||||
{
|
||||
UInt128 key;
|
||||
|
||||
SipHash hash;
|
||||
hash.update(path_to_file.data(), path_to_file.size() + 1);
|
||||
hash.update(offset);
|
||||
hash.update(length);
|
||||
|
||||
hash.get128(key);
|
||||
return key;
|
||||
return hash.get128();
|
||||
}
|
||||
|
||||
template <typename LoadFunc>
|
||||
|
@ -51,14 +51,11 @@ public:
|
||||
/// Calculate key from path to file and offset.
|
||||
static UInt128 hash(const String & path_to_file, size_t offset)
|
||||
{
|
||||
UInt128 key;
|
||||
|
||||
SipHash hash;
|
||||
hash.update(path_to_file.data(), path_to_file.size() + 1);
|
||||
hash.update(offset);
|
||||
hash.get128(key);
|
||||
|
||||
return key;
|
||||
return hash.get128();
|
||||
}
|
||||
|
||||
template <typename LoadFunc>
|
||||
|
@ -253,15 +253,11 @@ static inline T ALWAYS_INLINE packFixed(
|
||||
static inline UInt128 ALWAYS_INLINE hash128( /// NOLINT
|
||||
size_t i, size_t keys_size, const ColumnRawPtrs & key_columns)
|
||||
{
|
||||
UInt128 key;
|
||||
SipHash hash;
|
||||
|
||||
for (size_t j = 0; j < keys_size; ++j)
|
||||
key_columns[j]->updateHashWithValue(i, hash);
|
||||
|
||||
hash.get128(key);
|
||||
|
||||
return key;
|
||||
return hash.get128();
|
||||
}
|
||||
|
||||
/** Serialize keys into a continuous chunk of memory.
|
||||
|
@ -105,9 +105,7 @@ UInt128 AsynchronousInsertQueue::InsertQuery::calculateHash() const
|
||||
applyVisitor(FieldVisitorHash(siphash), setting.getValue());
|
||||
}
|
||||
|
||||
UInt128 res;
|
||||
siphash.get128(res);
|
||||
return res;
|
||||
return siphash.get128();
|
||||
}
|
||||
|
||||
bool AsynchronousInsertQueue::InsertQuery::operator==(const InsertQuery & other) const
|
||||
|
@ -118,9 +118,7 @@ IAST::Hash IAST::getTreeHash() const
|
||||
{
|
||||
SipHash hash_state;
|
||||
updateTreeHash(hash_state);
|
||||
IAST::Hash res;
|
||||
hash_state.get128(res);
|
||||
return res;
|
||||
return getSipHash128AsLoHi(hash_state);
|
||||
}
|
||||
|
||||
|
||||
|
@ -369,8 +369,7 @@ size_t ConstantExpressionTemplate::TemplateStructure::getTemplateHash(const ASTP
|
||||
/// Allows distinguish expression in the last column in Values format
|
||||
hash_state.update(salt);
|
||||
|
||||
IAST::Hash res128;
|
||||
hash_state.get128(res128);
|
||||
const auto res128 = getSipHash128AsLoHi(hash_state);
|
||||
size_t res = 0;
|
||||
boost::hash_combine(res, res128.first);
|
||||
boost::hash_combine(res, res128.second);
|
||||
|
@ -33,14 +33,11 @@ void LimitByTransform::transform(Chunk & chunk)
|
||||
|
||||
for (UInt64 row = 0; row < num_rows; ++row)
|
||||
{
|
||||
UInt128 key{};
|
||||
SipHash hash;
|
||||
|
||||
for (auto position : key_positions)
|
||||
columns[position]->updateHashWithValue(row, hash);
|
||||
|
||||
hash.get128(key);
|
||||
|
||||
const auto key = hash.get128();
|
||||
auto count = keys_counts[key]++;
|
||||
if (count >= group_offset
|
||||
&& (group_length > std::numeric_limits<UInt64>::max() - group_offset || count < group_length + group_offset))
|
||||
|
@ -32,11 +32,8 @@ public:
|
||||
|
||||
void onFinish() override
|
||||
{
|
||||
UInt128 key;
|
||||
String key_str;
|
||||
|
||||
new_hash->get128(key);
|
||||
key_str = getHexUIntLowercase(key);
|
||||
const auto key = new_hash->get128();
|
||||
const auto key_str = getHexUIntLowercase(key);
|
||||
|
||||
std::lock_guard lock(storage.mutex);
|
||||
|
||||
|
@ -681,7 +681,6 @@ QueryPipelineBuilder StorageLiveView::completeQuery(Pipes pipes)
|
||||
bool StorageLiveView::getNewBlocks(const std::lock_guard<std::mutex> & lock)
|
||||
{
|
||||
SipHash hash;
|
||||
UInt128 key;
|
||||
BlocksPtr new_blocks = std::make_shared<Blocks>();
|
||||
BlocksMetadataPtr new_blocks_metadata = std::make_shared<BlocksMetadata>();
|
||||
|
||||
@ -713,7 +712,7 @@ bool StorageLiveView::getNewBlocks(const std::lock_guard<std::mutex> & lock)
|
||||
new_blocks->push_back(block);
|
||||
}
|
||||
|
||||
hash.get128(key);
|
||||
const auto key = hash.get128();
|
||||
|
||||
/// Update blocks only if hash keys do not match
|
||||
/// NOTE: hash could be different for the same result
|
||||
|
@ -50,13 +50,9 @@ public:
|
||||
/// Calculate key from path to file and offset.
|
||||
static UInt128 hash(const String & path_to_file)
|
||||
{
|
||||
UInt128 key;
|
||||
|
||||
SipHash hash;
|
||||
hash.update(path_to_file.data(), path_to_file.size() + 1);
|
||||
hash.get128(key);
|
||||
|
||||
return key;
|
||||
return hash.get128();
|
||||
}
|
||||
|
||||
template <typename LoadFunc>
|
||||
|
@ -2052,14 +2052,8 @@ String IMergeTreeDataPart::getZeroLevelPartBlockID(std::string_view token) const
|
||||
hash.update(token.data(), token.size());
|
||||
}
|
||||
|
||||
union
|
||||
{
|
||||
char bytes[16];
|
||||
UInt64 words[2];
|
||||
} hash_value;
|
||||
hash.get128(hash_value.bytes);
|
||||
|
||||
return info.partition_id + "_" + toString(hash_value.words[0]) + "_" + toString(hash_value.words[1]);
|
||||
const auto hash_value = hash.get128();
|
||||
return info.partition_id + "_" + toString(hash_value.items[0]) + "_" + toString(hash_value.items[1]);
|
||||
}
|
||||
|
||||
IMergeTreeDataPart::uint128 IMergeTreeDataPart::getActualChecksumByFile(const String & file_name) const
|
||||
|
@ -83,12 +83,12 @@ size_t MarkRanges::getNumberOfMarks() const
|
||||
|
||||
void MarkRanges::serialize(WriteBuffer & out) const
|
||||
{
|
||||
writeIntBinary(this->size(), out);
|
||||
writeBinaryLittleEndian(this->size(), out);
|
||||
|
||||
for (const auto & [begin, end] : *this)
|
||||
{
|
||||
writeIntBinary(begin, out);
|
||||
writeIntBinary(end, out);
|
||||
writeBinaryLittleEndian(begin, out);
|
||||
writeBinaryLittleEndian(end, out);
|
||||
}
|
||||
}
|
||||
|
||||
@ -100,13 +100,13 @@ String MarkRanges::describe() const
|
||||
void MarkRanges::deserialize(ReadBuffer & in)
|
||||
{
|
||||
size_t size = 0;
|
||||
readIntBinary(size, in);
|
||||
readBinaryLittleEndian(size, in);
|
||||
|
||||
this->resize(size);
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
{
|
||||
readIntBinary((*this)[i].begin, in);
|
||||
readIntBinary((*this)[i].end, in);
|
||||
readBinaryLittleEndian((*this)[i].begin, in);
|
||||
readBinaryLittleEndian((*this)[i].end, in);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -323,9 +323,7 @@ MergeTreeDataPartChecksums::Checksum::uint128 MergeTreeDataPartChecksums::getTot
|
||||
hash_of_all_files.update(checksum.file_hash);
|
||||
}
|
||||
|
||||
MergeTreeDataPartChecksums::Checksum::uint128 ret;
|
||||
hash_of_all_files.get128(reinterpret_cast<char *>(&ret));
|
||||
return ret;
|
||||
return getSipHash128AsLoHi(hash_of_all_files);
|
||||
}
|
||||
|
||||
void MinimalisticDataPartChecksums::serialize(WriteBuffer & to) const
|
||||
@ -415,14 +413,9 @@ void MinimalisticDataPartChecksums::computeTotalChecksums(const MergeTreeDataPar
|
||||
}
|
||||
}
|
||||
|
||||
auto get_hash = [] (SipHash & hash, uint128 & data)
|
||||
{
|
||||
hash.get128(data);
|
||||
};
|
||||
|
||||
get_hash(hash_of_all_files_state, hash_of_all_files);
|
||||
get_hash(hash_of_uncompressed_files_state, hash_of_uncompressed_files);
|
||||
get_hash(uncompressed_hash_of_compressed_files_state, uncompressed_hash_of_compressed_files);
|
||||
hash_of_all_files = getSipHash128AsLoHi(hash_of_all_files_state);
|
||||
hash_of_uncompressed_files = getSipHash128AsLoHi(hash_of_uncompressed_files_state);
|
||||
uncompressed_hash_of_compressed_files = getSipHash128AsLoHi(uncompressed_hash_of_compressed_files_state);
|
||||
}
|
||||
|
||||
String MinimalisticDataPartChecksums::getSerializedString(const MergeTreeDataPartChecksums & full_checksums, bool minimalistic)
|
||||
|
@ -115,7 +115,7 @@ void MergeTreeDataPartCompact::loadIndexGranularityImpl(
|
||||
{
|
||||
marks_reader->ignore(columns_count * sizeof(MarkInCompressedFile));
|
||||
size_t granularity;
|
||||
readIntBinary(granularity, *marks_reader);
|
||||
readBinaryLittleEndian(granularity, *marks_reader);
|
||||
index_granularity_.appendMark(granularity);
|
||||
}
|
||||
|
||||
|
@ -167,7 +167,7 @@ IMergeTreeDataPart::Checksum MergeTreeDataPartInMemory::calculateBlockChecksum()
|
||||
column.column->updateHashFast(hash);
|
||||
|
||||
checksum.uncompressed_size = block.bytes();
|
||||
hash.get128(checksum.uncompressed_hash);
|
||||
checksum.uncompressed_hash = getSipHash128AsLoHi(hash);
|
||||
return checksum;
|
||||
}
|
||||
|
||||
|
@ -130,13 +130,13 @@ void MergeTreeDataPartWide::loadIndexGranularityImpl(
|
||||
MarkInCompressedFile mark;
|
||||
size_t granularity;
|
||||
|
||||
readBinary(mark.offset_in_compressed_file, *marks_reader);
|
||||
readBinary(mark.offset_in_decompressed_block, *marks_reader);
|
||||
readBinaryLittleEndian(mark.offset_in_compressed_file, *marks_reader);
|
||||
readBinaryLittleEndian(mark.offset_in_decompressed_block, *marks_reader);
|
||||
++marks_count;
|
||||
|
||||
if (index_granularity_info_.mark_type.adaptive)
|
||||
{
|
||||
readIntBinary(granularity, *marks_reader);
|
||||
readBinaryLittleEndian(granularity, *marks_reader);
|
||||
index_granularity_.appendMark(granularity);
|
||||
}
|
||||
}
|
||||
|
@ -228,8 +228,8 @@ void MergeTreeDataPartWriterCompact::writeDataBlock(const Block & block, const G
|
||||
};
|
||||
|
||||
|
||||
writeIntBinary(plain_hashing.count(), marks_out);
|
||||
writeIntBinary(static_cast<UInt64>(0), marks_out);
|
||||
writeBinaryLittleEndian(plain_hashing.count(), marks_out);
|
||||
writeBinaryLittleEndian(static_cast<UInt64>(0), marks_out);
|
||||
|
||||
writeColumnSingleGranule(
|
||||
block.getByName(name_and_type->name), data_part->getSerialization(name_and_type->name),
|
||||
@ -239,7 +239,7 @@ void MergeTreeDataPartWriterCompact::writeDataBlock(const Block & block, const G
|
||||
prev_stream->hashing_buf.next();
|
||||
}
|
||||
|
||||
writeIntBinary(granule.rows_to_write, marks_out);
|
||||
writeBinaryLittleEndian(granule.rows_to_write, marks_out);
|
||||
}
|
||||
}
|
||||
|
||||
@ -270,10 +270,10 @@ void MergeTreeDataPartWriterCompact::fillDataChecksums(IMergeTreeDataPart::Check
|
||||
{
|
||||
for (size_t i = 0; i < columns_list.size(); ++i)
|
||||
{
|
||||
writeIntBinary(plain_hashing.count(), marks_out);
|
||||
writeIntBinary(static_cast<UInt64>(0), marks_out);
|
||||
writeBinaryLittleEndian(plain_hashing.count(), marks_out);
|
||||
writeBinaryLittleEndian(static_cast<UInt64>(0), marks_out);
|
||||
}
|
||||
writeIntBinary(static_cast<UInt64>(0), marks_out);
|
||||
writeBinaryLittleEndian(static_cast<UInt64>(0), marks_out);
|
||||
}
|
||||
|
||||
for (const auto & [_, stream] : streams_by_codec)
|
||||
|
@ -313,13 +313,13 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block
|
||||
if (stream.compressed_hashing.offset() >= settings.min_compress_block_size)
|
||||
stream.compressed_hashing.next();
|
||||
|
||||
writeIntBinary(stream.plain_hashing.count(), marks_out);
|
||||
writeIntBinary(stream.compressed_hashing.offset(), marks_out);
|
||||
writeBinaryLittleEndian(stream.plain_hashing.count(), marks_out);
|
||||
writeBinaryLittleEndian(stream.compressed_hashing.offset(), marks_out);
|
||||
|
||||
/// Actually this numbers is redundant, but we have to store them
|
||||
/// to be compatible with the normal .mrk2 file format
|
||||
if (settings.can_use_adaptive_granularity)
|
||||
writeIntBinary(1UL, marks_out);
|
||||
writeBinaryLittleEndian(1UL, marks_out);
|
||||
}
|
||||
|
||||
size_t pos = granule.start_row;
|
||||
|
@ -275,10 +275,10 @@ void MergeTreeDataPartWriterWide::flushMarkToFile(const StreamNameAndMark & stre
|
||||
Stream & stream = *column_streams[stream_with_mark.stream_name];
|
||||
WriteBuffer & marks_out = stream.compress_marks ? stream.marks_compressed_hashing : stream.marks_hashing;
|
||||
|
||||
writeIntBinary(stream_with_mark.mark.offset_in_compressed_file, marks_out);
|
||||
writeIntBinary(stream_with_mark.mark.offset_in_decompressed_block, marks_out);
|
||||
writeBinaryLittleEndian(stream_with_mark.mark.offset_in_compressed_file, marks_out);
|
||||
writeBinaryLittleEndian(stream_with_mark.mark.offset_in_decompressed_block, marks_out);
|
||||
if (settings.can_use_adaptive_granularity)
|
||||
writeIntBinary(rows_in_mark, marks_out);
|
||||
writeBinaryLittleEndian(rows_in_mark, marks_out);
|
||||
}
|
||||
|
||||
StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn(
|
||||
@ -452,10 +452,10 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai
|
||||
"Incorrect number of marks in memory {}, on disk (at least) {}",
|
||||
index_granularity.getMarksCount(), mark_num + 1);
|
||||
|
||||
DB::readBinary(offset_in_compressed_file, *mrk_in);
|
||||
DB::readBinary(offset_in_decompressed_block, *mrk_in);
|
||||
readBinaryLittleEndian(offset_in_compressed_file, *mrk_in);
|
||||
readBinaryLittleEndian(offset_in_decompressed_block, *mrk_in);
|
||||
if (settings.can_use_adaptive_granularity)
|
||||
DB::readBinary(index_granularity_rows, *mrk_in);
|
||||
readBinaryLittleEndian(index_granularity_rows, *mrk_in);
|
||||
else
|
||||
index_granularity_rows = data_part->index_granularity_info.fixed_index_granularity;
|
||||
|
||||
|
@ -160,7 +160,7 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
|
||||
size_t granularity;
|
||||
reader->readStrict(
|
||||
reinterpret_cast<char *>(plain_marks.data() + i * columns_in_mark), columns_in_mark * sizeof(MarkInCompressedFile));
|
||||
readIntBinary(granularity, *reader);
|
||||
readBinaryLittleEndian(granularity, *reader);
|
||||
}
|
||||
|
||||
if (!reader->eof())
|
||||
@ -170,6 +170,16 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
|
||||
mrk_path, marks_count, expected_uncompressed_size);
|
||||
}
|
||||
|
||||
#if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
|
||||
std::ranges::for_each(
|
||||
plain_marks,
|
||||
[](auto & plain_mark)
|
||||
{
|
||||
plain_mark.offset_in_compressed_file = std::byteswap(plain_mark.offset_in_compressed_file);
|
||||
plain_mark.offset_in_decompressed_block = std::byteswap(plain_mark.offset_in_decompressed_block);
|
||||
});
|
||||
#endif
|
||||
|
||||
auto res = std::make_shared<MarksInCompressedFile>(plain_marks);
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::LoadedMarksCount, marks_count * columns_in_mark);
|
||||
|
@ -265,12 +265,12 @@ String MergeTreePartition::getID(const Block & partition_key_sample) const
|
||||
for (const Field & field : value)
|
||||
applyVisitor(hashing_visitor, field);
|
||||
|
||||
char hash_data[16];
|
||||
hash.get128(hash_data);
|
||||
result.resize(32);
|
||||
for (size_t i = 0; i < 16; ++i)
|
||||
const auto hash_data = getSipHash128AsArray(hash);
|
||||
const auto hash_size = hash_data.size();
|
||||
result.resize(hash_size * 2);
|
||||
for (size_t i = 0; i < hash_size; ++i)
|
||||
#if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
|
||||
writeHexByteLowercase(hash_data[16 - 1 - i], &result[2 * i]);
|
||||
writeHexByteLowercase(hash_data[hash_size - 1 - i], &result[2 * i]);
|
||||
#else
|
||||
writeHexByteLowercase(hash_data[i], &result[2 * i]);
|
||||
#endif
|
||||
|
@ -12,9 +12,7 @@ static std::array<char, 16> getSipHash(const String & str)
|
||||
{
|
||||
SipHash hash;
|
||||
hash.update(str.data(), str.size());
|
||||
std::array<char, 16> result;
|
||||
hash.get128(result.data());
|
||||
return result;
|
||||
return getSipHash128AsArray(hash);
|
||||
}
|
||||
|
||||
ReplicatedMergeTreePartHeader ReplicatedMergeTreePartHeader::fromColumnsAndChecksumsZNodes(
|
||||
|
@ -254,14 +254,9 @@ namespace
|
||||
for (const auto & col : cols)
|
||||
col->updateHashWithValue(j, hash);
|
||||
}
|
||||
union
|
||||
{
|
||||
char bytes[16];
|
||||
UInt64 words[2];
|
||||
} hash_value;
|
||||
hash.get128(hash_value.bytes);
|
||||
|
||||
block_id_vec.push_back(partition_id + "_" + DB::toString(hash_value.words[0]) + "_" + DB::toString(hash_value.words[1]));
|
||||
const auto hash_value = hash.get128();
|
||||
block_id_vec.push_back(partition_id + "_" + DB::toString(hash_value.items[0]) + "_" + DB::toString(hash_value.items[1]));
|
||||
}
|
||||
else
|
||||
block_id_vec.push_back(partition_id + "_" + std::string(token));
|
||||
|
Loading…
Reference in New Issue
Block a user