Fix race condition on keeper shutdown

This commit is contained in:
alesapin 2021-05-22 10:46:12 +03:00
parent 3cca5e8391
commit 8c63f0f8e2
2 changed files with 12 additions and 8 deletions

View File

@ -226,6 +226,10 @@ bool KeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr
request_info.session_id = session_id;
std::lock_guard lock(push_request_mutex);
if (shutdown_called)
return false;
/// Put close requests without timeouts
if (request->getOpNum() == Coordination::OpNum::Close)
requests_queue->push(std::move(request_info));

View File

@ -194,7 +194,7 @@ KeeperTCPHandler::KeeperTCPHandler(IServer & server_, const Poco::Net::StreamSoc
, server(server_)
, log(&Poco::Logger::get("NuKeeperTCPHandler"))
, global_context(Context::createCopy(server.context()))
, nu_keeper_storage_dispatcher(global_context->getKeeperStorageDispatcher())
, keeper_dispatcher(global_context->getKeeperStorageDispatcher())
, operation_timeout(0, global_context->getConfigRef().getUInt("test_keeper_server.operation_timeout_ms", Coordination::DEFAULT_OPERATION_TIMEOUT_MS) * 1000)
, session_timeout(0, global_context->getConfigRef().getUInt("test_keeper_server.session_timeout_ms", Coordination::DEFAULT_SESSION_TIMEOUT_MS) * 1000)
, poll_wrapper(std::make_unique<SocketInterruptablePollWrapper>(socket_))
@ -286,12 +286,12 @@ void KeeperTCPHandler::runImpl()
return;
}
if (nu_keeper_storage_dispatcher->hasLeader())
if (keeper_dispatcher->hasLeader())
{
try
{
LOG_INFO(log, "Requesting session ID for the new client");
session_id = nu_keeper_storage_dispatcher->getSessionID(session_timeout.totalMilliseconds());
session_id = keeper_dispatcher->getSessionID(session_timeout.totalMilliseconds());
LOG_INFO(log, "Received session ID {}", session_id);
}
catch (const Exception & e)
@ -318,7 +318,7 @@ void KeeperTCPHandler::runImpl()
UInt8 single_byte = 1;
[[maybe_unused]] int result = write(response_fd, &single_byte, sizeof(single_byte));
};
nu_keeper_storage_dispatcher->registerSession(session_id, response_callback);
keeper_dispatcher->registerSession(session_id, response_callback);
session_stopwatch.start();
bool close_received = false;
@ -368,7 +368,7 @@ void KeeperTCPHandler::runImpl()
if (response->error == Coordination::Error::ZSESSIONEXPIRED)
{
LOG_DEBUG(log, "Session #{} expired because server shutting down or quorum is not alive", session_id);
nu_keeper_storage_dispatcher->finishSession(session_id);
keeper_dispatcher->finishSession(session_id);
return;
}
@ -381,7 +381,7 @@ void KeeperTCPHandler::runImpl()
if (session_stopwatch.elapsedMicroseconds() > static_cast<UInt64>(session_timeout.totalMicroseconds()))
{
LOG_DEBUG(log, "Session #{} expired", session_id);
nu_keeper_storage_dispatcher->finishSession(session_id);
keeper_dispatcher->finishSession(session_id);
break;
}
}
@ -389,7 +389,7 @@ void KeeperTCPHandler::runImpl()
catch (const Exception & ex)
{
LOG_INFO(log, "Got exception processing session #{}: {}", session_id, getExceptionMessage(ex, true));
nu_keeper_storage_dispatcher->finishSession(session_id);
keeper_dispatcher->finishSession(session_id);
}
}
@ -407,7 +407,7 @@ std::pair<Coordination::OpNum, Coordination::XID> KeeperTCPHandler::receiveReque
request->xid = xid;
request->readImpl(*in);
if (!nu_keeper_storage_dispatcher->putRequest(request, session_id))
if (!keeper_dispatcher->putRequest(request, session_id))
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Session {} already disconnected", session_id);
return std::make_pair(opnum, xid);
}