From bac8cc55d2c48404a4b6b85ca09d15114620ef52 Mon Sep 17 00:00:00 2001 From: alesapin Date: Fri, 29 Jan 2021 15:39:04 +0300 Subject: [PATCH] Now we answer from follower nodes --- src/Coordination/NuKeeperServer.cpp | 3 ++- .../TestKeeperStorageDispatcher.cpp | 9 +++------ src/Server/TestKeeperTCPHandler.cpp | 18 +++++++----------- src/Server/TestKeeperTCPHandler.h | 2 +- 4 files changed, 13 insertions(+), 19 deletions(-) diff --git a/src/Coordination/NuKeeperServer.cpp b/src/Coordination/NuKeeperServer.cpp index 8995b51a13b..bcc348d1be3 100644 --- a/src/Coordination/NuKeeperServer.cpp +++ b/src/Coordination/NuKeeperServer.cpp @@ -47,6 +47,7 @@ void NuKeeperServer::startup() params.reserved_log_items_ = 5000; params.snapshot_distance_ = 5000; params.client_req_timeout_ = 10000; + params.auto_forwarding_ = true; params.return_method_ = nuraft::raft_params::blocking; raft_instance = launcher.init( @@ -146,7 +147,7 @@ TestKeeperStorage::ResponsesForSessions NuKeeperServer::readZooKeeperResponses(n TestKeeperStorage::ResponsesForSessions NuKeeperServer::putRequests(const TestKeeperStorage::RequestsForSessions & requests) { - if (isLeader() && requests.size() == 1 && requests[0].request->isReadRequest()) + if (raft_instance->is_leader_alive() && requests.size() == 1 && requests[0].request->isReadRequest()) { return state_machine->processReadRequest(requests[0]); } diff --git a/src/Coordination/TestKeeperStorageDispatcher.cpp b/src/Coordination/TestKeeperStorageDispatcher.cpp index 685fa58f8ad..d5682e1688b 100644 --- a/src/Coordination/TestKeeperStorageDispatcher.cpp +++ b/src/Coordination/TestKeeperStorageDispatcher.cpp @@ -175,12 +175,9 @@ void TestKeeperStorageDispatcher::shutdown() if (server) { TestKeeperStorage::RequestsForSessions expired_requests; - if (server->isLeader()) - { - TestKeeperStorage::RequestForSession request; - while (requests_queue.tryPop(request)) - expired_requests.push_back(TestKeeperStorage::RequestForSession{request}); - } + TestKeeperStorage::RequestForSession request; + while (requests_queue.tryPop(request)) + expired_requests.push_back(TestKeeperStorage::RequestForSession{request}); auto expired_responses = server->shutdown(expired_requests); diff --git a/src/Server/TestKeeperTCPHandler.cpp b/src/Server/TestKeeperTCPHandler.cpp index 04e5c6ece1d..5e5ba19f1a6 100644 --- a/src/Server/TestKeeperTCPHandler.cpp +++ b/src/Server/TestKeeperTCPHandler.cpp @@ -232,14 +232,10 @@ TestKeeperTCPHandler::TestKeeperTCPHandler(IServer & server_, const Poco::Net::S { } -void TestKeeperTCPHandler::sendHandshake(bool is_leader) +void TestKeeperTCPHandler::sendHandshake() { Coordination::write(Coordination::SERVER_HANDSHAKE_LENGTH, *out); - if (is_leader) - Coordination::write(Coordination::ZOOKEEPER_PROTOCOL_VERSION, *out); - else /// Specially ignore connections if we are not leader, client will throw exception - Coordination::write(42, *out); - + Coordination::write(Coordination::ZOOKEEPER_PROTOCOL_VERSION, *out); Coordination::write(Coordination::DEFAULT_SESSION_TIMEOUT_MS, *out); Coordination::write(session_id, *out); std::array passwd{}; @@ -319,18 +315,18 @@ void TestKeeperTCPHandler::runImpl() return; } - if (test_keeper_storage_dispatcher->isLeader()) + try { session_id = test_keeper_storage_dispatcher->getSessionID(); - sendHandshake(true); } - else + catch (const Exception & e) { - sendHandshake(false); - LOG_WARNING(log, "Ignoring connection because we are not leader"); + LOG_WARNING(log, "Cannot receive session id {}", e.displayText()); return; } + sendHandshake(); + auto response_fd = poll_wrapper->getResponseFD(); auto response_callback = [this, response_fd] (const Coordination::ZooKeeperResponsePtr & response) { diff --git a/src/Server/TestKeeperTCPHandler.h b/src/Server/TestKeeperTCPHandler.h index bb74513afce..e7372e8dd82 100644 --- a/src/Server/TestKeeperTCPHandler.h +++ b/src/Server/TestKeeperTCPHandler.h @@ -45,7 +45,7 @@ private: void runImpl(); - void sendHandshake(bool is_leader); + void sendHandshake(); void receiveHandshake(); std::pair receiveRequest();