From 6c57adee7c7dd11d2c8aa2102d0e952414eeda61 Mon Sep 17 00:00:00 2001 From: Mikhail Artemenko Date: Wed, 4 Sep 2024 10:33:21 +0000 Subject: [PATCH] add remove recursive entry point --- src/Common/ZooKeeper/IKeeper.h | 6 +++- src/Common/ZooKeeper/TestKeeper.cpp | 1 + src/Common/ZooKeeper/TestKeeper.h | 1 + src/Common/ZooKeeper/ZooKeeper.cpp | 34 ++++++++++++++----- src/Common/ZooKeeper/ZooKeeper.h | 2 +- src/Common/ZooKeeper/ZooKeeperCommon.cpp | 32 +++++++++++++++-- src/Common/ZooKeeper/ZooKeeperCommon.h | 19 +++++++++-- src/Common/ZooKeeper/ZooKeeperConstants.cpp | 1 + src/Common/ZooKeeper/ZooKeeperConstants.h | 1 + src/Common/ZooKeeper/ZooKeeperImpl.cpp | 21 +++++++++--- src/Common/ZooKeeper/ZooKeeperImpl.h | 1 + .../ZooKeeper/ZooKeeperWithFaultInjection.cpp | 4 +-- src/Coordination/KeeperConstants.h | 1 + src/Coordination/KeeperDispatcher.cpp | 1 + src/Coordination/KeeperFeatureFlags.h | 1 + src/Coordination/KeeperStorage.cpp | 2 ++ src/Interpreters/ZooKeeperLog.cpp | 1 + 17 files changed, 109 insertions(+), 20 deletions(-) diff --git a/src/Common/ZooKeeper/IKeeper.h b/src/Common/ZooKeeper/IKeeper.h index ce7489a33e5..f887e67bd92 100644 --- a/src/Common/ZooKeeper/IKeeper.h +++ b/src/Common/ZooKeeper/IKeeper.h @@ -238,10 +238,13 @@ struct RemoveRequest : virtual Request String path; int32_t version = -1; + /// strict limit for number of deleted nodes + uint32_t remove_nodes_limit = 1; + void addRootPath(const String & root_path) override; String getPath() const override { return path; } - size_t bytesSize() const override { return path.size() + sizeof(version); } + size_t bytesSize() const override { return path.size() + sizeof(version) + sizeof(remove_nodes_limit); } }; struct RemoveResponse : virtual Response @@ -585,6 +588,7 @@ public: virtual void remove( const String & path, int32_t version, + uint32_t remove_nodes_limit, RemoveCallback callback) = 0; virtual void exists( diff --git a/src/Common/ZooKeeper/TestKeeper.cpp b/src/Common/ZooKeeper/TestKeeper.cpp index 16ea412eb77..65a575549c4 100644 --- a/src/Common/ZooKeeper/TestKeeper.cpp +++ b/src/Common/ZooKeeper/TestKeeper.cpp @@ -759,6 +759,7 @@ void TestKeeper::create( void TestKeeper::remove( const String & path, int32_t version, + [[maybe_unused]] uint32_t remove_nodes_limit, // TODO(michicosun): enable RemoveCallback callback) { TestKeeperRemoveRequest request; diff --git a/src/Common/ZooKeeper/TestKeeper.h b/src/Common/ZooKeeper/TestKeeper.h index 562c313ac0e..14b37448a20 100644 --- a/src/Common/ZooKeeper/TestKeeper.h +++ b/src/Common/ZooKeeper/TestKeeper.h @@ -56,6 +56,7 @@ public: void remove( const String & path, int32_t version, + uint32_t remove_nodes_limit, RemoveCallback callback) override; void exists( diff --git a/src/Common/ZooKeeper/ZooKeeper.cpp b/src/Common/ZooKeeper/ZooKeeper.cpp index 1a9ed4f1ee7..eb4f5e0725a 100644 --- a/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/src/Common/ZooKeeper/ZooKeeper.cpp @@ -979,16 +979,34 @@ bool ZooKeeper::tryRemoveChildrenRecursive(const std::string & path, bool probab return removed_as_expected; } -void ZooKeeper::removeRecursive(const std::string & path) +void ZooKeeper::removeRecursive(const std::string & path) // TODO(michicosun) rewrite { - removeChildrenRecursive(path); - remove(path); + auto promise = std::make_shared>(); + auto future = promise->get_future(); + + auto callback = [promise](const Coordination::RemoveResponse & response) mutable + { + promise->set_value(response); + }; + + impl->remove(path, -1, /*remove_nodes_limit=*/ 100, std::move(callback)); + + if (future.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready) + { + impl->finalize(fmt::format("Operation timeout on {} {}", Coordination::OpNum::RemoveRecursive, path)); + check(Coordination::Error::ZOPERATIONTIMEOUT, path); + } + else + { + auto response = future.get(); + check(response.error, path); + } } -void ZooKeeper::tryRemoveRecursive(const std::string & path) +Coordination::Error ZooKeeper::tryRemoveRecursive(const std::string & path) { tryRemoveChildrenRecursive(path); - tryRemove(path); + return tryRemove(path); } @@ -1367,7 +1385,7 @@ std::future ZooKeeper::asyncRemove(const std::stri promise->set_value(response); }; - impl->remove(path, version, std::move(callback)); + impl->remove(path, version, /*remove_nodes_limit=*/ 1, std::move(callback)); return future; } @@ -1390,7 +1408,7 @@ std::future ZooKeeper::asyncTryRemove(const std::s promise->set_value(response); }; - impl->remove(path, version, std::move(callback)); + impl->remove(path, version, /*remove_nodes_limit=*/ 1, std::move(callback)); return future; } @@ -1404,7 +1422,7 @@ std::future ZooKeeper::asyncTryRemoveNoThrow(const promise->set_value(response); }; - impl->remove(path, version, std::move(callback)); + impl->remove(path, version, /*remove_nodes_limit=*/ 1, std::move(callback)); return future; } diff --git a/src/Common/ZooKeeper/ZooKeeper.h b/src/Common/ZooKeeper/ZooKeeper.h index 7ccdc9d1b7f..3b94e6004cb 100644 --- a/src/Common/ZooKeeper/ZooKeeper.h +++ b/src/Common/ZooKeeper/ZooKeeper.h @@ -487,7 +487,7 @@ public: /// this will not cause errors. /// For instance, you can call this method twice concurrently for the same node and the end /// result would be the same as for the single call. - void tryRemoveRecursive(const std::string & path); + Coordination::Error tryRemoveRecursive(const std::string & path); /// Similar to removeRecursive(...) and tryRemoveRecursive(...), but does not remove path itself. /// Node defined as RemoveException will not be deleted. diff --git a/src/Common/ZooKeeper/ZooKeeperCommon.cpp b/src/Common/ZooKeeper/ZooKeeperCommon.cpp index dff14f74681..fb37aa6ac22 100644 --- a/src/Common/ZooKeeper/ZooKeeperCommon.cpp +++ b/src/Common/ZooKeeper/ZooKeeperCommon.cpp @@ -1,5 +1,5 @@ -#include "Common/ZooKeeper/IKeeper.h" -#include "Common/ZooKeeper/ZooKeeperConstants.h" +#include +#include #include #include #include @@ -232,6 +232,27 @@ void ZooKeeperRemoveRequest::readImpl(ReadBuffer & in) Coordination::read(version, in); } +void ZooKeeperRemoveRecursiveRequest::writeImpl(WriteBuffer & out) const +{ + ZooKeeperRemoveRequest::writeImpl(out); + Coordination::write(remove_nodes_limit, out); +} + +void ZooKeeperRemoveRecursiveRequest::readImpl(ReadBuffer & in) +{ + ZooKeeperRemoveRequest::readImpl(in); + Coordination::read(remove_nodes_limit, in); +} + +std::string ZooKeeperRemoveRecursiveRequest::toStringImpl(bool short_format) const +{ + return fmt::format( + "{}\n" + "remove_nodes_limit = {}", + ZooKeeperRemoveRequest::toStringImpl(short_format), + remove_nodes_limit); +} + void ZooKeeperExistsRequest::writeImpl(WriteBuffer & out) const { Coordination::write(path, out); @@ -510,6 +531,11 @@ ZooKeeperMultiRequest::ZooKeeperMultiRequest(std::span(*concrete_request_remove)); } + else if (const auto * concrete_request_remove_recursive = dynamic_cast(generic_request.get())) + { + checkOperationType(Write); + requests.push_back(std::make_shared(*concrete_request_remove_recursive)); + } else if (const auto * concrete_request_set = dynamic_cast(generic_request.get())) { checkOperationType(Write); @@ -707,6 +733,7 @@ ZooKeeperResponsePtr ZooKeeperHeartbeatRequest::makeResponse() const { return se ZooKeeperResponsePtr ZooKeeperSyncRequest::makeResponse() const { return setTime(std::make_shared()); } ZooKeeperResponsePtr ZooKeeperAuthRequest::makeResponse() const { return setTime(std::make_shared()); } ZooKeeperResponsePtr ZooKeeperRemoveRequest::makeResponse() const { return setTime(std::make_shared()); } +ZooKeeperResponsePtr ZooKeeperRemoveRecursiveRequest::makeResponse() const { return setTime(std::make_shared()); } ZooKeeperResponsePtr ZooKeeperExistsRequest::makeResponse() const { return setTime(std::make_shared()); } ZooKeeperResponsePtr ZooKeeperGetRequest::makeResponse() const { return setTime(std::make_shared()); } ZooKeeperResponsePtr ZooKeeperSetRequest::makeResponse() const { return setTime(std::make_shared()); } @@ -1024,6 +1051,7 @@ ZooKeeperRequestFactory::ZooKeeperRequestFactory() registerZooKeeperRequest(*this); registerZooKeeperRequest(*this); registerZooKeeperRequest(*this); + registerZooKeeperRequest(*this); } PathMatchResult matchPath(std::string_view path, std::string_view match_to) diff --git a/src/Common/ZooKeeper/ZooKeeperCommon.h b/src/Common/ZooKeeper/ZooKeeperCommon.h index fd6ec3cd375..ef17d069b4c 100644 --- a/src/Common/ZooKeeper/ZooKeeperCommon.h +++ b/src/Common/ZooKeeper/ZooKeeperCommon.h @@ -258,7 +258,7 @@ struct ZooKeeperCreateIfNotExistsResponse : ZooKeeperCreateResponse using ZooKeeperCreateResponse::ZooKeeperCreateResponse; }; -struct ZooKeeperRemoveRequest final : RemoveRequest, ZooKeeperRequest +struct ZooKeeperRemoveRequest : RemoveRequest, ZooKeeperRequest { ZooKeeperRemoveRequest() = default; explicit ZooKeeperRemoveRequest(const RemoveRequest & base) : RemoveRequest(base) {} @@ -276,7 +276,17 @@ struct ZooKeeperRemoveRequest final : RemoveRequest, ZooKeeperRequest void createLogElements(LogElements & elems) const override; }; -struct ZooKeeperRemoveResponse final : RemoveResponse, ZooKeeperResponse +struct ZooKeeperRemoveRecursiveRequest : ZooKeeperRemoveRequest +{ + OpNum getOpNum() const override { return OpNum::RemoveRecursive; } + void writeImpl(WriteBuffer & out) const override; + void readImpl(ReadBuffer & in) override; + std::string toStringImpl(bool short_format) const override; + + ZooKeeperResponsePtr makeResponse() const override; +}; + +struct ZooKeeperRemoveResponse : RemoveResponse, ZooKeeperResponse { void readImpl(ReadBuffer &) override {} void writeImpl(WriteBuffer &) const override {} @@ -285,6 +295,11 @@ struct ZooKeeperRemoveResponse final : RemoveResponse, ZooKeeperResponse size_t bytesSize() const override { return RemoveResponse::bytesSize() + sizeof(xid) + sizeof(zxid); } }; +struct ZooKeeperRemoveRecursiveResponse : ZooKeeperRemoveResponse +{ + OpNum getOpNum() const override { return OpNum::RemoveRecursive; } +}; + struct ZooKeeperExistsRequest final : ExistsRequest, ZooKeeperRequest { ZooKeeperExistsRequest() = default; diff --git a/src/Common/ZooKeeper/ZooKeeperConstants.cpp b/src/Common/ZooKeeper/ZooKeeperConstants.cpp index cf8ba35e992..a2780dfd5e2 100644 --- a/src/Common/ZooKeeper/ZooKeeperConstants.cpp +++ b/src/Common/ZooKeeper/ZooKeeperConstants.cpp @@ -29,6 +29,7 @@ static const std::unordered_set VALID_OPERATIONS = static_cast(OpNum::GetACL), static_cast(OpNum::FilteredList), static_cast(OpNum::CheckNotExists), + static_cast(OpNum::RemoveRecursive), }; OpNum getOpNum(int32_t raw_op_num) diff --git a/src/Common/ZooKeeper/ZooKeeperConstants.h b/src/Common/ZooKeeper/ZooKeeperConstants.h index 1d9830505f8..9d8e2d4f857 100644 --- a/src/Common/ZooKeeper/ZooKeeperConstants.h +++ b/src/Common/ZooKeeper/ZooKeeperConstants.h @@ -40,6 +40,7 @@ enum class OpNum : int32_t FilteredList = 500, CheckNotExists = 501, CreateIfNotExists = 502, + RemoveRecursive = 503, SessionID = 997, /// Special internal request }; diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/src/Common/ZooKeeper/ZooKeeperImpl.cpp index ba622f30c91..5c4de98d5fc 100644 --- a/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -1333,14 +1333,27 @@ void ZooKeeper::create( void ZooKeeper::remove( const String & path, int32_t version, + uint32_t remove_nodes_limit, RemoveCallback callback) { - ZooKeeperRemoveRequest request; - request.path = path; - request.version = version; + std::shared_ptr request = nullptr; + + if (!isFeatureEnabled(KeeperFeatureFlag::REMOVE_RECURSIVE)) + { + if (remove_nodes_limit > 1) + throw Exception::fromMessage(Error::ZBADARGUMENTS, "RemoveRecursive request type cannot be used because it's not supported by the server"); + + request = std::make_shared(); + } + else + request = std::make_shared(); + + request->path = path; + request->version = version; + request->remove_nodes_limit = remove_nodes_limit; RequestInfo request_info; - request_info.request = std::make_shared(std::move(request)); + request_info.request = std::move(request); request_info.callback = [callback](const Response & response) { callback(dynamic_cast(response)); }; pushRequest(std::move(request_info)); diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.h b/src/Common/ZooKeeper/ZooKeeperImpl.h index 39082cd14c1..6e98df546d9 100644 --- a/src/Common/ZooKeeper/ZooKeeperImpl.h +++ b/src/Common/ZooKeeper/ZooKeeperImpl.h @@ -144,6 +144,7 @@ public: void remove( const String & path, int32_t version, + uint32_t remove_nodes_limit, RemoveCallback callback) override; void exists( diff --git a/src/Common/ZooKeeper/ZooKeeperWithFaultInjection.cpp b/src/Common/ZooKeeper/ZooKeeperWithFaultInjection.cpp index fc246e263d9..dfd3faf8d01 100644 --- a/src/Common/ZooKeeper/ZooKeeperWithFaultInjection.cpp +++ b/src/Common/ZooKeeper/ZooKeeperWithFaultInjection.cpp @@ -587,7 +587,7 @@ zkutil::ZooKeeper::FutureRemove ZooKeeperWithFaultInjection::asyncTryRemove(std: promise->set_value(response); }; - keeper->impl->remove(path, version, std::move(callback)); + keeper->impl->remove(path, version, /*remove_nodes_limit=*/ 1, std::move(callback)); return future; } @@ -630,7 +630,7 @@ zkutil::ZooKeeper::FutureRemove ZooKeeperWithFaultInjection::asyncTryRemoveNoThr } }; - keeper->impl->remove(path, version, std::move(callback)); + keeper->impl->remove(path, version, /*remove_nodes_limit=*/ 1, std::move(callback)); return future; } diff --git a/src/Coordination/KeeperConstants.h b/src/Coordination/KeeperConstants.h index 08a7c85585a..d984d077872 100644 --- a/src/Coordination/KeeperConstants.h +++ b/src/Coordination/KeeperConstants.h @@ -11,6 +11,7 @@ enum class KeeperApiVersion : uint8_t WITH_FILTERED_LIST, WITH_MULTI_READ, WITH_CHECK_NOT_EXISTS, + WITH_REMOVE_RECURSIVE, }; const String keeper_system_path = "/keeper"; diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index 4a350077596..fd80ee0a7e2 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -86,6 +86,7 @@ bool checkIfRequestIncreaseMem(const Coordination::ZooKeeperRequestPtr & request break; } case Coordination::OpNum::Remove: + case Coordination::OpNum::RemoveRecursive: { Coordination::ZooKeeperRemoveRequest & remove_req = dynamic_cast(*sub_zk_request); memory_delta -= remove_req.bytesSize(); diff --git a/src/Coordination/KeeperFeatureFlags.h b/src/Coordination/KeeperFeatureFlags.h index 4e26ca60736..e70bd50cc88 100644 --- a/src/Coordination/KeeperFeatureFlags.h +++ b/src/Coordination/KeeperFeatureFlags.h @@ -12,6 +12,7 @@ enum class KeeperFeatureFlag : size_t MULTI_READ, CHECK_NOT_EXISTS, CREATE_IF_NOT_EXISTS, + REMOVE_RECURSIVE, }; class KeeperFeatureFlags diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index acdf209baae..54cdc8c11db 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -2128,6 +2128,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro concrete_requests.push_back(std::make_shared>(sub_zk_request)); break; case Coordination::OpNum::Remove: + case Coordination::OpNum::RemoveRecursive: check_operation_type(OperationType::Write); concrete_requests.push_back(std::make_shared>(sub_zk_request)); break; @@ -2400,6 +2401,7 @@ KeeperStorageRequestProcessorsFactory::KeeperStorageRequestProcessorsFa registerKeeperRequestProcessor>(*this); registerKeeperRequestProcessor>(*this); registerKeeperRequestProcessor>(*this); + registerKeeperRequestProcessor>(*this); } diff --git a/src/Interpreters/ZooKeeperLog.cpp b/src/Interpreters/ZooKeeperLog.cpp index 0d3063a569e..769757a5fba 100644 --- a/src/Interpreters/ZooKeeperLog.cpp +++ b/src/Interpreters/ZooKeeperLog.cpp @@ -93,6 +93,7 @@ ColumnsDescription ZooKeeperLogElement::getColumnsDescription() {"FilteredList", static_cast(Coordination::OpNum::FilteredList)}, {"CheckNotExists", static_cast(Coordination::OpNum::CheckNotExists)}, {"CreateIfNotExists", static_cast(Coordination::OpNum::CreateIfNotExists)}, + {"RemoveRecursive", static_cast(Coordination::OpNum::RemoveRecursive)}, }); auto error_enum = getCoordinationErrorCodesEnumType();