add full tree acl check

This commit is contained in:
Mikhail Artemenko 2024-09-10 17:43:35 +00:00
parent 8b7a5616a2
commit 3730582d06

View File

@ -1584,7 +1584,7 @@ struct KeeperStorageRemoveRecursiveRequestProcessor final : public KeeperStorage
}
std::vector<typename Storage::Delta>
preprocess(Storage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/, uint64_t & digest, const KeeperContext & keeper_context) const override
preprocess(Storage & storage, int64_t zxid, int64_t session_id, int64_t /*time*/, uint64_t & digest, const KeeperContext & keeper_context) const override
{
ProfileEvents::increment(ProfileEvents::KeeperRemoveRequest);
Coordination::ZooKeeperRemoveRecursiveRequest & request = dynamic_cast<Coordination::ZooKeeperRemoveRecursiveRequest &>(*this->zk_request);
@ -1609,10 +1609,13 @@ struct KeeperStorageRemoveRecursiveRequestProcessor final : public KeeperStorage
return {typename Storage::Delta{zxid, Coordination::Error::ZNONODE}};
}
ToDeleteTreeCollector collector(storage, zxid, request.remove_nodes_limit);
bool limit_exceeded = collector.collect(request.path, *node);
ToDeleteTreeCollector collector(storage, zxid, session_id, request.remove_nodes_limit);
auto collect_status = collector.collect(request.path, *node);
if (limit_exceeded)
if (collect_status == ToDeleteTreeCollector::CollectStatus::NoAuth)
return {typename Storage::Delta{zxid, Coordination::Error::ZNOAUTH}};
if (collect_status == ToDeleteTreeCollector::CollectStatus::LimitExceeded)
return {typename Storage::Delta{zxid, Coordination::Error::ZNOTEMPTY}};
if (request.restored_from_zookeeper_log)
@ -1670,6 +1673,7 @@ private:
{
Storage & storage;
int64_t zxid;
int64_t session_id;
uint32_t limit;
uint32_t max_level = 0;
@ -1683,20 +1687,30 @@ private:
uint32_t level;
};
enum class CollectStatus
{
Ok,
NoAuth,
LimitExceeded,
};
friend struct KeeperStorageRemoveRecursiveRequestProcessor;
public:
ToDeleteTreeCollector(Storage & storage_, int64_t zxid_, uint32_t limit_)
ToDeleteTreeCollector(Storage & storage_, int64_t zxid_, int64_t session_id_, uint32_t limit_)
: storage(storage_)
, zxid(zxid_)
, session_id(session_id_)
, limit(limit_)
{
}
bool collect(StringRef root_path, const SNode & root_node)
CollectStatus collect(StringRef root_path, const SNode & root_node)
{
std::deque<Step> steps;
if (checkLimits(&root_node))
return true;
return CollectStatus::LimitExceeded;
steps.push_back(Step{root_path.toString(), &root_node, 0});
@ -1718,13 +1732,23 @@ private:
chassert(node_ptr != nullptr);
const auto & node = *node_ptr;
chassert(storage.uncommitted_state.getActualNodeView(path, node) != nullptr); /// explicitly check that node is not deleted
auto actual_node_ptr = storage.uncommitted_state.getActualNodeView(path, node);
chassert(actual_node_ptr != nullptr); /// explicitly check that node is not deleted
if (visitRocksDBNode(steps, path, level) || visitMemNode(steps, path, level) || visitRootAndUncommitted(steps, path, node, level))
return true;
if (actual_node_ptr->numChildren() > 0 && !storage.checkACL(path, Coordination::ACL::Delete, session_id, /*is_local=*/false))
return CollectStatus::NoAuth;
if (auto status = visitRocksDBNode(steps, path, level); status != CollectStatus::Ok)
return status;
if (auto status = visitMemNode(steps, path, level); status != CollectStatus::Ok)
return status;
if (auto status = visitRootAndUncommitted(steps, path, node, level); status != CollectStatus::Ok)
return status;
}
return false;
return CollectStatus::Ok;
}
std::vector<typename Storage::Delta> extractDeltas()
@ -1741,7 +1765,7 @@ private:
}
private:
bool visitRocksDBNode(std::deque<Step> & steps, StringRef root_path, uint32_t level)
CollectStatus visitRocksDBNode(std::deque<Step> & steps, StringRef root_path, uint32_t level)
{
if constexpr (Storage::use_rocksdb)
{
@ -1757,22 +1781,22 @@ private:
continue;
if (checkLimits(actual_child_node_ptr))
return true;
return CollectStatus::LimitExceeded;
steps.push_back(Step{std::move(child_path), std::move(child_node), level + 1});
}
}
return false;
return CollectStatus::Ok;
}
bool visitMemNode(std::deque<Step> & steps, StringRef root_path, uint32_t level)
CollectStatus visitMemNode(std::deque<Step> & steps, StringRef root_path, uint32_t level)
{
if constexpr (!Storage::use_rocksdb)
{
auto node_it = storage.container.find(root_path);
if (node_it == storage.container.end())
return false;
return CollectStatus::Ok;
std::filesystem::path root_fs_path(root_path.toString());
const auto & children = node_it->value.getChildren();
@ -1791,16 +1815,16 @@ private:
continue;
if (checkLimits(actual_child_node_ptr))
return true;
return CollectStatus::LimitExceeded;
steps.push_back(Step{std::move(child_path), &child_node, level + 1});
}
}
return false;
return CollectStatus::Ok;
}
bool visitRootAndUncommitted(std::deque<Step> & steps, StringRef root_path, const SNode & root_node, uint32_t level)
CollectStatus visitRootAndUncommitted(std::deque<Step> & steps, StringRef root_path, const SNode & root_node, uint32_t level)
{
const auto & nodes = storage.uncommitted_state.nodes;
@ -1815,7 +1839,7 @@ private:
continue;
if (checkLimits(actual_child_node_ptr))
return true;
return CollectStatus::LimitExceeded;
const String & child_path = it->first;
const SNode & child_node = *it->second.node;
@ -1825,7 +1849,7 @@ private:
addDelta(root_path, root_node, level);
return false;
return CollectStatus::Ok;
}
void addDelta(StringRef root_path, const SNode & root_node, uint32_t level)