diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 94cd6854f78..df1513e6b65 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -830,6 +830,9 @@ int Server::main(const std::vector & /*args*/) listen_try = true; } + /// Initialize test keeper raft + global_context->getTestKeeperStorageDispatcher(); + for (const auto & listen_host : listen_hosts) { /// TCP TestKeeper diff --git a/src/Coordination/NuKeeperServer.cpp b/src/Coordination/NuKeeperServer.cpp index 162e521f1c8..2aefc215451 100644 --- a/src/Coordination/NuKeeperServer.cpp +++ b/src/Coordination/NuKeeperServer.cpp @@ -1,13 +1,165 @@ #include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include namespace DB { -void NuKeeperServer::addServer(int server_id_, const std::string & server_uri) + +NuKeeperServer::NuKeeperServer(int server_id_, const std::string & hostname_, int port_) + : server_id(server_id_) + , hostname(hostname_) + , port(port_) + , endpoint(hostname + ":" + std::to_string(port)) + , state_machine(nuraft::cs_new()) + , state_manager(nuraft::cs_new(server_id, endpoint)) { - if (raft_instance->is_leader()) +} + +NuraftError NuKeeperServer::addServer(int server_id_, const std::string & server_uri_) +{ + nuraft::srv_config config(server_id_, server_uri_); + auto ret1 = raft_instance->add_srv(config); + return NuraftError{ret1->get_result_code(), ret1->get_result_str()}; +} + + +NuraftError NuKeeperServer::startup() +{ + nuraft::raft_params params; + params.heart_beat_interval_ = 100; + params.election_timeout_lower_bound_ = 200; + params.election_timeout_upper_bound_ = 400; + params.reserved_log_items_ = 5; + params.snapshot_distance_ = 5; + params.client_req_timeout_ = 3000; + params.return_method_ = nuraft::raft_params::blocking; + + raft_instance = launcher.init( + state_machine, state_manager, nuraft::cs_new("RaftInstance"), port, + nuraft::asio_service::options{}, params); + + if (!raft_instance) + return NuraftError{nuraft::cmd_result_code::TIMEOUT, "Cannot create RAFT instance"}; + + static constexpr auto MAX_RETRY = 30; + for (size_t i = 0; i < MAX_RETRY; ++i) { - nuraft::srv_config first_config(server_id, server_uri); + if (raft_instance->is_initialized()) + return NuraftError{nuraft::cmd_result_code::OK, ""}; + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } + return NuraftError{nuraft::cmd_result_code::TIMEOUT, "Cannot start RAFT instance"}; +} + +NuraftError NuKeeperServer::shutdown() +{ + if (!launcher.shutdown(5)) + return NuraftError{nuraft::cmd_result_code::TIMEOUT, "Temout waiting RAFT instance to shutdown"}; + return NuraftError{nuraft::cmd_result_code::OK, ""}; +} + +namespace +{ + +nuraft::ptr getZooKeeperLogEntry(int64_t session_id, const Coordination::ZooKeeperRequestPtr & request) +{ + DB::WriteBufferFromNuraftBuffer buf; + DB::writeIntBinary(session_id, buf); + request->write(buf); + return buf.getBuffer(); +} + +} + +TestKeeperStorage::ResponsesForSessions NuKeeperServer::readZooKeeperResponses(nuraft::ptr & buffer) +{ + DB::TestKeeperStorage::ResponsesForSessions results; + DB::ReadBufferFromNuraftBuffer buf(buffer); + + while (!buf.eof()) + { + int64_t session_id; + DB::readIntBinary(session_id, buf); + int32_t length; + Coordination::XID xid; + int64_t zxid; + Coordination::Error err; + + Coordination::read(length, buf); + Coordination::read(xid, buf); + Coordination::read(zxid, buf); + Coordination::read(err, buf); + Coordination::ZooKeeperResponsePtr response; + + if (xid == Coordination::WATCH_XID) + response = std::make_shared(); + else + { + response = ops_mapping[session_id][xid]; + ops_mapping[session_id].erase(xid); + if (ops_mapping[session_id].empty()) + ops_mapping.erase(session_id); + } + + if (err == Coordination::Error::ZOK && (xid == Coordination::WATCH_XID || response->getOpNum() != Coordination::OpNum::Close)) + response->readImpl(buf); + + response->xid = xid; + response->zxid = zxid; + response->error = err; + + results.push_back(DB::TestKeeperStorage::ResponseForSession{session_id, response}); + } + return results; +} + +TestKeeperStorage::ResponsesForSessions NuKeeperServer::putRequests(const TestKeeperStorage::RequestsForSessions & requests) +{ + std::vector> entries; + for (auto & [session_id, request] : requests) + { + ops_mapping[session_id][request->xid] = request->makeResponse(); + entries.push_back(getZooKeeperLogEntry(session_id, request)); + } + + auto result = raft_instance->append_entries(entries); + if (!result->get_accepted()) + return {}; + + if (result->get_result_code() != nuraft::cmd_result_code::OK) + return {}; + + return readZooKeeperResponses(result->get()); +} + + +int64_t NuKeeperServer::getSessionID() +{ + auto entry = nuraft::buffer::alloc(sizeof(size_t)); + nuraft::buffer_serializer bs(entry); + bs.put_i64(0); + + auto result = raft_instance->append_entries({entry}); + if (!result->get_accepted()) + return -1; + + if (result->get_result_code() != nuraft::cmd_result_code::OK) + return -1; + + auto resp = result->get(); + nuraft::buffer_serializer bs_resp(resp); + return bs_resp.get_i64(); +} + } diff --git a/src/Coordination/NuKeeperServer.h b/src/Coordination/NuKeeperServer.h index 0dc536b1593..c77a7a8be0a 100644 --- a/src/Coordination/NuKeeperServer.h +++ b/src/Coordination/NuKeeperServer.h @@ -5,10 +5,17 @@ #include #include #include +#include namespace DB { +struct NuraftError +{ + nuraft::cmd_result_code code; + std::string message; +}; + class NuKeeperServer { private: @@ -20,7 +27,7 @@ private: std::string endpoint; - nuraft::ptr state_machine; + nuraft::ptr state_machine; nuraft::ptr state_manager; @@ -28,16 +35,26 @@ private: nuraft::ptr raft_instance; -public: - NuKeeperServer(int server_id, const std::string & hostname, int port); + using XIDToOp = std::unordered_map; - void startup(); + using SessionIDOps = std::unordered_map; + + SessionIDOps ops_mapping; + + TestKeeperStorage::ResponsesForSessions readZooKeeperResponses(nuraft::ptr & buffer); + +public: + NuKeeperServer(int server_id_, const std::string & hostname_, int port_); + + NuraftError startup(); TestKeeperStorage::ResponsesForSessions putRequests(const TestKeeperStorage::RequestsForSessions & requests); - void addServer(int server_id_, const std::string & server_uri); + int64_t getSessionID(); - void shutdown(); + NuraftError addServer(int server_id_, const std::string & server_uri); + + NuraftError shutdown(); }; } diff --git a/src/Coordination/NuKeeperStateMachine.cpp b/src/Coordination/NuKeeperStateMachine.cpp index 136ead44596..79324c91cd3 100644 --- a/src/Coordination/NuKeeperStateMachine.cpp +++ b/src/Coordination/NuKeeperStateMachine.cpp @@ -51,15 +51,32 @@ NuKeeperStateMachine::NuKeeperStateMachine() nuraft::ptr NuKeeperStateMachine::commit(const size_t log_idx, nuraft::buffer & data) { LOG_DEBUG(log, "Commiting logidx {}", log_idx); - auto request_for_session = parseRequest(data); - TestKeeperStorage::ResponsesForSessions responses_for_sessions; + if (data.size() == sizeof(size_t)) { - std::lock_guard lock(storage_lock); - responses_for_sessions = storage.processRequest(request_for_session.request, request_for_session.session_id); + LOG_DEBUG(log, "Session ID response {}", log_idx); + auto response = nuraft::buffer::alloc(sizeof(size_t)); + nuraft::buffer_serializer bs(response); + { + std::lock_guard lock(storage_lock); + bs.put_i64(storage.getSessionID()); + } + last_committed_idx = log_idx; + return response; } + else + { + auto request_for_session = parseRequest(data); + //LOG_DEBUG(log, "GOT REQUEST {}", Coordination::toString(request_for_session.request->getOpNum())); + TestKeeperStorage::ResponsesForSessions responses_for_sessions; + { + std::lock_guard lock(storage_lock); + responses_for_sessions = storage.processRequest(request_for_session.request, request_for_session.session_id); + } + //LOG_DEBUG(log, "TOTAL RESPONSES {} FIRST XID {}", responses_for_sessions.size(), responses_for_sessions[0].response->xid); - last_committed_idx = log_idx; - return writeResponses(responses_for_sessions); + last_committed_idx = log_idx; + return writeResponses(responses_for_sessions); + } } bool NuKeeperStateMachine::apply_snapshot(nuraft::snapshot & s) diff --git a/src/Coordination/TestKeeperStorage.cpp b/src/Coordination/TestKeeperStorage.cpp index ef3ae1dfd16..ef72f5d4eaa 100644 --- a/src/Coordination/TestKeeperStorage.cpp +++ b/src/Coordination/TestKeeperStorage.cpp @@ -519,6 +519,7 @@ TestKeeperStorage::ResponsesForSessions TestKeeperStorage::finalize(const Reques finalized = true; + /// TODO delete ephemerals ResponsesForSessions finalize_results; auto finish_watch = [] (const auto & watch_pair) -> ResponsesForSessions { diff --git a/src/Coordination/TestKeeperStorageDispatcher.cpp b/src/Coordination/TestKeeperStorageDispatcher.cpp index 63cb5920f9b..9cc40f6e5c3 100644 --- a/src/Coordination/TestKeeperStorageDispatcher.cpp +++ b/src/Coordination/TestKeeperStorageDispatcher.cpp @@ -18,16 +18,16 @@ void TestKeeperStorageDispatcher::processingThread() { while (!shutdown) { - RequestInfo info; + TestKeeperStorage::RequestForSession request; UInt64 max_wait = UInt64(operation_timeout.totalMilliseconds()); - if (requests_queue.tryPop(info, max_wait)) + if (requests_queue.tryPop(request, max_wait)) { if (shutdown) break; - auto responses = storage.processRequest(info.request, info.session_id); + auto responses = server.putRequests({request}); for (const auto & response_for_session : responses) setResponse(response_for_session.session_id, response_for_session.response); } @@ -67,15 +67,17 @@ void TestKeeperStorageDispatcher::finalize() processing_thread.join(); } - RequestInfo info; - TestKeeperStorage::RequestsForSessions expired_requests; - while (requests_queue.tryPop(info)) - expired_requests.push_back(TestKeeperStorage::RequestForSession{info.session_id, info.request}); + //TestKeeperStorage::RequestsForSessions expired_requests; + //TestKeeperStorage::RequestForSession request; + //while (requests_queue.tryPop(request)) + // expired_requests.push_back(TestKeeperStorage::RequestForSession{request}); - auto expired_responses = storage.finalize(expired_requests); + //auto expired_responses = storage.finalize(expired_requests); - for (const auto & response_for_session : expired_responses) - setResponse(response_for_session.session_id, response_for_session.response); + //for (const auto & response_for_session : expired_responses) + // setResponse(response_for_session.session_id, response_for_session.response); + /// TODO FIXME + server.shutdown(); } void TestKeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id) @@ -87,8 +89,7 @@ void TestKeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperReques throw Exception(DB::ErrorCodes::LOGICAL_ERROR, "Unknown session id {}", session_id); } - RequestInfo request_info; - request_info.time = clock::now(); + TestKeeperStorage::RequestForSession request_info; request_info.request = request; request_info.session_id = session_id; @@ -101,7 +102,9 @@ void TestKeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperReques } TestKeeperStorageDispatcher::TestKeeperStorageDispatcher() + : server(1, "localhost", 44444) { + server.startup(); processing_thread = ThreadFromGlobalPool([this] { processingThread(); }); } diff --git a/src/Coordination/TestKeeperStorageDispatcher.h b/src/Coordination/TestKeeperStorageDispatcher.h index c1c739db87d..ef788a16369 100644 --- a/src/Coordination/TestKeeperStorageDispatcher.h +++ b/src/Coordination/TestKeeperStorageDispatcher.h @@ -2,8 +2,9 @@ #include #include -#include #include +#include +#include namespace DB { @@ -17,16 +18,9 @@ private: using clock = std::chrono::steady_clock; - struct RequestInfo - { - Coordination::ZooKeeperRequestPtr request; - clock::time_point time; - int64_t session_id; - }; - std::mutex push_request_mutex; - using RequestsQueue = ConcurrentBoundedQueue; + using RequestsQueue = ConcurrentBoundedQueue; RequestsQueue requests_queue{1}; std::atomic shutdown{false}; using SessionToResponseCallback = std::unordered_map; @@ -36,7 +30,7 @@ private: ThreadFromGlobalPool processing_thread; - TestKeeperStorage storage; + NuKeeperServer server; std::mutex session_id_mutex; private: @@ -46,6 +40,7 @@ private: public: TestKeeperStorageDispatcher(); + ~TestKeeperStorageDispatcher(); void putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id); @@ -53,7 +48,7 @@ public: int64_t getSessionID() { std::lock_guard lock(session_id_mutex); - return storage.getSessionID(); + return server.getSessionID(); } void registerSession(int64_t session_id, ZooKeeperResponseCallback callback); diff --git a/utils/zookeeper-test/main.cpp b/utils/zookeeper-test/main.cpp index 8f8aac00866..bfd7df26726 100644 --- a/utils/zookeeper-test/main.cpp +++ b/utils/zookeeper-test/main.cpp @@ -127,18 +127,22 @@ void testCreateListWatchEvent(zkutil::ZooKeeper & zk) void testMultiRequest(zkutil::ZooKeeper & zk) { + std::cerr << "Testing multi request\n"; Coordination::Requests requests; requests.push_back(zkutil::makeCreateRequest("/data/multirequest", "aaa", zkutil::CreateMode::Persistent)); requests.push_back(zkutil::makeSetRequest("/data/multirequest", "bbb", -1)); zk.multi(requests); + std::cerr << "Multi executed\n"; try { requests.clear(); + std::cerr << "Testing bad multi\n"; requests.push_back(zkutil::makeCreateRequest("/data/multirequest", "qweqwe", zkutil::CreateMode::Persistent)); requests.push_back(zkutil::makeSetRequest("/data/multirequest", "bbb", -1)); requests.push_back(zkutil::makeSetRequest("/data/multirequest", "ccc", -1)); zk.multi(requests); + std::cerr << "Bad multi executed\n"; std::terminate(); } catch (...) @@ -147,6 +151,7 @@ void testMultiRequest(zkutil::ZooKeeper & zk) } checkEq(zk, "/data/multirequest", "bbb"); + std::cerr << "Multi request finished\n"; } std::mutex elements_mutex;