From 8c63f0f8e20a6996f7d0b18ea5e8026f359a63ab Mon Sep 17 00:00:00 2001 From: alesapin Date: Sat, 22 May 2021 10:46:12 +0300 Subject: [PATCH] Fix race condition on keeper shutdown --- src/Coordination/KeeperStorageDispatcher.cpp | 4 ++++ src/Server/KeeperTCPHandler.cpp | 16 ++++++++-------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/Coordination/KeeperStorageDispatcher.cpp b/src/Coordination/KeeperStorageDispatcher.cpp index 14a44ee6f3f..7da7c22c8a3 100644 --- a/src/Coordination/KeeperStorageDispatcher.cpp +++ b/src/Coordination/KeeperStorageDispatcher.cpp @@ -226,6 +226,10 @@ bool KeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr request_info.session_id = session_id; std::lock_guard lock(push_request_mutex); + + if (shutdown_called) + return false; + /// Put close requests without timeouts if (request->getOpNum() == Coordination::OpNum::Close) requests_queue->push(std::move(request_info)); diff --git a/src/Server/KeeperTCPHandler.cpp b/src/Server/KeeperTCPHandler.cpp index 1dadd3437f7..2d5f41fe666 100644 --- a/src/Server/KeeperTCPHandler.cpp +++ b/src/Server/KeeperTCPHandler.cpp @@ -194,7 +194,7 @@ KeeperTCPHandler::KeeperTCPHandler(IServer & server_, const Poco::Net::StreamSoc , server(server_) , log(&Poco::Logger::get("NuKeeperTCPHandler")) , global_context(Context::createCopy(server.context())) - , nu_keeper_storage_dispatcher(global_context->getKeeperStorageDispatcher()) + , keeper_dispatcher(global_context->getKeeperStorageDispatcher()) , operation_timeout(0, global_context->getConfigRef().getUInt("test_keeper_server.operation_timeout_ms", Coordination::DEFAULT_OPERATION_TIMEOUT_MS) * 1000) , session_timeout(0, global_context->getConfigRef().getUInt("test_keeper_server.session_timeout_ms", Coordination::DEFAULT_SESSION_TIMEOUT_MS) * 1000) , poll_wrapper(std::make_unique(socket_)) @@ -286,12 +286,12 @@ void KeeperTCPHandler::runImpl() return; } - if (nu_keeper_storage_dispatcher->hasLeader()) + if (keeper_dispatcher->hasLeader()) { try { LOG_INFO(log, "Requesting session ID for the new client"); - session_id = nu_keeper_storage_dispatcher->getSessionID(session_timeout.totalMilliseconds()); + session_id = keeper_dispatcher->getSessionID(session_timeout.totalMilliseconds()); LOG_INFO(log, "Received session ID {}", session_id); } catch (const Exception & e) @@ -318,7 +318,7 @@ void KeeperTCPHandler::runImpl() UInt8 single_byte = 1; [[maybe_unused]] int result = write(response_fd, &single_byte, sizeof(single_byte)); }; - nu_keeper_storage_dispatcher->registerSession(session_id, response_callback); + keeper_dispatcher->registerSession(session_id, response_callback); session_stopwatch.start(); bool close_received = false; @@ -368,7 +368,7 @@ void KeeperTCPHandler::runImpl() if (response->error == Coordination::Error::ZSESSIONEXPIRED) { LOG_DEBUG(log, "Session #{} expired because server shutting down or quorum is not alive", session_id); - nu_keeper_storage_dispatcher->finishSession(session_id); + keeper_dispatcher->finishSession(session_id); return; } @@ -381,7 +381,7 @@ void KeeperTCPHandler::runImpl() if (session_stopwatch.elapsedMicroseconds() > static_cast(session_timeout.totalMicroseconds())) { LOG_DEBUG(log, "Session #{} expired", session_id); - nu_keeper_storage_dispatcher->finishSession(session_id); + keeper_dispatcher->finishSession(session_id); break; } } @@ -389,7 +389,7 @@ void KeeperTCPHandler::runImpl() catch (const Exception & ex) { LOG_INFO(log, "Got exception processing session #{}: {}", session_id, getExceptionMessage(ex, true)); - nu_keeper_storage_dispatcher->finishSession(session_id); + keeper_dispatcher->finishSession(session_id); } } @@ -407,7 +407,7 @@ std::pair KeeperTCPHandler::receiveReque request->xid = xid; request->readImpl(*in); - if (!nu_keeper_storage_dispatcher->putRequest(request, session_id)) + if (!keeper_dispatcher->putRequest(request, session_id)) throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Session {} already disconnected", session_id); return std::make_pair(opnum, xid); }