Replicate something in test keeper storage with raft

This commit is contained in:
alesapin 2021-01-20 19:25:30 +03:00
parent a241793a14
commit d5a3adffbd
8 changed files with 139 additions and 23 deletions

View File

@ -84,8 +84,6 @@ SRCS(
WeakHash.cpp
ZooKeeper/IKeeper.cpp
ZooKeeper/TestKeeper.cpp
ZooKeeper/TestKeeperStorage.cpp
ZooKeeper/TestKeeperStorageDispatcher.cpp
ZooKeeper/ZooKeeper.cpp
ZooKeeper/ZooKeeperCommon.cpp
ZooKeeper/ZooKeeperConstants.cpp

View File

@ -6,7 +6,8 @@ namespace DB
namespace
{
using namespace nuraft;
ptr<log_entry> makeClone(const ptr<log_entry> & entry) {
ptr<log_entry> makeClone(const ptr<log_entry> & entry)
{
ptr<log_entry> clone = cs_new<log_entry>(entry->get_term(), buffer::clone(entry->get_buf()), entry->get_val_type());
return clone;
}

View File

@ -12,6 +12,9 @@ public:
explicit ReadBufferFromNuraftBuffer(nuraft::ptr<nuraft::buffer> buffer)
: ReadBufferFromMemory(buffer->data_begin(), buffer->size())
{}
explicit ReadBufferFromNuraftBuffer(nuraft::buffer & buffer)
: ReadBufferFromMemory(buffer.data_begin(), buffer.size())
{}
};
}

View File

@ -49,7 +49,8 @@ nuraft::ptr<nuraft::snapshot> SummingStateMachine::last_snapshot()
// Just return the latest snapshot.
std::lock_guard<std::mutex> ll(snapshots_lock);
auto entry = snapshots.rbegin();
if (entry == snapshots.rend()) return nullptr;
if (entry == snapshots.rend())
return nullptr;
auto ctx = entry->second;
return ctx->snapshot;
@ -117,7 +118,8 @@ int SummingStateMachine::read_logical_snp_obj(
{
std::lock_guard<std::mutex> ll(snapshots_lock);
auto entry = snapshots.find(s.get_last_log_idx());
if (entry == snapshots.end()) {
if (entry == snapshots.end())
{
// Snapshot doesn't exist.
data_out = nullptr;
is_last_obj = true;

View File

@ -1,4 +1,4 @@
#include <Common/ZooKeeper/TestKeeperStorageDispatcher.h>
#include <Coordination/TestKeeperStorageDispatcher.h>
#include <Common/setThreadName.h>
namespace DB

View File

@ -2,7 +2,7 @@
#include <Common/ThreadPool.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/ZooKeeper/TestKeeperStorage.h>
#include <Coordination/TestKeeperStorage.h>
#include <functional>
namespace zkutil

View File

@ -51,7 +51,7 @@ nuraft::ptr<nuraft::buffer> WriteBufferFromNuraftBuffer::getBuffer()
return buffer;
}
WriteBufferFromNuraftBuffer::~WriteBufferFromNuraftBuffer()
WriteBufferFromNuraftBuffer::~WriteBufferFromNuraftBuffer()
{
try
{

View File

@ -3,6 +3,7 @@
#include <Coordination/InMemoryLogStore.h>
#include <Coordination/InMemoryStateManager.h>
#include <Coordination/SummingStateMachine.h>
#include <Coordination/NuKeeperStateMachine.h>
#include <Coordination/LoggerWrapper.h>
#include <Coordination/WriteBufferFromNuraftBuffer.h>
#include <Coordination/ReadBufferFromNuraftBuffer.h>
@ -12,15 +13,6 @@
#include <libnuraft/nuraft.hxx>
#include <thread>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
}
TEST(CoordinationTest, BuildTest)
{
@ -63,14 +55,15 @@ TEST(CoordinationTest, BufferSerde)
EXPECT_EQ(dynamic_cast<Coordination::ZooKeeperGetRequest *>(request_read.get())->path, "/path/value");
}
struct SummingRaftServer
template <typename StateMachine>
struct SimpliestRaftServer
{
SummingRaftServer(int server_id_, const std::string & hostname_, int port_)
SimpliestRaftServer(int server_id_, const std::string & hostname_, int port_)
: server_id(server_id_)
, hostname(hostname_)
, port(port_)
, endpoint(hostname + ":" + std::to_string(port))
, state_machine(nuraft::cs_new<DB::SummingStateMachine>())
, state_machine(nuraft::cs_new<StateMachine>())
, state_manager(nuraft::cs_new<DB::InMemoryStateManager>(server_id, endpoint))
{
nuraft::raft_params params;
@ -118,7 +111,7 @@ struct SummingRaftServer
std::string endpoint;
// State machine.
nuraft::ptr<DB::SummingStateMachine> state_machine;
nuraft::ptr<StateMachine> state_machine;
// State manager.
nuraft::ptr<nuraft::state_mgr> state_manager;
@ -130,6 +123,8 @@ struct SummingRaftServer
nuraft::ptr<nuraft::raft_server> raft_instance;
};
using SummingRaftServer = SimpliestRaftServer<DB::SummingStateMachine>;
nuraft::ptr<nuraft::buffer> getLogEntry(int64_t number)
{
nuraft::ptr<nuraft::buffer> ret = nuraft::buffer::alloc(sizeof(number));
@ -178,7 +173,7 @@ TEST(CoordinationTest, TestSummingRaft3)
EXPECT_TRUE(false);
}
while(s1.raft_instance->get_leader() != 2)
while (s1.raft_instance->get_leader() != 2)
{
std::cout << "Waiting s1 to join to s2 quorum\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
@ -193,7 +188,7 @@ TEST(CoordinationTest, TestSummingRaft3)
EXPECT_TRUE(false);
}
while(s3.raft_instance->get_leader() != 2)
while (s3.raft_instance->get_leader() != 2)
{
std::cout << "Waiting s3 to join to s2 quorum\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
@ -266,3 +261,120 @@ TEST(CoordinationTest, TestSummingRaft3)
s2.launcher.shutdown(5);
s3.launcher.shutdown(5);
}
using NuKeeperRaftServer = SimpliestRaftServer<DB::NuKeeperStateMachine>;
nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(int64_t session_id, const Coordination::ZooKeeperRequestPtr & request)
{
DB::WriteBufferFromNuraftBuffer buf;
DB::writeIntBinary(session_id, buf);
request->write(buf);
return buf.getBuffer();
}
zkutil::TestKeeperStorage::ResponsesForSessions getZooKeeperResponses(nuraft::ptr<nuraft::buffer> & buffer, const Coordination::ZooKeeperRequestPtr & request)
{
zkutil::TestKeeperStorage::ResponsesForSessions results;
DB::ReadBufferFromNuraftBuffer buf(buffer);
while (!buf.eof())
{
int64_t session_id;
DB::readIntBinary(session_id, buf);
int32_t length;
Coordination::XID xid;
int64_t zxid;
Coordination::Error err;
Coordination::read(length, buf);
Coordination::read(xid, buf);
Coordination::read(zxid, buf);
Coordination::read(err, buf);
auto response = request->makeResponse();
response->readImpl(buf);
results.push_back(zkutil::TestKeeperStorage::ResponseForSession{session_id, response});
}
return results;
}
TEST(CoordinationTest, TestNuKeeperRaft)
{
NuKeeperRaftServer s1(1, "localhost", 44447);
NuKeeperRaftServer s2(2, "localhost", 44448);
NuKeeperRaftServer s3(3, "localhost", 44449);
nuraft::srv_config first_config(1, "localhost:44447");
auto ret1 = s2.raft_instance->add_srv(first_config);
EXPECT_TRUE(ret1->get_accepted()) << "failed to add server: " << ret1->get_result_str() << std::endl;
while (s1.raft_instance->get_leader() != 2)
{
std::cout << "Waiting s1 to join to s2 quorum\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
nuraft::srv_config third_config(3, "localhost:44449");
auto ret3 = s2.raft_instance->add_srv(third_config);
EXPECT_TRUE(ret3->get_accepted()) << "failed to add server: " << ret3->get_result_str() << std::endl;
while (s3.raft_instance->get_leader() != 2)
{
std::cout << "Waiting s3 to join to s2 quorum\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
/// S2 is leader
EXPECT_EQ(s1.raft_instance->get_leader(), 2);
EXPECT_EQ(s2.raft_instance->get_leader(), 2);
EXPECT_EQ(s3.raft_instance->get_leader(), 2);
int64_t session_id = 34;
std::shared_ptr<Coordination::ZooKeeperCreateRequest> create_request = std::make_shared<Coordination::ZooKeeperCreateRequest>();
create_request->path = "/hello";
create_request->data = "world";
auto entry1 = getZooKeeperLogEntry(session_id, create_request);
auto ret_leader = s2.raft_instance->append_entries({entry1});
EXPECT_TRUE(ret_leader->get_accepted()) << "failed to replicate create entry:" << ret_leader->get_result_code();
EXPECT_EQ(ret_leader->get_result_code(), nuraft::cmd_result_code::OK) << "failed to replicate create entry:" << ret_leader->get_result_code();
auto result = ret_leader.get();
auto responses = getZooKeeperResponses(result->get(), create_request);
EXPECT_EQ(responses.size(), 1);
EXPECT_EQ(responses[0].session_id, 34);
EXPECT_EQ(responses[0].response->getOpNum(), Coordination::OpNum::Create);
EXPECT_EQ(dynamic_cast<Coordination::ZooKeeperCreateResponse *>(responses[0].response.get())->path_created, "/hello");
while (s1.state_machine->getStorage().container.count("/hello") == 0)
{
std::cout << "Waiting s1 to apply entry\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
while (s2.state_machine->getStorage().container.count("/hello") == 0)
{
std::cout << "Waiting s2 to apply entry\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
while (s3.state_machine->getStorage().container.count("/hello") == 0)
{
std::cout << "Waiting s3 to apply entry\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
EXPECT_EQ(s1.state_machine->getStorage().container["/hello"].data, "world");
EXPECT_EQ(s2.state_machine->getStorage().container["/hello"].data, "world");
EXPECT_EQ(s3.state_machine->getStorage().container["/hello"].data, "world");
s1.launcher.shutdown(5);
s2.launcher.shutdown(5);
s3.launcher.shutdown(5);
}