From 7f32926a218282891cbc6c5082267b7b5db3b867 Mon Sep 17 00:00:00 2001 From: alesapin Date: Thu, 21 Jan 2021 14:37:20 +0300 Subject: [PATCH] Fix race condition in TestKeeperHandler on session finish --- src/Common/ZooKeeper/TestKeeperStorageDispatcher.cpp | 10 +++++++++- src/Common/ZooKeeper/TestKeeperStorageDispatcher.h | 2 ++ src/Server/TestKeeperTCPHandler.cpp | 4 ++++ 3 files changed, 15 insertions(+), 1 deletion(-) diff --git a/src/Common/ZooKeeper/TestKeeperStorageDispatcher.cpp b/src/Common/ZooKeeper/TestKeeperStorageDispatcher.cpp index b1233fc47e3..35378e4ff09 100644 --- a/src/Common/ZooKeeper/TestKeeperStorageDispatcher.cpp +++ b/src/Common/ZooKeeper/TestKeeperStorageDispatcher.cpp @@ -49,7 +49,7 @@ void TestKeeperStorageDispatcher::setResponse(int64_t session_id, const Coordina std::lock_guard lock(session_to_response_callback_mutex); auto session_writer = session_to_response_callback.find(session_id); if (session_writer == session_to_response_callback.end()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown session id {}", session_id); + return; session_writer->second(response); /// Session closed, no more writes @@ -128,4 +128,12 @@ void TestKeeperStorageDispatcher::registerSession(int64_t session_id, ZooKeeperR throw Exception(DB::ErrorCodes::LOGICAL_ERROR, "Session with id {} already registered in dispatcher", session_id); } +void TestKeeperStorageDispatcher::finishSession(int64_t session_id) +{ + std::lock_guard lock(session_to_response_callback_mutex); + auto session_it = session_to_response_callback.find(session_id); + if (session_it != session_to_response_callback.end()) + session_to_response_callback.erase(session_it); +} + } diff --git a/src/Common/ZooKeeper/TestKeeperStorageDispatcher.h b/src/Common/ZooKeeper/TestKeeperStorageDispatcher.h index 27abf17ac73..a86895b5be1 100644 --- a/src/Common/ZooKeeper/TestKeeperStorageDispatcher.h +++ b/src/Common/ZooKeeper/TestKeeperStorageDispatcher.h @@ -53,6 +53,8 @@ public: return storage.getSessionID(); } void registerSession(int64_t session_id, ZooKeeperResponseCallback callback); + /// Call if we don't need any responses for this session no more (session was expired) + void finishSession(int64_t session_id); }; } diff --git a/src/Server/TestKeeperTCPHandler.cpp b/src/Server/TestKeeperTCPHandler.cpp index 7b02996019e..bf407ba96b7 100644 --- a/src/Server/TestKeeperTCPHandler.cpp +++ b/src/Server/TestKeeperTCPHandler.cpp @@ -390,7 +390,11 @@ void TestKeeperTCPHandler::finish() { Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close); request->xid = close_xid; + /// Put close request (so storage will remove all info about session) test_keeper_storage_dispatcher->putRequest(request, session_id); + /// We don't need any callbacks because session can be already dead and + /// nobody wait for response + test_keeper_storage_dispatcher->finishSession(session_id); } std::pair TestKeeperTCPHandler::receiveRequest()