Better shutdown

This commit is contained in:
alesapin 2021-01-26 17:08:31 +03:00
parent 6441d3200b
commit 817eb100a1
4 changed files with 48 additions and 37 deletions

View File

@ -72,7 +72,17 @@ TestKeeperStorage::ResponsesForSessions NuKeeperServer::shutdown(const TestKeepe
{
TestKeeperStorage::ResponsesForSessions responses;
if (can_become_leader)
responses = putRequests(expired_requests);
{
try
{
responses = putRequests(expired_requests);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
if (!launcher.shutdown(5))
LOG_WARNING(&Poco::Logger::get("NuKeeperServer"), "Failed to shutdown RAFT server in {} seconds", 5);
return responses;

View File

@ -14,7 +14,7 @@ namespace ErrorCodes
void TestKeeperStorageDispatcher::processingThread()
{
setThreadName("TestKeeperSProc");
while (!shutdown)
while (!shutdown_called)
{
TestKeeperStorage::RequestForSession request;
@ -22,8 +22,9 @@ void TestKeeperStorageDispatcher::processingThread()
if (requests_queue.tryPop(request, max_wait))
{
if (shutdown)
if (shutdown_called)
break;
try
{
auto responses = server->putRequests({request});
@ -51,34 +52,6 @@ void TestKeeperStorageDispatcher::setResponse(int64_t session_id, const Coordina
session_to_response_callback.erase(session_writer);
}
void TestKeeperStorageDispatcher::finalize()
{
{
std::lock_guard lock(push_request_mutex);
if (shutdown)
return;
shutdown = true;
if (processing_thread.joinable())
processing_thread.join();
}
if (server)
{
TestKeeperStorage::RequestsForSessions expired_requests;
TestKeeperStorage::RequestForSession request;
while (requests_queue.tryPop(request))
expired_requests.push_back(TestKeeperStorage::RequestForSession{request});
auto expired_responses = server->shutdown(expired_requests);
for (const auto & response_for_session : expired_responses)
setResponse(response_for_session.session_id, response_for_session.response);
}
}
bool TestKeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id)
{
@ -143,11 +116,34 @@ void TestKeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfigura
}
TestKeeperStorageDispatcher::~TestKeeperStorageDispatcher()
void TestKeeperStorageDispatcher::shutdown()
{
try
{
finalize();
{
std::lock_guard lock(push_request_mutex);
if (shutdown_called)
return;
shutdown_called = true;
if (processing_thread.joinable())
processing_thread.join();
}
if (server)
{
TestKeeperStorage::RequestsForSessions expired_requests;
TestKeeperStorage::RequestForSession request;
while (requests_queue.tryPop(request))
expired_requests.push_back(TestKeeperStorage::RequestForSession{request});
auto expired_responses = server->shutdown(expired_requests);
for (const auto & response_for_session : expired_responses)
setResponse(response_for_session.session_id, response_for_session.response);
}
}
catch (...)
{
@ -155,6 +151,11 @@ TestKeeperStorageDispatcher::~TestKeeperStorageDispatcher()
}
}
TestKeeperStorageDispatcher::~TestKeeperStorageDispatcher()
{
shutdown();
}
void TestKeeperStorageDispatcher::registerSession(int64_t session_id, ZooKeeperResponseCallback callback)
{
std::lock_guard lock(session_to_response_callback_mutex);

View File

@ -16,13 +16,12 @@ class TestKeeperStorageDispatcher
private:
Poco::Timespan operation_timeout{0, Coordination::DEFAULT_OPERATION_TIMEOUT_MS * 1000};
using clock = std::chrono::steady_clock;
std::mutex push_request_mutex;
using RequestsQueue = ConcurrentBoundedQueue<TestKeeperStorage::RequestForSession>;
RequestsQueue requests_queue{1};
std::atomic<bool> shutdown{false};
std::atomic<bool> shutdown_called{false};
using SessionToResponseCallback = std::unordered_map<int64_t, ZooKeeperResponseCallback>;
std::mutex session_to_response_callback_mutex;
@ -35,7 +34,6 @@ private:
private:
void processingThread();
void finalize();
void setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response);
public:
@ -43,6 +41,8 @@ public:
void initialize(const Poco::Util::AbstractConfiguration & config);
void shutdown();
~TestKeeperStorageDispatcher();
bool putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id);

View File

@ -447,7 +447,7 @@ struct ContextShared
/// Stop zookeeper connection
zookeeper.reset();
/// Stop test_keeper storage
test_keeper_storage_dispatcher.reset();
test_keeper_storage_dispatcher->shutdown();
}
bool hasTraceCollector() const