Now we answer from follower nodes

This commit is contained in:
alesapin 2021-01-29 15:39:04 +03:00
parent 6781c9f61d
commit bac8cc55d2
4 changed files with 13 additions and 19 deletions

View File

@ -47,6 +47,7 @@ void NuKeeperServer::startup()
params.reserved_log_items_ = 5000; params.reserved_log_items_ = 5000;
params.snapshot_distance_ = 5000; params.snapshot_distance_ = 5000;
params.client_req_timeout_ = 10000; params.client_req_timeout_ = 10000;
params.auto_forwarding_ = true;
params.return_method_ = nuraft::raft_params::blocking; params.return_method_ = nuraft::raft_params::blocking;
raft_instance = launcher.init( raft_instance = launcher.init(
@ -146,7 +147,7 @@ TestKeeperStorage::ResponsesForSessions NuKeeperServer::readZooKeeperResponses(n
TestKeeperStorage::ResponsesForSessions NuKeeperServer::putRequests(const TestKeeperStorage::RequestsForSessions & requests) TestKeeperStorage::ResponsesForSessions NuKeeperServer::putRequests(const TestKeeperStorage::RequestsForSessions & requests)
{ {
if (isLeader() && requests.size() == 1 && requests[0].request->isReadRequest()) if (raft_instance->is_leader_alive() && requests.size() == 1 && requests[0].request->isReadRequest())
{ {
return state_machine->processReadRequest(requests[0]); return state_machine->processReadRequest(requests[0]);
} }

View File

@ -175,12 +175,9 @@ void TestKeeperStorageDispatcher::shutdown()
if (server) if (server)
{ {
TestKeeperStorage::RequestsForSessions expired_requests; TestKeeperStorage::RequestsForSessions expired_requests;
if (server->isLeader()) TestKeeperStorage::RequestForSession request;
{ while (requests_queue.tryPop(request))
TestKeeperStorage::RequestForSession request; expired_requests.push_back(TestKeeperStorage::RequestForSession{request});
while (requests_queue.tryPop(request))
expired_requests.push_back(TestKeeperStorage::RequestForSession{request});
}
auto expired_responses = server->shutdown(expired_requests); auto expired_responses = server->shutdown(expired_requests);

View File

@ -232,14 +232,10 @@ TestKeeperTCPHandler::TestKeeperTCPHandler(IServer & server_, const Poco::Net::S
{ {
} }
void TestKeeperTCPHandler::sendHandshake(bool is_leader) void TestKeeperTCPHandler::sendHandshake()
{ {
Coordination::write(Coordination::SERVER_HANDSHAKE_LENGTH, *out); Coordination::write(Coordination::SERVER_HANDSHAKE_LENGTH, *out);
if (is_leader) Coordination::write(Coordination::ZOOKEEPER_PROTOCOL_VERSION, *out);
Coordination::write(Coordination::ZOOKEEPER_PROTOCOL_VERSION, *out);
else /// Specially ignore connections if we are not leader, client will throw exception
Coordination::write(42, *out);
Coordination::write(Coordination::DEFAULT_SESSION_TIMEOUT_MS, *out); Coordination::write(Coordination::DEFAULT_SESSION_TIMEOUT_MS, *out);
Coordination::write(session_id, *out); Coordination::write(session_id, *out);
std::array<char, Coordination::PASSWORD_LENGTH> passwd{}; std::array<char, Coordination::PASSWORD_LENGTH> passwd{};
@ -319,18 +315,18 @@ void TestKeeperTCPHandler::runImpl()
return; return;
} }
if (test_keeper_storage_dispatcher->isLeader()) try
{ {
session_id = test_keeper_storage_dispatcher->getSessionID(); session_id = test_keeper_storage_dispatcher->getSessionID();
sendHandshake(true);
} }
else catch (const Exception & e)
{ {
sendHandshake(false); LOG_WARNING(log, "Cannot receive session id {}", e.displayText());
LOG_WARNING(log, "Ignoring connection because we are not leader");
return; return;
} }
sendHandshake();
auto response_fd = poll_wrapper->getResponseFD(); auto response_fd = poll_wrapper->getResponseFD();
auto response_callback = [this, response_fd] (const Coordination::ZooKeeperResponsePtr & response) auto response_callback = [this, response_fd] (const Coordination::ZooKeeperResponsePtr & response)
{ {

View File

@ -45,7 +45,7 @@ private:
void runImpl(); void runImpl();
void sendHandshake(bool is_leader); void sendHandshake();
void receiveHandshake(); void receiveHandshake();
std::pair<Coordination::OpNum, Coordination::XID> receiveRequest(); std::pair<Coordination::OpNum, Coordination::XID> receiveRequest();