diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index 87d29e0ff65..2eb9ab30efa 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -1584,7 +1584,7 @@ struct KeeperStorageRemoveRecursiveRequestProcessor final : public KeeperStorage } std::vector - preprocess(Storage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/, uint64_t & digest, const KeeperContext & keeper_context) const override + preprocess(Storage & storage, int64_t zxid, int64_t session_id, int64_t /*time*/, uint64_t & digest, const KeeperContext & keeper_context) const override { ProfileEvents::increment(ProfileEvents::KeeperRemoveRequest); Coordination::ZooKeeperRemoveRecursiveRequest & request = dynamic_cast(*this->zk_request); @@ -1609,10 +1609,13 @@ struct KeeperStorageRemoveRecursiveRequestProcessor final : public KeeperStorage return {typename Storage::Delta{zxid, Coordination::Error::ZNONODE}}; } - ToDeleteTreeCollector collector(storage, zxid, request.remove_nodes_limit); - bool limit_exceeded = collector.collect(request.path, *node); + ToDeleteTreeCollector collector(storage, zxid, session_id, request.remove_nodes_limit); + auto collect_status = collector.collect(request.path, *node); - if (limit_exceeded) + if (collect_status == ToDeleteTreeCollector::CollectStatus::NoAuth) + return {typename Storage::Delta{zxid, Coordination::Error::ZNOAUTH}}; + + if (collect_status == ToDeleteTreeCollector::CollectStatus::LimitExceeded) return {typename Storage::Delta{zxid, Coordination::Error::ZNOTEMPTY}}; if (request.restored_from_zookeeper_log) @@ -1670,6 +1673,7 @@ private: { Storage & storage; int64_t zxid; + int64_t session_id; uint32_t limit; uint32_t max_level = 0; @@ -1683,20 +1687,30 @@ private: uint32_t level; }; + enum class CollectStatus + { + Ok, + NoAuth, + LimitExceeded, + }; + + friend struct KeeperStorageRemoveRecursiveRequestProcessor; + public: - ToDeleteTreeCollector(Storage & storage_, int64_t zxid_, uint32_t limit_) + ToDeleteTreeCollector(Storage & storage_, int64_t zxid_, int64_t session_id_, uint32_t limit_) : storage(storage_) , zxid(zxid_) + , session_id(session_id_) , limit(limit_) { } - bool collect(StringRef root_path, const SNode & root_node) + CollectStatus collect(StringRef root_path, const SNode & root_node) { std::deque steps; if (checkLimits(&root_node)) - return true; + return CollectStatus::LimitExceeded; steps.push_back(Step{root_path.toString(), &root_node, 0}); @@ -1718,13 +1732,23 @@ private: chassert(node_ptr != nullptr); const auto & node = *node_ptr; - chassert(storage.uncommitted_state.getActualNodeView(path, node) != nullptr); /// explicitly check that node is not deleted + auto actual_node_ptr = storage.uncommitted_state.getActualNodeView(path, node); + chassert(actual_node_ptr != nullptr); /// explicitly check that node is not deleted - if (visitRocksDBNode(steps, path, level) || visitMemNode(steps, path, level) || visitRootAndUncommitted(steps, path, node, level)) - return true; + if (actual_node_ptr->numChildren() > 0 && !storage.checkACL(path, Coordination::ACL::Delete, session_id, /*is_local=*/false)) + return CollectStatus::NoAuth; + + if (auto status = visitRocksDBNode(steps, path, level); status != CollectStatus::Ok) + return status; + + if (auto status = visitMemNode(steps, path, level); status != CollectStatus::Ok) + return status; + + if (auto status = visitRootAndUncommitted(steps, path, node, level); status != CollectStatus::Ok) + return status; } - return false; + return CollectStatus::Ok; } std::vector extractDeltas() @@ -1741,7 +1765,7 @@ private: } private: - bool visitRocksDBNode(std::deque & steps, StringRef root_path, uint32_t level) + CollectStatus visitRocksDBNode(std::deque & steps, StringRef root_path, uint32_t level) { if constexpr (Storage::use_rocksdb) { @@ -1757,22 +1781,22 @@ private: continue; if (checkLimits(actual_child_node_ptr)) - return true; + return CollectStatus::LimitExceeded; steps.push_back(Step{std::move(child_path), std::move(child_node), level + 1}); } } - return false; + return CollectStatus::Ok; } - bool visitMemNode(std::deque & steps, StringRef root_path, uint32_t level) + CollectStatus visitMemNode(std::deque & steps, StringRef root_path, uint32_t level) { if constexpr (!Storage::use_rocksdb) { auto node_it = storage.container.find(root_path); if (node_it == storage.container.end()) - return false; + return CollectStatus::Ok; std::filesystem::path root_fs_path(root_path.toString()); const auto & children = node_it->value.getChildren(); @@ -1791,16 +1815,16 @@ private: continue; if (checkLimits(actual_child_node_ptr)) - return true; + return CollectStatus::LimitExceeded; steps.push_back(Step{std::move(child_path), &child_node, level + 1}); } } - return false; + return CollectStatus::Ok; } - bool visitRootAndUncommitted(std::deque & steps, StringRef root_path, const SNode & root_node, uint32_t level) + CollectStatus visitRootAndUncommitted(std::deque & steps, StringRef root_path, const SNode & root_node, uint32_t level) { const auto & nodes = storage.uncommitted_state.nodes; @@ -1815,7 +1839,7 @@ private: continue; if (checkLimits(actual_child_node_ptr)) - return true; + return CollectStatus::LimitExceeded; const String & child_path = it->first; const SNode & child_node = *it->second.node; @@ -1825,7 +1849,7 @@ private: addDelta(root_path, root_node, level); - return false; + return CollectStatus::Ok; } void addDelta(StringRef root_path, const SNode & root_node, uint32_t level)