diff --git a/src/Coordination/KeeperServer.cpp b/src/Coordination/KeeperServer.cpp index d311dfbd679..8186ddd0c00 100644 --- a/src/Coordination/KeeperServer.cpp +++ b/src/Coordination/KeeperServer.cpp @@ -442,9 +442,9 @@ void KeeperServer::shutdownRaftServer() void KeeperServer::shutdown() { - state_machine->shutdownStorage(); state_manager->flushAndShutDownLogStore(); shutdownRaftServer(); + state_machine->shutdownStorage(); } namespace @@ -617,7 +617,9 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ auto & entry_buf = entry->get_buf(); auto request_for_session = state_machine->parseRequest(entry_buf); request_for_session.zxid = next_zxid; - state_machine->preprocess(request_for_session); + if (!state_machine->preprocess(request_for_session)) + return nuraft::cb_func::ReturnCode::ReturnNull; + request_for_session.digest = state_machine->getNodesDigest(); entry = nuraft::cs_new(entry->get_term(), getZooKeeperLogEntry(request_for_session), entry->get_val_type()); break; diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index 3d6c80b5e41..c5a66ce29ca 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -191,12 +191,16 @@ KeeperStorage::RequestForSession KeeperStateMachine::parseRequest(nuraft::buffer return request_for_session; } -void KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & request_for_session) +bool KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & request_for_session) { if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID) - return; + return true; std::lock_guard lock(storage_and_responses_lock); + + if (storage->isFinalized()) + return false; + try { storage->preprocessRequest( @@ -215,6 +219,8 @@ void KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & req if (keeper_context->digest_enabled && request_for_session.digest) assertDigest(*request_for_session.digest, storage->getNodesDigest(false), *request_for_session.request, false); + + return true; } nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, nuraft::buffer & data) diff --git a/src/Coordination/KeeperStateMachine.h b/src/Coordination/KeeperStateMachine.h index 9ddc4372d3b..fbd4fdc5ac2 100644 --- a/src/Coordination/KeeperStateMachine.h +++ b/src/Coordination/KeeperStateMachine.h @@ -33,7 +33,7 @@ public: static KeeperStorage::RequestForSession parseRequest(nuraft::buffer & data); - void preprocess(const KeeperStorage::RequestForSession & request_for_session); + bool preprocess(const KeeperStorage::RequestForSession & request_for_session); nuraft::ptr pre_commit(uint64_t log_idx, nuraft::buffer & data) override; diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index cfe614e1287..9b5d7dc5db3 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -516,7 +516,7 @@ void KeeperStorage::UncommittedState::rollback(int64_t rollback_zxid) std::shared_ptr KeeperStorage::UncommittedState::getNode(StringRef path) const { - if (auto node_it = nodes.find(std::string{path}); node_it != nodes.end()) + if (auto node_it = nodes.find(path.toView()); node_it != nodes.end()) return node_it->second.node; return tryGetNodeFromStorage(path); @@ -524,7 +524,7 @@ std::shared_ptr KeeperStorage::UncommittedState::getNode(St Coordination::ACLs KeeperStorage::UncommittedState::getACLs(StringRef path) const { - if (auto node_it = nodes.find(std::string{path}); node_it != nodes.end()) + if (auto node_it = nodes.find(path.toView()); node_it != nodes.end()) return node_it->second.acls; auto node_it = storage.container.find(path); @@ -830,7 +830,9 @@ bool KeeperStorage::checkACL(StringRef path, int32_t permission, int64_t session void KeeperStorage::unregisterEphemeralPath(int64_t session_id, const std::string & path) { auto ephemerals_it = ephemerals.find(session_id); - assert(ephemerals_it != ephemerals.end()); + if (ephemerals_it == ephemerals.end()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Session {} is missing ephemeral path"); + ephemerals_it->second.erase(path); if (ephemerals_it->second.empty()) ephemerals.erase(ephemerals_it); @@ -1825,6 +1827,11 @@ void KeeperStorage::finalize() session_expiry_queue.clear(); } +bool KeeperStorage::isFinalized() const +{ + return finalized; +} + class KeeperStorageRequestProcessorsFactory final : private boost::noncopyable { @@ -1892,7 +1899,7 @@ UInt64 KeeperStorage::calculateNodesDigest(UInt64 current_digest, const std::vec if (!keeper_context->digest_enabled) return current_digest; - std::unordered_map> updated_nodes; + std::unordered_map> updated_nodes; for (const auto & delta : new_deltas) { diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index 73714771bf3..a40cca8e778 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -425,6 +425,8 @@ public: void finalize(); + bool isFinalized() const; + /// Set of methods for creating snapshots /// Turn on snapshot mode, so data inside Container is not deleted, but replaced with new version.