Initial commit

This commit is contained in:
Aleksei Filatov 2024-06-13 12:43:25 +03:00
parent ee59036a83
commit 2d37f7b530
3 changed files with 79 additions and 3 deletions

View File

@ -609,7 +609,10 @@ void KeeperStorage::UncommittedState::commit(int64_t commit_zxid)
uncommitted_auth.pop_front();
if (uncommitted_auth.empty())
session_and_auth.erase(add_auth->session_id);
}
else if (auto * close_session = std::get_if<CloseSessionDelta>(&front_delta.operation))
{
closed_sessions.erase(close_session->session_id);
}
deltas.pop_front();
@ -682,6 +685,10 @@ void KeeperStorage::UncommittedState::rollback(int64_t rollback_zxid)
session_and_auth.erase(add_auth->session_id);
}
}
else if (auto * close_session = std::get_if<CloseSessionDelta>(&delta_it->operation))
{
closed_sessions.erase(close_session->session_id);
}
}
if (delta_it == deltas.rend())
@ -2366,12 +2373,15 @@ void KeeperStorage::preprocessRequest(
ephemerals.erase(session_ephemerals);
}
new_deltas.emplace_back(transaction.zxid, CloseSessionDelta{session_id});
uncommitted_state.closed_sessions.insert(session_id);
new_digest = calculateNodesDigest(new_digest, new_deltas);
return;
}
if (check_acl && !request_processor->checkAuth(*this, session_id, false))
if ((check_acl && !request_processor->checkAuth(*this, session_id, false)) ||
uncommitted_state.closed_sessions.contains(session_id)) // Is session closed but not committed yet
{
uncommitted_state.deltas.emplace_back(new_last_zxid, Coordination::Error::ZNOAUTH);
return;

View File

@ -314,8 +314,13 @@ public:
AuthID auth_id;
};
struct CloseSessionDelta
{
int64_t session_id;
};
using Operation = std::
variant<CreateNodeDelta, RemoveNodeDelta, UpdateNodeDelta, SetACLDelta, AddAuthDelta, ErrorDelta, SubDeltaEnd, FailedMultiDelta>;
variant<CreateNodeDelta, RemoveNodeDelta, UpdateNodeDelta, SetACLDelta, AddAuthDelta, ErrorDelta, SubDeltaEnd, FailedMultiDelta, CloseSessionDelta>;
struct Delta
{
@ -351,6 +356,7 @@ public:
std::shared_ptr<Node> tryGetNodeFromStorage(StringRef path) const;
std::unordered_map<int64_t, std::list<const AuthID *>> session_and_auth;
std::unordered_set<int64_t> closed_sessions;
struct UncommittedNode
{

View File

@ -2019,6 +2019,66 @@ TEST_P(CoordinationTest, TestCreateNodeWithAuthSchemeForAclWhenAuthIsPrecommitte
EXPECT_EQ(acls[0].permissions, 31);
}
TEST_P(CoordinationTest, TestPreprocessWhenCloseSessionIsPrecommitted)
{
using namespace Coordination;
using namespace DB;
ChangelogDirTest snapshots("./snapshots");
setSnapshotDirectory("./snapshots");
ResponsesQueue queue(std::numeric_limits<size_t>::max());
SnapshotsQueue snapshots_queue{1};
int64_t session_id = 1;
size_t term = 0;
auto state_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue, keeper_context, nullptr);
state_machine->init();
auto & storage = state_machine->getStorageUnsafe();
const auto & uncommitted_state = storage.uncommitted_state;
// Create first node for the session
String node_path_1 = "/node_1";
std::shared_ptr<ZooKeeperCreateRequest> create_req_1 = std::make_shared<ZooKeeperCreateRequest>();
create_req_1->path = node_path_1;
auto create_entry_1 = getLogEntryFromZKRequest(term, session_id, state_machine->getNextZxid(), create_req_1);
state_machine->pre_commit(1, create_entry_1->get_buf());
EXPECT_TRUE(uncommitted_state.nodes.contains(node_path_1));
state_machine->commit(1, create_entry_1->get_buf());
EXPECT_TRUE(storage.container.contains(node_path_1));
// Close session
std::shared_ptr<ZooKeeperCloseRequest> close_req = std::make_shared<ZooKeeperCloseRequest>();
auto close_entry = getLogEntryFromZKRequest(term, session_id, state_machine->getNextZxid(), close_req);
// Pre-commit close session
state_machine->pre_commit(2, close_entry->get_buf());
// Try to create second node after close session is pre-committed
String node_path_2 = "/node_2";
std::shared_ptr<ZooKeeperCreateRequest> create_req_2 = std::make_shared<ZooKeeperCreateRequest>();
create_req_2->path = node_path_2;
auto create_entry_2 = getLogEntryFromZKRequest(term, session_id, state_machine->getNextZxid(), create_req_2);
// Pre-commit creating second node
state_machine->pre_commit(3, create_entry_2->get_buf());
// Second node wasn't created
EXPECT_FALSE(uncommitted_state.nodes.contains(node_path_2));
// Rollback pre-committed closing session
state_machine->rollback(3, create_entry_2->get_buf());
state_machine->rollback(2, close_entry->get_buf());
// Pre-commit creating second node
state_machine->pre_commit(2, create_entry_2->get_buf());
// Now second node was created
EXPECT_TRUE(uncommitted_state.nodes.contains(node_path_2));
state_machine->commit(2, create_entry_2->get_buf());
EXPECT_TRUE(storage.container.contains(node_path_1));
EXPECT_TRUE(storage.container.contains(node_path_2));
}
TEST_P(CoordinationTest, TestSetACLWithAuthSchemeForAclWhenAuthIsPrecommitted)
{
using namespace Coordination;