diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index 9bcd0608bf7..5920e098470 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -609,7 +609,10 @@ void KeeperStorage::UncommittedState::commit(int64_t commit_zxid) uncommitted_auth.pop_front(); if (uncommitted_auth.empty()) session_and_auth.erase(add_auth->session_id); - + } + else if (auto * close_session = std::get_if(&front_delta.operation)) + { + closed_sessions.erase(close_session->session_id); } deltas.pop_front(); @@ -682,6 +685,10 @@ void KeeperStorage::UncommittedState::rollback(int64_t rollback_zxid) session_and_auth.erase(add_auth->session_id); } } + else if (auto * close_session = std::get_if(&delta_it->operation)) + { + closed_sessions.erase(close_session->session_id); + } } if (delta_it == deltas.rend()) @@ -2366,12 +2373,15 @@ void KeeperStorage::preprocessRequest( ephemerals.erase(session_ephemerals); } + new_deltas.emplace_back(transaction.zxid, CloseSessionDelta{session_id}); + uncommitted_state.closed_sessions.insert(session_id); new_digest = calculateNodesDigest(new_digest, new_deltas); return; } - if (check_acl && !request_processor->checkAuth(*this, session_id, false)) + if ((check_acl && !request_processor->checkAuth(*this, session_id, false)) || + uncommitted_state.closed_sessions.contains(session_id)) // Is session closed but not committed yet { uncommitted_state.deltas.emplace_back(new_last_zxid, Coordination::Error::ZNOAUTH); return; diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index d9e67f799f8..d5e9a64e69c 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -314,8 +314,13 @@ public: AuthID auth_id; }; + struct CloseSessionDelta + { + int64_t session_id; + }; + using Operation = std:: - variant; + variant; struct Delta { @@ -351,6 +356,7 @@ public: std::shared_ptr tryGetNodeFromStorage(StringRef path) const; std::unordered_map> session_and_auth; + std::unordered_set closed_sessions; struct UncommittedNode { diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index d314757efc9..30a0eea3040 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -2019,6 +2019,66 @@ TEST_P(CoordinationTest, TestCreateNodeWithAuthSchemeForAclWhenAuthIsPrecommitte EXPECT_EQ(acls[0].permissions, 31); } +TEST_P(CoordinationTest, TestPreprocessWhenCloseSessionIsPrecommitted) +{ + using namespace Coordination; + using namespace DB; + + ChangelogDirTest snapshots("./snapshots"); + setSnapshotDirectory("./snapshots"); + ResponsesQueue queue(std::numeric_limits::max()); + SnapshotsQueue snapshots_queue{1}; + int64_t session_id = 1; + size_t term = 0; + + auto state_machine = std::make_shared(queue, snapshots_queue, keeper_context, nullptr); + state_machine->init(); + + auto & storage = state_machine->getStorageUnsafe(); + const auto & uncommitted_state = storage.uncommitted_state; + + // Create first node for the session + String node_path_1 = "/node_1"; + std::shared_ptr create_req_1 = std::make_shared(); + create_req_1->path = node_path_1; + auto create_entry_1 = getLogEntryFromZKRequest(term, session_id, state_machine->getNextZxid(), create_req_1); + + state_machine->pre_commit(1, create_entry_1->get_buf()); + EXPECT_TRUE(uncommitted_state.nodes.contains(node_path_1)); + state_machine->commit(1, create_entry_1->get_buf()); + EXPECT_TRUE(storage.container.contains(node_path_1)); + + // Close session + std::shared_ptr close_req = std::make_shared(); + auto close_entry = getLogEntryFromZKRequest(term, session_id, state_machine->getNextZxid(), close_req); + // Pre-commit close session + state_machine->pre_commit(2, close_entry->get_buf()); + + // Try to create second node after close session is pre-committed + String node_path_2 = "/node_2"; + std::shared_ptr create_req_2 = std::make_shared(); + create_req_2->path = node_path_2; + auto create_entry_2 = getLogEntryFromZKRequest(term, session_id, state_machine->getNextZxid(), create_req_2); + + // Pre-commit creating second node + state_machine->pre_commit(3, create_entry_2->get_buf()); + // Second node wasn't created + EXPECT_FALSE(uncommitted_state.nodes.contains(node_path_2)); + + // Rollback pre-committed closing session + state_machine->rollback(3, create_entry_2->get_buf()); + state_machine->rollback(2, close_entry->get_buf()); + + // Pre-commit creating second node + state_machine->pre_commit(2, create_entry_2->get_buf()); + // Now second node was created + EXPECT_TRUE(uncommitted_state.nodes.contains(node_path_2)); + + state_machine->commit(2, create_entry_2->get_buf()); + EXPECT_TRUE(storage.container.contains(node_path_1)); + EXPECT_TRUE(storage.container.contains(node_path_2)); +} + TEST_P(CoordinationTest, TestSetACLWithAuthSchemeForAclWhenAuthIsPrecommitted) { using namespace Coordination;