diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index 54cdc8c11db..9d63a9e8691 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -1513,29 +1513,37 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr { if (request.restored_from_zookeeper_log) update_parent_pzxid(); + return {typename Storage::Delta{zxid, Coordination::Error::ZNONODE}}; } - else if (request.version != -1 && request.version != node->version) + + if (request.version != -1 && request.version != node->version) return {typename Storage::Delta{zxid, Coordination::Error::ZBADVERSION}}; - else if (node->numChildren() != 0) + + ToDeleteTreeCollector collector(storage, zxid, request.remove_nodes_limit); + bool limit_exceeded = collector.collect(request.path, *node); + + if (limit_exceeded) return {typename Storage::Delta{zxid, Coordination::Error::ZNOTEMPTY}}; if (request.restored_from_zookeeper_log) update_parent_pzxid(); - new_deltas.emplace_back( - std::string{parentNodePath(request.path)}, - zxid, - typename Storage::UpdateNodeDelta{[](typename Storage::Node & parent) - { - ++parent.cversion; - parent.decreaseNumChildren(); - }}); + auto delete_deltas = collector.extractDeltas(); - new_deltas.emplace_back(request.path, zxid, typename Storage::RemoveNodeDelta{request.version, node->ephemeralOwner()}); + for (const auto & delta : delete_deltas) + std::visit( + Overloaded{ + [&](const typename Storage::RemoveNodeDelta & remove_delta) + { + if (remove_delta.ephemeral_owner) + storage.unregisterEphemeralPath(remove_delta.ephemeral_owner, delta.path); + }, + [](auto && /* delta */) {}, + }, + delta.operation); - if (node->isEphemeral()) - storage.unregisterEphemeralPath(node->ephemeralOwner(), request.path); + new_deltas.insert(new_deltas.end(), std::make_move_iterator(delete_deltas.begin()), std::make_move_iterator(delete_deltas.end())); digest = storage.calculateNodesDigest(digest, new_deltas); @@ -1556,6 +1564,134 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr { return processWatchesImpl(this->zk_request->getPath(), watches, list_watches, Coordination::Event::DELETED); } + +private: + class ToDeleteTreeCollector + { + Storage & storage; + int64_t zxid; + uint32_t limit; + + uint32_t nodes_observed = 0; + std::vector deltas; + + public: + ToDeleteTreeCollector(Storage & storage_, int64_t zxid_, uint32_t limit_) + : storage(storage_) + , zxid(zxid_) + , limit(limit_) + { + } + + bool collect(StringRef root_path, const typename Storage::Node & root_node) + { + if (updateStats(root_node)) + return true; + + if (visitRocksDBNode(root_path) || visitMemNode(root_path)) + return true; + + /// After checking committed nodes there might be some in uncommitted state, + /// for example made on the previouse step of multi transaction. + return visitRootAndUncommitted(root_path, root_node); + } + + uint32_t getNumObserved() const + { + return nodes_observed; + } + + std::vector extractDeltas() + { + return std::move(deltas); + } + + private: + bool visitRocksDBNode(StringRef root_path) + { + if constexpr (Storage::use_rocksdb) + { + auto children = storage.container.getChildren(root_path.toString()); + + for (auto && [child_path, node] : children) + if (collect(child_path, node)) + return true; + } + + return false; + } + + bool visitMemNode(StringRef root_path) + { + if constexpr (!Storage::use_rocksdb) + { + std::filesystem::path root_fs_path(root_path.toString()); + + auto node_it = storage.container.find(root_path); + if (node_it != storage.container.end()) + { + auto children = node_it->value.getChildren(); + for (auto && child_name : children) + { + auto child_path = (root_fs_path / child_name.toView()).generic_string(); + + auto child_it = storage.container.find(child_path); + chassert(child_it != storage.container.end()); + + if (collect(child_path, child_it->value)) + return true; + } + } + } + + return false; + } + + bool visitRootAndUncommitted(StringRef root_path, const typename Storage::Node & root_node) + { + const auto & nodes = storage.uncommitted_state.nodes; + + /// nodes are sorted by paths with level locality + auto it = nodes.upper_bound(root_path.toView()); + + for (; it != nodes.end() && parentNodePath(it->first) == root_path; ++it) + { + chassert(it->second.node); + const typename Storage::Node & node = *it->second.node; + + if (updateStats(node)) + return true; + + if (visitRootAndUncommitted(it->first, node)) /// if child is uncommitted then all subtree is also uncommitted + return true; + } + + deltas.emplace_back( + parentNodePath(root_path).toString(), + zxid, + typename Storage::UpdateNodeDelta{ + [](typename Storage::Node & parent) + { + ++parent.cversion; + parent.decreaseNumChildren(); + } + }); + + deltas.emplace_back(root_path.toString(), zxid, typename Storage::RemoveNodeDelta{root_node.version, root_node.ephemeralOwner()}); + + return false; + } + + bool updateStats(const typename Storage::Node & root_node) + { + nodes_observed += 1 + root_node.numChildren(); /// root + all known children + + if (nodes_observed > limit) + return true; + + return false; + } + }; }; template diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index 4a9286d4835..c2f6e4c5a74 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -609,7 +609,18 @@ public: using is_transparent = void; // required to make find() work with different type than key_type }; - mutable std::unordered_map nodes; + struct PathCmp + { + using is_transparent = std::true_type; + + auto operator()(const std::string_view a, + const std::string_view b) const + { + return a.size() < b.size() || (a.size() == b.size() && a < b); + } + }; + + mutable std::map nodes; std::unordered_map, Hash, Equal> deltas_for_path; std::list deltas;