diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index 2e5e7214e3e..ffbac0656b9 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -106,31 +106,26 @@ nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, n { const Coordination::ZooKeeperSessionIDRequest & session_id_request = dynamic_cast(*request_for_session.request); int64_t session_id; - { - std::lock_guard lock(storage_lock); - session_id = storage->getSessionID(session_id_request.session_timeout_ms); - } - LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_id_request.session_timeout_ms); - std::shared_ptr response = std::make_shared(); response->internal_id = session_id_request.internal_id; - response->session_id = session_id; response->server_id = session_id_request.server_id; - KeeperStorage::ResponseForSession response_for_session; response_for_session.session_id = -1; response_for_session.response = response; - responses_queue.push(response_for_session); + { + std::lock_guard lock(storage_and_responses_lock); + session_id = storage->getSessionID(session_id_request.session_timeout_ms); + LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_id_request.session_timeout_ms); + response->session_id = session_id; + responses_queue.push(response_for_session); + } } else { - KeeperStorage::ResponsesForSessions responses_for_sessions; - { - std::lock_guard lock(storage_lock); - responses_for_sessions = storage->processRequest(request_for_session.request, request_for_session.session_id, log_idx); - for (auto & response_for_session : responses_for_sessions) - responses_queue.push(response_for_session); - } + std::lock_guard lock(storage_and_responses_lock); + KeeperStorage::ResponsesForSessions responses_for_sessions = storage->processRequest(request_for_session.request, request_for_session.session_id, log_idx); + for (auto & response_for_session : responses_for_sessions) + responses_queue.push(response_for_session); } last_committed_idx = log_idx; @@ -150,7 +145,7 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s) } { /// deserialize and apply snapshot to storage - std::lock_guard lock(storage_lock); + std::lock_guard lock(storage_and_responses_lock); std::tie(latest_snapshot_meta, storage) = snapshot_manager.deserializeSnapshotFromBuffer(latest_snapshot_ptr); } @@ -175,7 +170,7 @@ void KeeperStateMachine::create_snapshot( auto snapshot_meta_copy = nuraft::snapshot::deserialize(*snp_buf); CreateSnapshotTask snapshot_task; { /// lock storage for a short period time to turn on "snapshot mode". After that we can read consistent storage state without locking. - std::lock_guard lock(storage_lock); + std::lock_guard lock(storage_and_responses_lock); snapshot_task.snapshot = std::make_shared(storage.get(), snapshot_meta_copy); } @@ -198,7 +193,7 @@ void KeeperStateMachine::create_snapshot( { /// Must do it with lock (clearing elements from list) - std::lock_guard lock(storage_lock); + std::lock_guard lock(storage_and_responses_lock); /// Turn off "snapshot mode" and clear outdate part of storage state storage->clearGarbageAfterSnapshot(); /// Destroy snapshot with lock @@ -236,7 +231,7 @@ void KeeperStateMachine::save_logical_snp_obj( nuraft::ptr cloned_meta; if (obj_id == 0) /// Fake snapshot required by NuRaft at startup { - std::lock_guard lock(storage_lock); + std::lock_guard lock(storage_and_responses_lock); KeeperStorageSnapshot snapshot(storage.get(), s.get_last_log_idx()); cloned_buffer = snapshot_manager.serializeSnapshotToBuffer(snapshot); } @@ -303,24 +298,21 @@ int KeeperStateMachine::read_logical_snp_obj( void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSession & request_for_session) { /// Pure local request, just process it with storage - KeeperStorage::ResponsesForSessions responses; - { - std::lock_guard lock(storage_lock); - responses = storage->processRequest(request_for_session.request, request_for_session.session_id, std::nullopt); - } + std::lock_guard lock(storage_and_responses_lock); + auto responses = storage->processRequest(request_for_session.request, request_for_session.session_id, std::nullopt); for (const auto & response : responses) responses_queue.push(response); } std::unordered_set KeeperStateMachine::getDeadSessions() { - std::lock_guard lock(storage_lock); + std::lock_guard lock(storage_and_responses_lock); return storage->getDeadSessions(); } void KeeperStateMachine::shutdownStorage() { - std::lock_guard lock(storage_lock); + std::lock_guard lock(storage_and_responses_lock); storage->finalize(); } diff --git a/src/Coordination/KeeperStateMachine.h b/src/Coordination/KeeperStateMachine.h index 06be270b66e..32beaaf69e6 100644 --- a/src/Coordination/KeeperStateMachine.h +++ b/src/Coordination/KeeperStateMachine.h @@ -99,8 +99,12 @@ private: /// Mutex for snapshots std::mutex snapshots_lock; - /// Lock for storage - std::mutex storage_lock; + /// Lock for storage and responses_queue. It's important to process requests + /// and push them to the responses queue while holding this lock. Otherwise + /// we can get strange cases when, for example client send read request with + /// watch and after that receive watch response and only receive response + /// for request. + std::mutex storage_and_responses_lock; /// Last committed Raft log number. std::atomic last_committed_idx;