From d5a3adffbd5159845dd522c1d3df2070e6a840e4 Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 20 Jan 2021 19:25:30 +0300 Subject: [PATCH] Replicate something in test keeper storage with raft --- src/Common/ya.make | 2 - src/Coordination/InMemoryLogStore.cpp | 3 +- src/Coordination/ReadBufferFromNuraftBuffer.h | 3 + src/Coordination/SummingStateMachine.cpp | 6 +- .../TestKeeperStorageDispatcher.cpp | 2 +- .../TestKeeperStorageDispatcher.h | 2 +- .../WriteBufferFromNuraftBuffer.cpp | 2 +- src/Coordination/tests/gtest_for_build.cpp | 142 ++++++++++++++++-- 8 files changed, 139 insertions(+), 23 deletions(-) rename src/{Common/ZooKeeper => Coordination}/TestKeeperStorageDispatcher.cpp (98%) rename src/{Common/ZooKeeper => Coordination}/TestKeeperStorageDispatcher.h (96%) diff --git a/src/Common/ya.make b/src/Common/ya.make index 4f2f1892a88..a17b57ebb04 100644 --- a/src/Common/ya.make +++ b/src/Common/ya.make @@ -84,8 +84,6 @@ SRCS( WeakHash.cpp ZooKeeper/IKeeper.cpp ZooKeeper/TestKeeper.cpp - ZooKeeper/TestKeeperStorage.cpp - ZooKeeper/TestKeeperStorageDispatcher.cpp ZooKeeper/ZooKeeper.cpp ZooKeeper/ZooKeeperCommon.cpp ZooKeeper/ZooKeeperConstants.cpp diff --git a/src/Coordination/InMemoryLogStore.cpp b/src/Coordination/InMemoryLogStore.cpp index 9f8d398a110..b9e2e502fc7 100644 --- a/src/Coordination/InMemoryLogStore.cpp +++ b/src/Coordination/InMemoryLogStore.cpp @@ -6,7 +6,8 @@ namespace DB namespace { using namespace nuraft; -ptr makeClone(const ptr & entry) { +ptr makeClone(const ptr & entry) +{ ptr clone = cs_new(entry->get_term(), buffer::clone(entry->get_buf()), entry->get_val_type()); return clone; } diff --git a/src/Coordination/ReadBufferFromNuraftBuffer.h b/src/Coordination/ReadBufferFromNuraftBuffer.h index 392a97bdd8f..cc01d3c8f39 100644 --- a/src/Coordination/ReadBufferFromNuraftBuffer.h +++ b/src/Coordination/ReadBufferFromNuraftBuffer.h @@ -12,6 +12,9 @@ public: explicit ReadBufferFromNuraftBuffer(nuraft::ptr buffer) : ReadBufferFromMemory(buffer->data_begin(), buffer->size()) {} + explicit ReadBufferFromNuraftBuffer(nuraft::buffer & buffer) + : ReadBufferFromMemory(buffer.data_begin(), buffer.size()) + {} }; } diff --git a/src/Coordination/SummingStateMachine.cpp b/src/Coordination/SummingStateMachine.cpp index 16154ca8cd4..bf2a5bb818f 100644 --- a/src/Coordination/SummingStateMachine.cpp +++ b/src/Coordination/SummingStateMachine.cpp @@ -49,7 +49,8 @@ nuraft::ptr SummingStateMachine::last_snapshot() // Just return the latest snapshot. std::lock_guard ll(snapshots_lock); auto entry = snapshots.rbegin(); - if (entry == snapshots.rend()) return nullptr; + if (entry == snapshots.rend()) + return nullptr; auto ctx = entry->second; return ctx->snapshot; @@ -117,7 +118,8 @@ int SummingStateMachine::read_logical_snp_obj( { std::lock_guard ll(snapshots_lock); auto entry = snapshots.find(s.get_last_log_idx()); - if (entry == snapshots.end()) { + if (entry == snapshots.end()) + { // Snapshot doesn't exist. data_out = nullptr; is_last_obj = true; diff --git a/src/Common/ZooKeeper/TestKeeperStorageDispatcher.cpp b/src/Coordination/TestKeeperStorageDispatcher.cpp similarity index 98% rename from src/Common/ZooKeeper/TestKeeperStorageDispatcher.cpp rename to src/Coordination/TestKeeperStorageDispatcher.cpp index b1233fc47e3..1700fa76092 100644 --- a/src/Common/ZooKeeper/TestKeeperStorageDispatcher.cpp +++ b/src/Coordination/TestKeeperStorageDispatcher.cpp @@ -1,4 +1,4 @@ -#include +#include #include namespace DB diff --git a/src/Common/ZooKeeper/TestKeeperStorageDispatcher.h b/src/Coordination/TestKeeperStorageDispatcher.h similarity index 96% rename from src/Common/ZooKeeper/TestKeeperStorageDispatcher.h rename to src/Coordination/TestKeeperStorageDispatcher.h index 27abf17ac73..f8cb06c3ced 100644 --- a/src/Common/ZooKeeper/TestKeeperStorageDispatcher.h +++ b/src/Coordination/TestKeeperStorageDispatcher.h @@ -2,7 +2,7 @@ #include #include -#include +#include #include namespace zkutil diff --git a/src/Coordination/WriteBufferFromNuraftBuffer.cpp b/src/Coordination/WriteBufferFromNuraftBuffer.cpp index 09e1034ae8f..7d0a1dbcbb1 100644 --- a/src/Coordination/WriteBufferFromNuraftBuffer.cpp +++ b/src/Coordination/WriteBufferFromNuraftBuffer.cpp @@ -51,7 +51,7 @@ nuraft::ptr WriteBufferFromNuraftBuffer::getBuffer() return buffer; } - WriteBufferFromNuraftBuffer::~WriteBufferFromNuraftBuffer() +WriteBufferFromNuraftBuffer::~WriteBufferFromNuraftBuffer() { try { diff --git a/src/Coordination/tests/gtest_for_build.cpp b/src/Coordination/tests/gtest_for_build.cpp index 38602e48fae..fa330903ae2 100644 --- a/src/Coordination/tests/gtest_for_build.cpp +++ b/src/Coordination/tests/gtest_for_build.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -12,15 +13,6 @@ #include #include -namespace DB -{ - -namespace ErrorCodes -{ - extern const int LOGICAL_ERROR; -} - -} TEST(CoordinationTest, BuildTest) { @@ -63,14 +55,15 @@ TEST(CoordinationTest, BufferSerde) EXPECT_EQ(dynamic_cast(request_read.get())->path, "/path/value"); } -struct SummingRaftServer +template +struct SimpliestRaftServer { - SummingRaftServer(int server_id_, const std::string & hostname_, int port_) + SimpliestRaftServer(int server_id_, const std::string & hostname_, int port_) : server_id(server_id_) , hostname(hostname_) , port(port_) , endpoint(hostname + ":" + std::to_string(port)) - , state_machine(nuraft::cs_new()) + , state_machine(nuraft::cs_new()) , state_manager(nuraft::cs_new(server_id, endpoint)) { nuraft::raft_params params; @@ -118,7 +111,7 @@ struct SummingRaftServer std::string endpoint; // State machine. - nuraft::ptr state_machine; + nuraft::ptr state_machine; // State manager. nuraft::ptr state_manager; @@ -130,6 +123,8 @@ struct SummingRaftServer nuraft::ptr raft_instance; }; +using SummingRaftServer = SimpliestRaftServer; + nuraft::ptr getLogEntry(int64_t number) { nuraft::ptr ret = nuraft::buffer::alloc(sizeof(number)); @@ -178,7 +173,7 @@ TEST(CoordinationTest, TestSummingRaft3) EXPECT_TRUE(false); } - while(s1.raft_instance->get_leader() != 2) + while (s1.raft_instance->get_leader() != 2) { std::cout << "Waiting s1 to join to s2 quorum\n"; std::this_thread::sleep_for(std::chrono::milliseconds(100)); @@ -193,7 +188,7 @@ TEST(CoordinationTest, TestSummingRaft3) EXPECT_TRUE(false); } - while(s3.raft_instance->get_leader() != 2) + while (s3.raft_instance->get_leader() != 2) { std::cout << "Waiting s3 to join to s2 quorum\n"; std::this_thread::sleep_for(std::chrono::milliseconds(100)); @@ -266,3 +261,120 @@ TEST(CoordinationTest, TestSummingRaft3) s2.launcher.shutdown(5); s3.launcher.shutdown(5); } + +using NuKeeperRaftServer = SimpliestRaftServer; + + +nuraft::ptr getZooKeeperLogEntry(int64_t session_id, const Coordination::ZooKeeperRequestPtr & request) +{ + DB::WriteBufferFromNuraftBuffer buf; + DB::writeIntBinary(session_id, buf); + request->write(buf); + return buf.getBuffer(); +} + +zkutil::TestKeeperStorage::ResponsesForSessions getZooKeeperResponses(nuraft::ptr & buffer, const Coordination::ZooKeeperRequestPtr & request) +{ + zkutil::TestKeeperStorage::ResponsesForSessions results; + DB::ReadBufferFromNuraftBuffer buf(buffer); + while (!buf.eof()) + { + int64_t session_id; + DB::readIntBinary(session_id, buf); + + int32_t length; + Coordination::XID xid; + int64_t zxid; + Coordination::Error err; + + Coordination::read(length, buf); + Coordination::read(xid, buf); + Coordination::read(zxid, buf); + Coordination::read(err, buf); + auto response = request->makeResponse(); + response->readImpl(buf); + results.push_back(zkutil::TestKeeperStorage::ResponseForSession{session_id, response}); + } + return results; +} + +TEST(CoordinationTest, TestNuKeeperRaft) +{ + NuKeeperRaftServer s1(1, "localhost", 44447); + NuKeeperRaftServer s2(2, "localhost", 44448); + NuKeeperRaftServer s3(3, "localhost", 44449); + + nuraft::srv_config first_config(1, "localhost:44447"); + auto ret1 = s2.raft_instance->add_srv(first_config); + + EXPECT_TRUE(ret1->get_accepted()) << "failed to add server: " << ret1->get_result_str() << std::endl; + + while (s1.raft_instance->get_leader() != 2) + { + std::cout << "Waiting s1 to join to s2 quorum\n"; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + nuraft::srv_config third_config(3, "localhost:44449"); + auto ret3 = s2.raft_instance->add_srv(third_config); + + EXPECT_TRUE(ret3->get_accepted()) << "failed to add server: " << ret3->get_result_str() << std::endl; + + while (s3.raft_instance->get_leader() != 2) + { + std::cout << "Waiting s3 to join to s2 quorum\n"; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + /// S2 is leader + EXPECT_EQ(s1.raft_instance->get_leader(), 2); + EXPECT_EQ(s2.raft_instance->get_leader(), 2); + EXPECT_EQ(s3.raft_instance->get_leader(), 2); + + int64_t session_id = 34; + std::shared_ptr create_request = std::make_shared(); + create_request->path = "/hello"; + create_request->data = "world"; + + auto entry1 = getZooKeeperLogEntry(session_id, create_request); + auto ret_leader = s2.raft_instance->append_entries({entry1}); + + EXPECT_TRUE(ret_leader->get_accepted()) << "failed to replicate create entry:" << ret_leader->get_result_code(); + EXPECT_EQ(ret_leader->get_result_code(), nuraft::cmd_result_code::OK) << "failed to replicate create entry:" << ret_leader->get_result_code(); + + auto result = ret_leader.get(); + + auto responses = getZooKeeperResponses(result->get(), create_request); + + EXPECT_EQ(responses.size(), 1); + EXPECT_EQ(responses[0].session_id, 34); + EXPECT_EQ(responses[0].response->getOpNum(), Coordination::OpNum::Create); + EXPECT_EQ(dynamic_cast(responses[0].response.get())->path_created, "/hello"); + + + while (s1.state_machine->getStorage().container.count("/hello") == 0) + { + std::cout << "Waiting s1 to apply entry\n"; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + while (s2.state_machine->getStorage().container.count("/hello") == 0) + { + std::cout << "Waiting s2 to apply entry\n"; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + while (s3.state_machine->getStorage().container.count("/hello") == 0) + { + std::cout << "Waiting s3 to apply entry\n"; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + EXPECT_EQ(s1.state_machine->getStorage().container["/hello"].data, "world"); + EXPECT_EQ(s2.state_machine->getStorage().container["/hello"].data, "world"); + EXPECT_EQ(s3.state_machine->getStorage().container["/hello"].data, "world"); + + s1.launcher.shutdown(5); + s2.launcher.shutdown(5); + s3.launcher.shutdown(5); +}