Add storage simpliest serialization

This commit is contained in:
alesapin 2021-01-21 14:07:55 +03:00
parent a9df586c0e
commit f7175819d5
8 changed files with 391 additions and 4 deletions

View File

@ -3,6 +3,13 @@
namespace Coordination
{
void write(size_t x, WriteBuffer & out)
{
x = __builtin_bswap64(x);
writeBinary(x, out);
}
void write(int64_t x, WriteBuffer & out)
{
x = __builtin_bswap64(x);
@ -57,6 +64,12 @@ void write(const Error & x, WriteBuffer & out)
write(static_cast<int32_t>(x), out);
}
void read(size_t & x, ReadBuffer & in)
{
readBinary(x, in);
x = __builtin_bswap64(x);
}
void read(int64_t & x, ReadBuffer & in)
{
readBinary(x, in);

View File

@ -13,6 +13,7 @@ namespace Coordination
using namespace DB;
void write(size_t x, WriteBuffer & out);
void write(int64_t x, WriteBuffer & out);
void write(int32_t x, WriteBuffer & out);
void write(OpNum x, WriteBuffer & out);
@ -37,6 +38,7 @@ void write(const std::vector<T> & arr, WriteBuffer & out)
write(elem, out);
}
void read(size_t & x, ReadBuffer & in);
void read(int64_t & x, ReadBuffer & in);
void read(int32_t & x, ReadBuffer & in);
void read(OpNum & x, ReadBuffer & in);

View File

@ -0,0 +1,190 @@
#include <Coordination/NuKeeperStateMachine.h>
#include <Coordination/ReadBufferFromNuraftBuffer.h>
#include <Coordination/WriteBufferFromNuraftBuffer.h>
#include <IO/ReadHelpers.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
namespace DB
{
zkutil::TestKeeperStorage::RequestForSession parseRequest(nuraft::buffer & data)
{
ReadBufferFromNuraftBuffer buffer(data);
zkutil::TestKeeperStorage::RequestForSession request_for_session;
readIntBinary(request_for_session.session_id, buffer);
int32_t length;
Coordination::read(length, buffer);
int32_t xid;
Coordination::read(xid, buffer);
Coordination::OpNum opnum;
Coordination::read(opnum, buffer);
request_for_session.request = Coordination::ZooKeeperRequestFactory::instance().get(opnum);
request_for_session.request->xid = xid;
request_for_session.request->readImpl(buffer);
return request_for_session;
}
nuraft::ptr<nuraft::buffer> writeResponses(zkutil::TestKeeperStorage::ResponsesForSessions & responses)
{
WriteBufferFromNuraftBuffer buffer;
for (const auto & response_and_session : responses)
{
writeIntBinary(response_and_session.session_id, buffer);
response_and_session.response->write(buffer);
}
return buffer.getBuffer();
}
NuKeeperStateMachine::NuKeeperStateMachine()
: last_committed_idx(0)
, log(&Poco::Logger::get("NuRaftStateMachine"))
{
LOG_DEBUG(log, "Created nukeeper state machine");
}
nuraft::ptr<nuraft::buffer> NuKeeperStateMachine::commit(const size_t log_idx, nuraft::buffer & data)
{
LOG_DEBUG(log, "Commiting logidx {}", log_idx);
auto request_for_session = parseRequest(data);
auto responses_with_sessions = storage.processRequest(request_for_session.request, request_for_session.session_id);
last_committed_idx = log_idx;
return writeResponses(responses_with_sessions);
}
bool NuKeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
{
LOG_DEBUG(log, "Applying snapshot {}", s.get_last_log_idx());
std::lock_guard<std::mutex> lock(snapshots_lock);
auto entry = snapshots.find(s.get_last_log_idx());
if (entry == snapshots.end())
{
return false;
}
/// TODO
return true;
}
nuraft::ptr<nuraft::snapshot> NuKeeperStateMachine::last_snapshot()
{
LOG_DEBUG(log, "Trying to get last snapshot");
// Just return the latest snapshot.
std::lock_guard<std::mutex> lock(snapshots_lock);
auto entry = snapshots.rbegin();
if (entry == snapshots.rend())
return nullptr;
return entry->second;
}
void NuKeeperStateMachine::create_snapshot(
nuraft::snapshot & s,
nuraft::async_result<bool>::handler_type & when_done)
{
LOG_DEBUG(log, "Creating snapshot {}", s.get_last_log_idx());
{
std::lock_guard<std::mutex> lock(snapshots_lock);
nuraft::ptr<nuraft::buffer> snp_buf = s.serialize();
nuraft::ptr<nuraft::snapshot> ss = nuraft::snapshot::deserialize(*snp_buf);
snapshots[s.get_last_log_idx()] = ss;
const int MAX_SNAPSHOTS = 3;
int num = snapshots.size();
auto entry = snapshots.begin();
for (int i = 0; i < num - MAX_SNAPSHOTS; ++i)
{
if (entry == snapshots.end())
break;
entry = snapshots.erase(entry);
}
}
nuraft::ptr<std::exception> except(nullptr);
bool ret = true;
when_done(ret, except);
}
void NuKeeperStateMachine::save_logical_snp_obj(
nuraft::snapshot & s,
size_t & obj_id,
nuraft::buffer & /*data*/,
bool /*is_first_obj*/,
bool /*is_last_obj*/)
{
LOG_DEBUG(log, "Saving snapshot {} obj_id {}", s.get_last_log_idx(), obj_id);
if (obj_id == 0)
{
std::lock_guard<std::mutex> lock(snapshots_lock);
nuraft::ptr<nuraft::buffer> snp_buf = s.serialize();
nuraft::ptr<nuraft::snapshot> ss = nuraft::snapshot::deserialize(*snp_buf);
snapshots[s.get_last_log_idx()] = ss;
const int MAX_SNAPSHOTS = 3;
int num = snapshots.size();
auto entry = snapshots.begin();
for (int i = 0; i < num - MAX_SNAPSHOTS; ++i)
{
if (entry == snapshots.end())
break;
entry = snapshots.erase(entry);
}
}
else
{
std::lock_guard<std::mutex> lock(snapshots_lock);
auto entry = snapshots.find(s.get_last_log_idx());
assert(entry != snapshots.end());
}
obj_id++;
}
int NuKeeperStateMachine::read_logical_snp_obj(
nuraft::snapshot & s,
void* & /*user_snp_ctx*/,
ulong obj_id,
nuraft::ptr<nuraft::buffer> & data_out,
bool & is_last_obj)
{
LOG_DEBUG(log, "Reading snapshot {} obj_id {}", s.get_last_log_idx(), obj_id);
{
std::lock_guard<std::mutex> ll(snapshots_lock);
auto entry = snapshots.find(s.get_last_log_idx());
if (entry == snapshots.end())
{
// Snapshot doesn't exist.
data_out = nullptr;
is_last_obj = true;
return 0;
}
}
if (obj_id == 0)
{
// Object ID == 0: first object, put dummy data.
data_out = nuraft::buffer::alloc(sizeof(size_t));
nuraft::buffer_serializer bs(data_out);
bs.put_i32(0);
is_last_obj = false;
}
else
{
// Object ID > 0: second object, put actual value.
data_out = nuraft::buffer::alloc(sizeof(size_t));
nuraft::buffer_serializer bs(data_out);
bs.put_u64(1);
is_last_obj = true;
}
return 0;
}
}

View File

@ -0,0 +1,63 @@
#pragma once
#include <Coordination/TestKeeperStorage.h>
#include <libnuraft/nuraft.hxx>
#include <common/logger_useful.h>
namespace DB
{
class NuKeeperStateMachine : public nuraft::state_machine
{
public:
NuKeeperStateMachine();
nuraft::ptr<nuraft::buffer> pre_commit(const size_t /*log_idx*/, nuraft::buffer & /*data*/) override { return nullptr; }
nuraft::ptr<nuraft::buffer> commit(const size_t log_idx, nuraft::buffer & data) override;
void rollback(const size_t /*log_idx*/, nuraft::buffer & /*data*/) override {}
size_t last_commit_index() override { return last_committed_idx; }
bool apply_snapshot(nuraft::snapshot & s) override;
nuraft::ptr<nuraft::snapshot> last_snapshot() override;
void create_snapshot(
nuraft::snapshot & s,
nuraft::async_result<bool>::handler_type & when_done) override;
void save_logical_snp_obj(
nuraft::snapshot & s,
size_t & obj_id,
nuraft::buffer & data,
bool is_first_obj,
bool is_last_obj) override;
int read_logical_snp_obj(
nuraft::snapshot & s,
void* & user_snp_ctx,
ulong obj_id,
nuraft::ptr<nuraft::buffer> & data_out,
bool & is_last_obj) override;
zkutil::TestKeeperStorage & getStorage()
{
return storage;
}
private:
zkutil::TestKeeperStorage storage;
// Mutex for `snapshots_`.
std::mutex snapshots_lock;
/// Fake snapshot storage
std::map<uint64_t, nuraft::ptr<nuraft::snapshot>> snapshots;
/// Last committed Raft log number.
std::atomic<size_t> last_committed_idx;
Poco::Logger * log;
};
}

View File

@ -46,7 +46,7 @@ static TestKeeperStorage::ResponsesForSessions processWatchesImpl(const String &
{
std::shared_ptr<Coordination::ZooKeeperWatchResponse> watch_response = std::make_shared<Coordination::ZooKeeperWatchResponse>();
watch_response->path = path;
watch_response->xid = -1;
watch_response->xid = Coordination::WATCH_XID;
watch_response->zxid = -1;
watch_response->type = event_type;
watch_response->state = Coordination::State::CONNECTED;
@ -62,7 +62,7 @@ static TestKeeperStorage::ResponsesForSessions processWatchesImpl(const String &
{
std::shared_ptr<Coordination::ZooKeeperWatchResponse> watch_list_response = std::make_shared<Coordination::ZooKeeperWatchResponse>();
watch_list_response->path = parent_path;
watch_list_response->xid = -1;
watch_list_response->xid = Coordination::WATCH_XID;
watch_list_response->zxid = -1;
watch_list_response->type = Coordination::Event::CHILD;
watch_list_response->state = Coordination::State::CONNECTED;
@ -103,7 +103,6 @@ struct TestKeeperStorageHeartbeatRequest final : public TestKeeperStorageRequest
}
};
struct TestKeeperStorageCreateRequest final : public TestKeeperStorageRequest
{
using TestKeeperStorageRequest::TestKeeperStorageRequest;

View File

@ -0,0 +1,87 @@
#include <Coordination/TestKeeperStorageSerializer.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
namespace DB
{
namespace
{
void writeNode(const zkutil::TestKeeperStorage::Node & node, WriteBuffer & out)
{
Coordination::write(node.data, out);
Coordination::write(node.acls, out);
Coordination::write(node.is_ephemeral, out);
Coordination::write(node.is_sequental, out);
Coordination::write(node.stat, out);
Coordination::write(node.seq_num, out);
}
void readNode(zkutil::TestKeeperStorage::Node & node, ReadBuffer & in)
{
Coordination::read(node.data, in);
Coordination::read(node.acls, in);
Coordination::read(node.is_ephemeral, in);
Coordination::read(node.is_sequental, in);
Coordination::read(node.stat, in);
Coordination::read(node.seq_num, in);
}
}
void TestKeeperStorageSerializer::serialize(const zkutil::TestKeeperStorage & storage, WriteBuffer & out) const
{
Coordination::write(storage.zxid, out);
Coordination::write(storage.session_id_counter, out);
Coordination::write(storage.container.size(), out);
for (const auto & [path, node] : storage.container)
{
Coordination::write(path, out);
writeNode(node, out);
}
Coordination::write(storage.ephemerals.size(), out);
for (const auto & [session_id, paths] : storage.ephemerals)
{
Coordination::write(session_id, out);
Coordination::write(paths.size(), out);
for (const auto & path : paths)
Coordination::write(path, out);
}
}
void TestKeeperStorageSerializer::deserialize(zkutil::TestKeeperStorage & storage, ReadBuffer & in) const
{
int64_t session_id_counter, zxid;
Coordination::read(zxid, in);
Coordination::read(session_id_counter, in);
storage.zxid = zxid;
storage.session_id_counter = session_id_counter;
size_t container_size;
Coordination::read(container_size, in);
while (storage.container.size() < container_size)
{
std::string path;
Coordination::read(path, in);
zkutil::TestKeeperStorage::Node node;
readNode(node, in);
storage.container[path] = node;
}
size_t ephemerals_size;
Coordination::read(ephemerals_size, in);
while (storage.ephemerals.size() < ephemerals_size)
{
int64_t session_id;
size_t ephemerals_for_session;
Coordination::read(session_id, in);
Coordination::read(ephemerals_for_session, in);
while (storage.ephemerals[session_id].size() < ephemerals_for_session)
{
std::string ephemeral_path;
Coordination::read(ephemeral_path, in);
storage.ephemerals[session_id].emplace(ephemeral_path);
}
}
}
}

View File

@ -0,0 +1,17 @@
#pragma once
#include <Coordination/TestKeeperStorage.h>
#include <IO/WriteBuffer.h>
#include <IO/ReadBuffer.h>
namespace DB
{
class TestKeeperStorageSerializer
{
public:
void serialize(const zkutil::TestKeeperStorage & storage, WriteBuffer & out) const;
void deserialize(zkutil::TestKeeperStorage & storage, ReadBuffer & in) const;
};
}

View File

@ -351,7 +351,6 @@ TEST(CoordinationTest, TestNuKeeperRaft)
EXPECT_EQ(responses[0].response->getOpNum(), Coordination::OpNum::Create);
EXPECT_EQ(dynamic_cast<Coordination::ZooKeeperCreateResponse *>(responses[0].response.get())->path_created, "/hello");
while (s1.state_machine->getStorage().container.count("/hello") == 0)
{
std::cout << "Waiting s1 to apply entry\n";
@ -374,6 +373,23 @@ TEST(CoordinationTest, TestNuKeeperRaft)
EXPECT_EQ(s2.state_machine->getStorage().container["/hello"].data, "world");
EXPECT_EQ(s3.state_machine->getStorage().container["/hello"].data, "world");
std::shared_ptr<Coordination::ZooKeeperGetRequest> get_request = std::make_shared<Coordination::ZooKeeperGetRequest>();
get_request->path = "/hello";
auto entry2 = getZooKeeperLogEntry(session_id, get_request);
auto ret_leader_get = s2.raft_instance->append_entries({entry2});
EXPECT_TRUE(ret_leader_get->get_accepted()) << "failed to replicate create entry: " << ret_leader_get->get_result_code();
EXPECT_EQ(ret_leader_get->get_result_code(), nuraft::cmd_result_code::OK) << "failed to replicate create entry: " << ret_leader_get->get_result_code();
auto result_get = ret_leader_get.get();
auto get_responses = getZooKeeperResponses(result_get->get(), get_request);
EXPECT_EQ(get_responses.size(), 1);
EXPECT_EQ(get_responses[0].session_id, 34);
EXPECT_EQ(get_responses[0].response->getOpNum(), Coordination::OpNum::Get);
EXPECT_EQ(dynamic_cast<Coordination::ZooKeeperGetResponse *>(get_responses[0].response.get())->data, "world");
s1.launcher.shutdown(5);
s2.launcher.shutdown(5);
s3.launcher.shutdown(5);