mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 01:22:04 +00:00
Fix Keeper multi request preprocessing with NOAUTH
This commit is contained in:
parent
733c57dae7
commit
f401eccc64
@ -2545,6 +2545,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
|
||||
response.responses[i]->error = failed_multi->error_codes[i];
|
||||
}
|
||||
|
||||
response.error = failed_multi->global_error;
|
||||
storage.uncommitted_state.commit(zxid);
|
||||
return response_ptr;
|
||||
}
|
||||
@ -2901,7 +2902,19 @@ void KeeperStorage<Container>::preprocessRequest(
|
||||
|
||||
if (check_acl && !request_processor->checkAuth(*this, session_id, false))
|
||||
{
|
||||
uncommitted_state.deltas.emplace_back(new_last_zxid, Coordination::Error::ZNOAUTH);
|
||||
/// Multi requests handle failures using FailedMultiDelta
|
||||
if (zk_request->getOpNum() == Coordination::OpNum::Multi || zk_request->getOpNum() == Coordination::OpNum::MultiRead)
|
||||
{
|
||||
const auto & multi_request = dynamic_cast<const Coordination::ZooKeeperMultiRequest &>(*zk_request);
|
||||
std::vector<Coordination::Error> response_errors;
|
||||
response_errors.resize(multi_request.requests.size(), Coordination::Error::ZOK);
|
||||
uncommitted_state.deltas.emplace_back(
|
||||
new_last_zxid, KeeperStorage<Container>::FailedMultiDelta{std::move(response_errors), Coordination::Error::ZNOAUTH});
|
||||
}
|
||||
else
|
||||
{
|
||||
uncommitted_state.deltas.emplace_back(new_last_zxid, Coordination::Error::ZNOAUTH);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -522,6 +522,7 @@ public:
|
||||
struct FailedMultiDelta
|
||||
{
|
||||
std::vector<Coordination::Error> error_codes;
|
||||
Coordination::Error global_error{Coordination::Error::ZOK};
|
||||
};
|
||||
|
||||
// Denotes end of a subrequest in multi request
|
||||
|
@ -2280,6 +2280,62 @@ TYPED_TEST(CoordinationTest, TestPreprocessWhenCloseSessionIsPrecommitted)
|
||||
}
|
||||
}
|
||||
|
||||
TYPED_TEST(CoordinationTest, TestMultiRequestWithNoAuth)
|
||||
{
|
||||
using namespace Coordination;
|
||||
using namespace DB;
|
||||
|
||||
ChangelogDirTest snapshots("./snapshots");
|
||||
this->setSnapshotDirectory("./snapshots");
|
||||
|
||||
using Storage = typename TestFixture::Storage;
|
||||
|
||||
ChangelogDirTest rocks("./rocksdb");
|
||||
this->setRocksDBDirectory("./rocksdb");
|
||||
ResponsesQueue queue(std::numeric_limits<size_t>::max());
|
||||
SnapshotsQueue snapshots_queue{1};
|
||||
int64_t session_without_auth = 1;
|
||||
int64_t session_with_auth = 2;
|
||||
size_t term = 0;
|
||||
|
||||
auto state_machine = std::make_shared<KeeperStateMachine<Storage>>(queue, snapshots_queue, this->keeper_context, nullptr);
|
||||
state_machine->init();
|
||||
|
||||
auto & storage = state_machine->getStorageUnsafe();
|
||||
|
||||
auto auth_req = std::make_shared<ZooKeeperAuthRequest>();
|
||||
auth_req->scheme = "digest";
|
||||
auth_req->data = "test_user:test_password";
|
||||
|
||||
// Add auth data to the session
|
||||
auto auth_entry = getLogEntryFromZKRequest(term, session_with_auth, state_machine->getNextZxid(), auth_req);
|
||||
state_machine->pre_commit(1, auth_entry->get_buf());
|
||||
state_machine->commit(1, auth_entry->get_buf());
|
||||
|
||||
std::string node_with_acl = "/node_with_acl";
|
||||
{
|
||||
auto create_req = std::make_shared<ZooKeeperCreateRequest>();
|
||||
create_req->path = node_with_acl;
|
||||
create_req->data = "notmodified";
|
||||
create_req->acls = {{.permissions = ACL::Read, .scheme = "auth", .id = ""}};
|
||||
auto create_entry = getLogEntryFromZKRequest(term, session_with_auth, state_machine->getNextZxid(), create_req);
|
||||
state_machine->pre_commit(3, create_entry->get_buf());
|
||||
state_machine->commit(3, create_entry->get_buf());
|
||||
ASSERT_TRUE(storage.container.contains(node_with_acl));
|
||||
}
|
||||
Requests ops;
|
||||
ops.push_back(zkutil::makeSetRequest(node_with_acl, "modified", -1));
|
||||
ops.push_back(zkutil::makeCheckRequest("/nonexistentnode", -1));
|
||||
auto multi_req = std::make_shared<ZooKeeperMultiRequest>(ops, ACLs{});
|
||||
auto multi_entry = getLogEntryFromZKRequest(term, session_without_auth, state_machine->getNextZxid(), multi_req);
|
||||
state_machine->pre_commit(4, multi_entry->get_buf());
|
||||
state_machine->commit(4, multi_entry->get_buf());
|
||||
|
||||
auto node_it = storage.container.find(node_with_acl);
|
||||
ASSERT_FALSE(node_it == storage.container.end());
|
||||
ASSERT_TRUE(node_it->value.getData() == "notmodified");
|
||||
}
|
||||
|
||||
TYPED_TEST(CoordinationTest, TestSetACLWithAuthSchemeForAclWhenAuthIsPrecommitted)
|
||||
{
|
||||
using namespace Coordination;
|
||||
|
Loading…
Reference in New Issue
Block a user