Merge pull request #69627 from ClickHouse/keeper-fix-multi-no-auth

Fix Keeper multi request preprocessing with NOAUTH
This commit is contained in:
Antonio Andelic 2024-09-17 06:48:07 +00:00 committed by GitHub
commit 52dc9a54a7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 71 additions and 1 deletions

View File

@ -2545,6 +2545,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
response.responses[i]->error = failed_multi->error_codes[i];
}
response.error = failed_multi->global_error;
storage.uncommitted_state.commit(zxid);
return response_ptr;
}
@ -2901,7 +2902,19 @@ void KeeperStorage<Container>::preprocessRequest(
if (check_acl && !request_processor->checkAuth(*this, session_id, false))
{
uncommitted_state.deltas.emplace_back(new_last_zxid, Coordination::Error::ZNOAUTH);
/// Multi requests handle failures using FailedMultiDelta
if (zk_request->getOpNum() == Coordination::OpNum::Multi || zk_request->getOpNum() == Coordination::OpNum::MultiRead)
{
const auto & multi_request = dynamic_cast<const Coordination::ZooKeeperMultiRequest &>(*zk_request);
std::vector<Coordination::Error> response_errors;
response_errors.resize(multi_request.requests.size(), Coordination::Error::ZOK);
uncommitted_state.deltas.emplace_back(
new_last_zxid, KeeperStorage<Container>::FailedMultiDelta{std::move(response_errors), Coordination::Error::ZNOAUTH});
}
else
{
uncommitted_state.deltas.emplace_back(new_last_zxid, Coordination::Error::ZNOAUTH);
}
return;
}

View File

@ -522,6 +522,7 @@ public:
struct FailedMultiDelta
{
std::vector<Coordination::Error> error_codes;
Coordination::Error global_error{Coordination::Error::ZOK};
};
// Denotes end of a subrequest in multi request

View File

@ -2280,6 +2280,62 @@ TYPED_TEST(CoordinationTest, TestPreprocessWhenCloseSessionIsPrecommitted)
}
}
TYPED_TEST(CoordinationTest, TestMultiRequestWithNoAuth)
{
using namespace Coordination;
using namespace DB;
ChangelogDirTest snapshots("./snapshots");
this->setSnapshotDirectory("./snapshots");
using Storage = typename TestFixture::Storage;
ChangelogDirTest rocks("./rocksdb");
this->setRocksDBDirectory("./rocksdb");
ResponsesQueue queue(std::numeric_limits<size_t>::max());
SnapshotsQueue snapshots_queue{1};
int64_t session_without_auth = 1;
int64_t session_with_auth = 2;
size_t term = 0;
auto state_machine = std::make_shared<KeeperStateMachine<Storage>>(queue, snapshots_queue, this->keeper_context, nullptr);
state_machine->init();
auto & storage = state_machine->getStorageUnsafe();
auto auth_req = std::make_shared<ZooKeeperAuthRequest>();
auth_req->scheme = "digest";
auth_req->data = "test_user:test_password";
// Add auth data to the session
auto auth_entry = getLogEntryFromZKRequest(term, session_with_auth, state_machine->getNextZxid(), auth_req);
state_machine->pre_commit(1, auth_entry->get_buf());
state_machine->commit(1, auth_entry->get_buf());
std::string node_with_acl = "/node_with_acl";
{
auto create_req = std::make_shared<ZooKeeperCreateRequest>();
create_req->path = node_with_acl;
create_req->data = "notmodified";
create_req->acls = {{.permissions = ACL::Read, .scheme = "auth", .id = ""}};
auto create_entry = getLogEntryFromZKRequest(term, session_with_auth, state_machine->getNextZxid(), create_req);
state_machine->pre_commit(3, create_entry->get_buf());
state_machine->commit(3, create_entry->get_buf());
ASSERT_TRUE(storage.container.contains(node_with_acl));
}
Requests ops;
ops.push_back(zkutil::makeSetRequest(node_with_acl, "modified", -1));
ops.push_back(zkutil::makeCheckRequest("/nonexistentnode", -1));
auto multi_req = std::make_shared<ZooKeeperMultiRequest>(ops, ACLs{});
auto multi_entry = getLogEntryFromZKRequest(term, session_without_auth, state_machine->getNextZxid(), multi_req);
state_machine->pre_commit(4, multi_entry->get_buf());
state_machine->commit(4, multi_entry->get_buf());
auto node_it = storage.container.find(node_with_acl);
ASSERT_FALSE(node_it == storage.container.end());
ASSERT_TRUE(node_it->value.getData() == "notmodified");
}
TYPED_TEST(CoordinationTest, TestSetACLWithAuthSchemeForAclWhenAuthIsPrecommitted)
{
using namespace Coordination;