diff --git a/src/Common/ZooKeeper/TestKeeper.cpp b/src/Common/ZooKeeper/TestKeeper.cpp index 1ac085d6050..ae8d95b806a 100644 --- a/src/Common/ZooKeeper/TestKeeper.cpp +++ b/src/Common/ZooKeeper/TestKeeper.cpp @@ -90,6 +90,36 @@ struct TestKeeperRemoveRequest final : RemoveRequest, TestKeeperRequest } }; +struct TestKeeperRemoveRecursiveRequest final : RemoveRecursiveRequest, TestKeeperRequest +{ + TestKeeperRemoveRecursiveRequest() = default; + explicit TestKeeperRemoveRecursiveRequest(const RemoveRecursiveRequest & base) : RemoveRecursiveRequest(base) {} + ResponsePtr createResponse() const override; + std::pair process(TestKeeper::Container & container, int64_t zxid) const override; + + void processWatches(TestKeeper::Watches & node_watches, TestKeeper::Watches & list_watches) const override + { + std::vector> deleted; + + auto add_deleted_watches = [&](TestKeeper::Watches & w) + { + for (const auto & [watch_path, _] : w) + if (watch_path.starts_with(path)) + deleted.emplace_back(watch_path, std::count(watch_path.begin(), watch_path.end(), '/')); + }; + + add_deleted_watches(node_watches); + add_deleted_watches(list_watches); + std::sort(deleted.begin(), deleted.end(), [](const auto & lhs, const auto & rhs) + { + return lhs.second < rhs.second; + }); + + for (const auto & [watch_path, _] : deleted) + processWatchesImpl(watch_path, node_watches, list_watches); + } +}; + struct TestKeeperExistsRequest final : ExistsRequest, TestKeeperRequest { ResponsePtr createResponse() const override; @@ -175,6 +205,10 @@ struct TestKeeperMultiRequest final : MultiRequest, TestKeeperRequest { requests.push_back(std::make_shared(*concrete_request_remove)); } + else if (const auto * concrete_request_remove_recursive = dynamic_cast(generic_request.get())) + { + requests.push_back(std::make_shared(*concrete_request_remove_recursive)); + } else if (const auto * concrete_request_set = dynamic_cast(generic_request.get())) { requests.push_back(std::make_shared(*concrete_request_set)); @@ -313,6 +347,56 @@ std::pair TestKeeperRemoveRequest::process(TestKeeper::Contai return { std::make_shared(response), undo }; } +std::pair TestKeeperRemoveRecursiveRequest::process(TestKeeper::Container & container, int64_t zxid) const +{ + RemoveRecursiveResponse response; + response.zxid = zxid; + Undo undo; + + auto root_it = container.find(path); + if (root_it == container.end()) + { + response.error = Error::ZNONODE; + return { std::make_shared(response), undo }; + } + + std::vector> children; + + for (const auto & [child_path, child_node] : container) + if (child_path.starts_with(path)) + children.emplace_back(child_path, child_node); + + if (children.size() > remove_nodes_limit) + { + response.error = Error::ZNOTEMPTY; + return { std::make_shared(response), undo }; + } + + auto & parent = container.at(parentPath(path)); + --parent.stat.numChildren; + ++parent.stat.cversion; + + for (const auto & [child_path, child_node] : children) + { + auto child_it = container.find(child_path); + chassert(child_it != container.end()); + container.erase(child_it); + } + + response.error = Error::ZOK; + undo = [&container, dead = std::move(children), root_path = path]() + { + for (auto && [child_path, child_node] : dead) + container.emplace(child_path, child_node); + + auto & undo_parent = container.at(parentPath(root_path)); + ++undo_parent.stat.numChildren; + --undo_parent.stat.cversion; + }; + + return { std::make_shared(response), undo }; +} + std::pair TestKeeperExistsRequest::process(TestKeeper::Container & container, int64_t zxid) const { ExistsResponse response; @@ -530,6 +614,7 @@ std::pair TestKeeperMultiRequest::process(TestKeeper::Contain ResponsePtr TestKeeperCreateRequest::createResponse() const { return std::make_shared(); } ResponsePtr TestKeeperRemoveRequest::createResponse() const { return std::make_shared(); } +ResponsePtr TestKeeperRemoveRecursiveRequest::createResponse() const { return std::make_shared(); } ResponsePtr TestKeeperExistsRequest::createResponse() const { return std::make_shared(); } ResponsePtr TestKeeperGetRequest::createResponse() const { return std::make_shared(); } ResponsePtr TestKeeperSetRequest::createResponse() const { return std::make_shared(); } @@ -772,11 +857,18 @@ void TestKeeper::remove( } void TestKeeper::removeRecursive( - [[maybe_unused]] const String & path, - [[maybe_unused]] uint32_t remove_nodes_limit, - [[maybe_unused]] RemoveRecursiveCallback callback) + const String & path, + uint32_t remove_nodes_limit, + RemoveRecursiveCallback callback) { - /// TODO(michicosun) implement + TestKeeperRemoveRecursiveRequest request; + request.path = path; + request.remove_nodes_limit = remove_nodes_limit; + + RequestInfo request_info; + request_info.request = std::make_shared(std::move(request)); + request_info.callback = [callback](const Response & response) { callback(dynamic_cast(response)); }; + pushRequest(std::move(request_info)); } void TestKeeper::exists(