From 817eb100a186e1244f51247d7b83956152c6c8da Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 26 Jan 2021 17:08:31 +0300 Subject: [PATCH] Better shutdown --- src/Coordination/NuKeeperServer.cpp | 12 +++- .../TestKeeperStorageDispatcher.cpp | 65 ++++++++++--------- .../TestKeeperStorageDispatcher.h | 6 +- src/Interpreters/Context.cpp | 2 +- 4 files changed, 48 insertions(+), 37 deletions(-) diff --git a/src/Coordination/NuKeeperServer.cpp b/src/Coordination/NuKeeperServer.cpp index 5b5aeb206c4..6d70eff1121 100644 --- a/src/Coordination/NuKeeperServer.cpp +++ b/src/Coordination/NuKeeperServer.cpp @@ -72,7 +72,17 @@ TestKeeperStorage::ResponsesForSessions NuKeeperServer::shutdown(const TestKeepe { TestKeeperStorage::ResponsesForSessions responses; if (can_become_leader) - responses = putRequests(expired_requests); + { + try + { + responses = putRequests(expired_requests); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } + if (!launcher.shutdown(5)) LOG_WARNING(&Poco::Logger::get("NuKeeperServer"), "Failed to shutdown RAFT server in {} seconds", 5); return responses; diff --git a/src/Coordination/TestKeeperStorageDispatcher.cpp b/src/Coordination/TestKeeperStorageDispatcher.cpp index 3aef5213adc..7ce81df0bfd 100644 --- a/src/Coordination/TestKeeperStorageDispatcher.cpp +++ b/src/Coordination/TestKeeperStorageDispatcher.cpp @@ -14,7 +14,7 @@ namespace ErrorCodes void TestKeeperStorageDispatcher::processingThread() { setThreadName("TestKeeperSProc"); - while (!shutdown) + while (!shutdown_called) { TestKeeperStorage::RequestForSession request; @@ -22,8 +22,9 @@ void TestKeeperStorageDispatcher::processingThread() if (requests_queue.tryPop(request, max_wait)) { - if (shutdown) + if (shutdown_called) break; + try { auto responses = server->putRequests({request}); @@ -51,34 +52,6 @@ void TestKeeperStorageDispatcher::setResponse(int64_t session_id, const Coordina session_to_response_callback.erase(session_writer); } -void TestKeeperStorageDispatcher::finalize() -{ - { - std::lock_guard lock(push_request_mutex); - - if (shutdown) - return; - - shutdown = true; - - if (processing_thread.joinable()) - processing_thread.join(); - } - - if (server) - { - TestKeeperStorage::RequestsForSessions expired_requests; - TestKeeperStorage::RequestForSession request; - while (requests_queue.tryPop(request)) - expired_requests.push_back(TestKeeperStorage::RequestForSession{request}); - - auto expired_responses = server->shutdown(expired_requests); - - for (const auto & response_for_session : expired_responses) - setResponse(response_for_session.session_id, response_for_session.response); - } -} - bool TestKeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id) { @@ -143,11 +116,34 @@ void TestKeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfigura } -TestKeeperStorageDispatcher::~TestKeeperStorageDispatcher() +void TestKeeperStorageDispatcher::shutdown() { try { - finalize(); + { + std::lock_guard lock(push_request_mutex); + + if (shutdown_called) + return; + + shutdown_called = true; + + if (processing_thread.joinable()) + processing_thread.join(); + } + + if (server) + { + TestKeeperStorage::RequestsForSessions expired_requests; + TestKeeperStorage::RequestForSession request; + while (requests_queue.tryPop(request)) + expired_requests.push_back(TestKeeperStorage::RequestForSession{request}); + + auto expired_responses = server->shutdown(expired_requests); + + for (const auto & response_for_session : expired_responses) + setResponse(response_for_session.session_id, response_for_session.response); + } } catch (...) { @@ -155,6 +151,11 @@ TestKeeperStorageDispatcher::~TestKeeperStorageDispatcher() } } +TestKeeperStorageDispatcher::~TestKeeperStorageDispatcher() +{ + shutdown(); +} + void TestKeeperStorageDispatcher::registerSession(int64_t session_id, ZooKeeperResponseCallback callback) { std::lock_guard lock(session_to_response_callback_mutex); diff --git a/src/Coordination/TestKeeperStorageDispatcher.h b/src/Coordination/TestKeeperStorageDispatcher.h index aa220beecf2..5107f2f9cba 100644 --- a/src/Coordination/TestKeeperStorageDispatcher.h +++ b/src/Coordination/TestKeeperStorageDispatcher.h @@ -16,13 +16,12 @@ class TestKeeperStorageDispatcher private: Poco::Timespan operation_timeout{0, Coordination::DEFAULT_OPERATION_TIMEOUT_MS * 1000}; - using clock = std::chrono::steady_clock; std::mutex push_request_mutex; using RequestsQueue = ConcurrentBoundedQueue; RequestsQueue requests_queue{1}; - std::atomic shutdown{false}; + std::atomic shutdown_called{false}; using SessionToResponseCallback = std::unordered_map; std::mutex session_to_response_callback_mutex; @@ -35,7 +34,6 @@ private: private: void processingThread(); - void finalize(); void setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response); public: @@ -43,6 +41,8 @@ public: void initialize(const Poco::Util::AbstractConfiguration & config); + void shutdown(); + ~TestKeeperStorageDispatcher(); bool putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id); diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 0b381cf3fae..033f4b54a64 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -447,7 +447,7 @@ struct ContextShared /// Stop zookeeper connection zookeeper.reset(); /// Stop test_keeper storage - test_keeper_storage_dispatcher.reset(); + test_keeper_storage_dispatcher->shutdown(); } bool hasTraceCollector() const