diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index 613714263ef..9378b30ea52 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -1642,14 +1642,17 @@ struct KeeperStorageRemoveRecursiveRequestProcessor final : public KeeperStorage } private: + using SNode = typename Storage::Node; + class ToDeleteTreeCollector { Storage & storage; int64_t zxid; uint32_t limit; - uint32_t nodes_observed = 0; - std::vector deltas; + uint32_t max_level = 0; + uint32_t nodes_observed = 1; /// root node + std::unordered_map> by_level_deltas; public: ToDeleteTreeCollector(Storage & storage_, int64_t zxid_, uint32_t limit_) @@ -1659,31 +1662,60 @@ private: { } - bool collect(StringRef root_path, const typename Storage::Node & root_node) + bool collect(StringRef root_path, const SNode & root_node) { - if (updateStats(root_node)) + std::deque steps; + steps.push_back(Step{root_path.toString(), &root_node, 0}); + + if (observeNode(root_node)) return true; - if (visitRocksDBNode(root_path) || visitMemNode(root_path)) - return true; + while (!steps.empty()) + { + Step step = std::move(steps.front()); + steps.pop_front(); - /// After checking committed nodes there might be some in uncommitted state, - /// for example made on the previouse step of multi transaction. - return visitRootAndUncommitted(root_path, root_node); - } + StringRef path = step.path; + uint32_t level = step.level; + const SNode * node = nullptr; - uint32_t getNumObserved() const - { - return nodes_observed; + if (auto * rdb = std::get_if(&step.node)) + node = rdb; + else + node = std::get(step.node); + + chassert(!path.empty()); + chassert(node != nullptr); + + if (visitRocksDBNode(steps, path, level) || visitMemNode(steps, path, level) || visitRootAndUncommitted(steps, path, *node, level)) + return true; + } + + return false; } std::vector extractDeltas() { + std::vector deltas; + + for (ssize_t level = max_level; level >= 0; --level) + { + auto & level_deltas = by_level_deltas[static_cast(level)]; + deltas.insert(deltas.end(), std::make_move_iterator(level_deltas.begin()), std::make_move_iterator(level_deltas.end())); + } + return std::move(deltas); } private: - bool visitRocksDBNode(StringRef root_path) + struct Step + { + String path; + std::variant node; + uint32_t level; + }; + + bool visitRocksDBNode(std::deque & steps, StringRef root_path, uint32_t level) { if constexpr (Storage::use_rocksdb) { @@ -1692,86 +1724,92 @@ private: for (auto && [child_name, node] : children) { - auto child_path = (root_fs_path / child_name).generic_string(); - - if (collect(child_path, node)) + if (observeNode(node)) return true; + + auto child_path = (root_fs_path / child_name).generic_string(); + steps.push_back(Step{std::move(child_path), std::move(node), level + 1}); } } return false; } - bool visitMemNode(StringRef root_path) + bool visitMemNode(std::deque & steps, StringRef root_path, uint32_t level) { if constexpr (!Storage::use_rocksdb) { std::filesystem::path root_fs_path(root_path.toString()); auto node_it = storage.container.find(root_path); - if (node_it != storage.container.end()) + if (node_it == storage.container.end()) + return false; + + + auto children = node_it->value.getChildren(); + for (auto && child_name : children) { - auto children = node_it->value.getChildren(); - for (auto && child_name : children) - { - auto child_path = (root_fs_path / child_name.toView()).generic_string(); + auto child_path = (root_fs_path / child_name.toView()).generic_string(); - auto child_it = storage.container.find(child_path); - chassert(child_it != storage.container.end()); + auto child_it = storage.container.find(child_path); + chassert(child_it != storage.container.end()); - if (collect(child_path, child_it->value)) - return true; - } + if (observeNode(child_it->value)) + return true; + + steps.push_back(Step{std::move(child_path), child_it->value, level + 1}); } } return false; } - bool visitRootAndUncommitted(StringRef root_path, const typename Storage::Node & root_node) + bool visitRootAndUncommitted(std::deque & steps, StringRef root_path, const SNode & root_node, uint32_t level) { const auto & nodes = storage.uncommitted_state.nodes; /// nodes are sorted by paths with level locality - auto it = nodes.upper_bound(root_path.toView()); + auto it = nodes.upper_bound(root_path.toString() + "/"); for (; it != nodes.end() && parentNodePath(it->first) == root_path; ++it) { chassert(it->second.node); const String & path = it->first; - const typename Storage::Node & node = *it->second.node; + const SNode & node = *it->second.node; - if (updateStats(node)) + if (observeNode(node)) return true; - if (visitRootAndUncommitted(path, node)) /// if child is uncommitted then all subtree is also uncommitted - return true; + steps.push_back(Step{path, &node, level + 1}); } - deltas.emplace_back( + addDelta(root_path, root_node, level); + + return false; + } + + void addDelta(StringRef root_path, const SNode & root_node, uint32_t level) + { + max_level = std::max(max_level, level); + + by_level_deltas[level].emplace_back( parentNodePath(root_path).toString(), zxid, typename Storage::UpdateNodeDelta{ - [](typename Storage::Node & parent) + [](SNode & parent) { ++parent.cversion; parent.decreaseNumChildren(); } }); - deltas.emplace_back(root_path.toString(), zxid, typename Storage::RemoveNodeDelta{root_node.version, root_node.ephemeralOwner()}); - - return false; + by_level_deltas[level].emplace_back(root_path.toString(), zxid, typename Storage::RemoveNodeDelta{root_node.version, root_node.ephemeralOwner()}); } - bool updateStats(const typename Storage::Node & root_node) + bool observeNode(const SNode & node) { - nodes_observed += 1; - - if (nodes_observed + root_node.numChildren() > limit) - return true; - - return false; + nodes_observed += node.numChildren(); + return nodes_observed > limit; } }; };