Use snapshot object in serialization

This commit is contained in:
alesapin 2021-03-02 17:30:56 +03:00
parent 10e16e39ea
commit 8e6252b25f
8 changed files with 85 additions and 19 deletions

View File

@ -36,10 +36,15 @@ void write(bool x, WriteBuffer & out)
writeBinary(x, out);
}
void write(const char * s, size_t size, WriteBuffer & out)
{
write(int32_t(size), out);
out.write(s, size);
}
void write(const std::string & s, WriteBuffer & out)
{
write(int32_t(s.size()), out);
out.write(s.data(), s.size());
write(s.data(), s.size(), out);
}
void write(const ACL & acl, WriteBuffer & out)

View File

@ -20,6 +20,7 @@ void write(int32_t x, WriteBuffer & out);
void write(OpNum x, WriteBuffer & out);
void write(bool x, WriteBuffer & out);
void write(const char * s, size_t size, WriteBuffer & out);
void write(const std::string & s, WriteBuffer & out);
void write(const ACL & acl, WriteBuffer & out);
void write(const Stat & stat, WriteBuffer & out);

View File

@ -34,6 +34,8 @@ NuKeeperServer::NuKeeperServer(
void NuKeeperServer::startup()
{
state_machine->init();
state_manager->loadLogStore(state_machine->last_commit_index());
bool single_server = state_manager->getTotalServers() == 1;

View File

@ -42,7 +42,17 @@ NuKeeperStateMachine::NuKeeperStateMachine(ResponsesQueue & responses_queue_, co
void NuKeeperStateMachine::init()
{
LOG_DEBUG(log, "Trying to load state machine");
last_committed_idx = snapshot_manager.restoreFromLatestSnapshot(&storage);
latest_snapshot_buf = snapshot_manager.deserializeLatestSnapshotBufferFromDisk();
if (latest_snapshot_buf)
{
latest_snapshot_meta = snapshot_manager.deserializeSnapshotFromBuffer(&storage, latest_snapshot_buf);
last_committed_idx = latest_snapshot_meta->get_last_log_idx();
}
else
{
latest_snapshot_meta = nullptr;
last_committed_idx = 0;
}
LOG_DEBUG(log, "Loaded snapshot with last commited log index {}", last_committed_idx);
}

View File

@ -62,7 +62,7 @@ public:
private:
nuraft::ptr<nuraft::snapshot> latest_snapshot_meta;
SnapshotMetadataPtr latest_snapshot_meta;
nuraft::ptr<nuraft::buffer> latest_snapshot_buf;
CoordinationSettingsPtr coordination_settings;

View File

@ -62,12 +62,30 @@ namespace
Coordination::read(node.stat, in);
Coordination::read(node.seq_num, in);
}
void serializeSnapshotMetadata(const SnapshotMetadataPtr & snapshot_meta, WriteBuffer & out)
{
auto buffer = snapshot_meta->serialize();
Coordination::write(reinterpret_cast<const char *>(buffer->data_begin()), buffer->size(), out);
}
SnapshotMetadataPtr deserializeSnapshotMetadata(ReadBuffer & in)
{
/// FIXME (alesap)
std::string data;
Coordination::read(data, in);
auto buffer = nuraft::buffer::alloc(data.size());
buffer->put_raw(reinterpret_cast<const nuraft::byte *>(data.c_str()), data.size());
buffer->pos(0);
return SnapshotMetadata::deserialize(*buffer);
}
}
void NuKeeperStorageSnapshot::serialize(const NuKeeperStorageSnapshot & snapshot, WriteBuffer & out)
{
Coordination::write(static_cast<uint8_t>(snapshot.version), out);
serializeSnapshotMetadata(snapshot.snapshot_meta, out);
Coordination::write(snapshot.zxid, out);
Coordination::write(snapshot.session_id, out);
Coordination::write(snapshot.snapshot_container_size, out);
@ -89,13 +107,14 @@ void NuKeeperStorageSnapshot::serialize(const NuKeeperStorageSnapshot & snapshot
}
}
void NuKeeperStorageSnapshot::deserialize(NuKeeperStorage & storage, ReadBuffer & in)
SnapshotMetadataPtr NuKeeperStorageSnapshot::deserialize(NuKeeperStorage & storage, ReadBuffer & in)
{
uint8_t version;
Coordination::read(version, in);
if (static_cast<SnapshotVersion>(version) > SnapshotVersion::V0)
throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unsupported snapshot version {}", version);
SnapshotMetadataPtr result = deserializeSnapshotMetadata(in);
int64_t session_id, zxid;
Coordination::read(zxid, in);
Coordination::read(session_id, in);
@ -137,11 +156,25 @@ void NuKeeperStorageSnapshot::deserialize(NuKeeperStorage & storage, ReadBuffer
storage.addSessionID(active_session_id, timeout);
current_session_size++;
}
return result;
}
NuKeeperStorageSnapshot::NuKeeperStorageSnapshot(NuKeeperStorage * storage_, size_t up_to_log_idx_)
: storage(storage_)
, up_to_log_idx(up_to_log_idx_)
, snapshot_meta(std::make_shared<SnapshotMetadata>(up_to_log_idx_, 0, std::make_shared<nuraft::cluster_config>()))
, zxid(storage->getZXID())
, session_id(storage->session_id_counter)
{
storage->enableSnapshotMode();
snapshot_container_size = storage->container.snapshotSize();
begin = storage->getSnapshotIteratorBegin();
session_and_timeout = storage->getActiveSessions();
}
NuKeeperStorageSnapshot::NuKeeperStorageSnapshot(NuKeeperStorage * storage_, const SnapshotMetadataPtr & snapshot_meta_)
: storage(storage_)
, snapshot_meta(snapshot_meta_)
, zxid(storage->getZXID())
, session_id(storage->session_id_counter)
{
@ -191,6 +224,16 @@ std::string NuKeeperSnapshotManager::serializeSnapshotBufferToDisk(nuraft::buffe
return new_snapshot_path;
}
nuraft::ptr<nuraft::buffer> NuKeeperSnapshotManager::deserializeLatestSnapshotBufferFromDisk() const
{
if (!existing_snapshots.empty())
{
auto last_log_id = existing_snapshots.rbegin()->first;
return deserializeSnapshotBufferFromDisk(last_log_id);
}
return nullptr;
}
nuraft::ptr<nuraft::buffer> NuKeeperSnapshotManager::deserializeSnapshotBufferFromDisk(size_t up_to_log_idx) const
{
const std::string & snapshot_path = existing_snapshots.at(up_to_log_idx);
@ -210,23 +253,21 @@ nuraft::ptr<nuraft::buffer> NuKeeperSnapshotManager::serializeSnapshotToBuffer(c
return writer.getBuffer();
}
void NuKeeperSnapshotManager::deserializeSnapshotFromBuffer(NuKeeperStorage * storage, nuraft::ptr<nuraft::buffer> buffer)
SnapshotMetadataPtr NuKeeperSnapshotManager::deserializeSnapshotFromBuffer(NuKeeperStorage * storage, nuraft::ptr<nuraft::buffer> buffer)
{
ReadBufferFromNuraftBuffer reader(buffer);
CompressedReadBuffer compressed_reader(reader);
NuKeeperStorageSnapshot::deserialize(*storage, compressed_reader);
return NuKeeperStorageSnapshot::deserialize(*storage, compressed_reader);
}
size_t NuKeeperSnapshotManager::restoreFromLatestSnapshot(NuKeeperStorage * storage) const
SnapshotMetadataPtr NuKeeperSnapshotManager::restoreFromLatestSnapshot(NuKeeperStorage * storage) const
{
if (existing_snapshots.empty())
return 0 ;
return nullptr;
auto log_id = existing_snapshots.rbegin()->first;
auto buffer = deserializeSnapshotBufferFromDisk(log_id);
deserializeSnapshotFromBuffer(storage, buffer);
return log_id;
return deserializeSnapshotFromBuffer(storage, buffer);
}
void NuKeeperSnapshotManager::removeOutdatedSnapshotsIfNeeded()

View File

@ -7,6 +7,9 @@
namespace DB
{
using SnapshotMetadata = nuraft::snapshot;
using SnapshotMetadataPtr = std::shared_ptr<SnapshotMetadata>;
enum SnapshotVersion : uint8_t
{
V0 = 0,
@ -16,16 +19,18 @@ struct NuKeeperStorageSnapshot
{
public:
NuKeeperStorageSnapshot(NuKeeperStorage * storage_, size_t up_to_log_idx_);
NuKeeperStorageSnapshot(NuKeeperStorage * storage_, const SnapshotMetadataPtr & snapshot_meta_);
~NuKeeperStorageSnapshot();
static void serialize(const NuKeeperStorageSnapshot & snapshot, WriteBuffer & out);
static void deserialize(NuKeeperStorage & storage, ReadBuffer & in);
static SnapshotMetadataPtr deserialize(NuKeeperStorage & storage, ReadBuffer & in);
NuKeeperStorage * storage;
SnapshotVersion version = SnapshotVersion::V0;
size_t up_to_log_idx;
SnapshotMetadataPtr snapshot_meta;
int64_t zxid;
int64_t session_id;
size_t snapshot_container_size;
@ -38,13 +43,15 @@ class NuKeeperSnapshotManager
public:
NuKeeperSnapshotManager(const std::string & snapshots_path_, size_t snapshots_to_keep_);
size_t restoreFromLatestSnapshot(NuKeeperStorage * storage) const;
SnapshotMetadataPtr restoreFromLatestSnapshot(NuKeeperStorage * storage) const;
static nuraft::ptr<nuraft::buffer> serializeSnapshotToBuffer(const NuKeeperStorageSnapshot & snapshot);
std::string serializeSnapshotBufferToDisk(nuraft::buffer & buffer, size_t up_to_log_idx);
static void deserializeSnapshotFromBuffer(NuKeeperStorage * storage, nuraft::ptr<nuraft::buffer> buffer);
static SnapshotMetadataPtr deserializeSnapshotFromBuffer(NuKeeperStorage * storage, nuraft::ptr<nuraft::buffer> buffer);
nuraft::ptr<nuraft::buffer> deserializeSnapshotBufferFromDisk(size_t up_to_log_idx) const;
nuraft::ptr<nuraft::buffer> deserializeLatestSnapshotBufferFromDisk() const;
private:
void removeOutdatedSnapshotsIfNeeded();

View File

@ -918,7 +918,7 @@ TEST(CoordinationTest, TestStorageSnapshotSimple)
DB::NuKeeperStorageSnapshot snapshot(&storage, 2);
EXPECT_EQ(snapshot.up_to_log_idx, 2);
EXPECT_EQ(snapshot.snapshot_meta->get_last_log_idx(), 2);
EXPECT_EQ(snapshot.zxid, 2);
EXPECT_EQ(snapshot.session_id, 7);
EXPECT_EQ(snapshot.snapshot_container_size, 3);
@ -963,7 +963,7 @@ TEST(CoordinationTest, TestStorageSnapshotMoreWrites)
}
DB::NuKeeperStorageSnapshot snapshot(&storage, 50);
EXPECT_EQ(snapshot.up_to_log_idx, 50);
EXPECT_EQ(snapshot.snapshot_meta->get_last_log_idx(), 50);
EXPECT_EQ(snapshot.snapshot_container_size, 51);
for (size_t i = 50; i < 100; ++i)