From 6c57adee7c7dd11d2c8aa2102d0e952414eeda61 Mon Sep 17 00:00:00 2001 From: Mikhail Artemenko Date: Wed, 4 Sep 2024 10:33:21 +0000 Subject: [PATCH 01/23] add remove recursive entry point --- src/Common/ZooKeeper/IKeeper.h | 6 +++- src/Common/ZooKeeper/TestKeeper.cpp | 1 + src/Common/ZooKeeper/TestKeeper.h | 1 + src/Common/ZooKeeper/ZooKeeper.cpp | 34 ++++++++++++++----- src/Common/ZooKeeper/ZooKeeper.h | 2 +- src/Common/ZooKeeper/ZooKeeperCommon.cpp | 32 +++++++++++++++-- src/Common/ZooKeeper/ZooKeeperCommon.h | 19 +++++++++-- src/Common/ZooKeeper/ZooKeeperConstants.cpp | 1 + src/Common/ZooKeeper/ZooKeeperConstants.h | 1 + src/Common/ZooKeeper/ZooKeeperImpl.cpp | 21 +++++++++--- src/Common/ZooKeeper/ZooKeeperImpl.h | 1 + .../ZooKeeper/ZooKeeperWithFaultInjection.cpp | 4 +-- src/Coordination/KeeperConstants.h | 1 + src/Coordination/KeeperDispatcher.cpp | 1 + src/Coordination/KeeperFeatureFlags.h | 1 + src/Coordination/KeeperStorage.cpp | 2 ++ src/Interpreters/ZooKeeperLog.cpp | 1 + 17 files changed, 109 insertions(+), 20 deletions(-) diff --git a/src/Common/ZooKeeper/IKeeper.h b/src/Common/ZooKeeper/IKeeper.h index ce7489a33e5..f887e67bd92 100644 --- a/src/Common/ZooKeeper/IKeeper.h +++ b/src/Common/ZooKeeper/IKeeper.h @@ -238,10 +238,13 @@ struct RemoveRequest : virtual Request String path; int32_t version = -1; + /// strict limit for number of deleted nodes + uint32_t remove_nodes_limit = 1; + void addRootPath(const String & root_path) override; String getPath() const override { return path; } - size_t bytesSize() const override { return path.size() + sizeof(version); } + size_t bytesSize() const override { return path.size() + sizeof(version) + sizeof(remove_nodes_limit); } }; struct RemoveResponse : virtual Response @@ -585,6 +588,7 @@ public: virtual void remove( const String & path, int32_t version, + uint32_t remove_nodes_limit, RemoveCallback callback) = 0; virtual void exists( diff --git a/src/Common/ZooKeeper/TestKeeper.cpp b/src/Common/ZooKeeper/TestKeeper.cpp index 16ea412eb77..65a575549c4 100644 --- a/src/Common/ZooKeeper/TestKeeper.cpp +++ b/src/Common/ZooKeeper/TestKeeper.cpp @@ -759,6 +759,7 @@ void TestKeeper::create( void TestKeeper::remove( const String & path, int32_t version, + [[maybe_unused]] uint32_t remove_nodes_limit, // TODO(michicosun): enable RemoveCallback callback) { TestKeeperRemoveRequest request; diff --git a/src/Common/ZooKeeper/TestKeeper.h b/src/Common/ZooKeeper/TestKeeper.h index 562c313ac0e..14b37448a20 100644 --- a/src/Common/ZooKeeper/TestKeeper.h +++ b/src/Common/ZooKeeper/TestKeeper.h @@ -56,6 +56,7 @@ public: void remove( const String & path, int32_t version, + uint32_t remove_nodes_limit, RemoveCallback callback) override; void exists( diff --git a/src/Common/ZooKeeper/ZooKeeper.cpp b/src/Common/ZooKeeper/ZooKeeper.cpp index 1a9ed4f1ee7..eb4f5e0725a 100644 --- a/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/src/Common/ZooKeeper/ZooKeeper.cpp @@ -979,16 +979,34 @@ bool ZooKeeper::tryRemoveChildrenRecursive(const std::string & path, bool probab return removed_as_expected; } -void ZooKeeper::removeRecursive(const std::string & path) +void ZooKeeper::removeRecursive(const std::string & path) // TODO(michicosun) rewrite { - removeChildrenRecursive(path); - remove(path); + auto promise = std::make_shared>(); + auto future = promise->get_future(); + + auto callback = [promise](const Coordination::RemoveResponse & response) mutable + { + promise->set_value(response); + }; + + impl->remove(path, -1, /*remove_nodes_limit=*/ 100, std::move(callback)); + + if (future.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready) + { + impl->finalize(fmt::format("Operation timeout on {} {}", Coordination::OpNum::RemoveRecursive, path)); + check(Coordination::Error::ZOPERATIONTIMEOUT, path); + } + else + { + auto response = future.get(); + check(response.error, path); + } } -void ZooKeeper::tryRemoveRecursive(const std::string & path) +Coordination::Error ZooKeeper::tryRemoveRecursive(const std::string & path) { tryRemoveChildrenRecursive(path); - tryRemove(path); + return tryRemove(path); } @@ -1367,7 +1385,7 @@ std::future ZooKeeper::asyncRemove(const std::stri promise->set_value(response); }; - impl->remove(path, version, std::move(callback)); + impl->remove(path, version, /*remove_nodes_limit=*/ 1, std::move(callback)); return future; } @@ -1390,7 +1408,7 @@ std::future ZooKeeper::asyncTryRemove(const std::s promise->set_value(response); }; - impl->remove(path, version, std::move(callback)); + impl->remove(path, version, /*remove_nodes_limit=*/ 1, std::move(callback)); return future; } @@ -1404,7 +1422,7 @@ std::future ZooKeeper::asyncTryRemoveNoThrow(const promise->set_value(response); }; - impl->remove(path, version, std::move(callback)); + impl->remove(path, version, /*remove_nodes_limit=*/ 1, std::move(callback)); return future; } diff --git a/src/Common/ZooKeeper/ZooKeeper.h b/src/Common/ZooKeeper/ZooKeeper.h index 7ccdc9d1b7f..3b94e6004cb 100644 --- a/src/Common/ZooKeeper/ZooKeeper.h +++ b/src/Common/ZooKeeper/ZooKeeper.h @@ -487,7 +487,7 @@ public: /// this will not cause errors. /// For instance, you can call this method twice concurrently for the same node and the end /// result would be the same as for the single call. - void tryRemoveRecursive(const std::string & path); + Coordination::Error tryRemoveRecursive(const std::string & path); /// Similar to removeRecursive(...) and tryRemoveRecursive(...), but does not remove path itself. /// Node defined as RemoveException will not be deleted. diff --git a/src/Common/ZooKeeper/ZooKeeperCommon.cpp b/src/Common/ZooKeeper/ZooKeeperCommon.cpp index dff14f74681..fb37aa6ac22 100644 --- a/src/Common/ZooKeeper/ZooKeeperCommon.cpp +++ b/src/Common/ZooKeeper/ZooKeeperCommon.cpp @@ -1,5 +1,5 @@ -#include "Common/ZooKeeper/IKeeper.h" -#include "Common/ZooKeeper/ZooKeeperConstants.h" +#include +#include #include #include #include @@ -232,6 +232,27 @@ void ZooKeeperRemoveRequest::readImpl(ReadBuffer & in) Coordination::read(version, in); } +void ZooKeeperRemoveRecursiveRequest::writeImpl(WriteBuffer & out) const +{ + ZooKeeperRemoveRequest::writeImpl(out); + Coordination::write(remove_nodes_limit, out); +} + +void ZooKeeperRemoveRecursiveRequest::readImpl(ReadBuffer & in) +{ + ZooKeeperRemoveRequest::readImpl(in); + Coordination::read(remove_nodes_limit, in); +} + +std::string ZooKeeperRemoveRecursiveRequest::toStringImpl(bool short_format) const +{ + return fmt::format( + "{}\n" + "remove_nodes_limit = {}", + ZooKeeperRemoveRequest::toStringImpl(short_format), + remove_nodes_limit); +} + void ZooKeeperExistsRequest::writeImpl(WriteBuffer & out) const { Coordination::write(path, out); @@ -510,6 +531,11 @@ ZooKeeperMultiRequest::ZooKeeperMultiRequest(std::span(*concrete_request_remove)); } + else if (const auto * concrete_request_remove_recursive = dynamic_cast(generic_request.get())) + { + checkOperationType(Write); + requests.push_back(std::make_shared(*concrete_request_remove_recursive)); + } else if (const auto * concrete_request_set = dynamic_cast(generic_request.get())) { checkOperationType(Write); @@ -707,6 +733,7 @@ ZooKeeperResponsePtr ZooKeeperHeartbeatRequest::makeResponse() const { return se ZooKeeperResponsePtr ZooKeeperSyncRequest::makeResponse() const { return setTime(std::make_shared()); } ZooKeeperResponsePtr ZooKeeperAuthRequest::makeResponse() const { return setTime(std::make_shared()); } ZooKeeperResponsePtr ZooKeeperRemoveRequest::makeResponse() const { return setTime(std::make_shared()); } +ZooKeeperResponsePtr ZooKeeperRemoveRecursiveRequest::makeResponse() const { return setTime(std::make_shared()); } ZooKeeperResponsePtr ZooKeeperExistsRequest::makeResponse() const { return setTime(std::make_shared()); } ZooKeeperResponsePtr ZooKeeperGetRequest::makeResponse() const { return setTime(std::make_shared()); } ZooKeeperResponsePtr ZooKeeperSetRequest::makeResponse() const { return setTime(std::make_shared()); } @@ -1024,6 +1051,7 @@ ZooKeeperRequestFactory::ZooKeeperRequestFactory() registerZooKeeperRequest(*this); registerZooKeeperRequest(*this); registerZooKeeperRequest(*this); + registerZooKeeperRequest(*this); } PathMatchResult matchPath(std::string_view path, std::string_view match_to) diff --git a/src/Common/ZooKeeper/ZooKeeperCommon.h b/src/Common/ZooKeeper/ZooKeeperCommon.h index fd6ec3cd375..ef17d069b4c 100644 --- a/src/Common/ZooKeeper/ZooKeeperCommon.h +++ b/src/Common/ZooKeeper/ZooKeeperCommon.h @@ -258,7 +258,7 @@ struct ZooKeeperCreateIfNotExistsResponse : ZooKeeperCreateResponse using ZooKeeperCreateResponse::ZooKeeperCreateResponse; }; -struct ZooKeeperRemoveRequest final : RemoveRequest, ZooKeeperRequest +struct ZooKeeperRemoveRequest : RemoveRequest, ZooKeeperRequest { ZooKeeperRemoveRequest() = default; explicit ZooKeeperRemoveRequest(const RemoveRequest & base) : RemoveRequest(base) {} @@ -276,7 +276,17 @@ struct ZooKeeperRemoveRequest final : RemoveRequest, ZooKeeperRequest void createLogElements(LogElements & elems) const override; }; -struct ZooKeeperRemoveResponse final : RemoveResponse, ZooKeeperResponse +struct ZooKeeperRemoveRecursiveRequest : ZooKeeperRemoveRequest +{ + OpNum getOpNum() const override { return OpNum::RemoveRecursive; } + void writeImpl(WriteBuffer & out) const override; + void readImpl(ReadBuffer & in) override; + std::string toStringImpl(bool short_format) const override; + + ZooKeeperResponsePtr makeResponse() const override; +}; + +struct ZooKeeperRemoveResponse : RemoveResponse, ZooKeeperResponse { void readImpl(ReadBuffer &) override {} void writeImpl(WriteBuffer &) const override {} @@ -285,6 +295,11 @@ struct ZooKeeperRemoveResponse final : RemoveResponse, ZooKeeperResponse size_t bytesSize() const override { return RemoveResponse::bytesSize() + sizeof(xid) + sizeof(zxid); } }; +struct ZooKeeperRemoveRecursiveResponse : ZooKeeperRemoveResponse +{ + OpNum getOpNum() const override { return OpNum::RemoveRecursive; } +}; + struct ZooKeeperExistsRequest final : ExistsRequest, ZooKeeperRequest { ZooKeeperExistsRequest() = default; diff --git a/src/Common/ZooKeeper/ZooKeeperConstants.cpp b/src/Common/ZooKeeper/ZooKeeperConstants.cpp index cf8ba35e992..a2780dfd5e2 100644 --- a/src/Common/ZooKeeper/ZooKeeperConstants.cpp +++ b/src/Common/ZooKeeper/ZooKeeperConstants.cpp @@ -29,6 +29,7 @@ static const std::unordered_set VALID_OPERATIONS = static_cast(OpNum::GetACL), static_cast(OpNum::FilteredList), static_cast(OpNum::CheckNotExists), + static_cast(OpNum::RemoveRecursive), }; OpNum getOpNum(int32_t raw_op_num) diff --git a/src/Common/ZooKeeper/ZooKeeperConstants.h b/src/Common/ZooKeeper/ZooKeeperConstants.h index 1d9830505f8..9d8e2d4f857 100644 --- a/src/Common/ZooKeeper/ZooKeeperConstants.h +++ b/src/Common/ZooKeeper/ZooKeeperConstants.h @@ -40,6 +40,7 @@ enum class OpNum : int32_t FilteredList = 500, CheckNotExists = 501, CreateIfNotExists = 502, + RemoveRecursive = 503, SessionID = 997, /// Special internal request }; diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/src/Common/ZooKeeper/ZooKeeperImpl.cpp index ba622f30c91..5c4de98d5fc 100644 --- a/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -1333,14 +1333,27 @@ void ZooKeeper::create( void ZooKeeper::remove( const String & path, int32_t version, + uint32_t remove_nodes_limit, RemoveCallback callback) { - ZooKeeperRemoveRequest request; - request.path = path; - request.version = version; + std::shared_ptr request = nullptr; + + if (!isFeatureEnabled(KeeperFeatureFlag::REMOVE_RECURSIVE)) + { + if (remove_nodes_limit > 1) + throw Exception::fromMessage(Error::ZBADARGUMENTS, "RemoveRecursive request type cannot be used because it's not supported by the server"); + + request = std::make_shared(); + } + else + request = std::make_shared(); + + request->path = path; + request->version = version; + request->remove_nodes_limit = remove_nodes_limit; RequestInfo request_info; - request_info.request = std::make_shared(std::move(request)); + request_info.request = std::move(request); request_info.callback = [callback](const Response & response) { callback(dynamic_cast(response)); }; pushRequest(std::move(request_info)); diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.h b/src/Common/ZooKeeper/ZooKeeperImpl.h index 39082cd14c1..6e98df546d9 100644 --- a/src/Common/ZooKeeper/ZooKeeperImpl.h +++ b/src/Common/ZooKeeper/ZooKeeperImpl.h @@ -144,6 +144,7 @@ public: void remove( const String & path, int32_t version, + uint32_t remove_nodes_limit, RemoveCallback callback) override; void exists( diff --git a/src/Common/ZooKeeper/ZooKeeperWithFaultInjection.cpp b/src/Common/ZooKeeper/ZooKeeperWithFaultInjection.cpp index fc246e263d9..dfd3faf8d01 100644 --- a/src/Common/ZooKeeper/ZooKeeperWithFaultInjection.cpp +++ b/src/Common/ZooKeeper/ZooKeeperWithFaultInjection.cpp @@ -587,7 +587,7 @@ zkutil::ZooKeeper::FutureRemove ZooKeeperWithFaultInjection::asyncTryRemove(std: promise->set_value(response); }; - keeper->impl->remove(path, version, std::move(callback)); + keeper->impl->remove(path, version, /*remove_nodes_limit=*/ 1, std::move(callback)); return future; } @@ -630,7 +630,7 @@ zkutil::ZooKeeper::FutureRemove ZooKeeperWithFaultInjection::asyncTryRemoveNoThr } }; - keeper->impl->remove(path, version, std::move(callback)); + keeper->impl->remove(path, version, /*remove_nodes_limit=*/ 1, std::move(callback)); return future; } diff --git a/src/Coordination/KeeperConstants.h b/src/Coordination/KeeperConstants.h index 08a7c85585a..d984d077872 100644 --- a/src/Coordination/KeeperConstants.h +++ b/src/Coordination/KeeperConstants.h @@ -11,6 +11,7 @@ enum class KeeperApiVersion : uint8_t WITH_FILTERED_LIST, WITH_MULTI_READ, WITH_CHECK_NOT_EXISTS, + WITH_REMOVE_RECURSIVE, }; const String keeper_system_path = "/keeper"; diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index 4a350077596..fd80ee0a7e2 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -86,6 +86,7 @@ bool checkIfRequestIncreaseMem(const Coordination::ZooKeeperRequestPtr & request break; } case Coordination::OpNum::Remove: + case Coordination::OpNum::RemoveRecursive: { Coordination::ZooKeeperRemoveRequest & remove_req = dynamic_cast(*sub_zk_request); memory_delta -= remove_req.bytesSize(); diff --git a/src/Coordination/KeeperFeatureFlags.h b/src/Coordination/KeeperFeatureFlags.h index 4e26ca60736..e70bd50cc88 100644 --- a/src/Coordination/KeeperFeatureFlags.h +++ b/src/Coordination/KeeperFeatureFlags.h @@ -12,6 +12,7 @@ enum class KeeperFeatureFlag : size_t MULTI_READ, CHECK_NOT_EXISTS, CREATE_IF_NOT_EXISTS, + REMOVE_RECURSIVE, }; class KeeperFeatureFlags diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index acdf209baae..54cdc8c11db 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -2128,6 +2128,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro concrete_requests.push_back(std::make_shared>(sub_zk_request)); break; case Coordination::OpNum::Remove: + case Coordination::OpNum::RemoveRecursive: check_operation_type(OperationType::Write); concrete_requests.push_back(std::make_shared>(sub_zk_request)); break; @@ -2400,6 +2401,7 @@ KeeperStorageRequestProcessorsFactory::KeeperStorageRequestProcessorsFa registerKeeperRequestProcessor>(*this); registerKeeperRequestProcessor>(*this); registerKeeperRequestProcessor>(*this); + registerKeeperRequestProcessor>(*this); } diff --git a/src/Interpreters/ZooKeeperLog.cpp b/src/Interpreters/ZooKeeperLog.cpp index 0d3063a569e..769757a5fba 100644 --- a/src/Interpreters/ZooKeeperLog.cpp +++ b/src/Interpreters/ZooKeeperLog.cpp @@ -93,6 +93,7 @@ ColumnsDescription ZooKeeperLogElement::getColumnsDescription() {"FilteredList", static_cast(Coordination::OpNum::FilteredList)}, {"CheckNotExists", static_cast(Coordination::OpNum::CheckNotExists)}, {"CreateIfNotExists", static_cast(Coordination::OpNum::CreateIfNotExists)}, + {"RemoveRecursive", static_cast(Coordination::OpNum::RemoveRecursive)}, }); auto error_enum = getCoordinationErrorCodesEnumType(); From c6777af485fcd8aef5b759de9c1c531c74df6b8a Mon Sep 17 00:00:00 2001 From: Mikhail Artemenko Date: Wed, 4 Sep 2024 22:59:18 +0000 Subject: [PATCH 02/23] add remove recursive support --- src/Coordination/KeeperStorage.cpp | 162 ++++++++++++++++++++++++++--- src/Coordination/KeeperStorage.h | 13 ++- 2 files changed, 161 insertions(+), 14 deletions(-) diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index 54cdc8c11db..9d63a9e8691 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -1513,29 +1513,37 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr { if (request.restored_from_zookeeper_log) update_parent_pzxid(); + return {typename Storage::Delta{zxid, Coordination::Error::ZNONODE}}; } - else if (request.version != -1 && request.version != node->version) + + if (request.version != -1 && request.version != node->version) return {typename Storage::Delta{zxid, Coordination::Error::ZBADVERSION}}; - else if (node->numChildren() != 0) + + ToDeleteTreeCollector collector(storage, zxid, request.remove_nodes_limit); + bool limit_exceeded = collector.collect(request.path, *node); + + if (limit_exceeded) return {typename Storage::Delta{zxid, Coordination::Error::ZNOTEMPTY}}; if (request.restored_from_zookeeper_log) update_parent_pzxid(); - new_deltas.emplace_back( - std::string{parentNodePath(request.path)}, - zxid, - typename Storage::UpdateNodeDelta{[](typename Storage::Node & parent) - { - ++parent.cversion; - parent.decreaseNumChildren(); - }}); + auto delete_deltas = collector.extractDeltas(); - new_deltas.emplace_back(request.path, zxid, typename Storage::RemoveNodeDelta{request.version, node->ephemeralOwner()}); + for (const auto & delta : delete_deltas) + std::visit( + Overloaded{ + [&](const typename Storage::RemoveNodeDelta & remove_delta) + { + if (remove_delta.ephemeral_owner) + storage.unregisterEphemeralPath(remove_delta.ephemeral_owner, delta.path); + }, + [](auto && /* delta */) {}, + }, + delta.operation); - if (node->isEphemeral()) - storage.unregisterEphemeralPath(node->ephemeralOwner(), request.path); + new_deltas.insert(new_deltas.end(), std::make_move_iterator(delete_deltas.begin()), std::make_move_iterator(delete_deltas.end())); digest = storage.calculateNodesDigest(digest, new_deltas); @@ -1556,6 +1564,134 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr { return processWatchesImpl(this->zk_request->getPath(), watches, list_watches, Coordination::Event::DELETED); } + +private: + class ToDeleteTreeCollector + { + Storage & storage; + int64_t zxid; + uint32_t limit; + + uint32_t nodes_observed = 0; + std::vector deltas; + + public: + ToDeleteTreeCollector(Storage & storage_, int64_t zxid_, uint32_t limit_) + : storage(storage_) + , zxid(zxid_) + , limit(limit_) + { + } + + bool collect(StringRef root_path, const typename Storage::Node & root_node) + { + if (updateStats(root_node)) + return true; + + if (visitRocksDBNode(root_path) || visitMemNode(root_path)) + return true; + + /// After checking committed nodes there might be some in uncommitted state, + /// for example made on the previouse step of multi transaction. + return visitRootAndUncommitted(root_path, root_node); + } + + uint32_t getNumObserved() const + { + return nodes_observed; + } + + std::vector extractDeltas() + { + return std::move(deltas); + } + + private: + bool visitRocksDBNode(StringRef root_path) + { + if constexpr (Storage::use_rocksdb) + { + auto children = storage.container.getChildren(root_path.toString()); + + for (auto && [child_path, node] : children) + if (collect(child_path, node)) + return true; + } + + return false; + } + + bool visitMemNode(StringRef root_path) + { + if constexpr (!Storage::use_rocksdb) + { + std::filesystem::path root_fs_path(root_path.toString()); + + auto node_it = storage.container.find(root_path); + if (node_it != storage.container.end()) + { + auto children = node_it->value.getChildren(); + for (auto && child_name : children) + { + auto child_path = (root_fs_path / child_name.toView()).generic_string(); + + auto child_it = storage.container.find(child_path); + chassert(child_it != storage.container.end()); + + if (collect(child_path, child_it->value)) + return true; + } + } + } + + return false; + } + + bool visitRootAndUncommitted(StringRef root_path, const typename Storage::Node & root_node) + { + const auto & nodes = storage.uncommitted_state.nodes; + + /// nodes are sorted by paths with level locality + auto it = nodes.upper_bound(root_path.toView()); + + for (; it != nodes.end() && parentNodePath(it->first) == root_path; ++it) + { + chassert(it->second.node); + const typename Storage::Node & node = *it->second.node; + + if (updateStats(node)) + return true; + + if (visitRootAndUncommitted(it->first, node)) /// if child is uncommitted then all subtree is also uncommitted + return true; + } + + deltas.emplace_back( + parentNodePath(root_path).toString(), + zxid, + typename Storage::UpdateNodeDelta{ + [](typename Storage::Node & parent) + { + ++parent.cversion; + parent.decreaseNumChildren(); + } + }); + + deltas.emplace_back(root_path.toString(), zxid, typename Storage::RemoveNodeDelta{root_node.version, root_node.ephemeralOwner()}); + + return false; + } + + bool updateStats(const typename Storage::Node & root_node) + { + nodes_observed += 1 + root_node.numChildren(); /// root + all known children + + if (nodes_observed > limit) + return true; + + return false; + } + }; }; template diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index 4a9286d4835..c2f6e4c5a74 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -609,7 +609,18 @@ public: using is_transparent = void; // required to make find() work with different type than key_type }; - mutable std::unordered_map nodes; + struct PathCmp + { + using is_transparent = std::true_type; + + auto operator()(const std::string_view a, + const std::string_view b) const + { + return a.size() < b.size() || (a.size() == b.size() && a < b); + } + }; + + mutable std::map nodes; std::unordered_map, Hash, Equal> deltas_for_path; std::list deltas; From 5180e58dca42959f3c9985459f6c847165e9bd06 Mon Sep 17 00:00:00 2001 From: Mikhail Artemenko Date: Thu, 5 Sep 2024 11:01:37 +0000 Subject: [PATCH 03/23] fix collector --- src/Coordination/KeeperStorage.cpp | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index 9d63a9e8691..aa5b9d539b2 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -1611,11 +1611,16 @@ private: { if constexpr (Storage::use_rocksdb) { + std::filesystem::path root_fs_path(root_path.toString()); auto children = storage.container.getChildren(root_path.toString()); - for (auto && [child_path, node] : children) + for (auto && [child_name, node] : children) + { + auto child_path = (root_fs_path / child_name).generic_string(); + if (collect(child_path, node)) return true; + } } return false; @@ -1657,12 +1662,13 @@ private: for (; it != nodes.end() && parentNodePath(it->first) == root_path; ++it) { chassert(it->second.node); + const String & path = it->first; const typename Storage::Node & node = *it->second.node; if (updateStats(node)) return true; - if (visitRootAndUncommitted(it->first, node)) /// if child is uncommitted then all subtree is also uncommitted + if (visitRootAndUncommitted(path, node)) /// if child is uncommitted then all subtree is also uncommitted return true; } @@ -1684,9 +1690,9 @@ private: bool updateStats(const typename Storage::Node & root_node) { - nodes_observed += 1 + root_node.numChildren(); /// root + all known children + nodes_observed += 1; - if (nodes_observed > limit) + if (nodes_observed + root_node.numChildren() > limit) return true; return false; From ae512fe5337bb24b0ad7cc2bf841d4392cdd9cf2 Mon Sep 17 00:00:00 2001 From: Mikhail Artemenko Date: Thu, 5 Sep 2024 11:05:23 +0000 Subject: [PATCH 04/23] add test for single delete --- src/Coordination/tests/gtest_coordination.cpp | 167 ++++++++++++++++++ 1 file changed, 167 insertions(+) diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index d39031773cd..413b5f18e33 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -3113,6 +3113,8 @@ TYPED_TEST(CoordinationTest, TestFeatureFlags) ASSERT_TRUE(feature_flags.isEnabled(KeeperFeatureFlag::FILTERED_LIST)); ASSERT_TRUE(feature_flags.isEnabled(KeeperFeatureFlag::MULTI_READ)); ASSERT_FALSE(feature_flags.isEnabled(KeeperFeatureFlag::CHECK_NOT_EXISTS)); + ASSERT_FALSE(feature_flags.isEnabled(KeeperFeatureFlag::CREATE_IF_NOT_EXISTS)); + ASSERT_FALSE(feature_flags.isEnabled(KeeperFeatureFlag::REMOVE_RECURSIVE)); } TYPED_TEST(CoordinationTest, TestSystemNodeModify) @@ -3374,6 +3376,171 @@ TYPED_TEST(CoordinationTest, TestReapplyingDeltas) ASSERT_TRUE(children1_set == children2_set); } +TYPED_TEST(CoordinationTest, TestRemoveRecursiveRequest) +{ + using namespace DB; + using namespace Coordination; + + using Storage = typename TestFixture::Storage; + + ChangelogDirTest rocks("./rocksdb"); + this->setRocksDBDirectory("./rocksdb"); + + Storage storage{500, "", this->keeper_context}; + + int32_t zxid = 0; + + const auto create = [&](const String & path, int create_mode) + { + int new_zxid = ++zxid; + + const auto create_request = std::make_shared(); + create_request->path = path; + create_request->is_ephemeral = create_mode == zkutil::CreateMode::Ephemeral || create_mode == zkutil::CreateMode::EphemeralSequential; + create_request->is_sequential = create_mode == zkutil::CreateMode::PersistentSequential || create_mode == zkutil::CreateMode::EphemeralSequential; + + storage.preprocessRequest(create_request, 1, 0, new_zxid); + auto responses = storage.processRequest(create_request, 1, new_zxid); + + EXPECT_EQ(responses.size(), 1); + EXPECT_EQ(responses[0].response->error, Coordination::Error::ZOK) << "Failed to create " << path; + }; + + const auto remove = [&](const String & path, int32_t version = -1, std::optional remove_nodes_limit = std::nullopt) + { + int new_zxid = ++zxid; + + std::shared_ptr remove_request; + + if (remove_nodes_limit.has_value()) + remove_request = std::make_shared(); + else + remove_request = std::make_shared(); + + remove_request->path = path; + remove_request->version = version; + remove_request->remove_nodes_limit = remove_nodes_limit.value_or(1); + + storage.preprocessRequest(remove_request, 1, 0, new_zxid); + return storage.processRequest(remove_request, 1, new_zxid); + }; + + const auto exists = [&](const String & path) + { + int new_zxid = ++zxid; + + const auto exists_request = std::make_shared(); + exists_request->path = path; + + storage.preprocessRequest(exists_request, 1, 0, new_zxid); + auto responses = storage.processRequest(exists_request, 1, new_zxid); + + EXPECT_EQ(responses.size(), 1); + return responses[0].response->error == Coordination::Error::ZOK; + }; + + { + SCOPED_TRACE("Single Remove Single Node"); + create("/T1", zkutil::CreateMode::Persistent); + + auto responses = remove("/T1"); + ASSERT_EQ(responses.size(), 1); + ASSERT_EQ(responses[0].response->error, Coordination::Error::ZOK); + ASSERT_FALSE(exists("/T1")); + } + + { + SCOPED_TRACE("Single Remove Tree"); + create("/T2", zkutil::CreateMode::Persistent); + create("/T2/A", zkutil::CreateMode::Persistent); + + auto responses = remove("/T2"); + ASSERT_EQ(responses.size(), 1); + ASSERT_EQ(responses[0].response->error, Coordination::Error::ZNOTEMPTY); + ASSERT_TRUE(exists("/T2")); + } + + { + SCOPED_TRACE("Recursive Remove Single Node"); + create("/T3", zkutil::CreateMode::Persistent); + + auto responses = remove("/T3", 0, 100); + ASSERT_EQ(responses.size(), 1); + ASSERT_EQ(responses[0].response->error, Coordination::Error::ZOK); + ASSERT_FALSE(exists("/T3")); + } + + { + SCOPED_TRACE("Recursive Remove Tree"); + create("/T4", zkutil::CreateMode::Persistent); + create("/T4/A", zkutil::CreateMode::Persistent); + + auto responses = remove("/T4", 0, 100); + ASSERT_EQ(responses.size(), 1); + ASSERT_EQ(responses[0].response->error, Coordination::Error::ZOK); + ASSERT_FALSE(exists("/T4")); + ASSERT_FALSE(exists("/T4/A")); + } + + { + SCOPED_TRACE("Recursive Remove Tree Small Limit"); + create("/T5", zkutil::CreateMode::Persistent); + create("/T5/A", zkutil::CreateMode::Persistent); + create("/T5/B", zkutil::CreateMode::Persistent); + create("/T5/A/C", zkutil::CreateMode::Persistent); + + auto responses = remove("/T5", 0, 2); + ASSERT_EQ(responses.size(), 1); + ASSERT_EQ(responses[0].response->error, Coordination::Error::ZNOTEMPTY); + ASSERT_TRUE(exists("/T5")); + ASSERT_TRUE(exists("/T5/A")); + ASSERT_TRUE(exists("/T5/B")); + ASSERT_TRUE(exists("/T5/A/C")); + } + + { + SCOPED_TRACE("Recursive Remove Tree Small Limit"); + create("/T6", zkutil::CreateMode::Persistent); + create("/T6/A", zkutil::CreateMode::Persistent); + create("/T6/B", zkutil::CreateMode::Persistent); + create("/T6/A/C", zkutil::CreateMode::Persistent); + + auto responses = remove("/T6", 0, 2); + ASSERT_EQ(responses.size(), 1); + ASSERT_EQ(responses[0].response->error, Coordination::Error::ZNOTEMPTY); + ASSERT_TRUE(exists("/T6")); + ASSERT_TRUE(exists("/T6/A")); + ASSERT_TRUE(exists("/T6/B")); + ASSERT_TRUE(exists("/T6/A/C")); + } + + { + SCOPED_TRACE("Recursive Remove Ephemeral"); + create("/T7", zkutil::CreateMode::Ephemeral); + + auto responses = remove("/T7", 0, 100); + ASSERT_EQ(responses.size(), 1); + ASSERT_EQ(responses[0].response->error, Coordination::Error::ZOK); + ASSERT_TRUE(!exists("/T7")); + } + + { + SCOPED_TRACE("Recursive Remove Tree With Ephemeral"); + create("/T8", zkutil::CreateMode::Persistent); + create("/T8/A", zkutil::CreateMode::Persistent); + create("/T8/B", zkutil::CreateMode::Ephemeral); + create("/T8/A/C", zkutil::CreateMode::Ephemeral); + + auto responses = remove("/T8", 0, 4); + ASSERT_EQ(responses.size(), 1); + ASSERT_EQ(responses[0].response->error, Coordination::Error::ZOK); + ASSERT_TRUE(!exists("/T8")); + ASSERT_TRUE(!exists("/T8/A")); + ASSERT_TRUE(!exists("/T8/B")); + ASSERT_TRUE(!exists("/T8/A/C")); + } +} + /// INSTANTIATE_TEST_SUITE_P(CoordinationTestSuite, /// CoordinationTest, /// ::testing::ValuesIn(std::initializer_list{CompressionParam{true, ".zstd"}, CompressionParam{false, ""}})); From 6455e1dfa15572ab928ccb9907334d417cf69919 Mon Sep 17 00:00:00 2001 From: Mikhail Artemenko Date: Thu, 5 Sep 2024 11:14:43 +0000 Subject: [PATCH 05/23] add ephemerals check --- src/Coordination/tests/gtest_coordination.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index 413b5f18e33..f902a806b37 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -3517,10 +3517,12 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveRequest) { SCOPED_TRACE("Recursive Remove Ephemeral"); create("/T7", zkutil::CreateMode::Ephemeral); + ASSERT_EQ(storage.ephemerals.size(), 1); auto responses = remove("/T7", 0, 100); ASSERT_EQ(responses.size(), 1); ASSERT_EQ(responses[0].response->error, Coordination::Error::ZOK); + ASSERT_EQ(storage.ephemerals.size(), 0); ASSERT_TRUE(!exists("/T7")); } @@ -3530,10 +3532,12 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveRequest) create("/T8/A", zkutil::CreateMode::Persistent); create("/T8/B", zkutil::CreateMode::Ephemeral); create("/T8/A/C", zkutil::CreateMode::Ephemeral); + ASSERT_EQ(storage.ephemerals.size(), 1); auto responses = remove("/T8", 0, 4); ASSERT_EQ(responses.size(), 1); ASSERT_EQ(responses[0].response->error, Coordination::Error::ZOK); + ASSERT_EQ(storage.ephemerals.size(), 0); ASSERT_TRUE(!exists("/T8")); ASSERT_TRUE(!exists("/T8/A")); ASSERT_TRUE(!exists("/T8/B")); From c3cc2a3fb19438043c076300cf59d5c1c2fd3957 Mon Sep 17 00:00:00 2001 From: Mikhail Artemenko Date: Thu, 5 Sep 2024 16:14:50 +0000 Subject: [PATCH 06/23] intro new request type --- src/Common/ZooKeeper/IKeeper.cpp | 1 + src/Common/ZooKeeper/IKeeper.h | 25 ++- src/Common/ZooKeeper/TestKeeper.cpp | 9 +- src/Common/ZooKeeper/TestKeeper.h | 6 +- src/Common/ZooKeeper/ZooKeeper.cpp | 14 +- src/Common/ZooKeeper/ZooKeeperCommon.cpp | 16 +- src/Common/ZooKeeper/ZooKeeperCommon.h | 36 ++-- src/Common/ZooKeeper/ZooKeeperImpl.cpp | 40 ++-- src/Common/ZooKeeper/ZooKeeperImpl.h | 6 +- .../ZooKeeper/ZooKeeperWithFaultInjection.cpp | 4 +- src/Coordination/KeeperDispatcher.cpp | 7 +- src/Coordination/KeeperStorage.cpp | 171 +++++++++++++----- 12 files changed, 235 insertions(+), 100 deletions(-) diff --git a/src/Common/ZooKeeper/IKeeper.cpp b/src/Common/ZooKeeper/IKeeper.cpp index 7cca262baca..34c9d94fca5 100644 --- a/src/Common/ZooKeeper/IKeeper.cpp +++ b/src/Common/ZooKeeper/IKeeper.cpp @@ -171,6 +171,7 @@ bool isUserError(Error zk_return_code) void CreateRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); } void RemoveRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); } +void RemoveRecursiveRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); } void ExistsRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); } void GetRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); } void SetRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); } diff --git a/src/Common/ZooKeeper/IKeeper.h b/src/Common/ZooKeeper/IKeeper.h index f887e67bd92..a0d6ae54f56 100644 --- a/src/Common/ZooKeeper/IKeeper.h +++ b/src/Common/ZooKeeper/IKeeper.h @@ -238,16 +238,30 @@ struct RemoveRequest : virtual Request String path; int32_t version = -1; + void addRootPath(const String & root_path) override; + String getPath() const override { return path; } + + size_t bytesSize() const override { return path.size() + sizeof(version); } +}; + +struct RemoveResponse : virtual Response +{ +}; + +struct RemoveRecursiveRequest : virtual Request +{ + String path; + /// strict limit for number of deleted nodes uint32_t remove_nodes_limit = 1; void addRootPath(const String & root_path) override; String getPath() const override { return path; } - size_t bytesSize() const override { return path.size() + sizeof(version) + sizeof(remove_nodes_limit); } + size_t bytesSize() const override { return path.size() + sizeof(remove_nodes_limit); } }; -struct RemoveResponse : virtual Response +struct RemoveRecursiveResponse : virtual Response { }; @@ -433,6 +447,7 @@ struct ErrorResponse : virtual Response using CreateCallback = std::function; using RemoveCallback = std::function; +using RemoveRecursiveCallback = std::function; using ExistsCallback = std::function; using GetCallback = std::function; using SetCallback = std::function; @@ -588,9 +603,13 @@ public: virtual void remove( const String & path, int32_t version, - uint32_t remove_nodes_limit, RemoveCallback callback) = 0; + virtual void removeRecursive( + const String & path, + uint32_t remove_nodes_limit, + RemoveRecursiveCallback callback) = 0; + virtual void exists( const String & path, ExistsCallback callback, diff --git a/src/Common/ZooKeeper/TestKeeper.cpp b/src/Common/ZooKeeper/TestKeeper.cpp index 65a575549c4..1ac085d6050 100644 --- a/src/Common/ZooKeeper/TestKeeper.cpp +++ b/src/Common/ZooKeeper/TestKeeper.cpp @@ -759,7 +759,6 @@ void TestKeeper::create( void TestKeeper::remove( const String & path, int32_t version, - [[maybe_unused]] uint32_t remove_nodes_limit, // TODO(michicosun): enable RemoveCallback callback) { TestKeeperRemoveRequest request; @@ -772,6 +771,14 @@ void TestKeeper::remove( pushRequest(std::move(request_info)); } +void TestKeeper::removeRecursive( + [[maybe_unused]] const String & path, + [[maybe_unused]] uint32_t remove_nodes_limit, + [[maybe_unused]] RemoveRecursiveCallback callback) +{ + /// TODO(michicosun) implement +} + void TestKeeper::exists( const String & path, ExistsCallback callback, diff --git a/src/Common/ZooKeeper/TestKeeper.h b/src/Common/ZooKeeper/TestKeeper.h index 14b37448a20..c32f0064dec 100644 --- a/src/Common/ZooKeeper/TestKeeper.h +++ b/src/Common/ZooKeeper/TestKeeper.h @@ -56,9 +56,13 @@ public: void remove( const String & path, int32_t version, - uint32_t remove_nodes_limit, RemoveCallback callback) override; + void removeRecursive( + const String & path, + uint32_t remove_nodes_limit, + RemoveRecursiveCallback callback) override; + void exists( const String & path, ExistsCallback callback, diff --git a/src/Common/ZooKeeper/ZooKeeper.cpp b/src/Common/ZooKeeper/ZooKeeper.cpp index eb4f5e0725a..93e833e87c8 100644 --- a/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/src/Common/ZooKeeper/ZooKeeper.cpp @@ -981,16 +981,16 @@ bool ZooKeeper::tryRemoveChildrenRecursive(const std::string & path, bool probab void ZooKeeper::removeRecursive(const std::string & path) // TODO(michicosun) rewrite { - auto promise = std::make_shared>(); + auto promise = std::make_shared>(); auto future = promise->get_future(); - auto callback = [promise](const Coordination::RemoveResponse & response) mutable + auto callback = [promise](const Coordination::RemoveRecursiveResponse & response) mutable { promise->set_value(response); }; - impl->remove(path, -1, /*remove_nodes_limit=*/ 100, std::move(callback)); - + impl->removeRecursive(path, 100, std::move(callback)); + if (future.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready) { impl->finalize(fmt::format("Operation timeout on {} {}", Coordination::OpNum::RemoveRecursive, path)); @@ -1385,7 +1385,7 @@ std::future ZooKeeper::asyncRemove(const std::stri promise->set_value(response); }; - impl->remove(path, version, /*remove_nodes_limit=*/ 1, std::move(callback)); + impl->remove(path, version, std::move(callback)); return future; } @@ -1408,7 +1408,7 @@ std::future ZooKeeper::asyncTryRemove(const std::s promise->set_value(response); }; - impl->remove(path, version, /*remove_nodes_limit=*/ 1, std::move(callback)); + impl->remove(path, version, std::move(callback)); return future; } @@ -1422,7 +1422,7 @@ std::future ZooKeeper::asyncTryRemoveNoThrow(const promise->set_value(response); }; - impl->remove(path, version, /*remove_nodes_limit=*/ 1, std::move(callback)); + impl->remove(path, version, std::move(callback)); return future; } diff --git a/src/Common/ZooKeeper/ZooKeeperCommon.cpp b/src/Common/ZooKeeper/ZooKeeperCommon.cpp index fb37aa6ac22..3f9225e84dd 100644 --- a/src/Common/ZooKeeper/ZooKeeperCommon.cpp +++ b/src/Common/ZooKeeper/ZooKeeperCommon.cpp @@ -232,24 +232,24 @@ void ZooKeeperRemoveRequest::readImpl(ReadBuffer & in) Coordination::read(version, in); } -void ZooKeeperRemoveRecursiveRequest::writeImpl(WriteBuffer & out) const +void ZooKeeperRemoveRecursiveRequest::writeImpl(WriteBuffer & out) const { - ZooKeeperRemoveRequest::writeImpl(out); + Coordination::write(path, out); Coordination::write(remove_nodes_limit, out); } -void ZooKeeperRemoveRecursiveRequest::readImpl(ReadBuffer & in) +void ZooKeeperRemoveRecursiveRequest::readImpl(ReadBuffer & in) { - ZooKeeperRemoveRequest::readImpl(in); + Coordination::read(path, in); Coordination::read(remove_nodes_limit, in); } -std::string ZooKeeperRemoveRecursiveRequest::toStringImpl(bool short_format) const +std::string ZooKeeperRemoveRecursiveRequest::toStringImpl(bool /*short_format*/) const { return fmt::format( - "{}\n" + "path = {}\n" "remove_nodes_limit = {}", - ZooKeeperRemoveRequest::toStringImpl(short_format), + path, remove_nodes_limit); } @@ -531,7 +531,7 @@ ZooKeeperMultiRequest::ZooKeeperMultiRequest(std::span(*concrete_request_remove)); } - else if (const auto * concrete_request_remove_recursive = dynamic_cast(generic_request.get())) + else if (const auto * concrete_request_remove_recursive = dynamic_cast(generic_request.get())) { checkOperationType(Write); requests.push_back(std::make_shared(*concrete_request_remove_recursive)); diff --git a/src/Common/ZooKeeper/ZooKeeperCommon.h b/src/Common/ZooKeeper/ZooKeeperCommon.h index ef17d069b4c..66c075b277b 100644 --- a/src/Common/ZooKeeper/ZooKeeperCommon.h +++ b/src/Common/ZooKeeper/ZooKeeperCommon.h @@ -258,7 +258,7 @@ struct ZooKeeperCreateIfNotExistsResponse : ZooKeeperCreateResponse using ZooKeeperCreateResponse::ZooKeeperCreateResponse; }; -struct ZooKeeperRemoveRequest : RemoveRequest, ZooKeeperRequest +struct ZooKeeperRemoveRequest final : RemoveRequest, ZooKeeperRequest { ZooKeeperRemoveRequest() = default; explicit ZooKeeperRemoveRequest(const RemoveRequest & base) : RemoveRequest(base) {} @@ -276,17 +276,7 @@ struct ZooKeeperRemoveRequest : RemoveRequest, ZooKeeperRequest void createLogElements(LogElements & elems) const override; }; -struct ZooKeeperRemoveRecursiveRequest : ZooKeeperRemoveRequest -{ - OpNum getOpNum() const override { return OpNum::RemoveRecursive; } - void writeImpl(WriteBuffer & out) const override; - void readImpl(ReadBuffer & in) override; - std::string toStringImpl(bool short_format) const override; - - ZooKeeperResponsePtr makeResponse() const override; -}; - -struct ZooKeeperRemoveResponse : RemoveResponse, ZooKeeperResponse +struct ZooKeeperRemoveResponse final : RemoveResponse, ZooKeeperResponse { void readImpl(ReadBuffer &) override {} void writeImpl(WriteBuffer &) const override {} @@ -295,9 +285,29 @@ struct ZooKeeperRemoveResponse : RemoveResponse, ZooKeeperResponse size_t bytesSize() const override { return RemoveResponse::bytesSize() + sizeof(xid) + sizeof(zxid); } }; -struct ZooKeeperRemoveRecursiveResponse : ZooKeeperRemoveResponse +struct ZooKeeperRemoveRecursiveRequest final : RemoveRecursiveRequest, ZooKeeperRequest { + ZooKeeperRemoveRecursiveRequest() = default; + explicit ZooKeeperRemoveRecursiveRequest(const RemoveRecursiveRequest & base) : RemoveRecursiveRequest(base) {} + OpNum getOpNum() const override { return OpNum::RemoveRecursive; } + void writeImpl(WriteBuffer & out) const override; + void readImpl(ReadBuffer & in) override; + std::string toStringImpl(bool short_format) const override; + + ZooKeeperResponsePtr makeResponse() const override; + bool isReadRequest() const override { return false; } + + size_t bytesSize() const override { return RemoveRecursiveRequest::bytesSize() + sizeof(xid); } +}; + +struct ZooKeeperRemoveRecursiveResponse : RemoveRecursiveResponse, ZooKeeperResponse +{ + void readImpl(ReadBuffer &) override {} + void writeImpl(WriteBuffer &) const override {} + OpNum getOpNum() const override { return OpNum::RemoveRecursive; } + + size_t bytesSize() const override { return RemoveRecursiveResponse::bytesSize() + sizeof(xid) + sizeof(zxid); } }; struct ZooKeeperExistsRequest final : ExistsRequest, ZooKeeperRequest diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/src/Common/ZooKeeper/ZooKeeperImpl.cpp index 5c4de98d5fc..a6dd9738e17 100644 --- a/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -1333,33 +1333,39 @@ void ZooKeeper::create( void ZooKeeper::remove( const String & path, int32_t version, - uint32_t remove_nodes_limit, RemoveCallback callback) { - std::shared_ptr request = nullptr; - - if (!isFeatureEnabled(KeeperFeatureFlag::REMOVE_RECURSIVE)) - { - if (remove_nodes_limit > 1) - throw Exception::fromMessage(Error::ZBADARGUMENTS, "RemoveRecursive request type cannot be used because it's not supported by the server"); - - request = std::make_shared(); - } - else - request = std::make_shared(); - - request->path = path; - request->version = version; - request->remove_nodes_limit = remove_nodes_limit; + ZooKeeperRemoveRequest request; + request.path = path; + request.version = version; RequestInfo request_info; - request_info.request = std::move(request); + request_info.request = std::make_shared(std::move(request)); request_info.callback = [callback](const Response & response) { callback(dynamic_cast(response)); }; pushRequest(std::move(request_info)); ProfileEvents::increment(ProfileEvents::ZooKeeperRemove); } +void ZooKeeper::removeRecursive( + const String &path, + uint32_t remove_nodes_limit, + RemoveRecursiveCallback callback) +{ + if (!isFeatureEnabled(KeeperFeatureFlag::REMOVE_RECURSIVE)) + throw Exception::fromMessage(Error::ZBADARGUMENTS, "RemoveRecursive request type cannot be used because it's not supported by the server"); + + ZooKeeperRemoveRecursiveRequest request; + request.path = path; + request.remove_nodes_limit = remove_nodes_limit; + + RequestInfo request_info; + request_info.request = std::make_shared(std::move(request)); + request_info.callback = [callback](const Response & response) { callback(dynamic_cast(response)); }; + + pushRequest(std::move(request_info)); + ProfileEvents::increment(ProfileEvents::ZooKeeperRemove); +} void ZooKeeper::exists( const String & path, diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.h b/src/Common/ZooKeeper/ZooKeeperImpl.h index 6e98df546d9..47d2ab8f401 100644 --- a/src/Common/ZooKeeper/ZooKeeperImpl.h +++ b/src/Common/ZooKeeper/ZooKeeperImpl.h @@ -144,9 +144,13 @@ public: void remove( const String & path, int32_t version, - uint32_t remove_nodes_limit, RemoveCallback callback) override; + void removeRecursive( + const String &path, + uint32_t remove_nodes_limit, + RemoveRecursiveCallback callback) override; + void exists( const String & path, ExistsCallback callback, diff --git a/src/Common/ZooKeeper/ZooKeeperWithFaultInjection.cpp b/src/Common/ZooKeeper/ZooKeeperWithFaultInjection.cpp index dfd3faf8d01..fc246e263d9 100644 --- a/src/Common/ZooKeeper/ZooKeeperWithFaultInjection.cpp +++ b/src/Common/ZooKeeper/ZooKeeperWithFaultInjection.cpp @@ -587,7 +587,7 @@ zkutil::ZooKeeper::FutureRemove ZooKeeperWithFaultInjection::asyncTryRemove(std: promise->set_value(response); }; - keeper->impl->remove(path, version, /*remove_nodes_limit=*/ 1, std::move(callback)); + keeper->impl->remove(path, version, std::move(callback)); return future; } @@ -630,7 +630,7 @@ zkutil::ZooKeeper::FutureRemove ZooKeeperWithFaultInjection::asyncTryRemoveNoThr } }; - keeper->impl->remove(path, version, /*remove_nodes_limit=*/ 1, std::move(callback)); + keeper->impl->remove(path, version, std::move(callback)); return future; } diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index fd80ee0a7e2..c91cdb53222 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -86,12 +86,17 @@ bool checkIfRequestIncreaseMem(const Coordination::ZooKeeperRequestPtr & request break; } case Coordination::OpNum::Remove: - case Coordination::OpNum::RemoveRecursive: { Coordination::ZooKeeperRemoveRequest & remove_req = dynamic_cast(*sub_zk_request); memory_delta -= remove_req.bytesSize(); break; } + case Coordination::OpNum::RemoveRecursive: + { + Coordination::ZooKeeperRemoveRecursiveRequest & remove_req = dynamic_cast(*sub_zk_request); + memory_delta -= remove_req.bytesSize(); + break; + } default: break; } diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index aa5b9d539b2..613714263ef 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -1462,16 +1462,41 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce } }; +namespace +{ + +template +void addUpdateParentPzxidDelta(Storage & storage, std::vector & deltas, int64_t zxid, StringRef path) +{ + auto parent_path = parentNodePath(path); + if (!storage.uncommitted_state.getNode(parent_path)) + return; + + deltas.emplace_back( + std::string{parent_path}, + zxid, + typename Storage::UpdateNodeDelta + { + [zxid](Storage::Node & parent) + { + parent.pzxid = std::max(parent.pzxid, zxid); + } + } + ); +} + +} + template struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestProcessor { + using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; + bool checkAuth(Storage & storage, int64_t session_id, bool is_local) const override { return storage.checkACL(parentNodePath(this->zk_request->getPath()), Coordination::ACL::Delete, session_id, is_local); } - using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - std::vector preprocess(Storage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/, uint64_t & digest, const KeeperContext & keeper_context) const override { @@ -1488,62 +1513,35 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr return {typename Storage::Delta{zxid, Coordination::Error::ZBADARGUMENTS}}; } - const auto update_parent_pzxid = [&]() - { - auto parent_path = parentNodePath(request.path); - if (!storage.uncommitted_state.getNode(parent_path)) - return; - - new_deltas.emplace_back( - std::string{parent_path}, - zxid, - typename Storage::UpdateNodeDelta - { - [zxid](Storage::Node & parent) - { - parent.pzxid = std::max(parent.pzxid, zxid); - } - } - ); - }; - auto node = storage.uncommitted_state.getNode(request.path); if (!node) { if (request.restored_from_zookeeper_log) - update_parent_pzxid(); - + addUpdateParentPzxidDelta(storage, new_deltas, zxid, request.path); return {typename Storage::Delta{zxid, Coordination::Error::ZNONODE}}; } - - if (request.version != -1 && request.version != node->version) + else if (request.version != -1 && request.version != node->version) return {typename Storage::Delta{zxid, Coordination::Error::ZBADVERSION}}; - - ToDeleteTreeCollector collector(storage, zxid, request.remove_nodes_limit); - bool limit_exceeded = collector.collect(request.path, *node); - - if (limit_exceeded) + else if (node->numChildren() != 0) return {typename Storage::Delta{zxid, Coordination::Error::ZNOTEMPTY}}; if (request.restored_from_zookeeper_log) - update_parent_pzxid(); + addUpdateParentPzxidDelta(storage, new_deltas, zxid, request.path); - auto delete_deltas = collector.extractDeltas(); + new_deltas.emplace_back( + std::string{parentNodePath(request.path)}, + zxid, + typename Storage::UpdateNodeDelta{[](typename Storage::Node & parent) + { + ++parent.cversion; + parent.decreaseNumChildren(); + }}); - for (const auto & delta : delete_deltas) - std::visit( - Overloaded{ - [&](const typename Storage::RemoveNodeDelta & remove_delta) - { - if (remove_delta.ephemeral_owner) - storage.unregisterEphemeralPath(remove_delta.ephemeral_owner, delta.path); - }, - [](auto && /* delta */) {}, - }, - delta.operation); + new_deltas.emplace_back(request.path, zxid, typename Storage::RemoveNodeDelta{request.version, node->ephemeralOwner()}); - new_deltas.insert(new_deltas.end(), std::make_move_iterator(delete_deltas.begin()), std::make_move_iterator(delete_deltas.end())); + if (node->isEphemeral()) + storage.unregisterEphemeralPath(node->ephemeralOwner(), request.path); digest = storage.calculateNodesDigest(digest, new_deltas); @@ -1564,6 +1562,84 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr { return processWatchesImpl(this->zk_request->getPath(), watches, list_watches, Coordination::Event::DELETED); } +}; + +template +struct KeeperStorageRemoveRecursiveRequestProcessor final : public KeeperStorageRequestProcessor +{ + using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; + + bool checkAuth(Storage & storage, int64_t session_id, bool is_local) const override + { + return storage.checkACL(parentNodePath(this->zk_request->getPath()), Coordination::ACL::Delete, session_id, is_local); + } + + std::vector + preprocess(Storage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/, uint64_t & digest, const KeeperContext & keeper_context) const override + { + ProfileEvents::increment(ProfileEvents::KeeperRemoveRequest); + Coordination::ZooKeeperRemoveRecursiveRequest & request = dynamic_cast(*this->zk_request); + + std::vector new_deltas; + + if (Coordination::matchPath(request.path, keeper_system_path) != Coordination::PathMatchResult::NOT_MATCH) + { + auto error_msg = fmt::format("Trying to delete an internal Keeper path ({}) which is not allowed", request.path); + + handleSystemNodeModification(keeper_context, error_msg); + return {typename Storage::Delta{zxid, Coordination::Error::ZBADARGUMENTS}}; + } + + auto node = storage.uncommitted_state.getNode(request.path); + + if (!node) + { + if (request.restored_from_zookeeper_log) + addUpdateParentPzxidDelta(storage, new_deltas, zxid, request.path); + + return {typename Storage::Delta{zxid, Coordination::Error::ZNONODE}}; + } + + ToDeleteTreeCollector collector(storage, zxid, request.remove_nodes_limit); + bool limit_exceeded = collector.collect(request.path, *node); + + if (limit_exceeded) + return {typename Storage::Delta{zxid, Coordination::Error::ZNOTEMPTY}}; + + if (request.restored_from_zookeeper_log) + addUpdateParentPzxidDelta(storage, new_deltas, zxid, request.path); + + auto delete_deltas = collector.extractDeltas(); + + for (const auto & delta : delete_deltas) + { + const auto * remove_delta = std::get_if(&delta.operation); + if (remove_delta && remove_delta->ephemeral_owner) + storage.unregisterEphemeralPath(remove_delta->ephemeral_owner, delta.path); + } + + new_deltas.insert(new_deltas.end(), std::make_move_iterator(delete_deltas.begin()), std::make_move_iterator(delete_deltas.end())); + + digest = storage.calculateNodesDigest(digest, new_deltas); + + return new_deltas; + } + + Coordination::ZooKeeperResponsePtr process(Storage & storage, int64_t zxid) const override + { + Coordination::ZooKeeperResponsePtr response_ptr = this->zk_request->makeResponse(); + Coordination::ZooKeeperRemoveRecursiveResponse & response = dynamic_cast(*response_ptr); + + response.error = storage.commit(zxid); + return response_ptr; + } + + KeeperStorageBase::ResponsesForSessions + processWatches(KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & list_watches) const override + { + /// TODO(michicosun) rewrite + return processWatchesImpl(this->zk_request->getPath(), watches, list_watches, Coordination::Event::DELETED); + } private: class ToDeleteTreeCollector @@ -2270,10 +2346,13 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro concrete_requests.push_back(std::make_shared>(sub_zk_request)); break; case Coordination::OpNum::Remove: - case Coordination::OpNum::RemoveRecursive: check_operation_type(OperationType::Write); concrete_requests.push_back(std::make_shared>(sub_zk_request)); break; + case Coordination::OpNum::RemoveRecursive: + check_operation_type(OperationType::Write); + concrete_requests.push_back(std::make_shared>(sub_zk_request)); + break; case Coordination::OpNum::Set: check_operation_type(OperationType::Write); concrete_requests.push_back(std::make_shared>(sub_zk_request)); @@ -2543,7 +2622,7 @@ KeeperStorageRequestProcessorsFactory::KeeperStorageRequestProcessorsFa registerKeeperRequestProcessor>(*this); registerKeeperRequestProcessor>(*this); registerKeeperRequestProcessor>(*this); - registerKeeperRequestProcessor>(*this); + registerKeeperRequestProcessor>(*this); } From cb1c11c74ae33d9de40ffc86dd7f2dc976be309b Mon Sep 17 00:00:00 2001 From: Mikhail Artemenko Date: Fri, 6 Sep 2024 00:08:08 +0000 Subject: [PATCH 07/23] change traverse --- src/Coordination/KeeperStorage.cpp | 130 +++++++++++++++++++---------- 1 file changed, 84 insertions(+), 46 deletions(-) diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index 613714263ef..9378b30ea52 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -1642,14 +1642,17 @@ struct KeeperStorageRemoveRecursiveRequestProcessor final : public KeeperStorage } private: + using SNode = typename Storage::Node; + class ToDeleteTreeCollector { Storage & storage; int64_t zxid; uint32_t limit; - uint32_t nodes_observed = 0; - std::vector deltas; + uint32_t max_level = 0; + uint32_t nodes_observed = 1; /// root node + std::unordered_map> by_level_deltas; public: ToDeleteTreeCollector(Storage & storage_, int64_t zxid_, uint32_t limit_) @@ -1659,31 +1662,60 @@ private: { } - bool collect(StringRef root_path, const typename Storage::Node & root_node) + bool collect(StringRef root_path, const SNode & root_node) { - if (updateStats(root_node)) + std::deque steps; + steps.push_back(Step{root_path.toString(), &root_node, 0}); + + if (observeNode(root_node)) return true; - if (visitRocksDBNode(root_path) || visitMemNode(root_path)) - return true; + while (!steps.empty()) + { + Step step = std::move(steps.front()); + steps.pop_front(); - /// After checking committed nodes there might be some in uncommitted state, - /// for example made on the previouse step of multi transaction. - return visitRootAndUncommitted(root_path, root_node); - } + StringRef path = step.path; + uint32_t level = step.level; + const SNode * node = nullptr; - uint32_t getNumObserved() const - { - return nodes_observed; + if (auto * rdb = std::get_if(&step.node)) + node = rdb; + else + node = std::get(step.node); + + chassert(!path.empty()); + chassert(node != nullptr); + + if (visitRocksDBNode(steps, path, level) || visitMemNode(steps, path, level) || visitRootAndUncommitted(steps, path, *node, level)) + return true; + } + + return false; } std::vector extractDeltas() { + std::vector deltas; + + for (ssize_t level = max_level; level >= 0; --level) + { + auto & level_deltas = by_level_deltas[static_cast(level)]; + deltas.insert(deltas.end(), std::make_move_iterator(level_deltas.begin()), std::make_move_iterator(level_deltas.end())); + } + return std::move(deltas); } private: - bool visitRocksDBNode(StringRef root_path) + struct Step + { + String path; + std::variant node; + uint32_t level; + }; + + bool visitRocksDBNode(std::deque & steps, StringRef root_path, uint32_t level) { if constexpr (Storage::use_rocksdb) { @@ -1692,86 +1724,92 @@ private: for (auto && [child_name, node] : children) { - auto child_path = (root_fs_path / child_name).generic_string(); - - if (collect(child_path, node)) + if (observeNode(node)) return true; + + auto child_path = (root_fs_path / child_name).generic_string(); + steps.push_back(Step{std::move(child_path), std::move(node), level + 1}); } } return false; } - bool visitMemNode(StringRef root_path) + bool visitMemNode(std::deque & steps, StringRef root_path, uint32_t level) { if constexpr (!Storage::use_rocksdb) { std::filesystem::path root_fs_path(root_path.toString()); auto node_it = storage.container.find(root_path); - if (node_it != storage.container.end()) + if (node_it == storage.container.end()) + return false; + + + auto children = node_it->value.getChildren(); + for (auto && child_name : children) { - auto children = node_it->value.getChildren(); - for (auto && child_name : children) - { - auto child_path = (root_fs_path / child_name.toView()).generic_string(); + auto child_path = (root_fs_path / child_name.toView()).generic_string(); - auto child_it = storage.container.find(child_path); - chassert(child_it != storage.container.end()); + auto child_it = storage.container.find(child_path); + chassert(child_it != storage.container.end()); - if (collect(child_path, child_it->value)) - return true; - } + if (observeNode(child_it->value)) + return true; + + steps.push_back(Step{std::move(child_path), child_it->value, level + 1}); } } return false; } - bool visitRootAndUncommitted(StringRef root_path, const typename Storage::Node & root_node) + bool visitRootAndUncommitted(std::deque & steps, StringRef root_path, const SNode & root_node, uint32_t level) { const auto & nodes = storage.uncommitted_state.nodes; /// nodes are sorted by paths with level locality - auto it = nodes.upper_bound(root_path.toView()); + auto it = nodes.upper_bound(root_path.toString() + "/"); for (; it != nodes.end() && parentNodePath(it->first) == root_path; ++it) { chassert(it->second.node); const String & path = it->first; - const typename Storage::Node & node = *it->second.node; + const SNode & node = *it->second.node; - if (updateStats(node)) + if (observeNode(node)) return true; - if (visitRootAndUncommitted(path, node)) /// if child is uncommitted then all subtree is also uncommitted - return true; + steps.push_back(Step{path, &node, level + 1}); } - deltas.emplace_back( + addDelta(root_path, root_node, level); + + return false; + } + + void addDelta(StringRef root_path, const SNode & root_node, uint32_t level) + { + max_level = std::max(max_level, level); + + by_level_deltas[level].emplace_back( parentNodePath(root_path).toString(), zxid, typename Storage::UpdateNodeDelta{ - [](typename Storage::Node & parent) + [](SNode & parent) { ++parent.cversion; parent.decreaseNumChildren(); } }); - deltas.emplace_back(root_path.toString(), zxid, typename Storage::RemoveNodeDelta{root_node.version, root_node.ephemeralOwner()}); - - return false; + by_level_deltas[level].emplace_back(root_path.toString(), zxid, typename Storage::RemoveNodeDelta{root_node.version, root_node.ephemeralOwner()}); } - bool updateStats(const typename Storage::Node & root_node) + bool observeNode(const SNode & node) { - nodes_observed += 1; - - if (nodes_observed + root_node.numChildren() > limit) - return true; - - return false; + nodes_observed += node.numChildren(); + return nodes_observed > limit; } }; }; From aa2721f7e49d54e97e177ee75ca9e5b39e1a2beb Mon Sep 17 00:00:00 2001 From: Mikhail Artemenko Date: Fri, 6 Sep 2024 00:09:46 +0000 Subject: [PATCH 08/23] update tests --- src/Common/ZooKeeper/Types.h | 1 + src/Common/ZooKeeper/ZooKeeper.cpp | 8 ++ src/Coordination/tests/gtest_coordination.cpp | 126 +++++++++++++++--- 3 files changed, 114 insertions(+), 21 deletions(-) diff --git a/src/Common/ZooKeeper/Types.h b/src/Common/ZooKeeper/Types.h index d2876adaabc..4a163c15838 100644 --- a/src/Common/ZooKeeper/Types.h +++ b/src/Common/ZooKeeper/Types.h @@ -31,6 +31,7 @@ using AsyncResponses = std::vector>>; Coordination::RequestPtr makeCreateRequest(const std::string & path, const std::string & data, int create_mode, bool ignore_if_exists = false); Coordination::RequestPtr makeRemoveRequest(const std::string & path, int version); +Coordination::RequestPtr makeRemoveRecursiveRequest(const std::string & path, uint32_t remove_nodes_limit); Coordination::RequestPtr makeSetRequest(const std::string & path, const std::string & data, int version); Coordination::RequestPtr makeCheckRequest(const std::string & path, int version); diff --git a/src/Common/ZooKeeper/ZooKeeper.cpp b/src/Common/ZooKeeper/ZooKeeper.cpp index 93e833e87c8..65c3fdba8d2 100644 --- a/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/src/Common/ZooKeeper/ZooKeeper.cpp @@ -1637,6 +1637,14 @@ Coordination::RequestPtr makeRemoveRequest(const std::string & path, int version return request; } +Coordination::RequestPtr makeRemoveRecursiveRequest(const std::string & path, uint32_t remove_nodes_limit) +{ + auto request = std::make_shared(); + request->path = path; + request->remove_nodes_limit = remove_nodes_limit; + return request; +} + Coordination::RequestPtr makeSetRequest(const std::string & path, const std::string & data, int version) { auto request = std::make_shared(); diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index f902a806b37..27061b15b90 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -3406,20 +3406,25 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveRequest) EXPECT_EQ(responses[0].response->error, Coordination::Error::ZOK) << "Failed to create " << path; }; - const auto remove = [&](const String & path, int32_t version = -1, std::optional remove_nodes_limit = std::nullopt) + const auto remove = [&](const String & path, int32_t version = -1) { int new_zxid = ++zxid; - std::shared_ptr remove_request; - - if (remove_nodes_limit.has_value()) - remove_request = std::make_shared(); - else - remove_request = std::make_shared(); - + auto remove_request = std::make_shared(); remove_request->path = path; remove_request->version = version; - remove_request->remove_nodes_limit = remove_nodes_limit.value_or(1); + + storage.preprocessRequest(remove_request, 1, 0, new_zxid); + return storage.processRequest(remove_request, 1, new_zxid); + }; + + const auto remove_recursive = [&](const String & path, uint32_t remove_nodes_limit = 1) + { + int new_zxid = ++zxid; + + auto remove_request = std::make_shared(); + remove_request->path = path; + remove_request->remove_nodes_limit = remove_nodes_limit; storage.preprocessRequest(remove_request, 1, 0, new_zxid); return storage.processRequest(remove_request, 1, new_zxid); @@ -3464,7 +3469,7 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveRequest) SCOPED_TRACE("Recursive Remove Single Node"); create("/T3", zkutil::CreateMode::Persistent); - auto responses = remove("/T3", 0, 100); + auto responses = remove_recursive("/T3", 100); ASSERT_EQ(responses.size(), 1); ASSERT_EQ(responses[0].response->error, Coordination::Error::ZOK); ASSERT_FALSE(exists("/T3")); @@ -3475,7 +3480,7 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveRequest) create("/T4", zkutil::CreateMode::Persistent); create("/T4/A", zkutil::CreateMode::Persistent); - auto responses = remove("/T4", 0, 100); + auto responses = remove_recursive("/T4", 100); ASSERT_EQ(responses.size(), 1); ASSERT_EQ(responses[0].response->error, Coordination::Error::ZOK); ASSERT_FALSE(exists("/T4")); @@ -3489,7 +3494,7 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveRequest) create("/T5/B", zkutil::CreateMode::Persistent); create("/T5/A/C", zkutil::CreateMode::Persistent); - auto responses = remove("/T5", 0, 2); + auto responses = remove_recursive("/T5", 2); ASSERT_EQ(responses.size(), 1); ASSERT_EQ(responses[0].response->error, Coordination::Error::ZNOTEMPTY); ASSERT_TRUE(exists("/T5")); @@ -3499,19 +3504,19 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveRequest) } { - SCOPED_TRACE("Recursive Remove Tree Small Limit"); + SCOPED_TRACE("Recursive Remove Tree Big Limit"); create("/T6", zkutil::CreateMode::Persistent); create("/T6/A", zkutil::CreateMode::Persistent); create("/T6/B", zkutil::CreateMode::Persistent); create("/T6/A/C", zkutil::CreateMode::Persistent); - auto responses = remove("/T6", 0, 2); + auto responses = remove_recursive("/T6", 4); ASSERT_EQ(responses.size(), 1); - ASSERT_EQ(responses[0].response->error, Coordination::Error::ZNOTEMPTY); - ASSERT_TRUE(exists("/T6")); - ASSERT_TRUE(exists("/T6/A")); - ASSERT_TRUE(exists("/T6/B")); - ASSERT_TRUE(exists("/T6/A/C")); + ASSERT_EQ(responses[0].response->error, Coordination::Error::ZOK); + ASSERT_FALSE(exists("/T6")); + ASSERT_FALSE(exists("/T6/A")); + ASSERT_FALSE(exists("/T6/B")); + ASSERT_FALSE(exists("/T6/A/C")); } { @@ -3519,7 +3524,7 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveRequest) create("/T7", zkutil::CreateMode::Ephemeral); ASSERT_EQ(storage.ephemerals.size(), 1); - auto responses = remove("/T7", 0, 100); + auto responses = remove_recursive("/T7", 100); ASSERT_EQ(responses.size(), 1); ASSERT_EQ(responses[0].response->error, Coordination::Error::ZOK); ASSERT_EQ(storage.ephemerals.size(), 0); @@ -3534,7 +3539,7 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveRequest) create("/T8/A/C", zkutil::CreateMode::Ephemeral); ASSERT_EQ(storage.ephemerals.size(), 1); - auto responses = remove("/T8", 0, 4); + auto responses = remove_recursive("/T8", 4); ASSERT_EQ(responses.size(), 1); ASSERT_EQ(responses[0].response->error, Coordination::Error::ZOK); ASSERT_EQ(storage.ephemerals.size(), 0); @@ -3545,6 +3550,85 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveRequest) } } +TYPED_TEST(CoordinationTest, TestRemoveRecursiveInMultiRequest) +{ + using namespace DB; + using namespace Coordination; + + using Storage = typename TestFixture::Storage; + + ChangelogDirTest rocks("./rocksdb"); + this->setRocksDBDirectory("./rocksdb"); + + Storage storage{500, "", this->keeper_context}; + int zxid = 0; + + Coordination::Requests ops; + ops.push_back(zkutil::makeCreateRequest("/A", "A", zkutil::CreateMode::Persistent)); + ops.push_back(zkutil::makeCreateRequest("/A/B", "B", zkutil::CreateMode::Persistent)); + ops.push_back(zkutil::makeCreateRequest("/A/C", "C", zkutil::CreateMode::Ephemeral)); + ops.push_back(zkutil::makeCreateRequest("/A/B/D", "D", zkutil::CreateMode::Ephemeral)); + + const auto exists = [&](const String & path) + { + int new_zxid = ++zxid; + + const auto exists_request = std::make_shared(); + exists_request->path = path; + + storage.preprocessRequest(exists_request, 1, 0, new_zxid); + auto responses = storage.processRequest(exists_request, 1, new_zxid); + + EXPECT_EQ(responses.size(), 1); + return responses[0].response->error == Coordination::Error::ZOK; + }; + + const auto is_multi_ok = [&](Coordination::ZooKeeperResponsePtr response) + { + const auto & multi_response = dynamic_cast(*response); + + for (const auto & op_response : multi_response.responses) + if (op_response->error != Coordination::Error::ZOK) + return false; + + return true; + }; + + { + SCOPED_TRACE("Remove In Multi Tx"); + int new_zxid = ++zxid; + + ops.push_back(zkutil::makeRemoveRequest("/A", -1)); + const auto request = std::make_shared(ops, ACLs{}); + + storage.preprocessRequest(request, 1, 0, new_zxid); + auto responses = storage.processRequest(request, 1, new_zxid); + ops.pop_back(); + + ASSERT_EQ(responses.size(), 1); + ASSERT_FALSE(is_multi_ok(responses[0].response)); + } + + { + SCOPED_TRACE("Recursive Remove In Multi Tx"); + int new_zxid = ++zxid; + + ops.push_back(zkutil::makeRemoveRecursiveRequest("/A", 4)); + const auto request = std::make_shared(ops, ACLs{}); + + storage.preprocessRequest(request, 1, 0, new_zxid); + auto responses = storage.processRequest(request, 1, new_zxid); + ops.pop_back(); + + ASSERT_EQ(responses.size(), 1); + ASSERT_TRUE(is_multi_ok(responses[0].response)); + ASSERT_FALSE(exists("/A")); + ASSERT_FALSE(exists("/A/C")); + ASSERT_FALSE(exists("/A/B")); + ASSERT_FALSE(exists("/A/B/D")); + } +} + /// INSTANTIATE_TEST_SUITE_P(CoordinationTestSuite, /// CoordinationTest, /// ::testing::ValuesIn(std::initializer_list{CompressionParam{true, ".zstd"}, CompressionParam{false, ""}})); From 231a7c97ccf61b4a3511d0def3abd49469d31f3f Mon Sep 17 00:00:00 2001 From: Mikhail Artemenko Date: Fri, 6 Sep 2024 08:34:59 +0000 Subject: [PATCH 09/23] move code --- src/Coordination/KeeperStorage.cpp | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index 9378b30ea52..bb6dd04eeec 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -1654,6 +1654,13 @@ private: uint32_t nodes_observed = 1; /// root node std::unordered_map> by_level_deltas; + struct Step + { + String path; + std::variant node; + uint32_t level; + }; + public: ToDeleteTreeCollector(Storage & storage_, int64_t zxid_, uint32_t limit_) : storage(storage_) @@ -1665,11 +1672,12 @@ private: bool collect(StringRef root_path, const SNode & root_node) { std::deque steps; - steps.push_back(Step{root_path.toString(), &root_node, 0}); - if (observeNode(root_node)) + if (checkLimits(root_node)) return true; + steps.push_back(Step{root_path.toString(), &root_node, 0}); + while (!steps.empty()) { Step step = std::move(steps.front()); @@ -1708,13 +1716,6 @@ private: } private: - struct Step - { - String path; - std::variant node; - uint32_t level; - }; - bool visitRocksDBNode(std::deque & steps, StringRef root_path, uint32_t level) { if constexpr (Storage::use_rocksdb) @@ -1724,7 +1725,7 @@ private: for (auto && [child_name, node] : children) { - if (observeNode(node)) + if (checkLimits(node)) return true; auto child_path = (root_fs_path / child_name).generic_string(); @@ -1754,7 +1755,7 @@ private: auto child_it = storage.container.find(child_path); chassert(child_it != storage.container.end()); - if (observeNode(child_it->value)) + if (checkLimits(child_it->value)) return true; steps.push_back(Step{std::move(child_path), child_it->value, level + 1}); @@ -1777,7 +1778,7 @@ private: const String & path = it->first; const SNode & node = *it->second.node; - if (observeNode(node)) + if (checkLimits(node)) return true; steps.push_back(Step{path, &node, level + 1}); @@ -1806,7 +1807,7 @@ private: by_level_deltas[level].emplace_back(root_path.toString(), zxid, typename Storage::RemoveNodeDelta{root_node.version, root_node.ephemeralOwner()}); } - bool observeNode(const SNode & node) + bool checkLimits(const SNode & node) { nodes_observed += node.numChildren(); return nodes_observed > limit; From 2bbe93353187cd7aba95e6cb6d487fa553309b28 Mon Sep 17 00:00:00 2001 From: Mikhail Artemenko Date: Fri, 6 Sep 2024 09:33:47 +0000 Subject: [PATCH 10/23] fix watches logic --- src/Coordination/KeeperStorage.cpp | 31 ++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index bb6dd04eeec..24d95a89a4d 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -1129,6 +1129,12 @@ struct KeeperStorageRequestProcessor return {}; } + virtual KeeperStorageBase::ResponsesForSessions + processWatches(const Storage & /*storage*/, int64_t /*zxid*/, KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & list_watches) const + { + return processWatches(watches, list_watches); + } + virtual bool checkAuth(Storage & /*storage*/, int64_t /*session_id*/, bool /*is_local*/) const { return true; } virtual ~KeeperStorageRequestProcessor() = default; @@ -1635,10 +1641,23 @@ struct KeeperStorageRemoveRecursiveRequestProcessor final : public KeeperStorage } KeeperStorageBase::ResponsesForSessions - processWatches(KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & list_watches) const override + processWatches(const Storage & storage, int64_t zxid, KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & list_watches) const override { - /// TODO(michicosun) rewrite - return processWatchesImpl(this->zk_request->getPath(), watches, list_watches, Coordination::Event::DELETED); + /// need to iterate over zxid deltas and update watches for deleted tree. + const auto & deltas = storage.uncommitted_state.deltas; + + KeeperStorageBase::ResponsesForSessions responses; + for (auto it = deltas.rbegin(); it != deltas.rend() && it->zxid == zxid; ++it) + { + const auto * remove_delta = std::get_if(&it->operation); + if (remove_delta) + { + auto new_responses = processWatchesImpl(it->path, watches, list_watches, Coordination::Event::DELETED); + responses.insert(responses.end(), std::make_move_iterator(new_responses.begin()), std::make_move_iterator(new_responses.end())); + } + } + + return responses; } private: @@ -2511,12 +2530,12 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro } KeeperStorageBase::ResponsesForSessions - processWatches(typename Storage::Watches & watches, typename Storage::Watches & list_watches) const override + processWatches(const Storage & storage, int64_t zxid, typename Storage::Watches & watches, typename Storage::Watches & list_watches) const override { typename Storage::ResponsesForSessions result; for (const auto & generic_request : concrete_requests) { - auto responses = generic_request->processWatches(watches, list_watches); + auto responses = generic_request->processWatches(storage, zxid, watches, list_watches); result.insert(result.end(), responses.begin(), responses.end()); } return result; @@ -2980,7 +2999,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::process /// If this requests processed successfully we need to check watches if (response->error == Coordination::Error::ZOK) { - auto watch_responses = request_processor->processWatches(watches, list_watches); + auto watch_responses = request_processor->processWatches(*this, zxid, watches, list_watches); results.insert(results.end(), watch_responses.begin(), watch_responses.end()); } From cf0e0b766da015d7b13f2c43b669c135daf43c14 Mon Sep 17 00:00:00 2001 From: Mikhail Artemenko Date: Fri, 6 Sep 2024 09:34:26 +0000 Subject: [PATCH 11/23] add test for watches --- src/Coordination/tests/gtest_coordination.cpp | 96 +++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index 27061b15b90..037a102082c 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -3629,6 +3629,102 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveInMultiRequest) } } +TYPED_TEST(CoordinationTest, TestRemoveRecursiveWatches) +{ + using namespace DB; + using namespace Coordination; + + using Storage = typename TestFixture::Storage; + + ChangelogDirTest rocks("./rocksdb"); + this->setRocksDBDirectory("./rocksdb"); + + Storage storage{500, "", this->keeper_context}; + int zxid = 0; + + const auto create = [&](const String & path, int create_mode) + { + int new_zxid = ++zxid; + + const auto create_request = std::make_shared(); + create_request->path = path; + create_request->is_ephemeral = create_mode == zkutil::CreateMode::Ephemeral || create_mode == zkutil::CreateMode::EphemeralSequential; + create_request->is_sequential = create_mode == zkutil::CreateMode::PersistentSequential || create_mode == zkutil::CreateMode::EphemeralSequential; + + storage.preprocessRequest(create_request, 1, 0, new_zxid); + auto responses = storage.processRequest(create_request, 1, new_zxid); + + EXPECT_EQ(responses.size(), 1); + EXPECT_EQ(responses[0].response->error, Coordination::Error::ZOK) << "Failed to create " << path; + }; + + const auto add_watch = [&](const String & path) + { + int new_zxid = ++zxid; + + const auto exists_request = std::make_shared(); + exists_request->path = path; + exists_request->has_watch = true; + + storage.preprocessRequest(exists_request, 1, 0, new_zxid); + auto responses = storage.processRequest(exists_request, 1, new_zxid); + + EXPECT_EQ(responses.size(), 1); + EXPECT_EQ(responses[0].response->error, Coordination::Error::ZOK); + }; + + const auto add_list_watch = [&](const String & path) + { + int new_zxid = ++zxid; + + const auto list_request = std::make_shared(); + list_request->path = path; + list_request->has_watch = true; + + storage.preprocessRequest(list_request, 1, 0, new_zxid); + auto responses = storage.processRequest(list_request, 1, new_zxid); + + EXPECT_EQ(responses.size(), 1); + EXPECT_EQ(responses[0].response->error, Coordination::Error::ZOK); + }; + + create("/A", zkutil::CreateMode::Persistent); + create("/A/B", zkutil::CreateMode::Persistent); + create("/A/C", zkutil::CreateMode::Ephemeral); + create("/A/B/D", zkutil::CreateMode::Ephemeral); + + add_watch("/A"); + add_watch("/A/B"); + add_watch("/A/C"); + add_watch("/A/B/D"); + add_list_watch("/A"); + add_list_watch("/A/B"); + ASSERT_EQ(storage.watches.size(), 4); + ASSERT_EQ(storage.list_watches.size(), 2); + + int new_zxid = ++zxid; + + auto remove_request = std::make_shared(); + remove_request->path = "/A"; + remove_request->remove_nodes_limit = 4; + + storage.preprocessRequest(remove_request, 1, 0, new_zxid); + auto responses = storage.processRequest(remove_request, 1, new_zxid); + + ASSERT_EQ(responses.size(), 7); + + for (size_t i = 0; i < 7; ++i) + { + ASSERT_EQ(responses[i].response->error, Coordination::Error::ZOK); + + if (const auto * watch_response = dynamic_cast(responses[i].response.get())) + ASSERT_EQ(watch_response->type, Coordination::Event::DELETED); + } + + ASSERT_EQ(storage.watches.size(), 0); + ASSERT_EQ(storage.list_watches.size(), 0); +} + /// INSTANTIATE_TEST_SUITE_P(CoordinationTestSuite, /// CoordinationTest, /// ::testing::ValuesIn(std::initializer_list{CompressionParam{true, ".zstd"}, CompressionParam{false, ""}})); From 068ada57baace782d98923de2174d50aebc16bf0 Mon Sep 17 00:00:00 2001 From: Mikhail Artemenko Date: Fri, 6 Sep 2024 12:23:57 +0000 Subject: [PATCH 12/23] update api --- src/Common/ZooKeeper/ZooKeeper.cpp | 33 ++++++++++++++++++++---------- src/Common/ZooKeeper/ZooKeeper.h | 13 ++++++------ 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/src/Common/ZooKeeper/ZooKeeper.cpp b/src/Common/ZooKeeper/ZooKeeper.cpp index 65c3fdba8d2..ae60520affb 100644 --- a/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/src/Common/ZooKeeper/ZooKeeper.cpp @@ -979,8 +979,26 @@ bool ZooKeeper::tryRemoveChildrenRecursive(const std::string & path, bool probab return removed_as_expected; } -void ZooKeeper::removeRecursive(const std::string & path) // TODO(michicosun) rewrite +void ZooKeeper::removeRecursive(const std::string & path, uint32_t remove_nodes_limit) { + if (!isFeatureEnabled(DB::KeeperFeatureFlag::REMOVE_RECURSIVE)) + { + removeChildrenRecursive(path); + remove(path); + return; + } + + check(tryRemoveRecursive(path, remove_nodes_limit), path); +} + +Coordination::Error ZooKeeper::tryRemoveRecursive(const std::string & path, uint32_t remove_nodes_limit) +{ + if (!isFeatureEnabled(DB::KeeperFeatureFlag::REMOVE_RECURSIVE)) + { + tryRemoveChildrenRecursive(path); + return tryRemove(path); + } + auto promise = std::make_shared>(); auto future = promise->get_future(); @@ -989,27 +1007,20 @@ void ZooKeeper::removeRecursive(const std::string & path) // TODO(michicosun) re promise->set_value(response); }; - impl->removeRecursive(path, 100, std::move(callback)); + impl->removeRecursive(path, remove_nodes_limit, std::move(callback)); if (future.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready) { impl->finalize(fmt::format("Operation timeout on {} {}", Coordination::OpNum::RemoveRecursive, path)); - check(Coordination::Error::ZOPERATIONTIMEOUT, path); + return Coordination::Error::ZOPERATIONTIMEOUT; } else { auto response = future.get(); - check(response.error, path); + return response.error; } } -Coordination::Error ZooKeeper::tryRemoveRecursive(const std::string & path) -{ - tryRemoveChildrenRecursive(path); - return tryRemove(path); -} - - namespace { struct WaitForDisappearState diff --git a/src/Common/ZooKeeper/ZooKeeper.h b/src/Common/ZooKeeper/ZooKeeper.h index 3b94e6004cb..29c4fbc9b74 100644 --- a/src/Common/ZooKeeper/ZooKeeper.h +++ b/src/Common/ZooKeeper/ZooKeeper.h @@ -479,15 +479,16 @@ public: Int64 getClientID(); - /// Remove the node with the subtree. If someone concurrently adds or removes a node - /// in the subtree, the result is undefined. - void removeRecursive(const std::string & path); + /// Remove the node with the subtree. + /// If Keeper supports RemoveRecursive operation then it will be performed atomically. + /// Otherwise if someone concurrently adds or removes a node in the subtree, the result is undefined. + void removeRecursive(const std::string & path, uint32_t remove_nodes_limit = 100); - /// Remove the node with the subtree. If someone concurrently removes a node in the subtree, - /// this will not cause errors. + /// Same as removeRecursive but in case if Keeper does not supports RemoveRecursive and + /// if someone concurrently removes a node in the subtree, this will not cause errors. /// For instance, you can call this method twice concurrently for the same node and the end /// result would be the same as for the single call. - Coordination::Error tryRemoveRecursive(const std::string & path); + Coordination::Error tryRemoveRecursive(const std::string & path, uint32_t remove_nodes_limit = 100); /// Similar to removeRecursive(...) and tryRemoveRecursive(...), but does not remove path itself. /// Node defined as RemoveException will not be deleted. From 81972b97e7bb555b4dc4672fdcb765dfb4b5bd31 Mon Sep 17 00:00:00 2001 From: Mikhail Artemenko Date: Fri, 6 Sep 2024 13:23:57 +0000 Subject: [PATCH 13/23] support test keeper --- src/Common/ZooKeeper/TestKeeper.cpp | 100 ++++++++++++++++++++++++++-- 1 file changed, 96 insertions(+), 4 deletions(-) diff --git a/src/Common/ZooKeeper/TestKeeper.cpp b/src/Common/ZooKeeper/TestKeeper.cpp index 1ac085d6050..ae8d95b806a 100644 --- a/src/Common/ZooKeeper/TestKeeper.cpp +++ b/src/Common/ZooKeeper/TestKeeper.cpp @@ -90,6 +90,36 @@ struct TestKeeperRemoveRequest final : RemoveRequest, TestKeeperRequest } }; +struct TestKeeperRemoveRecursiveRequest final : RemoveRecursiveRequest, TestKeeperRequest +{ + TestKeeperRemoveRecursiveRequest() = default; + explicit TestKeeperRemoveRecursiveRequest(const RemoveRecursiveRequest & base) : RemoveRecursiveRequest(base) {} + ResponsePtr createResponse() const override; + std::pair process(TestKeeper::Container & container, int64_t zxid) const override; + + void processWatches(TestKeeper::Watches & node_watches, TestKeeper::Watches & list_watches) const override + { + std::vector> deleted; + + auto add_deleted_watches = [&](TestKeeper::Watches & w) + { + for (const auto & [watch_path, _] : w) + if (watch_path.starts_with(path)) + deleted.emplace_back(watch_path, std::count(watch_path.begin(), watch_path.end(), '/')); + }; + + add_deleted_watches(node_watches); + add_deleted_watches(list_watches); + std::sort(deleted.begin(), deleted.end(), [](const auto & lhs, const auto & rhs) + { + return lhs.second < rhs.second; + }); + + for (const auto & [watch_path, _] : deleted) + processWatchesImpl(watch_path, node_watches, list_watches); + } +}; + struct TestKeeperExistsRequest final : ExistsRequest, TestKeeperRequest { ResponsePtr createResponse() const override; @@ -175,6 +205,10 @@ struct TestKeeperMultiRequest final : MultiRequest, TestKeeperRequest { requests.push_back(std::make_shared(*concrete_request_remove)); } + else if (const auto * concrete_request_remove_recursive = dynamic_cast(generic_request.get())) + { + requests.push_back(std::make_shared(*concrete_request_remove_recursive)); + } else if (const auto * concrete_request_set = dynamic_cast(generic_request.get())) { requests.push_back(std::make_shared(*concrete_request_set)); @@ -313,6 +347,56 @@ std::pair TestKeeperRemoveRequest::process(TestKeeper::Contai return { std::make_shared(response), undo }; } +std::pair TestKeeperRemoveRecursiveRequest::process(TestKeeper::Container & container, int64_t zxid) const +{ + RemoveRecursiveResponse response; + response.zxid = zxid; + Undo undo; + + auto root_it = container.find(path); + if (root_it == container.end()) + { + response.error = Error::ZNONODE; + return { std::make_shared(response), undo }; + } + + std::vector> children; + + for (const auto & [child_path, child_node] : container) + if (child_path.starts_with(path)) + children.emplace_back(child_path, child_node); + + if (children.size() > remove_nodes_limit) + { + response.error = Error::ZNOTEMPTY; + return { std::make_shared(response), undo }; + } + + auto & parent = container.at(parentPath(path)); + --parent.stat.numChildren; + ++parent.stat.cversion; + + for (const auto & [child_path, child_node] : children) + { + auto child_it = container.find(child_path); + chassert(child_it != container.end()); + container.erase(child_it); + } + + response.error = Error::ZOK; + undo = [&container, dead = std::move(children), root_path = path]() + { + for (auto && [child_path, child_node] : dead) + container.emplace(child_path, child_node); + + auto & undo_parent = container.at(parentPath(root_path)); + ++undo_parent.stat.numChildren; + --undo_parent.stat.cversion; + }; + + return { std::make_shared(response), undo }; +} + std::pair TestKeeperExistsRequest::process(TestKeeper::Container & container, int64_t zxid) const { ExistsResponse response; @@ -530,6 +614,7 @@ std::pair TestKeeperMultiRequest::process(TestKeeper::Contain ResponsePtr TestKeeperCreateRequest::createResponse() const { return std::make_shared(); } ResponsePtr TestKeeperRemoveRequest::createResponse() const { return std::make_shared(); } +ResponsePtr TestKeeperRemoveRecursiveRequest::createResponse() const { return std::make_shared(); } ResponsePtr TestKeeperExistsRequest::createResponse() const { return std::make_shared(); } ResponsePtr TestKeeperGetRequest::createResponse() const { return std::make_shared(); } ResponsePtr TestKeeperSetRequest::createResponse() const { return std::make_shared(); } @@ -772,11 +857,18 @@ void TestKeeper::remove( } void TestKeeper::removeRecursive( - [[maybe_unused]] const String & path, - [[maybe_unused]] uint32_t remove_nodes_limit, - [[maybe_unused]] RemoveRecursiveCallback callback) + const String & path, + uint32_t remove_nodes_limit, + RemoveRecursiveCallback callback) { - /// TODO(michicosun) implement + TestKeeperRemoveRecursiveRequest request; + request.path = path; + request.remove_nodes_limit = remove_nodes_limit; + + RequestInfo request_info; + request_info.request = std::make_shared(std::move(request)); + request_info.callback = [callback](const Response & response) { callback(dynamic_cast(response)); }; + pushRequest(std::move(request_info)); } void TestKeeper::exists( From 6309f36232a28e3455ac8d057513983caa499a61 Mon Sep 17 00:00:00 2001 From: Mikhail Artemenko Date: Mon, 9 Sep 2024 09:44:19 +0000 Subject: [PATCH 14/23] add limit option to rmr command --- programs/keeper-client/Commands.cpp | 11 ++++++++++- programs/keeper-client/Commands.h | 2 +- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/programs/keeper-client/Commands.cpp b/programs/keeper-client/Commands.cpp index 4ad2eb31e6d..b4a5329b01f 100644 --- a/programs/keeper-client/Commands.cpp +++ b/programs/keeper-client/Commands.cpp @@ -506,14 +506,23 @@ bool RMRCommand::parse(IParser::Pos & pos, std::shared_ptr & nod return false; node->args.push_back(std::move(path)); + ASTPtr remove_nodes_limit; + if (ParserUnsignedInteger{}.parse(pos, remove_nodes_limit, expected)) + node->args.push_back(remove_nodes_limit->as().value); + else + node->args.push_back(UInt64(100)); + return true; } void RMRCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) const { String path = client->getAbsolutePath(query->args[0].safeGet()); + UInt64 remove_nodes_limit = query->args[1].safeGet(); + client->askConfirmation( - "You are going to recursively delete path " + path, [client, path] { client->zookeeper->removeRecursive(path); }); + "You are going to recursively delete path " + path, + [client, path, remove_nodes_limit] { client->zookeeper->removeRecursive(path, static_cast(remove_nodes_limit)); }); } bool ReconfigCommand::parse(IParser::Pos & pos, std::shared_ptr & node, DB::Expected & expected) const diff --git a/programs/keeper-client/Commands.h b/programs/keeper-client/Commands.h index 686a752b6b6..da577ce1e65 100644 --- a/programs/keeper-client/Commands.h +++ b/programs/keeper-client/Commands.h @@ -184,7 +184,7 @@ class RMRCommand : public IKeeperClientCommand void execute(const ASTKeeperQuery * query, KeeperClient * client) const override; - String getHelpMessage() const override { return "{} -- Recursively deletes path. Confirmation required"; } + String getHelpMessage() const override { return "{} [limit] -- Recursively deletes path if the subtree size is smaller than the limit. Confirmation required (default limit = 100)"; } }; class ReconfigCommand : public IKeeperClientCommand From 08368d1d65a8fc5f6caff986d17c4de438d18114 Mon Sep 17 00:00:00 2001 From: Mikhail Artemenko Date: Mon, 9 Sep 2024 09:46:17 +0000 Subject: [PATCH 15/23] update keeper client docs --- docs/en/operations/utilities/clickhouse-keeper-client.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/operations/utilities/clickhouse-keeper-client.md b/docs/en/operations/utilities/clickhouse-keeper-client.md index a66ecbc1372..2cd7be99dfb 100644 --- a/docs/en/operations/utilities/clickhouse-keeper-client.md +++ b/docs/en/operations/utilities/clickhouse-keeper-client.md @@ -53,7 +53,7 @@ keeper foo bar - `touch ''` -- Creates new node with an empty string as value. Doesn't throw an exception if the node already exists - `get ''` -- Returns the node's value - `rm '' [version]` -- Removes the node only if version matches (default: -1) -- `rmr ''` -- Recursively deletes path. Confirmation required +- `rmr '' [limit]` -- Recursively deletes path if the subtree size is smaller than the limit. Confirmation required (default limit = 100) - `flwc ` -- Executes four-letter-word command - `help` -- Prints this message - `get_direct_children_number '[path]'` -- Get numbers of direct children nodes under a specific path From 0453d5e91aadc73ad93b813677fe8e0d70c7fd13 Mon Sep 17 00:00:00 2001 From: Mikhail Artemenko Date: Tue, 10 Sep 2024 12:26:03 +0000 Subject: [PATCH 16/23] small fixes --- src/Common/ZooKeeper/TestKeeper.cpp | 6 +++++- src/Coordination/KeeperStorage.cpp | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/Common/ZooKeeper/TestKeeper.cpp b/src/Common/ZooKeeper/TestKeeper.cpp index ae8d95b806a..3982971163c 100644 --- a/src/Common/ZooKeeper/TestKeeper.cpp +++ b/src/Common/ZooKeeper/TestKeeper.cpp @@ -362,9 +362,13 @@ std::pair TestKeeperRemoveRecursiveRequest::process(TestKeepe std::vector> children; - for (const auto & [child_path, child_node] : container) + for (auto it = std::next(root_it); it != container.end(); ++it) + { + const auto & [child_path, child_node] = *it; + if (child_path.starts_with(path)) children.emplace_back(child_path, child_node); + } if (children.size() > remove_nodes_limit) { diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index 24d95a89a4d..1bd6c27fe63 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -1766,7 +1766,7 @@ private: return false; - auto children = node_it->value.getChildren(); + const auto & children = node_it->value.getChildren(); for (auto && child_name : children) { auto child_path = (root_fs_path / child_name.toView()).generic_string(); From 0bd7ef39c41502a2d0cea7cb34fd12e9b9e8de30 Mon Sep 17 00:00:00 2001 From: Mikhail Artemenko Date: Tue, 10 Sep 2024 12:27:34 +0000 Subject: [PATCH 17/23] unify processWatches --- src/Coordination/KeeperStorage.cpp | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index 1bd6c27fe63..503eddac16e 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -1124,17 +1124,11 @@ struct KeeperStorageRequestProcessor } virtual KeeperStorageBase::ResponsesForSessions - processWatches(KeeperStorageBase::Watches & /*watches*/, KeeperStorageBase::Watches & /*list_watches*/) const + processWatches(const Storage & /*storage*/, int64_t /*zxid*/, KeeperStorageBase::Watches & /*watches*/, KeeperStorageBase::Watches & /*list_watches*/) const { return {}; } - virtual KeeperStorageBase::ResponsesForSessions - processWatches(const Storage & /*storage*/, int64_t /*zxid*/, KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & list_watches) const - { - return processWatches(watches, list_watches); - } - virtual bool checkAuth(Storage & /*storage*/, int64_t /*session_id*/, bool /*is_local*/) const { return true; } virtual ~KeeperStorageRequestProcessor() = default; @@ -1247,7 +1241,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; KeeperStorageBase::ResponsesForSessions - processWatches(KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & list_watches) const override + processWatches(const Storage & /*storage*/, int64_t /*zxid*/, KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & list_watches) const override { return processWatchesImpl(this->zk_request->getPath(), watches, list_watches, Coordination::Event::CREATED); } @@ -1564,7 +1558,7 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr } KeeperStorageBase::ResponsesForSessions - processWatches(KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & list_watches) const override + processWatches(const Storage & /*storage*/, int64_t /*zxid*/, KeeperStorageBase::Watches & watches, KeeperStorageBase::Watches & list_watches) const override { return processWatchesImpl(this->zk_request->getPath(), watches, list_watches, Coordination::Event::DELETED); } @@ -1985,7 +1979,7 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce } KeeperStorageBase::ResponsesForSessions - processWatches(typename Storage::Watches & watches, typename Storage::Watches & list_watches) const override + processWatches(const Storage & /*storage*/, int64_t /*zxid*/, typename Storage::Watches & watches, typename Storage::Watches & list_watches) const override { return processWatchesImpl(this->zk_request->getPath(), watches, list_watches, Coordination::Event::CHANGED); } From 1aacb48bcf7abddaaac9bdb2c723189b2329167b Mon Sep 17 00:00:00 2001 From: Mikhail Artemenko Date: Tue, 10 Sep 2024 14:34:27 +0000 Subject: [PATCH 18/23] fix collector --- src/Coordination/KeeperStorage.cpp | 73 ++++++++++++------ src/Coordination/KeeperStorage.h | 1 + src/Coordination/tests/gtest_coordination.cpp | 77 +++++++++++++++++-- 3 files changed, 123 insertions(+), 28 deletions(-) diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index 503eddac16e..87d29e0ff65 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -832,6 +832,15 @@ std::shared_ptr KeeperStorage::UncommittedS return tryGetNodeFromStorage(path); } +template +const typename Container::Node * KeeperStorage::UncommittedState::getActualNodeView(StringRef path, const Node & storage_node) const +{ + if (auto node_it = nodes.find(path.toView()); node_it != nodes.end()) + return node_it->second.node.get(); + + return &storage_node; +} + template Coordination::ACLs KeeperStorage::UncommittedState::getACLs(StringRef path) const { @@ -1686,7 +1695,7 @@ private: { std::deque steps; - if (checkLimits(root_node)) + if (checkLimits(&root_node)) return true; steps.push_back(Step{root_path.toString(), &root_node, 0}); @@ -1698,17 +1707,20 @@ private: StringRef path = step.path; uint32_t level = step.level; - const SNode * node = nullptr; + const SNode * node_ptr = nullptr; if (auto * rdb = std::get_if(&step.node)) - node = rdb; + node_ptr = rdb; else - node = std::get(step.node); + node_ptr = std::get(step.node); chassert(!path.empty()); - chassert(node != nullptr); + chassert(node_ptr != nullptr); - if (visitRocksDBNode(steps, path, level) || visitMemNode(steps, path, level) || visitRootAndUncommitted(steps, path, *node, level)) + const auto & node = *node_ptr; + chassert(storage.uncommitted_state.getActualNodeView(path, node) != nullptr); /// explicitly check that node is not deleted + + if (visitRocksDBNode(steps, path, level) || visitMemNode(steps, path, level) || visitRootAndUncommitted(steps, path, node, level)) return true; } @@ -1736,13 +1748,18 @@ private: std::filesystem::path root_fs_path(root_path.toString()); auto children = storage.container.getChildren(root_path.toString()); - for (auto && [child_name, node] : children) + for (auto && [child_name, child_node] : children) { - if (checkLimits(node)) + auto child_path = (root_fs_path / child_name).generic_string(); + const auto actual_child_node_ptr = storage.uncommitted_state.getActualNodeView(child_path, child_node); + + if (actual_child_node_ptr == nullptr) /// node was deleted in previous step of multi transaction + continue; + + if (checkLimits(actual_child_node_ptr)) return true; - auto child_path = (root_fs_path / child_name).generic_string(); - steps.push_back(Step{std::move(child_path), std::move(node), level + 1}); + steps.push_back(Step{std::move(child_path), std::move(child_node), level + 1}); } } @@ -1753,25 +1770,30 @@ private: { if constexpr (!Storage::use_rocksdb) { - std::filesystem::path root_fs_path(root_path.toString()); - auto node_it = storage.container.find(root_path); if (node_it == storage.container.end()) return false; - + std::filesystem::path root_fs_path(root_path.toString()); const auto & children = node_it->value.getChildren(); - for (auto && child_name : children) + + for (const auto & child_name : children) { auto child_path = (root_fs_path / child_name.toView()).generic_string(); auto child_it = storage.container.find(child_path); chassert(child_it != storage.container.end()); + const auto & child_node = child_it->value; - if (checkLimits(child_it->value)) + const auto actual_child_node_ptr = storage.uncommitted_state.getActualNodeView(child_path, child_node); + + if (actual_child_node_ptr == nullptr) /// node was deleted in previous step of multi transaction + continue; + + if (checkLimits(actual_child_node_ptr)) return true; - steps.push_back(Step{std::move(child_path), child_it->value, level + 1}); + steps.push_back(Step{std::move(child_path), &child_node, level + 1}); } } @@ -1787,14 +1809,18 @@ private: for (; it != nodes.end() && parentNodePath(it->first) == root_path; ++it) { - chassert(it->second.node); - const String & path = it->first; - const SNode & node = *it->second.node; + const auto actual_child_node_ptr = it->second.node.get(); - if (checkLimits(node)) + if (actual_child_node_ptr == nullptr) /// node was deleted in previous step of multi transaction + continue; + + if (checkLimits(actual_child_node_ptr)) return true; - steps.push_back(Step{path, &node, level + 1}); + const String & child_path = it->first; + const SNode & child_node = *it->second.node; + + steps.push_back(Step{child_path, &child_node, level + 1}); } addDelta(root_path, root_node, level); @@ -1820,9 +1846,10 @@ private: by_level_deltas[level].emplace_back(root_path.toString(), zxid, typename Storage::RemoveNodeDelta{root_node.version, root_node.ephemeralOwner()}); } - bool checkLimits(const SNode & node) + bool checkLimits(const SNode * node) { - nodes_observed += node.numChildren(); + chassert(node != nullptr); + nodes_observed += node->numChildren(); return nodes_observed > limit; } }; diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index c2f6e4c5a74..904af76ef37 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -566,6 +566,7 @@ public: void rollback(int64_t rollback_zxid); std::shared_ptr getNode(StringRef path) const; + const Node * getActualNodeView(StringRef path, const Node & storage_node) const; Coordination::ACLs getACLs(StringRef path) const; void applyDelta(const Delta & delta); diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index 037a102082c..deb6aaed8c9 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -3563,11 +3563,15 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveInMultiRequest) Storage storage{500, "", this->keeper_context}; int zxid = 0; - Coordination::Requests ops; - ops.push_back(zkutil::makeCreateRequest("/A", "A", zkutil::CreateMode::Persistent)); - ops.push_back(zkutil::makeCreateRequest("/A/B", "B", zkutil::CreateMode::Persistent)); - ops.push_back(zkutil::makeCreateRequest("/A/C", "C", zkutil::CreateMode::Ephemeral)); - ops.push_back(zkutil::makeCreateRequest("/A/B/D", "D", zkutil::CreateMode::Ephemeral)); + auto prepare_create_tree = []() + { + return Coordination::Requests{ + zkutil::makeCreateRequest("/A", "A", zkutil::CreateMode::Persistent), + zkutil::makeCreateRequest("/A/B", "B", zkutil::CreateMode::Persistent), + zkutil::makeCreateRequest("/A/C", "C", zkutil::CreateMode::Ephemeral), + zkutil::makeCreateRequest("/A/B/D", "D", zkutil::CreateMode::Ephemeral), + }; + }; const auto exists = [&](const String & path) { @@ -3597,6 +3601,7 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveInMultiRequest) { SCOPED_TRACE("Remove In Multi Tx"); int new_zxid = ++zxid; + auto ops = prepare_create_tree(); ops.push_back(zkutil::makeRemoveRequest("/A", -1)); const auto request = std::make_shared(ops, ACLs{}); @@ -3612,6 +3617,7 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveInMultiRequest) { SCOPED_TRACE("Recursive Remove In Multi Tx"); int new_zxid = ++zxid; + auto ops = prepare_create_tree(); ops.push_back(zkutil::makeRemoveRecursiveRequest("/A", 4)); const auto request = std::make_shared(ops, ACLs{}); @@ -3627,6 +3633,67 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveInMultiRequest) ASSERT_FALSE(exists("/A/B")); ASSERT_FALSE(exists("/A/B/D")); } + + { + SCOPED_TRACE("Recursive Remove With Regular In Multi Tx"); + int new_zxid = ++zxid; + auto ops = prepare_create_tree(); + + ops.push_back(zkutil::makeRemoveRequest("/A/C", -1)); + ops.push_back(zkutil::makeRemoveRecursiveRequest("/A", 4)); + const auto request = std::make_shared(ops, ACLs{}); + + storage.preprocessRequest(request, 1, 0, new_zxid); + auto responses = storage.processRequest(request, 1, new_zxid); + ops.pop_back(); + ops.pop_back(); + + ASSERT_EQ(responses.size(), 1); + ASSERT_TRUE(is_multi_ok(responses[0].response)); + ASSERT_FALSE(exists("/A")); + ASSERT_FALSE(exists("/A/C")); + ASSERT_FALSE(exists("/A/B")); + ASSERT_FALSE(exists("/A/B/D")); + } + + { + SCOPED_TRACE("Recursive Remove From Committed and Uncommitted states"); + int create_zxid = ++zxid; + auto ops = prepare_create_tree(); + + /// First create nodes + const auto create_request = std::make_shared(ops, ACLs{}); + storage.preprocessRequest(create_request, 1, 0, create_zxid); + auto create_responses = storage.processRequest(create_request, 1, create_zxid); + ASSERT_EQ(create_responses.size(), 1); + ASSERT_TRUE(is_multi_ok(create_responses[0].response)); + ASSERT_TRUE(exists("/A")); + ASSERT_TRUE(exists("/A/C")); + ASSERT_TRUE(exists("/A/B")); + ASSERT_TRUE(exists("/A/B/D")); + + /// Remove node A/C as a single remove request. + /// Remove all other as remove recursive request. + /// In this case we should list storage to understand the tree topology + /// but ignore already deleted nodes in uncommitted state. + + int remove_zxid = ++zxid; + ops = { + zkutil::makeRemoveRequest("/A/C", -1), + zkutil::makeRemoveRecursiveRequest("/A", 3), + }; + const auto remove_request = std::make_shared(ops, ACLs{}); + + storage.preprocessRequest(remove_request, 1, 0, remove_zxid); + auto remove_responses = storage.processRequest(remove_request, 1, remove_zxid); + + ASSERT_EQ(remove_responses.size(), 1); + ASSERT_TRUE(is_multi_ok(remove_responses[0].response)); + ASSERT_FALSE(exists("/A")); + ASSERT_FALSE(exists("/A/C")); + ASSERT_FALSE(exists("/A/B")); + ASSERT_FALSE(exists("/A/B/D")); + } } TYPED_TEST(CoordinationTest, TestRemoveRecursiveWatches) From 8b7a5616a23d6d7a27961a891483557b754fab22 Mon Sep 17 00:00:00 2001 From: Mikhail Artemenko Date: Tue, 10 Sep 2024 15:51:31 +0000 Subject: [PATCH 19/23] update test --- src/Coordination/tests/gtest_coordination.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index deb6aaed8c9..b908140f1d7 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -3640,7 +3640,7 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveInMultiRequest) auto ops = prepare_create_tree(); ops.push_back(zkutil::makeRemoveRequest("/A/C", -1)); - ops.push_back(zkutil::makeRemoveRecursiveRequest("/A", 4)); + ops.push_back(zkutil::makeRemoveRecursiveRequest("/A", 3)); const auto request = std::make_shared(ops, ACLs{}); storage.preprocessRequest(request, 1, 0, new_zxid); From 3730582d0639bb2e0247b66aa9a17ce9bd6e00d7 Mon Sep 17 00:00:00 2001 From: Mikhail Artemenko Date: Tue, 10 Sep 2024 17:43:35 +0000 Subject: [PATCH 20/23] add full tree acl check --- src/Coordination/KeeperStorage.cpp | 66 ++++++++++++++++++++---------- 1 file changed, 45 insertions(+), 21 deletions(-) diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index 87d29e0ff65..2eb9ab30efa 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -1584,7 +1584,7 @@ struct KeeperStorageRemoveRecursiveRequestProcessor final : public KeeperStorage } std::vector - preprocess(Storage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/, uint64_t & digest, const KeeperContext & keeper_context) const override + preprocess(Storage & storage, int64_t zxid, int64_t session_id, int64_t /*time*/, uint64_t & digest, const KeeperContext & keeper_context) const override { ProfileEvents::increment(ProfileEvents::KeeperRemoveRequest); Coordination::ZooKeeperRemoveRecursiveRequest & request = dynamic_cast(*this->zk_request); @@ -1609,10 +1609,13 @@ struct KeeperStorageRemoveRecursiveRequestProcessor final : public KeeperStorage return {typename Storage::Delta{zxid, Coordination::Error::ZNONODE}}; } - ToDeleteTreeCollector collector(storage, zxid, request.remove_nodes_limit); - bool limit_exceeded = collector.collect(request.path, *node); + ToDeleteTreeCollector collector(storage, zxid, session_id, request.remove_nodes_limit); + auto collect_status = collector.collect(request.path, *node); - if (limit_exceeded) + if (collect_status == ToDeleteTreeCollector::CollectStatus::NoAuth) + return {typename Storage::Delta{zxid, Coordination::Error::ZNOAUTH}}; + + if (collect_status == ToDeleteTreeCollector::CollectStatus::LimitExceeded) return {typename Storage::Delta{zxid, Coordination::Error::ZNOTEMPTY}}; if (request.restored_from_zookeeper_log) @@ -1670,6 +1673,7 @@ private: { Storage & storage; int64_t zxid; + int64_t session_id; uint32_t limit; uint32_t max_level = 0; @@ -1683,20 +1687,30 @@ private: uint32_t level; }; + enum class CollectStatus + { + Ok, + NoAuth, + LimitExceeded, + }; + + friend struct KeeperStorageRemoveRecursiveRequestProcessor; + public: - ToDeleteTreeCollector(Storage & storage_, int64_t zxid_, uint32_t limit_) + ToDeleteTreeCollector(Storage & storage_, int64_t zxid_, int64_t session_id_, uint32_t limit_) : storage(storage_) , zxid(zxid_) + , session_id(session_id_) , limit(limit_) { } - bool collect(StringRef root_path, const SNode & root_node) + CollectStatus collect(StringRef root_path, const SNode & root_node) { std::deque steps; if (checkLimits(&root_node)) - return true; + return CollectStatus::LimitExceeded; steps.push_back(Step{root_path.toString(), &root_node, 0}); @@ -1718,13 +1732,23 @@ private: chassert(node_ptr != nullptr); const auto & node = *node_ptr; - chassert(storage.uncommitted_state.getActualNodeView(path, node) != nullptr); /// explicitly check that node is not deleted + auto actual_node_ptr = storage.uncommitted_state.getActualNodeView(path, node); + chassert(actual_node_ptr != nullptr); /// explicitly check that node is not deleted - if (visitRocksDBNode(steps, path, level) || visitMemNode(steps, path, level) || visitRootAndUncommitted(steps, path, node, level)) - return true; + if (actual_node_ptr->numChildren() > 0 && !storage.checkACL(path, Coordination::ACL::Delete, session_id, /*is_local=*/false)) + return CollectStatus::NoAuth; + + if (auto status = visitRocksDBNode(steps, path, level); status != CollectStatus::Ok) + return status; + + if (auto status = visitMemNode(steps, path, level); status != CollectStatus::Ok) + return status; + + if (auto status = visitRootAndUncommitted(steps, path, node, level); status != CollectStatus::Ok) + return status; } - return false; + return CollectStatus::Ok; } std::vector extractDeltas() @@ -1741,7 +1765,7 @@ private: } private: - bool visitRocksDBNode(std::deque & steps, StringRef root_path, uint32_t level) + CollectStatus visitRocksDBNode(std::deque & steps, StringRef root_path, uint32_t level) { if constexpr (Storage::use_rocksdb) { @@ -1757,22 +1781,22 @@ private: continue; if (checkLimits(actual_child_node_ptr)) - return true; + return CollectStatus::LimitExceeded; steps.push_back(Step{std::move(child_path), std::move(child_node), level + 1}); } } - return false; + return CollectStatus::Ok; } - bool visitMemNode(std::deque & steps, StringRef root_path, uint32_t level) + CollectStatus visitMemNode(std::deque & steps, StringRef root_path, uint32_t level) { if constexpr (!Storage::use_rocksdb) { auto node_it = storage.container.find(root_path); if (node_it == storage.container.end()) - return false; + return CollectStatus::Ok; std::filesystem::path root_fs_path(root_path.toString()); const auto & children = node_it->value.getChildren(); @@ -1791,16 +1815,16 @@ private: continue; if (checkLimits(actual_child_node_ptr)) - return true; + return CollectStatus::LimitExceeded; steps.push_back(Step{std::move(child_path), &child_node, level + 1}); } } - return false; + return CollectStatus::Ok; } - bool visitRootAndUncommitted(std::deque & steps, StringRef root_path, const SNode & root_node, uint32_t level) + CollectStatus visitRootAndUncommitted(std::deque & steps, StringRef root_path, const SNode & root_node, uint32_t level) { const auto & nodes = storage.uncommitted_state.nodes; @@ -1815,7 +1839,7 @@ private: continue; if (checkLimits(actual_child_node_ptr)) - return true; + return CollectStatus::LimitExceeded; const String & child_path = it->first; const SNode & child_node = *it->second.node; @@ -1825,7 +1849,7 @@ private: addDelta(root_path, root_node, level); - return false; + return CollectStatus::Ok; } void addDelta(StringRef root_path, const SNode & root_node, uint32_t level) From 00fe1d12eee2e7a7b328d582861ec37cd5606190 Mon Sep 17 00:00:00 2001 From: Mikhail Artemenko Date: Tue, 10 Sep 2024 17:45:39 +0000 Subject: [PATCH 21/23] add acl test for rmr --- src/Coordination/tests/gtest_coordination.cpp | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index b908140f1d7..db113f6dfbe 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -3792,6 +3792,70 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveWatches) ASSERT_EQ(storage.list_watches.size(), 0); } +TYPED_TEST(CoordinationTest, TestRemoveRecursiveAcls) +{ + using namespace DB; + using namespace Coordination; + + using Storage = typename TestFixture::Storage; + + ChangelogDirTest rocks("./rocksdb"); + this->setRocksDBDirectory("./rocksdb"); + + Storage storage{500, "", this->keeper_context}; + int zxid = 0; + + { + int new_zxid = ++zxid; + String user_auth_data = "test_user:test_password"; + + const auto auth_request = std::make_shared(); + auth_request->scheme = "digest"; + auth_request->data = user_auth_data; + + storage.preprocessRequest(auth_request, 1, 0, new_zxid); + auto responses = storage.processRequest(auth_request, 1, new_zxid); + + EXPECT_EQ(responses.size(), 1); + EXPECT_EQ(responses[0].response->error, Coordination::Error::ZOK) << "Failed to add auth to session"; + } + + const auto create = [&](const String & path) + { + int new_zxid = ++zxid; + + const auto create_request = std::make_shared(); + create_request->path = path; + create_request->acls = {{.permissions = ACL::Create, .scheme = "auth", .id = ""}}; + + storage.preprocessRequest(create_request, 1, 0, new_zxid); + auto responses = storage.processRequest(create_request, 1, new_zxid); + + EXPECT_EQ(responses.size(), 1); + EXPECT_EQ(responses[0].response->error, Coordination::Error::ZOK) << "Failed to create " << path; + }; + + /// Add nodes with only Create ACL + create("/A"); + create("/A/B"); + create("/A/C"); + create("/A/B/D"); + + { + int new_zxid = ++zxid; + + auto remove_request = std::make_shared(); + remove_request->path = "/A"; + remove_request->remove_nodes_limit = 4; + + storage.preprocessRequest(remove_request, 1, 0, new_zxid); + auto responses = storage.processRequest(remove_request, 1, new_zxid); + + EXPECT_EQ(responses.size(), 1); + EXPECT_EQ(responses[0].response->error, Coordination::Error::ZNOAUTH); + } +} + /// INSTANTIATE_TEST_SUITE_P(CoordinationTestSuite, /// CoordinationTest, /// ::testing::ValuesIn(std::initializer_list{CompressionParam{true, ".zstd"}, CompressionParam{false, ""}})); From 3e3d6523207ac93e96c6f7434d56cd21fd230947 Mon Sep 17 00:00:00 2001 From: Mikhail Artemenko Date: Tue, 10 Sep 2024 17:51:35 +0000 Subject: [PATCH 22/23] refactor tests --- src/Coordination/tests/gtest_coordination.cpp | 22 +++++-------------- 1 file changed, 5 insertions(+), 17 deletions(-) diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index db113f6dfbe..73402af5ec4 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -3475,18 +3475,6 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveRequest) ASSERT_FALSE(exists("/T3")); } - { - SCOPED_TRACE("Recursive Remove Tree"); - create("/T4", zkutil::CreateMode::Persistent); - create("/T4/A", zkutil::CreateMode::Persistent); - - auto responses = remove_recursive("/T4", 100); - ASSERT_EQ(responses.size(), 1); - ASSERT_EQ(responses[0].response->error, Coordination::Error::ZOK); - ASSERT_FALSE(exists("/T4")); - ASSERT_FALSE(exists("/T4/A")); - } - { SCOPED_TRACE("Recursive Remove Tree Small Limit"); create("/T5", zkutil::CreateMode::Persistent); @@ -3528,7 +3516,7 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveRequest) ASSERT_EQ(responses.size(), 1); ASSERT_EQ(responses[0].response->error, Coordination::Error::ZOK); ASSERT_EQ(storage.ephemerals.size(), 0); - ASSERT_TRUE(!exists("/T7")); + ASSERT_FALSE(exists("/T7")); } { @@ -3543,10 +3531,10 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveRequest) ASSERT_EQ(responses.size(), 1); ASSERT_EQ(responses[0].response->error, Coordination::Error::ZOK); ASSERT_EQ(storage.ephemerals.size(), 0); - ASSERT_TRUE(!exists("/T8")); - ASSERT_TRUE(!exists("/T8/A")); - ASSERT_TRUE(!exists("/T8/B")); - ASSERT_TRUE(!exists("/T8/A/C")); + ASSERT_FALSE(exists("/T8")); + ASSERT_FALSE(exists("/T8/A")); + ASSERT_FALSE(exists("/T8/B")); + ASSERT_FALSE(exists("/T8/A/C")); } } From 55d6672bc43579dad3edaa1e8c5d595025d50fc8 Mon Sep 17 00:00:00 2001 From: Mikhail Artemenko Date: Wed, 11 Sep 2024 10:00:40 +0000 Subject: [PATCH 23/23] add early break for test keeper impl --- src/Common/ZooKeeper/TestKeeper.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Common/ZooKeeper/TestKeeper.cpp b/src/Common/ZooKeeper/TestKeeper.cpp index 3982971163c..2fbe9110b6b 100644 --- a/src/Common/ZooKeeper/TestKeeper.cpp +++ b/src/Common/ZooKeeper/TestKeeper.cpp @@ -368,6 +368,8 @@ std::pair TestKeeperRemoveRecursiveRequest::process(TestKeepe if (child_path.starts_with(path)) children.emplace_back(child_path, child_node); + else + break; } if (children.size() > remove_nodes_limit)