Calculate checksum with siphash

This commit is contained in:
alesapin 2021-02-20 18:36:56 +03:00
parent e7f792c94d
commit 0c2cf3cf30
2 changed files with 38 additions and 22 deletions

View File

@ -7,6 +7,7 @@
#include <boost/algorithm/string/join.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <Common/Exception.h>
#include <Common/SipHash.h>
#include <common/logger_useful.h>
namespace DB
@ -56,13 +57,15 @@ LogEntryPtr makeClone(const LogEntryPtr & entry)
Checksum computeRecordChecksum(const ChangelogRecord & record)
{
const auto * header_start = reinterpret_cast<const char *>(&record.header);
auto sum = CityHash_v1_0_2::CityHash128(header_start, sizeof(record.header));
SipHash hash;
hash.update(record.header.version);
hash.update(record.header.index);
hash.update(record.header.term);
hash.update(record.header.value_type);
hash.update(record.header.blob_size);
if (record.header.blob_size != 0)
sum = CityHash_v1_0_2::CityHash128WithSeed(reinterpret_cast<const char *>(record.blob->data_begin()), record.header.blob_size, sum);
return sum;
hash.update(reinterpret_cast<char *>(record.blob->data_begin()), record.blob->size());
return hash.get64();
}
}
@ -82,7 +85,11 @@ public:
off_t result = plain_buf.count();
writeIntBinary(computeRecordChecksum(record), plain_buf);
writePODBinary(record.header, plain_buf);
writeIntBinary(record.header.version, plain_buf);
writeIntBinary(record.header.index, plain_buf);
writeIntBinary(record.header.term, plain_buf);
writeIntBinary(record.header.value_type, plain_buf);
writeIntBinary(record.header.blob_size, plain_buf);
if (record.header.blob_size != 0)
plain_buf.write(reinterpret_cast<char *>(record.blob->data_begin()), record.blob->size());
@ -160,8 +167,14 @@ public:
Checksum record_checksum;
readIntBinary(record_checksum, read_buf);
/// Initialization is required, otherwise checksums may fail
ChangelogRecord record;
readPODBinary(record.header, read_buf);
readIntBinary(record.header.version, read_buf);
readIntBinary(record.header.index, read_buf);
readIntBinary(record.header.term, read_buf);
readIntBinary(record.header.value_type, read_buf);
readIntBinary(record.header.blob_size, read_buf);
if (record.header.version > CURRENT_CHANGELOG_VERSION)
throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unsupported changelog version {} on path {}", record.header.version, filepath);
@ -248,7 +261,7 @@ void Changelog::readChangelogAndInitWriter(size_t from_log_index)
size_t incomplete_log_index = 0;
ChangelogReadResult result{};
for (const auto & [start_index, changelog_description] : existing_changelogs)
for (const auto & [changelog_start_index, changelog_description] : existing_changelogs)
{
entries_in_last = changelog_description.to_log_index - changelog_description.from_log_index + 1;
@ -261,7 +274,7 @@ void Changelog::readChangelogAndInitWriter(size_t from_log_index)
/// May happen after truncate, crash or simply unfinished log
if (result.entries_read < entries_in_last)
{
incomplete_log_index = start_index;
incomplete_log_index = changelog_start_index;
break;
}
}
@ -319,18 +332,20 @@ void Changelog::rotate(size_t new_start_log_index)
ChangelogRecord Changelog::buildRecord(size_t index, const LogEntryPtr & log_entry)
{
ChangelogRecordHeader header;
header.version = ChangelogVersion::V0;
header.index = index;
header.term = log_entry->get_term();
header.value_type = log_entry->get_val_type();
ChangelogRecord record;
record.header.version = ChangelogVersion::V0;
record.header.index = index;
record.header.term = log_entry->get_term();
record.header.value_type = log_entry->get_val_type();
auto buffer = log_entry->get_buf_ptr();
if (buffer)
header.blob_size = buffer->size();
record.header.blob_size = buffer->size();
else
header.blob_size = 0;
record.header.blob_size = 0;
return ChangelogRecord{header, buffer};
record.blob = buffer;
return record;
}
void Changelog::appendEntry(size_t index, const LogEntryPtr & log_entry, bool force_sync)

View File

@ -10,7 +10,7 @@
namespace DB
{
using Checksum = CityHash_v1_0_2::uint128;
using Checksum = UInt64;
using LogEntryPtr = nuraft::ptr<nuraft::log_entry>;
using LogEntries = std::vector<LogEntryPtr>;
@ -27,7 +27,7 @@ enum class ChangelogVersion : uint8_t
static constexpr auto CURRENT_CHANGELOG_VERSION = ChangelogVersion::V0;
struct __attribute__((__packed__)) ChangelogRecordHeader
struct ChangelogRecordHeader
{
ChangelogVersion version = CURRENT_CHANGELOG_VERSION;
size_t index; /// entry log number
@ -115,12 +115,13 @@ public:
~Changelog();
private:
/// Pack log_entry into changelog record
static ChangelogRecord buildRecord(size_t index, const LogEntryPtr & log_entry);
/// Starts new file [new_start_log_index, new_start_log_index + rotate_interval]
void rotate(size_t new_start_log_index);
/// Pack log_entry into changelog record
static ChangelogRecord buildRecord(size_t index, const LogEntryPtr & log_entry);
private:
const std::string changelogs_dir;