diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index 2eb9ab30efa..63dc39092cf 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -2545,6 +2545,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro response.responses[i]->error = failed_multi->error_codes[i]; } + response.error = failed_multi->global_error; storage.uncommitted_state.commit(zxid); return response_ptr; } @@ -2901,7 +2902,19 @@ void KeeperStorage::preprocessRequest( if (check_acl && !request_processor->checkAuth(*this, session_id, false)) { - uncommitted_state.deltas.emplace_back(new_last_zxid, Coordination::Error::ZNOAUTH); + /// Multi requests handle failures using FailedMultiDelta + if (zk_request->getOpNum() == Coordination::OpNum::Multi || zk_request->getOpNum() == Coordination::OpNum::MultiRead) + { + const auto & multi_request = dynamic_cast(*zk_request); + std::vector response_errors; + response_errors.resize(multi_request.requests.size(), Coordination::Error::ZOK); + uncommitted_state.deltas.emplace_back( + new_last_zxid, KeeperStorage::FailedMultiDelta{std::move(response_errors), Coordination::Error::ZNOAUTH}); + } + else + { + uncommitted_state.deltas.emplace_back(new_last_zxid, Coordination::Error::ZNOAUTH); + } return; } diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index 904af76ef37..6fbc4c2b168 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -522,6 +522,7 @@ public: struct FailedMultiDelta { std::vector error_codes; + Coordination::Error global_error{Coordination::Error::ZOK}; }; // Denotes end of a subrequest in multi request diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index 73402af5ec4..46f36fe0039 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -2280,6 +2280,62 @@ TYPED_TEST(CoordinationTest, TestPreprocessWhenCloseSessionIsPrecommitted) } } +TYPED_TEST(CoordinationTest, TestMultiRequestWithNoAuth) +{ + using namespace Coordination; + using namespace DB; + + ChangelogDirTest snapshots("./snapshots"); + this->setSnapshotDirectory("./snapshots"); + + using Storage = typename TestFixture::Storage; + + ChangelogDirTest rocks("./rocksdb"); + this->setRocksDBDirectory("./rocksdb"); + ResponsesQueue queue(std::numeric_limits::max()); + SnapshotsQueue snapshots_queue{1}; + int64_t session_without_auth = 1; + int64_t session_with_auth = 2; + size_t term = 0; + + auto state_machine = std::make_shared>(queue, snapshots_queue, this->keeper_context, nullptr); + state_machine->init(); + + auto & storage = state_machine->getStorageUnsafe(); + + auto auth_req = std::make_shared(); + auth_req->scheme = "digest"; + auth_req->data = "test_user:test_password"; + + // Add auth data to the session + auto auth_entry = getLogEntryFromZKRequest(term, session_with_auth, state_machine->getNextZxid(), auth_req); + state_machine->pre_commit(1, auth_entry->get_buf()); + state_machine->commit(1, auth_entry->get_buf()); + + std::string node_with_acl = "/node_with_acl"; + { + auto create_req = std::make_shared(); + create_req->path = node_with_acl; + create_req->data = "notmodified"; + create_req->acls = {{.permissions = ACL::Read, .scheme = "auth", .id = ""}}; + auto create_entry = getLogEntryFromZKRequest(term, session_with_auth, state_machine->getNextZxid(), create_req); + state_machine->pre_commit(3, create_entry->get_buf()); + state_machine->commit(3, create_entry->get_buf()); + ASSERT_TRUE(storage.container.contains(node_with_acl)); + } + Requests ops; + ops.push_back(zkutil::makeSetRequest(node_with_acl, "modified", -1)); + ops.push_back(zkutil::makeCheckRequest("/nonexistentnode", -1)); + auto multi_req = std::make_shared(ops, ACLs{}); + auto multi_entry = getLogEntryFromZKRequest(term, session_without_auth, state_machine->getNextZxid(), multi_req); + state_machine->pre_commit(4, multi_entry->get_buf()); + state_machine->commit(4, multi_entry->get_buf()); + + auto node_it = storage.container.find(node_with_acl); + ASSERT_FALSE(node_it == storage.container.end()); + ASSERT_TRUE(node_it->value.getData() == "notmodified"); +} + TYPED_TEST(CoordinationTest, TestSetACLWithAuthSchemeForAclWhenAuthIsPrecommitted) { using namespace Coordination;