First working snapshots

This commit is contained in:
alesapin 2021-01-21 17:34:34 +03:00
parent d6b8dd7525
commit 61fe49194b
4 changed files with 65 additions and 6 deletions

View File

@ -126,7 +126,6 @@ void NuKeeperStateMachine::create_snapshot(
nuraft::snapshot & s,
nuraft::async_result<bool>::handler_type & when_done)
{
LOG_DEBUG(log, "Creating snapshot {}", s.get_last_log_idx());
auto snapshot = createSnapshotInternal(s);
{
@ -156,6 +155,7 @@ void NuKeeperStateMachine::save_logical_snp_obj(
bool /*is_last_obj*/)
{
LOG_DEBUG(log, "Saving snapshot {} obj_id {}", s.get_last_log_idx(), obj_id);
if (obj_id == 0)
{
auto new_snapshot = createSnapshotInternal(s);
@ -165,8 +165,9 @@ void NuKeeperStateMachine::save_logical_snp_obj(
else
{
auto received_snapshot = readSnapshot(s, data);
std::lock_guard<std::mutex> lock(snapshots_lock);
snapshots.try_emplace(s.get_last_log_idx(), std::move(received_snapshot));
snapshots[s.get_last_log_idx()] = std::move(received_snapshot);
}
obj_id++;

View File

@ -74,6 +74,10 @@ public:
public:
TestKeeperStorage();
int64_t getSessionID()
{
return session_id_counter++;
}
ResponsesForSessions processRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id);
ResponsesForSessions finalize(const RequestsForSessions & expired_requests);
};

View File

@ -13,8 +13,6 @@ using ZooKeeperResponseCallback = std::function<void(const Coordination::ZooKeep
class TestKeeperStorageDispatcher
{
private:
std::atomic<int64_t> session_id_counter{0};
Poco::Timespan operation_timeout{0, Coordination::DEFAULT_OPERATION_TIMEOUT_MS * 1000};
using clock = std::chrono::steady_clock;
@ -39,6 +37,7 @@ private:
ThreadFromGlobalPool processing_thread;
TestKeeperStorage storage;
std::mutex session_id_mutex;
private:
void processingThread();
@ -53,7 +52,8 @@ public:
int64_t getSessionID()
{
return session_id_counter.fetch_add(1);
std::lock_guard lock(session_id_mutex);
return storage.getSessionID();
}
void registerSession(int64_t session_id, ZooKeeperResponseCallback callback);

View File

@ -2,11 +2,14 @@
#include <Coordination/InMemoryLogStore.h>
#include <Coordination/InMemoryStateManager.h>
#include <Coordination/TestKeeperStorageSerializer.h>
#include <Coordination/SummingStateMachine.h>
#include <Coordination/NuKeeperStateMachine.h>
#include <Coordination/LoggerWrapper.h>
#include <Coordination/WriteBufferFromNuraftBuffer.h>
#include <Coordination/ReadBufferFromNuraftBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Common/Exception.h>
@ -71,7 +74,7 @@ struct SimpliestRaftServer
params.election_timeout_lower_bound_ = 200;
params.election_timeout_upper_bound_ = 400;
params.reserved_log_items_ = 5;
params.snapshot_distance_ = 5;
params.snapshot_distance_ = 1; /// forcefully send snapshots
params.client_req_timeout_ = 3000;
params.return_method_ = nuraft::raft_params::blocking;
@ -298,6 +301,35 @@ zkutil::TestKeeperStorage::ResponsesForSessions getZooKeeperResponses(nuraft::pt
return results;
}
TEST(CoordinationTest, TestStorageSerialization)
{
zkutil::TestKeeperStorage storage;
storage.container["/hello"] = zkutil::TestKeeperStorage::Node{.data="world"};
storage.container["/hello/somepath"] = zkutil::TestKeeperStorage::Node{.data="somedata"};
storage.session_id_counter = 5;
storage.zxid = 156;
storage.ephemerals[3] = {"/hello", "/"};
storage.ephemerals[1] = {"/hello/somepath"};
DB::WriteBufferFromOwnString buffer;
zkutil::TestKeeperStorageSerializer serializer;
serializer.serialize(storage, buffer);
std::string serialized = buffer.str();
EXPECT_NE(serialized.size(), 0);
DB::ReadBufferFromString read(serialized);
zkutil::TestKeeperStorage new_storage;
serializer.deserialize(new_storage, read);
EXPECT_EQ(new_storage.container.size(), 3);
EXPECT_EQ(new_storage.container["/hello"].data, "world");
EXPECT_EQ(new_storage.container["/hello/somepath"].data, "somedata");
EXPECT_EQ(new_storage.session_id_counter, 5);
EXPECT_EQ(new_storage.zxid, 156);
EXPECT_EQ(new_storage.ephemerals.size(), 2);
EXPECT_EQ(new_storage.ephemerals[3].size(), 2);
EXPECT_EQ(new_storage.ephemerals[1].size(), 1);
}
TEST(CoordinationTest, TestNuKeeperRaft)
{
NuKeeperRaftServer s1(1, "localhost", 44447);
@ -390,7 +422,29 @@ TEST(CoordinationTest, TestNuKeeperRaft)
EXPECT_EQ(get_responses[0].response->getOpNum(), Coordination::OpNum::Get);
EXPECT_EQ(dynamic_cast<Coordination::ZooKeeperGetResponse *>(get_responses[0].response.get())->data, "world");
NuKeeperRaftServer s4(4, "localhost", 44450);
nuraft::srv_config fourth_config(4, "localhost:44450");
auto ret4 = s2.raft_instance->add_srv(fourth_config);
while (s4.raft_instance->get_leader() != 2)
{
std::cout << "Waiting s1 to join to s2 quorum\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
/// Applied snapshot
EXPECT_EQ(s4.raft_instance->get_leader(), 2);
while (s4.state_machine->getStorage().container.count("/hello") == 0)
{
std::cout << "Waiting s4 to apply entry\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
EXPECT_EQ(s4.state_machine->getStorage().container["/hello"].data, "world");
s1.launcher.shutdown(5);
s2.launcher.shutdown(5);
s3.launcher.shutdown(5);
s4.launcher.shutdown(5);
}