mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-19 21:03:51 +00:00
Backport #65735 to 24.6: Fix bug with session closing in Keeper
This commit is contained in:
parent
dcced7c847
commit
9d59f87edb
@ -808,7 +808,11 @@ void LogEntryStorage::startCommitLogsPrefetch(uint64_t last_committed_index) con
|
|||||||
|
|
||||||
for (; current_index <= max_index_for_prefetch; ++current_index)
|
for (; current_index <= max_index_for_prefetch; ++current_index)
|
||||||
{
|
{
|
||||||
const auto & [changelog_description, position, size] = logs_location.at(current_index);
|
auto location_it = logs_location.find(current_index);
|
||||||
|
if (location_it == logs_location.end())
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Location of log entry with index {} is missing", current_index);
|
||||||
|
|
||||||
|
const auto & [changelog_description, position, size] = location_it->second;
|
||||||
if (total_size == 0)
|
if (total_size == 0)
|
||||||
current_file_info = &file_infos.emplace_back(changelog_description, position, /* count */ 1);
|
current_file_info = &file_infos.emplace_back(changelog_description, position, /* count */ 1);
|
||||||
else if (total_size + size > commit_logs_cache.size_threshold)
|
else if (total_size + size > commit_logs_cache.size_threshold)
|
||||||
@ -1416,7 +1420,11 @@ LogEntriesPtr LogEntryStorage::getLogEntriesBetween(uint64_t start, uint64_t end
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
const auto & log_location = logs_location.at(i);
|
auto location_it = logs_location.find(i);
|
||||||
|
if (location_it == logs_location.end())
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Location of log entry with index {} is missing", i);
|
||||||
|
|
||||||
|
const auto & log_location = location_it->second;
|
||||||
|
|
||||||
if (!read_info)
|
if (!read_info)
|
||||||
set_new_file(log_location);
|
set_new_file(log_location);
|
||||||
|
@ -534,6 +534,10 @@ bool KeeperStorage::UncommittedState::hasACL(int64_t session_id, bool is_local,
|
|||||||
if (is_local)
|
if (is_local)
|
||||||
return check_auth(storage.session_and_auth[session_id]);
|
return check_auth(storage.session_and_auth[session_id]);
|
||||||
|
|
||||||
|
/// we want to close the session and with that we will remove all the auth related to the session
|
||||||
|
if (closed_sessions.contains(session_id))
|
||||||
|
return false;
|
||||||
|
|
||||||
if (check_auth(storage.session_and_auth[session_id]))
|
if (check_auth(storage.session_and_auth[session_id]))
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
@ -559,6 +563,10 @@ void KeeperStorage::UncommittedState::addDelta(Delta new_delta)
|
|||||||
auto & uncommitted_auth = session_and_auth[auth_delta->session_id];
|
auto & uncommitted_auth = session_and_auth[auth_delta->session_id];
|
||||||
uncommitted_auth.emplace_back(&auth_delta->auth_id);
|
uncommitted_auth.emplace_back(&auth_delta->auth_id);
|
||||||
}
|
}
|
||||||
|
else if (const auto * close_session_delta = std::get_if<CloseSessionDelta>(&added_delta.operation))
|
||||||
|
{
|
||||||
|
closed_sessions.insert(close_session_delta->session_id);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void KeeperStorage::UncommittedState::addDeltas(std::vector<Delta> new_deltas)
|
void KeeperStorage::UncommittedState::addDeltas(std::vector<Delta> new_deltas)
|
||||||
@ -1013,9 +1021,11 @@ struct KeeperStorageHeartbeatRequestProcessor final : public KeeperStorageReques
|
|||||||
{
|
{
|
||||||
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
|
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
|
||||||
Coordination::ZooKeeperResponsePtr
|
Coordination::ZooKeeperResponsePtr
|
||||||
process(KeeperStorage & /* storage */, int64_t /* zxid */) const override
|
process(KeeperStorage & storage, int64_t zxid) const override
|
||||||
{
|
{
|
||||||
return zk_request->makeResponse();
|
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
|
||||||
|
response_ptr->error = storage.commit(zxid);
|
||||||
|
return response_ptr;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -2377,15 +2387,13 @@ void KeeperStorage::preprocessRequest(
|
|||||||
|
|
||||||
ephemerals.erase(session_ephemerals);
|
ephemerals.erase(session_ephemerals);
|
||||||
}
|
}
|
||||||
new_deltas.emplace_back(transaction.zxid, CloseSessionDelta{session_id});
|
|
||||||
uncommitted_state.closed_sessions.insert(session_id);
|
|
||||||
|
|
||||||
|
new_deltas.emplace_back(transaction.zxid, CloseSessionDelta{session_id});
|
||||||
new_digest = calculateNodesDigest(new_digest, new_deltas);
|
new_digest = calculateNodesDigest(new_digest, new_deltas);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((check_acl && !request_processor->checkAuth(*this, session_id, false)) ||
|
if (check_acl && !request_processor->checkAuth(*this, session_id, false))
|
||||||
uncommitted_state.closed_sessions.contains(session_id)) // Is session closed but not committed yet
|
|
||||||
{
|
{
|
||||||
uncommitted_state.deltas.emplace_back(new_last_zxid, Coordination::Error::ZNOAUTH);
|
uncommitted_state.deltas.emplace_back(new_last_zxid, Coordination::Error::ZNOAUTH);
|
||||||
return;
|
return;
|
||||||
@ -2442,8 +2450,6 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
uncommitted_state.commit(zxid);
|
|
||||||
|
|
||||||
clearDeadWatches(session_id);
|
clearDeadWatches(session_id);
|
||||||
auto auth_it = session_and_auth.find(session_id);
|
auto auth_it = session_and_auth.find(session_id);
|
||||||
if (auth_it != session_and_auth.end())
|
if (auth_it != session_and_auth.end())
|
||||||
@ -2488,7 +2494,6 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
response = request_processor->process(*this, zxid);
|
response = request_processor->process(*this, zxid);
|
||||||
uncommitted_state.commit(zxid);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Watches for this requests are added to the watches lists
|
/// Watches for this requests are added to the watches lists
|
||||||
@ -2528,6 +2533,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
|
|||||||
results.push_back(ResponseForSession{session_id, response});
|
results.push_back(ResponseForSession{session_id, response});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uncommitted_state.commit(zxid);
|
||||||
return results;
|
return results;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2028,56 +2028,175 @@ TEST_P(CoordinationTest, TestPreprocessWhenCloseSessionIsPrecommitted)
|
|||||||
setSnapshotDirectory("./snapshots");
|
setSnapshotDirectory("./snapshots");
|
||||||
ResponsesQueue queue(std::numeric_limits<size_t>::max());
|
ResponsesQueue queue(std::numeric_limits<size_t>::max());
|
||||||
SnapshotsQueue snapshots_queue{1};
|
SnapshotsQueue snapshots_queue{1};
|
||||||
int64_t session_id = 1;
|
int64_t session_without_auth = 1;
|
||||||
|
int64_t session_with_auth = 2;
|
||||||
size_t term = 0;
|
size_t term = 0;
|
||||||
|
|
||||||
auto state_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue, keeper_context, nullptr);
|
auto state_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue, keeper_context, nullptr);
|
||||||
state_machine->init();
|
state_machine->init();
|
||||||
|
|
||||||
auto & storage = state_machine->getStorageUnsafe();
|
auto & storage = state_machine->getStorageUnsafe();
|
||||||
const auto & uncommitted_state = storage.uncommitted_state;
|
const auto & uncommitted_state = storage.uncommitted_state;
|
||||||
|
|
||||||
// Create first node for the session
|
auto auth_req = std::make_shared<ZooKeeperAuthRequest>();
|
||||||
String node_path_1 = "/node_1";
|
auth_req->scheme = "digest";
|
||||||
std::shared_ptr<ZooKeeperCreateRequest> create_req_1 = std::make_shared<ZooKeeperCreateRequest>();
|
auth_req->data = "test_user:test_password";
|
||||||
create_req_1->path = node_path_1;
|
|
||||||
auto create_entry_1 = getLogEntryFromZKRequest(term, session_id, state_machine->getNextZxid(), create_req_1);
|
|
||||||
|
|
||||||
state_machine->pre_commit(1, create_entry_1->get_buf());
|
// Add auth data to the session
|
||||||
EXPECT_TRUE(uncommitted_state.nodes.contains(node_path_1));
|
auto auth_entry = getLogEntryFromZKRequest(term, session_with_auth, state_machine->getNextZxid(), auth_req);
|
||||||
|
state_machine->pre_commit(1, auth_entry->get_buf());
|
||||||
|
state_machine->commit(1, auth_entry->get_buf());
|
||||||
|
|
||||||
state_machine->commit(1, create_entry_1->get_buf());
|
std::string node_without_acl = "/node_without_acl";
|
||||||
EXPECT_TRUE(storage.container.contains(node_path_1));
|
{
|
||||||
|
auto create_req = std::make_shared<ZooKeeperCreateRequest>();
|
||||||
|
create_req->path = node_without_acl;
|
||||||
|
create_req->data = "notmodified";
|
||||||
|
auto create_entry = getLogEntryFromZKRequest(term, session_with_auth, state_machine->getNextZxid(), create_req);
|
||||||
|
state_machine->pre_commit(2, create_entry->get_buf());
|
||||||
|
state_machine->commit(2, create_entry->get_buf());
|
||||||
|
ASSERT_TRUE(storage.container.contains(node_without_acl));
|
||||||
|
}
|
||||||
|
|
||||||
// Close session
|
std::string node_with_acl = "/node_with_acl";
|
||||||
std::shared_ptr<ZooKeeperCloseRequest> close_req = std::make_shared<ZooKeeperCloseRequest>();
|
{
|
||||||
auto close_entry = getLogEntryFromZKRequest(term, session_id, state_machine->getNextZxid(), close_req);
|
auto create_req = std::make_shared<ZooKeeperCreateRequest>();
|
||||||
// Pre-commit close session
|
create_req->path = node_with_acl;
|
||||||
state_machine->pre_commit(2, close_entry->get_buf());
|
create_req->data = "notmodified";
|
||||||
|
create_req->acls = {{.permissions = ACL::All, .scheme = "auth", .id = ""}};
|
||||||
|
auto create_entry = getLogEntryFromZKRequest(term, session_with_auth, state_machine->getNextZxid(), create_req);
|
||||||
|
state_machine->pre_commit(3, create_entry->get_buf());
|
||||||
|
state_machine->commit(3, create_entry->get_buf());
|
||||||
|
ASSERT_TRUE(storage.container.contains(node_with_acl));
|
||||||
|
}
|
||||||
|
|
||||||
// Try to create second node after close session is pre-committed
|
auto set_req_with_acl = std::make_shared<ZooKeeperSetRequest>();
|
||||||
String node_path_2 = "/node_2";
|
set_req_with_acl->path = node_with_acl;
|
||||||
std::shared_ptr<ZooKeeperCreateRequest> create_req_2 = std::make_shared<ZooKeeperCreateRequest>();
|
set_req_with_acl->data = "modified";
|
||||||
create_req_2->path = node_path_2;
|
|
||||||
auto create_entry_2 = getLogEntryFromZKRequest(term, session_id, state_machine->getNextZxid(), create_req_2);
|
|
||||||
|
|
||||||
// Pre-commit creating second node
|
|
||||||
state_machine->pre_commit(3, create_entry_2->get_buf());
|
|
||||||
// Second node wasn't created
|
|
||||||
EXPECT_FALSE(uncommitted_state.nodes.contains(node_path_2));
|
|
||||||
|
|
||||||
// Rollback pre-committed closing session
|
auto set_req_without_acl = std::make_shared<ZooKeeperSetRequest>();
|
||||||
state_machine->rollback(3, create_entry_2->get_buf());
|
set_req_without_acl->path = node_without_acl;
|
||||||
state_machine->rollback(2, close_entry->get_buf());
|
set_req_without_acl->data = "modified";
|
||||||
|
|
||||||
// Pre-commit creating second node
|
const auto reset_node_value
|
||||||
state_machine->pre_commit(2, create_entry_2->get_buf());
|
= [&](const auto & path) { storage.container.updateValue(path, [](auto & node) { node.setData("notmodified"); }); };
|
||||||
// Now second node was created
|
|
||||||
EXPECT_TRUE(uncommitted_state.nodes.contains(node_path_2));
|
|
||||||
|
|
||||||
state_machine->commit(2, create_entry_2->get_buf());
|
auto close_req = std::make_shared<ZooKeeperCloseRequest>();
|
||||||
EXPECT_TRUE(storage.container.contains(node_path_1));
|
|
||||||
EXPECT_TRUE(storage.container.contains(node_path_2));
|
{
|
||||||
|
SCOPED_TRACE("Session with Auth");
|
||||||
|
|
||||||
|
// test we can modify both nodes
|
||||||
|
auto set_entry = getLogEntryFromZKRequest(term, session_with_auth, state_machine->getNextZxid(), set_req_with_acl);
|
||||||
|
state_machine->pre_commit(5, set_entry->get_buf());
|
||||||
|
state_machine->commit(5, set_entry->get_buf());
|
||||||
|
ASSERT_TRUE(storage.container.find(node_with_acl)->value.getData() == "modified");
|
||||||
|
reset_node_value(node_with_acl);
|
||||||
|
|
||||||
|
set_entry = getLogEntryFromZKRequest(term, session_with_auth, state_machine->getNextZxid(), set_req_without_acl);
|
||||||
|
state_machine->pre_commit(6, set_entry->get_buf());
|
||||||
|
state_machine->commit(6, set_entry->get_buf());
|
||||||
|
ASSERT_TRUE(storage.container.find(node_without_acl)->value.getData() == "modified");
|
||||||
|
reset_node_value(node_without_acl);
|
||||||
|
|
||||||
|
auto close_entry = getLogEntryFromZKRequest(term, session_with_auth, state_machine->getNextZxid(), close_req);
|
||||||
|
|
||||||
|
// Pre-commit close session
|
||||||
|
state_machine->pre_commit(7, close_entry->get_buf());
|
||||||
|
|
||||||
|
/// will be rejected because we don't have required auth
|
||||||
|
auto set_entry_with_acl = getLogEntryFromZKRequest(term, session_with_auth, state_machine->getNextZxid(), set_req_with_acl);
|
||||||
|
state_machine->pre_commit(8, set_entry_with_acl->get_buf());
|
||||||
|
|
||||||
|
/// will be accepted because no ACL
|
||||||
|
auto set_entry_without_acl = getLogEntryFromZKRequest(term, session_with_auth, state_machine->getNextZxid(), set_req_without_acl);
|
||||||
|
state_machine->pre_commit(9, set_entry_without_acl->get_buf());
|
||||||
|
|
||||||
|
ASSERT_TRUE(uncommitted_state.getNode(node_with_acl)->getData() == "notmodified");
|
||||||
|
ASSERT_TRUE(uncommitted_state.getNode(node_without_acl)->getData() == "modified");
|
||||||
|
|
||||||
|
state_machine->rollback(9, set_entry_without_acl->get_buf());
|
||||||
|
state_machine->rollback(8, set_entry_with_acl->get_buf());
|
||||||
|
|
||||||
|
// let's commit close and verify we get same outcome
|
||||||
|
state_machine->commit(7, close_entry->get_buf());
|
||||||
|
|
||||||
|
/// will be rejected because we don't have required auth
|
||||||
|
set_entry_with_acl = getLogEntryFromZKRequest(term, session_with_auth, state_machine->getNextZxid(), set_req_with_acl);
|
||||||
|
state_machine->pre_commit(8, set_entry_with_acl->get_buf());
|
||||||
|
|
||||||
|
/// will be accepted because no ACL
|
||||||
|
set_entry_without_acl = getLogEntryFromZKRequest(term, session_with_auth, state_machine->getNextZxid(), set_req_without_acl);
|
||||||
|
state_machine->pre_commit(9, set_entry_without_acl->get_buf());
|
||||||
|
|
||||||
|
ASSERT_TRUE(uncommitted_state.getNode(node_with_acl)->getData() == "notmodified");
|
||||||
|
ASSERT_TRUE(uncommitted_state.getNode(node_without_acl)->getData() == "modified");
|
||||||
|
|
||||||
|
state_machine->commit(8, set_entry_with_acl->get_buf());
|
||||||
|
state_machine->commit(9, set_entry_without_acl->get_buf());
|
||||||
|
|
||||||
|
ASSERT_TRUE(storage.container.find(node_with_acl)->value.getData() == "notmodified");
|
||||||
|
ASSERT_TRUE(storage.container.find(node_without_acl)->value.getData() == "modified");
|
||||||
|
|
||||||
|
reset_node_value(node_without_acl);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
SCOPED_TRACE("Session without Auth");
|
||||||
|
|
||||||
|
// test we can modify only node without acl
|
||||||
|
auto set_entry = getLogEntryFromZKRequest(term, session_without_auth, state_machine->getNextZxid(), set_req_with_acl);
|
||||||
|
state_machine->pre_commit(10, set_entry->get_buf());
|
||||||
|
state_machine->commit(10, set_entry->get_buf());
|
||||||
|
ASSERT_TRUE(storage.container.find(node_with_acl)->value.getData() == "notmodified");
|
||||||
|
|
||||||
|
set_entry = getLogEntryFromZKRequest(term, session_without_auth, state_machine->getNextZxid(), set_req_without_acl);
|
||||||
|
state_machine->pre_commit(11, set_entry->get_buf());
|
||||||
|
state_machine->commit(11, set_entry->get_buf());
|
||||||
|
ASSERT_TRUE(storage.container.find(node_without_acl)->value.getData() == "modified");
|
||||||
|
reset_node_value(node_without_acl);
|
||||||
|
|
||||||
|
auto close_entry = getLogEntryFromZKRequest(term, session_without_auth, state_machine->getNextZxid(), close_req);
|
||||||
|
|
||||||
|
// Pre-commit close session
|
||||||
|
state_machine->pre_commit(12, close_entry->get_buf());
|
||||||
|
|
||||||
|
/// will be rejected because we don't have required auth
|
||||||
|
auto set_entry_with_acl = getLogEntryFromZKRequest(term, session_without_auth, state_machine->getNextZxid(), set_req_with_acl);
|
||||||
|
state_machine->pre_commit(13, set_entry_with_acl->get_buf());
|
||||||
|
|
||||||
|
/// will be accepted because no ACL
|
||||||
|
auto set_entry_without_acl = getLogEntryFromZKRequest(term, session_without_auth, state_machine->getNextZxid(), set_req_without_acl);
|
||||||
|
state_machine->pre_commit(14, set_entry_without_acl->get_buf());
|
||||||
|
|
||||||
|
ASSERT_TRUE(uncommitted_state.getNode(node_with_acl)->getData() == "notmodified");
|
||||||
|
ASSERT_TRUE(uncommitted_state.getNode(node_without_acl)->getData() == "modified");
|
||||||
|
|
||||||
|
state_machine->rollback(14, set_entry_without_acl->get_buf());
|
||||||
|
state_machine->rollback(13, set_entry_with_acl->get_buf());
|
||||||
|
|
||||||
|
// let's commit close and verify we get same outcome
|
||||||
|
state_machine->commit(12, close_entry->get_buf());
|
||||||
|
|
||||||
|
/// will be rejected because we don't have required auth
|
||||||
|
set_entry_with_acl = getLogEntryFromZKRequest(term, session_without_auth, state_machine->getNextZxid(), set_req_with_acl);
|
||||||
|
state_machine->pre_commit(13, set_entry_with_acl->get_buf());
|
||||||
|
|
||||||
|
/// will be accepted because no ACL
|
||||||
|
set_entry_without_acl = getLogEntryFromZKRequest(term, session_without_auth, state_machine->getNextZxid(), set_req_without_acl);
|
||||||
|
state_machine->pre_commit(14, set_entry_without_acl->get_buf());
|
||||||
|
|
||||||
|
ASSERT_TRUE(uncommitted_state.getNode(node_with_acl)->getData() == "notmodified");
|
||||||
|
ASSERT_TRUE(uncommitted_state.getNode(node_without_acl)->getData() == "modified");
|
||||||
|
|
||||||
|
state_machine->commit(13, set_entry_with_acl->get_buf());
|
||||||
|
state_machine->commit(14, set_entry_without_acl->get_buf());
|
||||||
|
|
||||||
|
ASSERT_TRUE(storage.container.find(node_with_acl)->value.getData() == "notmodified");
|
||||||
|
ASSERT_TRUE(storage.container.find(node_without_acl)->value.getData() == "modified");
|
||||||
|
|
||||||
|
reset_node_value(node_without_acl);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_P(CoordinationTest, TestSetACLWithAuthSchemeForAclWhenAuthIsPrecommitted)
|
TEST_P(CoordinationTest, TestSetACLWithAuthSchemeForAclWhenAuthIsPrecommitted)
|
||||||
|
Loading…
Reference in New Issue
Block a user