diff --git a/docs/en/operations/utilities/clickhouse-keeper-client.md b/docs/en/operations/utilities/clickhouse-keeper-client.md index fbfdd66d1a3..6f026766750 100644 --- a/docs/en/operations/utilities/clickhouse-keeper-client.md +++ b/docs/en/operations/utilities/clickhouse-keeper-client.md @@ -55,7 +55,7 @@ keeper foo bar - `touch ''` -- Creates new node with an empty string as value. Doesn't throw an exception if the node already exists - `get ''` -- Returns the node's value - `rm '' [version]` -- Removes the node only if version matches (default: -1) -- `rmr ''` -- Recursively deletes path. Confirmation required +- `rmr '' [limit]` -- Recursively deletes path if the subtree size is smaller than the limit. Confirmation required (default limit = 100) - `flwc ` -- Executes four-letter-word command - `help` -- Prints this message - `get_direct_children_number '[path]'` -- Get numbers of direct children nodes under a specific path diff --git a/programs/keeper-client/Commands.cpp b/programs/keeper-client/Commands.cpp index 4ad2eb31e6d..b4a5329b01f 100644 --- a/programs/keeper-client/Commands.cpp +++ b/programs/keeper-client/Commands.cpp @@ -506,14 +506,23 @@ bool RMRCommand::parse(IParser::Pos & pos, std::shared_ptr & nod return false; node->args.push_back(std::move(path)); + ASTPtr remove_nodes_limit; + if (ParserUnsignedInteger{}.parse(pos, remove_nodes_limit, expected)) + node->args.push_back(remove_nodes_limit->as().value); + else + node->args.push_back(UInt64(100)); + return true; } void RMRCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) const { String path = client->getAbsolutePath(query->args[0].safeGet()); + UInt64 remove_nodes_limit = query->args[1].safeGet(); + client->askConfirmation( - "You are going to recursively delete path " + path, [client, path] { client->zookeeper->removeRecursive(path); }); + "You are going to recursively delete path " + path, + [client, path, remove_nodes_limit] { client->zookeeper->removeRecursive(path, static_cast(remove_nodes_limit)); }); } bool ReconfigCommand::parse(IParser::Pos & pos, std::shared_ptr & node, DB::Expected & expected) const diff --git a/programs/keeper-client/Commands.h b/programs/keeper-client/Commands.h index 686a752b6b6..da577ce1e65 100644 --- a/programs/keeper-client/Commands.h +++ b/programs/keeper-client/Commands.h @@ -184,7 +184,7 @@ class RMRCommand : public IKeeperClientCommand void execute(const ASTKeeperQuery * query, KeeperClient * client) const override; - String getHelpMessage() const override { return "{} -- Recursively deletes path. Confirmation required"; } + String getHelpMessage() const override { return "{} [limit] -- Recursively deletes path if the subtree size is smaller than the limit. Confirmation required (default limit = 100)"; } }; class ReconfigCommand : public IKeeperClientCommand diff --git a/src/Common/ZooKeeper/IKeeper.cpp b/src/Common/ZooKeeper/IKeeper.cpp index 7cca262baca..34c9d94fca5 100644 --- a/src/Common/ZooKeeper/IKeeper.cpp +++ b/src/Common/ZooKeeper/IKeeper.cpp @@ -171,6 +171,7 @@ bool isUserError(Error zk_return_code) void CreateRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); } void RemoveRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); } +void RemoveRecursiveRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); } void ExistsRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); } void GetRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); } void SetRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); } diff --git a/src/Common/ZooKeeper/IKeeper.h b/src/Common/ZooKeeper/IKeeper.h index ce7489a33e5..a0d6ae54f56 100644 --- a/src/Common/ZooKeeper/IKeeper.h +++ b/src/Common/ZooKeeper/IKeeper.h @@ -248,6 +248,23 @@ struct RemoveResponse : virtual Response { }; +struct RemoveRecursiveRequest : virtual Request +{ + String path; + + /// strict limit for number of deleted nodes + uint32_t remove_nodes_limit = 1; + + void addRootPath(const String & root_path) override; + String getPath() const override { return path; } + + size_t bytesSize() const override { return path.size() + sizeof(remove_nodes_limit); } +}; + +struct RemoveRecursiveResponse : virtual Response +{ +}; + struct ExistsRequest : virtual Request { String path; @@ -430,6 +447,7 @@ struct ErrorResponse : virtual Response using CreateCallback = std::function; using RemoveCallback = std::function; +using RemoveRecursiveCallback = std::function; using ExistsCallback = std::function; using GetCallback = std::function; using SetCallback = std::function; @@ -587,6 +605,11 @@ public: int32_t version, RemoveCallback callback) = 0; + virtual void removeRecursive( + const String & path, + uint32_t remove_nodes_limit, + RemoveRecursiveCallback callback) = 0; + virtual void exists( const String & path, ExistsCallback callback, diff --git a/src/Common/ZooKeeper/TestKeeper.cpp b/src/Common/ZooKeeper/TestKeeper.cpp index 16ea412eb77..2fbe9110b6b 100644 --- a/src/Common/ZooKeeper/TestKeeper.cpp +++ b/src/Common/ZooKeeper/TestKeeper.cpp @@ -90,6 +90,36 @@ struct TestKeeperRemoveRequest final : RemoveRequest, TestKeeperRequest } }; +struct TestKeeperRemoveRecursiveRequest final : RemoveRecursiveRequest, TestKeeperRequest +{ + TestKeeperRemoveRecursiveRequest() = default; + explicit TestKeeperRemoveRecursiveRequest(const RemoveRecursiveRequest & base) : RemoveRecursiveRequest(base) {} + ResponsePtr createResponse() const override; + std::pair process(TestKeeper::Container & container, int64_t zxid) const override; + + void processWatches(TestKeeper::Watches & node_watches, TestKeeper::Watches & list_watches) const override + { + std::vector> deleted; + + auto add_deleted_watches = [&](TestKeeper::Watches & w) + { + for (const auto & [watch_path, _] : w) + if (watch_path.starts_with(path)) + deleted.emplace_back(watch_path, std::count(watch_path.begin(), watch_path.end(), '/')); + }; + + add_deleted_watches(node_watches); + add_deleted_watches(list_watches); + std::sort(deleted.begin(), deleted.end(), [](const auto & lhs, const auto & rhs) + { + return lhs.second < rhs.second; + }); + + for (const auto & [watch_path, _] : deleted) + processWatchesImpl(watch_path, node_watches, list_watches); + } +}; + struct TestKeeperExistsRequest final : ExistsRequest, TestKeeperRequest { ResponsePtr createResponse() const override; @@ -175,6 +205,10 @@ struct TestKeeperMultiRequest final : MultiRequest, TestKeeperRequest { requests.push_back(std::make_shared(*concrete_request_remove)); } + else if (const auto * concrete_request_remove_recursive = dynamic_cast(generic_request.get())) + { + requests.push_back(std::make_shared(*concrete_request_remove_recursive)); + } else if (const auto * concrete_request_set = dynamic_cast(generic_request.get())) { requests.push_back(std::make_shared(*concrete_request_set)); @@ -313,6 +347,62 @@ std::pair TestKeeperRemoveRequest::process(TestKeeper::Contai return { std::make_shared(response), undo }; } +std::pair TestKeeperRemoveRecursiveRequest::process(TestKeeper::Container & container, int64_t zxid) const +{ + RemoveRecursiveResponse response; + response.zxid = zxid; + Undo undo; + + auto root_it = container.find(path); + if (root_it == container.end()) + { + response.error = Error::ZNONODE; + return { std::make_shared(response), undo }; + } + + std::vector> children; + + for (auto it = std::next(root_it); it != container.end(); ++it) + { + const auto & [child_path, child_node] = *it; + + if (child_path.starts_with(path)) + children.emplace_back(child_path, child_node); + else + break; + } + + if (children.size() > remove_nodes_limit) + { + response.error = Error::ZNOTEMPTY; + return { std::make_shared(response), undo }; + } + + auto & parent = container.at(parentPath(path)); + --parent.stat.numChildren; + ++parent.stat.cversion; + + for (const auto & [child_path, child_node] : children) + { + auto child_it = container.find(child_path); + chassert(child_it != container.end()); + container.erase(child_it); + } + + response.error = Error::ZOK; + undo = [&container, dead = std::move(children), root_path = path]() + { + for (auto && [child_path, child_node] : dead) + container.emplace(child_path, child_node); + + auto & undo_parent = container.at(parentPath(root_path)); + ++undo_parent.stat.numChildren; + --undo_parent.stat.cversion; + }; + + return { std::make_shared(response), undo }; +} + std::pair TestKeeperExistsRequest::process(TestKeeper::Container & container, int64_t zxid) const { ExistsResponse response; @@ -530,6 +620,7 @@ std::pair TestKeeperMultiRequest::process(TestKeeper::Contain ResponsePtr TestKeeperCreateRequest::createResponse() const { return std::make_shared(); } ResponsePtr TestKeeperRemoveRequest::createResponse() const { return std::make_shared(); } +ResponsePtr TestKeeperRemoveRecursiveRequest::createResponse() const { return std::make_shared(); } ResponsePtr TestKeeperExistsRequest::createResponse() const { return std::make_shared(); } ResponsePtr TestKeeperGetRequest::createResponse() const { return std::make_shared(); } ResponsePtr TestKeeperSetRequest::createResponse() const { return std::make_shared(); } @@ -771,6 +862,21 @@ void TestKeeper::remove( pushRequest(std::move(request_info)); } +void TestKeeper::removeRecursive( + const String & path, + uint32_t remove_nodes_limit, + RemoveRecursiveCallback callback) +{ + TestKeeperRemoveRecursiveRequest request; + request.path = path; + request.remove_nodes_limit = remove_nodes_limit; + + RequestInfo request_info; + request_info.request = std::make_shared(std::move(request)); + request_info.callback = [callback](const Response & response) { callback(dynamic_cast(response)); }; + pushRequest(std::move(request_info)); +} + void TestKeeper::exists( const String & path, ExistsCallback callback, diff --git a/src/Common/ZooKeeper/TestKeeper.h b/src/Common/ZooKeeper/TestKeeper.h index 562c313ac0e..c32f0064dec 100644 --- a/src/Common/ZooKeeper/TestKeeper.h +++ b/src/Common/ZooKeeper/TestKeeper.h @@ -58,6 +58,11 @@ public: int32_t version, RemoveCallback callback) override; + void removeRecursive( + const String & path, + uint32_t remove_nodes_limit, + RemoveRecursiveCallback callback) override; + void exists( const String & path, ExistsCallback callback, diff --git a/src/Common/ZooKeeper/Types.h b/src/Common/ZooKeeper/Types.h index d2876adaabc..4a163c15838 100644 --- a/src/Common/ZooKeeper/Types.h +++ b/src/Common/ZooKeeper/Types.h @@ -31,6 +31,7 @@ using AsyncResponses = std::vector>>; Coordination::RequestPtr makeCreateRequest(const std::string & path, const std::string & data, int create_mode, bool ignore_if_exists = false); Coordination::RequestPtr makeRemoveRequest(const std::string & path, int version); +Coordination::RequestPtr makeRemoveRecursiveRequest(const std::string & path, uint32_t remove_nodes_limit); Coordination::RequestPtr makeSetRequest(const std::string & path, const std::string & data, int version); Coordination::RequestPtr makeCheckRequest(const std::string & path, int version); diff --git a/src/Common/ZooKeeper/ZooKeeper.cpp b/src/Common/ZooKeeper/ZooKeeper.cpp index 1a9ed4f1ee7..ae60520affb 100644 --- a/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/src/Common/ZooKeeper/ZooKeeper.cpp @@ -979,18 +979,47 @@ bool ZooKeeper::tryRemoveChildrenRecursive(const std::string & path, bool probab return removed_as_expected; } -void ZooKeeper::removeRecursive(const std::string & path) +void ZooKeeper::removeRecursive(const std::string & path, uint32_t remove_nodes_limit) { - removeChildrenRecursive(path); - remove(path); + if (!isFeatureEnabled(DB::KeeperFeatureFlag::REMOVE_RECURSIVE)) + { + removeChildrenRecursive(path); + remove(path); + return; + } + + check(tryRemoveRecursive(path, remove_nodes_limit), path); } -void ZooKeeper::tryRemoveRecursive(const std::string & path) +Coordination::Error ZooKeeper::tryRemoveRecursive(const std::string & path, uint32_t remove_nodes_limit) { - tryRemoveChildrenRecursive(path); - tryRemove(path); -} + if (!isFeatureEnabled(DB::KeeperFeatureFlag::REMOVE_RECURSIVE)) + { + tryRemoveChildrenRecursive(path); + return tryRemove(path); + } + auto promise = std::make_shared>(); + auto future = promise->get_future(); + + auto callback = [promise](const Coordination::RemoveRecursiveResponse & response) mutable + { + promise->set_value(response); + }; + + impl->removeRecursive(path, remove_nodes_limit, std::move(callback)); + + if (future.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready) + { + impl->finalize(fmt::format("Operation timeout on {} {}", Coordination::OpNum::RemoveRecursive, path)); + return Coordination::Error::ZOPERATIONTIMEOUT; + } + else + { + auto response = future.get(); + return response.error; + } +} namespace { @@ -1619,6 +1648,14 @@ Coordination::RequestPtr makeRemoveRequest(const std::string & path, int version return request; } +Coordination::RequestPtr makeRemoveRecursiveRequest(const std::string & path, uint32_t remove_nodes_limit) +{ + auto request = std::make_shared(); + request->path = path; + request->remove_nodes_limit = remove_nodes_limit; + return request; +} + Coordination::RequestPtr makeSetRequest(const std::string & path, const std::string & data, int version) { auto request = std::make_shared(); diff --git a/src/Common/ZooKeeper/ZooKeeper.h b/src/Common/ZooKeeper/ZooKeeper.h index 7ccdc9d1b7f..29c4fbc9b74 100644 --- a/src/Common/ZooKeeper/ZooKeeper.h +++ b/src/Common/ZooKeeper/ZooKeeper.h @@ -479,15 +479,16 @@ public: Int64 getClientID(); - /// Remove the node with the subtree. If someone concurrently adds or removes a node - /// in the subtree, the result is undefined. - void removeRecursive(const std::string & path); + /// Remove the node with the subtree. + /// If Keeper supports RemoveRecursive operation then it will be performed atomically. + /// Otherwise if someone concurrently adds or removes a node in the subtree, the result is undefined. + void removeRecursive(const std::string & path, uint32_t remove_nodes_limit = 100); - /// Remove the node with the subtree. If someone concurrently removes a node in the subtree, - /// this will not cause errors. + /// Same as removeRecursive but in case if Keeper does not supports RemoveRecursive and + /// if someone concurrently removes a node in the subtree, this will not cause errors. /// For instance, you can call this method twice concurrently for the same node and the end /// result would be the same as for the single call. - void tryRemoveRecursive(const std::string & path); + Coordination::Error tryRemoveRecursive(const std::string & path, uint32_t remove_nodes_limit = 100); /// Similar to removeRecursive(...) and tryRemoveRecursive(...), but does not remove path itself. /// Node defined as RemoveException will not be deleted. diff --git a/src/Common/ZooKeeper/ZooKeeperCommon.cpp b/src/Common/ZooKeeper/ZooKeeperCommon.cpp index dff14f74681..3f9225e84dd 100644 --- a/src/Common/ZooKeeper/ZooKeeperCommon.cpp +++ b/src/Common/ZooKeeper/ZooKeeperCommon.cpp @@ -1,5 +1,5 @@ -#include "Common/ZooKeeper/IKeeper.h" -#include "Common/ZooKeeper/ZooKeeperConstants.h" +#include +#include #include #include #include @@ -232,6 +232,27 @@ void ZooKeeperRemoveRequest::readImpl(ReadBuffer & in) Coordination::read(version, in); } +void ZooKeeperRemoveRecursiveRequest::writeImpl(WriteBuffer & out) const +{ + Coordination::write(path, out); + Coordination::write(remove_nodes_limit, out); +} + +void ZooKeeperRemoveRecursiveRequest::readImpl(ReadBuffer & in) +{ + Coordination::read(path, in); + Coordination::read(remove_nodes_limit, in); +} + +std::string ZooKeeperRemoveRecursiveRequest::toStringImpl(bool /*short_format*/) const +{ + return fmt::format( + "path = {}\n" + "remove_nodes_limit = {}", + path, + remove_nodes_limit); +} + void ZooKeeperExistsRequest::writeImpl(WriteBuffer & out) const { Coordination::write(path, out); @@ -510,6 +531,11 @@ ZooKeeperMultiRequest::ZooKeeperMultiRequest(std::span(*concrete_request_remove)); } + else if (const auto * concrete_request_remove_recursive = dynamic_cast(generic_request.get())) + { + checkOperationType(Write); + requests.push_back(std::make_shared(*concrete_request_remove_recursive)); + } else if (const auto * concrete_request_set = dynamic_cast(generic_request.get())) { checkOperationType(Write); @@ -707,6 +733,7 @@ ZooKeeperResponsePtr ZooKeeperHeartbeatRequest::makeResponse() const { return se ZooKeeperResponsePtr ZooKeeperSyncRequest::makeResponse() const { return setTime(std::make_shared()); } ZooKeeperResponsePtr ZooKeeperAuthRequest::makeResponse() const { return setTime(std::make_shared()); } ZooKeeperResponsePtr ZooKeeperRemoveRequest::makeResponse() const { return setTime(std::make_shared()); } +ZooKeeperResponsePtr ZooKeeperRemoveRecursiveRequest::makeResponse() const { return setTime(std::make_shared()); } ZooKeeperResponsePtr ZooKeeperExistsRequest::makeResponse() const { return setTime(std::make_shared()); } ZooKeeperResponsePtr ZooKeeperGetRequest::makeResponse() const { return setTime(std::make_shared()); } ZooKeeperResponsePtr ZooKeeperSetRequest::makeResponse() const { return setTime(std::make_shared()); } @@ -1024,6 +1051,7 @@ ZooKeeperRequestFactory::ZooKeeperRequestFactory() registerZooKeeperRequest(*this); registerZooKeeperRequest(*this); registerZooKeeperRequest(*this); + registerZooKeeperRequest(*this); } PathMatchResult matchPath(std::string_view path, std::string_view match_to) diff --git a/src/Common/ZooKeeper/ZooKeeperCommon.h b/src/Common/ZooKeeper/ZooKeeperCommon.h index fd6ec3cd375..66c075b277b 100644 --- a/src/Common/ZooKeeper/ZooKeeperCommon.h +++ b/src/Common/ZooKeeper/ZooKeeperCommon.h @@ -285,6 +285,31 @@ struct ZooKeeperRemoveResponse final : RemoveResponse, ZooKeeperResponse size_t bytesSize() const override { return RemoveResponse::bytesSize() + sizeof(xid) + sizeof(zxid); } }; +struct ZooKeeperRemoveRecursiveRequest final : RemoveRecursiveRequest, ZooKeeperRequest +{ + ZooKeeperRemoveRecursiveRequest() = default; + explicit ZooKeeperRemoveRecursiveRequest(const RemoveRecursiveRequest & base) : RemoveRecursiveRequest(base) {} + + OpNum getOpNum() const override { return OpNum::RemoveRecursive; } + void writeImpl(WriteBuffer & out) const override; + void readImpl(ReadBuffer & in) override; + std::string toStringImpl(bool short_format) const override; + + ZooKeeperResponsePtr makeResponse() const override; + bool isReadRequest() const override { return false; } + + size_t bytesSize() const override { return RemoveRecursiveRequest::bytesSize() + sizeof(xid); } +}; + +struct ZooKeeperRemoveRecursiveResponse : RemoveRecursiveResponse, ZooKeeperResponse +{ + void readImpl(ReadBuffer &) override {} + void writeImpl(WriteBuffer &) const override {} + OpNum getOpNum() const override { return OpNum::RemoveRecursive; } + + size_t bytesSize() const override { return RemoveRecursiveResponse::bytesSize() + sizeof(xid) + sizeof(zxid); } +}; + struct ZooKeeperExistsRequest final : ExistsRequest, ZooKeeperRequest { ZooKeeperExistsRequest() = default; diff --git a/src/Common/ZooKeeper/ZooKeeperConstants.cpp b/src/Common/ZooKeeper/ZooKeeperConstants.cpp index cf8ba35e992..a2780dfd5e2 100644 --- a/src/Common/ZooKeeper/ZooKeeperConstants.cpp +++ b/src/Common/ZooKeeper/ZooKeeperConstants.cpp @@ -29,6 +29,7 @@ static const std::unordered_set VALID_OPERATIONS = static_cast(OpNum::GetACL), static_cast(OpNum::FilteredList), static_cast(OpNum::CheckNotExists), + static_cast(OpNum::RemoveRecursive), }; OpNum getOpNum(int32_t raw_op_num) diff --git a/src/Common/ZooKeeper/ZooKeeperConstants.h b/src/Common/ZooKeeper/ZooKeeperConstants.h index 1d9830505f8..9d8e2d4f857 100644 --- a/src/Common/ZooKeeper/ZooKeeperConstants.h +++ b/src/Common/ZooKeeper/ZooKeeperConstants.h @@ -40,6 +40,7 @@ enum class OpNum : int32_t FilteredList = 500, CheckNotExists = 501, CreateIfNotExists = 502, + RemoveRecursive = 503, SessionID = 997, /// Special internal request }; diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/src/Common/ZooKeeper/ZooKeeperImpl.cpp index ba622f30c91..a6dd9738e17 100644 --- a/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -1347,6 +1347,25 @@ void ZooKeeper::remove( ProfileEvents::increment(ProfileEvents::ZooKeeperRemove); } +void ZooKeeper::removeRecursive( + const String &path, + uint32_t remove_nodes_limit, + RemoveRecursiveCallback callback) +{ + if (!isFeatureEnabled(KeeperFeatureFlag::REMOVE_RECURSIVE)) + throw Exception::fromMessage(Error::ZBADARGUMENTS, "RemoveRecursive request type cannot be used because it's not supported by the server"); + + ZooKeeperRemoveRecursiveRequest request; + request.path = path; + request.remove_nodes_limit = remove_nodes_limit; + + RequestInfo request_info; + request_info.request = std::make_shared(std::move(request)); + request_info.callback = [callback](const Response & response) { callback(dynamic_cast(response)); }; + + pushRequest(std::move(request_info)); + ProfileEvents::increment(ProfileEvents::ZooKeeperRemove); +} void ZooKeeper::exists( const String & path, diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.h b/src/Common/ZooKeeper/ZooKeeperImpl.h index 39082cd14c1..47d2ab8f401 100644 --- a/src/Common/ZooKeeper/ZooKeeperImpl.h +++ b/src/Common/ZooKeeper/ZooKeeperImpl.h @@ -146,6 +146,11 @@ public: int32_t version, RemoveCallback callback) override; + void removeRecursive( + const String &path, + uint32_t remove_nodes_limit, + RemoveRecursiveCallback callback) override; + void exists( const String & path, ExistsCallback callback, diff --git a/src/Coordination/KeeperConstants.h b/src/Coordination/KeeperConstants.h index 08a7c85585a..d984d077872 100644 --- a/src/Coordination/KeeperConstants.h +++ b/src/Coordination/KeeperConstants.h @@ -11,6 +11,7 @@ enum class KeeperApiVersion : uint8_t WITH_FILTERED_LIST, WITH_MULTI_READ, WITH_CHECK_NOT_EXISTS, + WITH_REMOVE_RECURSIVE, }; const String keeper_system_path = "/keeper"; diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index 893bb8e6082..142bd3b7c71 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -91,6 +91,12 @@ bool checkIfRequestIncreaseMem(const Coordination::ZooKeeperRequestPtr & request memory_delta -= remove_req.bytesSize(); break; } + case Coordination::OpNum::RemoveRecursive: + { + Coordination::ZooKeeperRemoveRecursiveRequest & remove_req = dynamic_cast(*sub_zk_request); + memory_delta -= remove_req.bytesSize(); + break; + } default: break; } diff --git a/src/Coordination/KeeperFeatureFlags.h b/src/Coordination/KeeperFeatureFlags.h index 4e26ca60736..e70bd50cc88 100644 --- a/src/Coordination/KeeperFeatureFlags.h +++ b/src/Coordination/KeeperFeatureFlags.h @@ -12,6 +12,7 @@ enum class KeeperFeatureFlag : size_t MULTI_READ, CHECK_NOT_EXISTS, CREATE_IF_NOT_EXISTS, + REMOVE_RECURSIVE, }; class KeeperFeatureFlags diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index acdf209baae..2eb9ab30efa 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -832,6 +832,15 @@ std::shared_ptr KeeperStorage::UncommittedS return tryGetNodeFromStorage(path); } +template +const typename Container::Node * KeeperStorage::UncommittedState::getActualNodeView(StringRef path, const Node & storage_node) const +{ + if (auto node_it = nodes.find(path.toView()); node_it != nodes.end()) + return node_it->second.node.get(); + + return &storage_node; +} + template Coordination::ACLs KeeperStorage::UncommittedState::getACLs(StringRef path) const { @@ -1124,7 +1133,7 @@ struct KeeperStorageRequestProcessor } virtual KeeperStorageBase::ResponsesForSessions - processWatches(KeeperStorageBase::Watches & /*watches*/, KeeperStorageBase::Watches & /*list_watches*/) const + processWatches(const Storage & /*storage*/, int64_t /*zxid*/, KeeperStorageBase::Watches & /*watches*/, KeeperStorageBase::Watches & /*list_watches*/) const { return {}; } @@ -1241,7 +1250,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; KeeperStorageBase::ResponsesForSessions - processWatches(KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & list_watches) const override + processWatches(const Storage & /*storage*/, int64_t /*zxid*/, KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & list_watches) const override { return processWatchesImpl(this->zk_request->getPath(), watches, list_watches, Coordination::Event::CREATED); } @@ -1462,16 +1471,41 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce } }; +namespace +{ + +template +void addUpdateParentPzxidDelta(Storage & storage, std::vector & deltas, int64_t zxid, StringRef path) +{ + auto parent_path = parentNodePath(path); + if (!storage.uncommitted_state.getNode(parent_path)) + return; + + deltas.emplace_back( + std::string{parent_path}, + zxid, + typename Storage::UpdateNodeDelta + { + [zxid](Storage::Node & parent) + { + parent.pzxid = std::max(parent.pzxid, zxid); + } + } + ); +} + +} + template struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestProcessor { + using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; + bool checkAuth(Storage & storage, int64_t session_id, bool is_local) const override { return storage.checkACL(parentNodePath(this->zk_request->getPath()), Coordination::ACL::Delete, session_id, is_local); } - using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - std::vector preprocess(Storage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/, uint64_t & digest, const KeeperContext & keeper_context) const override { @@ -1488,31 +1522,12 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr return {typename Storage::Delta{zxid, Coordination::Error::ZBADARGUMENTS}}; } - const auto update_parent_pzxid = [&]() - { - auto parent_path = parentNodePath(request.path); - if (!storage.uncommitted_state.getNode(parent_path)) - return; - - new_deltas.emplace_back( - std::string{parent_path}, - zxid, - typename Storage::UpdateNodeDelta - { - [zxid](Storage::Node & parent) - { - parent.pzxid = std::max(parent.pzxid, zxid); - } - } - ); - }; - auto node = storage.uncommitted_state.getNode(request.path); if (!node) { if (request.restored_from_zookeeper_log) - update_parent_pzxid(); + addUpdateParentPzxidDelta(storage, new_deltas, zxid, request.path); return {typename Storage::Delta{zxid, Coordination::Error::ZNONODE}}; } else if (request.version != -1 && request.version != node->version) @@ -1521,7 +1536,7 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr return {typename Storage::Delta{zxid, Coordination::Error::ZNOTEMPTY}}; if (request.restored_from_zookeeper_log) - update_parent_pzxid(); + addUpdateParentPzxidDelta(storage, new_deltas, zxid, request.path); new_deltas.emplace_back( std::string{parentNodePath(request.path)}, @@ -1552,12 +1567,318 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr } KeeperStorageBase::ResponsesForSessions - processWatches(KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & list_watches) const override + processWatches(const Storage & /*storage*/, int64_t /*zxid*/, KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & list_watches) const override { return processWatchesImpl(this->zk_request->getPath(), watches, list_watches, Coordination::Event::DELETED); } }; +template +struct KeeperStorageRemoveRecursiveRequestProcessor final : public KeeperStorageRequestProcessor +{ + using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; + + bool checkAuth(Storage & storage, int64_t session_id, bool is_local) const override + { + return storage.checkACL(parentNodePath(this->zk_request->getPath()), Coordination::ACL::Delete, session_id, is_local); + } + + std::vector + preprocess(Storage & storage, int64_t zxid, int64_t session_id, int64_t /*time*/, uint64_t & digest, const KeeperContext & keeper_context) const override + { + ProfileEvents::increment(ProfileEvents::KeeperRemoveRequest); + Coordination::ZooKeeperRemoveRecursiveRequest & request = dynamic_cast(*this->zk_request); + + std::vector new_deltas; + + if (Coordination::matchPath(request.path, keeper_system_path) != Coordination::PathMatchResult::NOT_MATCH) + { + auto error_msg = fmt::format("Trying to delete an internal Keeper path ({}) which is not allowed", request.path); + + handleSystemNodeModification(keeper_context, error_msg); + return {typename Storage::Delta{zxid, Coordination::Error::ZBADARGUMENTS}}; + } + + auto node = storage.uncommitted_state.getNode(request.path); + + if (!node) + { + if (request.restored_from_zookeeper_log) + addUpdateParentPzxidDelta(storage, new_deltas, zxid, request.path); + + return {typename Storage::Delta{zxid, Coordination::Error::ZNONODE}}; + } + + ToDeleteTreeCollector collector(storage, zxid, session_id, request.remove_nodes_limit); + auto collect_status = collector.collect(request.path, *node); + + if (collect_status == ToDeleteTreeCollector::CollectStatus::NoAuth) + return {typename Storage::Delta{zxid, Coordination::Error::ZNOAUTH}}; + + if (collect_status == ToDeleteTreeCollector::CollectStatus::LimitExceeded) + return {typename Storage::Delta{zxid, Coordination::Error::ZNOTEMPTY}}; + + if (request.restored_from_zookeeper_log) + addUpdateParentPzxidDelta(storage, new_deltas, zxid, request.path); + + auto delete_deltas = collector.extractDeltas(); + + for (const auto & delta : delete_deltas) + { + const auto * remove_delta = std::get_if(&delta.operation); + if (remove_delta && remove_delta->ephemeral_owner) + storage.unregisterEphemeralPath(remove_delta->ephemeral_owner, delta.path); + } + + new_deltas.insert(new_deltas.end(), std::make_move_iterator(delete_deltas.begin()), std::make_move_iterator(delete_deltas.end())); + + digest = storage.calculateNodesDigest(digest, new_deltas); + + return new_deltas; + } + + Coordination::ZooKeeperResponsePtr process(Storage & storage, int64_t zxid) const override + { + Coordination::ZooKeeperResponsePtr response_ptr = this->zk_request->makeResponse(); + Coordination::ZooKeeperRemoveRecursiveResponse & response = dynamic_cast(*response_ptr); + + response.error = storage.commit(zxid); + return response_ptr; + } + + KeeperStorageBase::ResponsesForSessions + processWatches(const Storage & storage, int64_t zxid, KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & list_watches) const override + { + /// need to iterate over zxid deltas and update watches for deleted tree. + const auto & deltas = storage.uncommitted_state.deltas; + + KeeperStorageBase::ResponsesForSessions responses; + for (auto it = deltas.rbegin(); it != deltas.rend() && it->zxid == zxid; ++it) + { + const auto * remove_delta = std::get_if(&it->operation); + if (remove_delta) + { + auto new_responses = processWatchesImpl(it->path, watches, list_watches, Coordination::Event::DELETED); + responses.insert(responses.end(), std::make_move_iterator(new_responses.begin()), std::make_move_iterator(new_responses.end())); + } + } + + return responses; + } + +private: + using SNode = typename Storage::Node; + + class ToDeleteTreeCollector + { + Storage & storage; + int64_t zxid; + int64_t session_id; + uint32_t limit; + + uint32_t max_level = 0; + uint32_t nodes_observed = 1; /// root node + std::unordered_map> by_level_deltas; + + struct Step + { + String path; + std::variant node; + uint32_t level; + }; + + enum class CollectStatus + { + Ok, + NoAuth, + LimitExceeded, + }; + + friend struct KeeperStorageRemoveRecursiveRequestProcessor; + + public: + ToDeleteTreeCollector(Storage & storage_, int64_t zxid_, int64_t session_id_, uint32_t limit_) + : storage(storage_) + , zxid(zxid_) + , session_id(session_id_) + , limit(limit_) + { + } + + CollectStatus collect(StringRef root_path, const SNode & root_node) + { + std::deque steps; + + if (checkLimits(&root_node)) + return CollectStatus::LimitExceeded; + + steps.push_back(Step{root_path.toString(), &root_node, 0}); + + while (!steps.empty()) + { + Step step = std::move(steps.front()); + steps.pop_front(); + + StringRef path = step.path; + uint32_t level = step.level; + const SNode * node_ptr = nullptr; + + if (auto * rdb = std::get_if(&step.node)) + node_ptr = rdb; + else + node_ptr = std::get(step.node); + + chassert(!path.empty()); + chassert(node_ptr != nullptr); + + const auto & node = *node_ptr; + auto actual_node_ptr = storage.uncommitted_state.getActualNodeView(path, node); + chassert(actual_node_ptr != nullptr); /// explicitly check that node is not deleted + + if (actual_node_ptr->numChildren() > 0 && !storage.checkACL(path, Coordination::ACL::Delete, session_id, /*is_local=*/false)) + return CollectStatus::NoAuth; + + if (auto status = visitRocksDBNode(steps, path, level); status != CollectStatus::Ok) + return status; + + if (auto status = visitMemNode(steps, path, level); status != CollectStatus::Ok) + return status; + + if (auto status = visitRootAndUncommitted(steps, path, node, level); status != CollectStatus::Ok) + return status; + } + + return CollectStatus::Ok; + } + + std::vector extractDeltas() + { + std::vector deltas; + + for (ssize_t level = max_level; level >= 0; --level) + { + auto & level_deltas = by_level_deltas[static_cast(level)]; + deltas.insert(deltas.end(), std::make_move_iterator(level_deltas.begin()), std::make_move_iterator(level_deltas.end())); + } + + return std::move(deltas); + } + + private: + CollectStatus visitRocksDBNode(std::deque & steps, StringRef root_path, uint32_t level) + { + if constexpr (Storage::use_rocksdb) + { + std::filesystem::path root_fs_path(root_path.toString()); + auto children = storage.container.getChildren(root_path.toString()); + + for (auto && [child_name, child_node] : children) + { + auto child_path = (root_fs_path / child_name).generic_string(); + const auto actual_child_node_ptr = storage.uncommitted_state.getActualNodeView(child_path, child_node); + + if (actual_child_node_ptr == nullptr) /// node was deleted in previous step of multi transaction + continue; + + if (checkLimits(actual_child_node_ptr)) + return CollectStatus::LimitExceeded; + + steps.push_back(Step{std::move(child_path), std::move(child_node), level + 1}); + } + } + + return CollectStatus::Ok; + } + + CollectStatus visitMemNode(std::deque & steps, StringRef root_path, uint32_t level) + { + if constexpr (!Storage::use_rocksdb) + { + auto node_it = storage.container.find(root_path); + if (node_it == storage.container.end()) + return CollectStatus::Ok; + + std::filesystem::path root_fs_path(root_path.toString()); + const auto & children = node_it->value.getChildren(); + + for (const auto & child_name : children) + { + auto child_path = (root_fs_path / child_name.toView()).generic_string(); + + auto child_it = storage.container.find(child_path); + chassert(child_it != storage.container.end()); + const auto & child_node = child_it->value; + + const auto actual_child_node_ptr = storage.uncommitted_state.getActualNodeView(child_path, child_node); + + if (actual_child_node_ptr == nullptr) /// node was deleted in previous step of multi transaction + continue; + + if (checkLimits(actual_child_node_ptr)) + return CollectStatus::LimitExceeded; + + steps.push_back(Step{std::move(child_path), &child_node, level + 1}); + } + } + + return CollectStatus::Ok; + } + + CollectStatus visitRootAndUncommitted(std::deque & steps, StringRef root_path, const SNode & root_node, uint32_t level) + { + const auto & nodes = storage.uncommitted_state.nodes; + + /// nodes are sorted by paths with level locality + auto it = nodes.upper_bound(root_path.toString() + "/"); + + for (; it != nodes.end() && parentNodePath(it->first) == root_path; ++it) + { + const auto actual_child_node_ptr = it->second.node.get(); + + if (actual_child_node_ptr == nullptr) /// node was deleted in previous step of multi transaction + continue; + + if (checkLimits(actual_child_node_ptr)) + return CollectStatus::LimitExceeded; + + const String & child_path = it->first; + const SNode & child_node = *it->second.node; + + steps.push_back(Step{child_path, &child_node, level + 1}); + } + + addDelta(root_path, root_node, level); + + return CollectStatus::Ok; + } + + void addDelta(StringRef root_path, const SNode & root_node, uint32_t level) + { + max_level = std::max(max_level, level); + + by_level_deltas[level].emplace_back( + parentNodePath(root_path).toString(), + zxid, + typename Storage::UpdateNodeDelta{ + [](SNode & parent) + { + ++parent.cversion; + parent.decreaseNumChildren(); + } + }); + + by_level_deltas[level].emplace_back(root_path.toString(), zxid, typename Storage::RemoveNodeDelta{root_node.version, root_node.ephemeralOwner()}); + } + + bool checkLimits(const SNode * node) + { + chassert(node != nullptr); + nodes_observed += node->numChildren(); + return nodes_observed > limit; + } + }; +}; + template struct KeeperStorageExistsRequestProcessor final : public KeeperStorageRequestProcessor { @@ -1709,7 +2030,7 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce } KeeperStorageBase::ResponsesForSessions - processWatches(typename Storage::Watches & watches, typename Storage::Watches & list_watches) const override + processWatches(const Storage & /*storage*/, int64_t /*zxid*/, typename Storage::Watches & watches, typename Storage::Watches & list_watches) const override { return processWatchesImpl(this->zk_request->getPath(), watches, list_watches, Coordination::Event::CHANGED); } @@ -2131,6 +2452,10 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro check_operation_type(OperationType::Write); concrete_requests.push_back(std::make_shared>(sub_zk_request)); break; + case Coordination::OpNum::RemoveRecursive: + check_operation_type(OperationType::Write); + concrete_requests.push_back(std::make_shared>(sub_zk_request)); + break; case Coordination::OpNum::Set: check_operation_type(OperationType::Write); concrete_requests.push_back(std::make_shared>(sub_zk_request)); @@ -2250,12 +2575,12 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro } KeeperStorageBase::ResponsesForSessions - processWatches(typename Storage::Watches & watches, typename Storage::Watches & list_watches) const override + processWatches(const Storage & storage, int64_t zxid, typename Storage::Watches & watches, typename Storage::Watches & list_watches) const override { typename Storage::ResponsesForSessions result; for (const auto & generic_request : concrete_requests) { - auto responses = generic_request->processWatches(watches, list_watches); + auto responses = generic_request->processWatches(storage, zxid, watches, list_watches); result.insert(result.end(), responses.begin(), responses.end()); } return result; @@ -2400,6 +2725,7 @@ KeeperStorageRequestProcessorsFactory::KeeperStorageRequestProcessorsFa registerKeeperRequestProcessor>(*this); registerKeeperRequestProcessor>(*this); registerKeeperRequestProcessor>(*this); + registerKeeperRequestProcessor>(*this); } @@ -2718,7 +3044,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::process /// If this requests processed successfully we need to check watches if (response->error == Coordination::Error::ZOK) { - auto watch_responses = request_processor->processWatches(watches, list_watches); + auto watch_responses = request_processor->processWatches(*this, zxid, watches, list_watches); results.insert(results.end(), watch_responses.begin(), watch_responses.end()); } diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index 4a9286d4835..904af76ef37 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -566,6 +566,7 @@ public: void rollback(int64_t rollback_zxid); std::shared_ptr getNode(StringRef path) const; + const Node * getActualNodeView(StringRef path, const Node & storage_node) const; Coordination::ACLs getACLs(StringRef path) const; void applyDelta(const Delta & delta); @@ -609,7 +610,18 @@ public: using is_transparent = void; // required to make find() work with different type than key_type }; - mutable std::unordered_map nodes; + struct PathCmp + { + using is_transparent = std::true_type; + + auto operator()(const std::string_view a, + const std::string_view b) const + { + return a.size() < b.size() || (a.size() == b.size() && a < b); + } + }; + + mutable std::map nodes; std::unordered_map, Hash, Equal> deltas_for_path; std::list deltas; diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index d39031773cd..73402af5ec4 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -3113,6 +3113,8 @@ TYPED_TEST(CoordinationTest, TestFeatureFlags) ASSERT_TRUE(feature_flags.isEnabled(KeeperFeatureFlag::FILTERED_LIST)); ASSERT_TRUE(feature_flags.isEnabled(KeeperFeatureFlag::MULTI_READ)); ASSERT_FALSE(feature_flags.isEnabled(KeeperFeatureFlag::CHECK_NOT_EXISTS)); + ASSERT_FALSE(feature_flags.isEnabled(KeeperFeatureFlag::CREATE_IF_NOT_EXISTS)); + ASSERT_FALSE(feature_flags.isEnabled(KeeperFeatureFlag::REMOVE_RECURSIVE)); } TYPED_TEST(CoordinationTest, TestSystemNodeModify) @@ -3374,6 +3376,474 @@ TYPED_TEST(CoordinationTest, TestReapplyingDeltas) ASSERT_TRUE(children1_set == children2_set); } +TYPED_TEST(CoordinationTest, TestRemoveRecursiveRequest) +{ + using namespace DB; + using namespace Coordination; + + using Storage = typename TestFixture::Storage; + + ChangelogDirTest rocks("./rocksdb"); + this->setRocksDBDirectory("./rocksdb"); + + Storage storage{500, "", this->keeper_context}; + + int32_t zxid = 0; + + const auto create = [&](const String & path, int create_mode) + { + int new_zxid = ++zxid; + + const auto create_request = std::make_shared(); + create_request->path = path; + create_request->is_ephemeral = create_mode == zkutil::CreateMode::Ephemeral || create_mode == zkutil::CreateMode::EphemeralSequential; + create_request->is_sequential = create_mode == zkutil::CreateMode::PersistentSequential || create_mode == zkutil::CreateMode::EphemeralSequential; + + storage.preprocessRequest(create_request, 1, 0, new_zxid); + auto responses = storage.processRequest(create_request, 1, new_zxid); + + EXPECT_EQ(responses.size(), 1); + EXPECT_EQ(responses[0].response->error, Coordination::Error::ZOK) << "Failed to create " << path; + }; + + const auto remove = [&](const String & path, int32_t version = -1) + { + int new_zxid = ++zxid; + + auto remove_request = std::make_shared(); + remove_request->path = path; + remove_request->version = version; + + storage.preprocessRequest(remove_request, 1, 0, new_zxid); + return storage.processRequest(remove_request, 1, new_zxid); + }; + + const auto remove_recursive = [&](const String & path, uint32_t remove_nodes_limit = 1) + { + int new_zxid = ++zxid; + + auto remove_request = std::make_shared(); + remove_request->path = path; + remove_request->remove_nodes_limit = remove_nodes_limit; + + storage.preprocessRequest(remove_request, 1, 0, new_zxid); + return storage.processRequest(remove_request, 1, new_zxid); + }; + + const auto exists = [&](const String & path) + { + int new_zxid = ++zxid; + + const auto exists_request = std::make_shared(); + exists_request->path = path; + + storage.preprocessRequest(exists_request, 1, 0, new_zxid); + auto responses = storage.processRequest(exists_request, 1, new_zxid); + + EXPECT_EQ(responses.size(), 1); + return responses[0].response->error == Coordination::Error::ZOK; + }; + + { + SCOPED_TRACE("Single Remove Single Node"); + create("/T1", zkutil::CreateMode::Persistent); + + auto responses = remove("/T1"); + ASSERT_EQ(responses.size(), 1); + ASSERT_EQ(responses[0].response->error, Coordination::Error::ZOK); + ASSERT_FALSE(exists("/T1")); + } + + { + SCOPED_TRACE("Single Remove Tree"); + create("/T2", zkutil::CreateMode::Persistent); + create("/T2/A", zkutil::CreateMode::Persistent); + + auto responses = remove("/T2"); + ASSERT_EQ(responses.size(), 1); + ASSERT_EQ(responses[0].response->error, Coordination::Error::ZNOTEMPTY); + ASSERT_TRUE(exists("/T2")); + } + + { + SCOPED_TRACE("Recursive Remove Single Node"); + create("/T3", zkutil::CreateMode::Persistent); + + auto responses = remove_recursive("/T3", 100); + ASSERT_EQ(responses.size(), 1); + ASSERT_EQ(responses[0].response->error, Coordination::Error::ZOK); + ASSERT_FALSE(exists("/T3")); + } + + { + SCOPED_TRACE("Recursive Remove Tree Small Limit"); + create("/T5", zkutil::CreateMode::Persistent); + create("/T5/A", zkutil::CreateMode::Persistent); + create("/T5/B", zkutil::CreateMode::Persistent); + create("/T5/A/C", zkutil::CreateMode::Persistent); + + auto responses = remove_recursive("/T5", 2); + ASSERT_EQ(responses.size(), 1); + ASSERT_EQ(responses[0].response->error, Coordination::Error::ZNOTEMPTY); + ASSERT_TRUE(exists("/T5")); + ASSERT_TRUE(exists("/T5/A")); + ASSERT_TRUE(exists("/T5/B")); + ASSERT_TRUE(exists("/T5/A/C")); + } + + { + SCOPED_TRACE("Recursive Remove Tree Big Limit"); + create("/T6", zkutil::CreateMode::Persistent); + create("/T6/A", zkutil::CreateMode::Persistent); + create("/T6/B", zkutil::CreateMode::Persistent); + create("/T6/A/C", zkutil::CreateMode::Persistent); + + auto responses = remove_recursive("/T6", 4); + ASSERT_EQ(responses.size(), 1); + ASSERT_EQ(responses[0].response->error, Coordination::Error::ZOK); + ASSERT_FALSE(exists("/T6")); + ASSERT_FALSE(exists("/T6/A")); + ASSERT_FALSE(exists("/T6/B")); + ASSERT_FALSE(exists("/T6/A/C")); + } + + { + SCOPED_TRACE("Recursive Remove Ephemeral"); + create("/T7", zkutil::CreateMode::Ephemeral); + ASSERT_EQ(storage.ephemerals.size(), 1); + + auto responses = remove_recursive("/T7", 100); + ASSERT_EQ(responses.size(), 1); + ASSERT_EQ(responses[0].response->error, Coordination::Error::ZOK); + ASSERT_EQ(storage.ephemerals.size(), 0); + ASSERT_FALSE(exists("/T7")); + } + + { + SCOPED_TRACE("Recursive Remove Tree With Ephemeral"); + create("/T8", zkutil::CreateMode::Persistent); + create("/T8/A", zkutil::CreateMode::Persistent); + create("/T8/B", zkutil::CreateMode::Ephemeral); + create("/T8/A/C", zkutil::CreateMode::Ephemeral); + ASSERT_EQ(storage.ephemerals.size(), 1); + + auto responses = remove_recursive("/T8", 4); + ASSERT_EQ(responses.size(), 1); + ASSERT_EQ(responses[0].response->error, Coordination::Error::ZOK); + ASSERT_EQ(storage.ephemerals.size(), 0); + ASSERT_FALSE(exists("/T8")); + ASSERT_FALSE(exists("/T8/A")); + ASSERT_FALSE(exists("/T8/B")); + ASSERT_FALSE(exists("/T8/A/C")); + } +} + +TYPED_TEST(CoordinationTest, TestRemoveRecursiveInMultiRequest) +{ + using namespace DB; + using namespace Coordination; + + using Storage = typename TestFixture::Storage; + + ChangelogDirTest rocks("./rocksdb"); + this->setRocksDBDirectory("./rocksdb"); + + Storage storage{500, "", this->keeper_context}; + int zxid = 0; + + auto prepare_create_tree = []() + { + return Coordination::Requests{ + zkutil::makeCreateRequest("/A", "A", zkutil::CreateMode::Persistent), + zkutil::makeCreateRequest("/A/B", "B", zkutil::CreateMode::Persistent), + zkutil::makeCreateRequest("/A/C", "C", zkutil::CreateMode::Ephemeral), + zkutil::makeCreateRequest("/A/B/D", "D", zkutil::CreateMode::Ephemeral), + }; + }; + + const auto exists = [&](const String & path) + { + int new_zxid = ++zxid; + + const auto exists_request = std::make_shared(); + exists_request->path = path; + + storage.preprocessRequest(exists_request, 1, 0, new_zxid); + auto responses = storage.processRequest(exists_request, 1, new_zxid); + + EXPECT_EQ(responses.size(), 1); + return responses[0].response->error == Coordination::Error::ZOK; + }; + + const auto is_multi_ok = [&](Coordination::ZooKeeperResponsePtr response) + { + const auto & multi_response = dynamic_cast(*response); + + for (const auto & op_response : multi_response.responses) + if (op_response->error != Coordination::Error::ZOK) + return false; + + return true; + }; + + { + SCOPED_TRACE("Remove In Multi Tx"); + int new_zxid = ++zxid; + auto ops = prepare_create_tree(); + + ops.push_back(zkutil::makeRemoveRequest("/A", -1)); + const auto request = std::make_shared(ops, ACLs{}); + + storage.preprocessRequest(request, 1, 0, new_zxid); + auto responses = storage.processRequest(request, 1, new_zxid); + ops.pop_back(); + + ASSERT_EQ(responses.size(), 1); + ASSERT_FALSE(is_multi_ok(responses[0].response)); + } + + { + SCOPED_TRACE("Recursive Remove In Multi Tx"); + int new_zxid = ++zxid; + auto ops = prepare_create_tree(); + + ops.push_back(zkutil::makeRemoveRecursiveRequest("/A", 4)); + const auto request = std::make_shared(ops, ACLs{}); + + storage.preprocessRequest(request, 1, 0, new_zxid); + auto responses = storage.processRequest(request, 1, new_zxid); + ops.pop_back(); + + ASSERT_EQ(responses.size(), 1); + ASSERT_TRUE(is_multi_ok(responses[0].response)); + ASSERT_FALSE(exists("/A")); + ASSERT_FALSE(exists("/A/C")); + ASSERT_FALSE(exists("/A/B")); + ASSERT_FALSE(exists("/A/B/D")); + } + + { + SCOPED_TRACE("Recursive Remove With Regular In Multi Tx"); + int new_zxid = ++zxid; + auto ops = prepare_create_tree(); + + ops.push_back(zkutil::makeRemoveRequest("/A/C", -1)); + ops.push_back(zkutil::makeRemoveRecursiveRequest("/A", 3)); + const auto request = std::make_shared(ops, ACLs{}); + + storage.preprocessRequest(request, 1, 0, new_zxid); + auto responses = storage.processRequest(request, 1, new_zxid); + ops.pop_back(); + ops.pop_back(); + + ASSERT_EQ(responses.size(), 1); + ASSERT_TRUE(is_multi_ok(responses[0].response)); + ASSERT_FALSE(exists("/A")); + ASSERT_FALSE(exists("/A/C")); + ASSERT_FALSE(exists("/A/B")); + ASSERT_FALSE(exists("/A/B/D")); + } + + { + SCOPED_TRACE("Recursive Remove From Committed and Uncommitted states"); + int create_zxid = ++zxid; + auto ops = prepare_create_tree(); + + /// First create nodes + const auto create_request = std::make_shared(ops, ACLs{}); + storage.preprocessRequest(create_request, 1, 0, create_zxid); + auto create_responses = storage.processRequest(create_request, 1, create_zxid); + ASSERT_EQ(create_responses.size(), 1); + ASSERT_TRUE(is_multi_ok(create_responses[0].response)); + ASSERT_TRUE(exists("/A")); + ASSERT_TRUE(exists("/A/C")); + ASSERT_TRUE(exists("/A/B")); + ASSERT_TRUE(exists("/A/B/D")); + + /// Remove node A/C as a single remove request. + /// Remove all other as remove recursive request. + /// In this case we should list storage to understand the tree topology + /// but ignore already deleted nodes in uncommitted state. + + int remove_zxid = ++zxid; + ops = { + zkutil::makeRemoveRequest("/A/C", -1), + zkutil::makeRemoveRecursiveRequest("/A", 3), + }; + const auto remove_request = std::make_shared(ops, ACLs{}); + + storage.preprocessRequest(remove_request, 1, 0, remove_zxid); + auto remove_responses = storage.processRequest(remove_request, 1, remove_zxid); + + ASSERT_EQ(remove_responses.size(), 1); + ASSERT_TRUE(is_multi_ok(remove_responses[0].response)); + ASSERT_FALSE(exists("/A")); + ASSERT_FALSE(exists("/A/C")); + ASSERT_FALSE(exists("/A/B")); + ASSERT_FALSE(exists("/A/B/D")); + } +} + +TYPED_TEST(CoordinationTest, TestRemoveRecursiveWatches) +{ + using namespace DB; + using namespace Coordination; + + using Storage = typename TestFixture::Storage; + + ChangelogDirTest rocks("./rocksdb"); + this->setRocksDBDirectory("./rocksdb"); + + Storage storage{500, "", this->keeper_context}; + int zxid = 0; + + const auto create = [&](const String & path, int create_mode) + { + int new_zxid = ++zxid; + + const auto create_request = std::make_shared(); + create_request->path = path; + create_request->is_ephemeral = create_mode == zkutil::CreateMode::Ephemeral || create_mode == zkutil::CreateMode::EphemeralSequential; + create_request->is_sequential = create_mode == zkutil::CreateMode::PersistentSequential || create_mode == zkutil::CreateMode::EphemeralSequential; + + storage.preprocessRequest(create_request, 1, 0, new_zxid); + auto responses = storage.processRequest(create_request, 1, new_zxid); + + EXPECT_EQ(responses.size(), 1); + EXPECT_EQ(responses[0].response->error, Coordination::Error::ZOK) << "Failed to create " << path; + }; + + const auto add_watch = [&](const String & path) + { + int new_zxid = ++zxid; + + const auto exists_request = std::make_shared(); + exists_request->path = path; + exists_request->has_watch = true; + + storage.preprocessRequest(exists_request, 1, 0, new_zxid); + auto responses = storage.processRequest(exists_request, 1, new_zxid); + + EXPECT_EQ(responses.size(), 1); + EXPECT_EQ(responses[0].response->error, Coordination::Error::ZOK); + }; + + const auto add_list_watch = [&](const String & path) + { + int new_zxid = ++zxid; + + const auto list_request = std::make_shared(); + list_request->path = path; + list_request->has_watch = true; + + storage.preprocessRequest(list_request, 1, 0, new_zxid); + auto responses = storage.processRequest(list_request, 1, new_zxid); + + EXPECT_EQ(responses.size(), 1); + EXPECT_EQ(responses[0].response->error, Coordination::Error::ZOK); + }; + + create("/A", zkutil::CreateMode::Persistent); + create("/A/B", zkutil::CreateMode::Persistent); + create("/A/C", zkutil::CreateMode::Ephemeral); + create("/A/B/D", zkutil::CreateMode::Ephemeral); + + add_watch("/A"); + add_watch("/A/B"); + add_watch("/A/C"); + add_watch("/A/B/D"); + add_list_watch("/A"); + add_list_watch("/A/B"); + ASSERT_EQ(storage.watches.size(), 4); + ASSERT_EQ(storage.list_watches.size(), 2); + + int new_zxid = ++zxid; + + auto remove_request = std::make_shared(); + remove_request->path = "/A"; + remove_request->remove_nodes_limit = 4; + + storage.preprocessRequest(remove_request, 1, 0, new_zxid); + auto responses = storage.processRequest(remove_request, 1, new_zxid); + + ASSERT_EQ(responses.size(), 7); + + for (size_t i = 0; i < 7; ++i) + { + ASSERT_EQ(responses[i].response->error, Coordination::Error::ZOK); + + if (const auto * watch_response = dynamic_cast(responses[i].response.get())) + ASSERT_EQ(watch_response->type, Coordination::Event::DELETED); + } + + ASSERT_EQ(storage.watches.size(), 0); + ASSERT_EQ(storage.list_watches.size(), 0); +} + +TYPED_TEST(CoordinationTest, TestRemoveRecursiveAcls) +{ + using namespace DB; + using namespace Coordination; + + using Storage = typename TestFixture::Storage; + + ChangelogDirTest rocks("./rocksdb"); + this->setRocksDBDirectory("./rocksdb"); + + Storage storage{500, "", this->keeper_context}; + int zxid = 0; + + { + int new_zxid = ++zxid; + String user_auth_data = "test_user:test_password"; + + const auto auth_request = std::make_shared(); + auth_request->scheme = "digest"; + auth_request->data = user_auth_data; + + storage.preprocessRequest(auth_request, 1, 0, new_zxid); + auto responses = storage.processRequest(auth_request, 1, new_zxid); + + EXPECT_EQ(responses.size(), 1); + EXPECT_EQ(responses[0].response->error, Coordination::Error::ZOK) << "Failed to add auth to session"; + } + + const auto create = [&](const String & path) + { + int new_zxid = ++zxid; + + const auto create_request = std::make_shared(); + create_request->path = path; + create_request->acls = {{.permissions = ACL::Create, .scheme = "auth", .id = ""}}; + + storage.preprocessRequest(create_request, 1, 0, new_zxid); + auto responses = storage.processRequest(create_request, 1, new_zxid); + + EXPECT_EQ(responses.size(), 1); + EXPECT_EQ(responses[0].response->error, Coordination::Error::ZOK) << "Failed to create " << path; + }; + + /// Add nodes with only Create ACL + create("/A"); + create("/A/B"); + create("/A/C"); + create("/A/B/D"); + + { + int new_zxid = ++zxid; + + auto remove_request = std::make_shared(); + remove_request->path = "/A"; + remove_request->remove_nodes_limit = 4; + + storage.preprocessRequest(remove_request, 1, 0, new_zxid); + auto responses = storage.processRequest(remove_request, 1, new_zxid); + + EXPECT_EQ(responses.size(), 1); + EXPECT_EQ(responses[0].response->error, Coordination::Error::ZNOAUTH); + } +} + /// INSTANTIATE_TEST_SUITE_P(CoordinationTestSuite, /// CoordinationTest, /// ::testing::ValuesIn(std::initializer_list{CompressionParam{true, ".zstd"}, CompressionParam{false, ""}})); diff --git a/src/Interpreters/ZooKeeperLog.cpp b/src/Interpreters/ZooKeeperLog.cpp index 0d3063a569e..769757a5fba 100644 --- a/src/Interpreters/ZooKeeperLog.cpp +++ b/src/Interpreters/ZooKeeperLog.cpp @@ -93,6 +93,7 @@ ColumnsDescription ZooKeeperLogElement::getColumnsDescription() {"FilteredList", static_cast(Coordination::OpNum::FilteredList)}, {"CheckNotExists", static_cast(Coordination::OpNum::CheckNotExists)}, {"CreateIfNotExists", static_cast(Coordination::OpNum::CreateIfNotExists)}, + {"RemoveRecursive", static_cast(Coordination::OpNum::RemoveRecursive)}, }); auto error_enum = getCoordinationErrorCodesEnumType();