diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index 503eddac16e..87d29e0ff65 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -832,6 +832,15 @@ std::shared_ptr KeeperStorage::UncommittedS return tryGetNodeFromStorage(path); } +template +const typename Container::Node * KeeperStorage::UncommittedState::getActualNodeView(StringRef path, const Node & storage_node) const +{ + if (auto node_it = nodes.find(path.toView()); node_it != nodes.end()) + return node_it->second.node.get(); + + return &storage_node; +} + template Coordination::ACLs KeeperStorage::UncommittedState::getACLs(StringRef path) const { @@ -1686,7 +1695,7 @@ private: { std::deque steps; - if (checkLimits(root_node)) + if (checkLimits(&root_node)) return true; steps.push_back(Step{root_path.toString(), &root_node, 0}); @@ -1698,17 +1707,20 @@ private: StringRef path = step.path; uint32_t level = step.level; - const SNode * node = nullptr; + const SNode * node_ptr = nullptr; if (auto * rdb = std::get_if(&step.node)) - node = rdb; + node_ptr = rdb; else - node = std::get(step.node); + node_ptr = std::get(step.node); chassert(!path.empty()); - chassert(node != nullptr); + chassert(node_ptr != nullptr); - if (visitRocksDBNode(steps, path, level) || visitMemNode(steps, path, level) || visitRootAndUncommitted(steps, path, *node, level)) + const auto & node = *node_ptr; + chassert(storage.uncommitted_state.getActualNodeView(path, node) != nullptr); /// explicitly check that node is not deleted + + if (visitRocksDBNode(steps, path, level) || visitMemNode(steps, path, level) || visitRootAndUncommitted(steps, path, node, level)) return true; } @@ -1736,13 +1748,18 @@ private: std::filesystem::path root_fs_path(root_path.toString()); auto children = storage.container.getChildren(root_path.toString()); - for (auto && [child_name, node] : children) + for (auto && [child_name, child_node] : children) { - if (checkLimits(node)) + auto child_path = (root_fs_path / child_name).generic_string(); + const auto actual_child_node_ptr = storage.uncommitted_state.getActualNodeView(child_path, child_node); + + if (actual_child_node_ptr == nullptr) /// node was deleted in previous step of multi transaction + continue; + + if (checkLimits(actual_child_node_ptr)) return true; - auto child_path = (root_fs_path / child_name).generic_string(); - steps.push_back(Step{std::move(child_path), std::move(node), level + 1}); + steps.push_back(Step{std::move(child_path), std::move(child_node), level + 1}); } } @@ -1753,25 +1770,30 @@ private: { if constexpr (!Storage::use_rocksdb) { - std::filesystem::path root_fs_path(root_path.toString()); - auto node_it = storage.container.find(root_path); if (node_it == storage.container.end()) return false; - + std::filesystem::path root_fs_path(root_path.toString()); const auto & children = node_it->value.getChildren(); - for (auto && child_name : children) + + for (const auto & child_name : children) { auto child_path = (root_fs_path / child_name.toView()).generic_string(); auto child_it = storage.container.find(child_path); chassert(child_it != storage.container.end()); + const auto & child_node = child_it->value; - if (checkLimits(child_it->value)) + const auto actual_child_node_ptr = storage.uncommitted_state.getActualNodeView(child_path, child_node); + + if (actual_child_node_ptr == nullptr) /// node was deleted in previous step of multi transaction + continue; + + if (checkLimits(actual_child_node_ptr)) return true; - steps.push_back(Step{std::move(child_path), child_it->value, level + 1}); + steps.push_back(Step{std::move(child_path), &child_node, level + 1}); } } @@ -1787,14 +1809,18 @@ private: for (; it != nodes.end() && parentNodePath(it->first) == root_path; ++it) { - chassert(it->second.node); - const String & path = it->first; - const SNode & node = *it->second.node; + const auto actual_child_node_ptr = it->second.node.get(); - if (checkLimits(node)) + if (actual_child_node_ptr == nullptr) /// node was deleted in previous step of multi transaction + continue; + + if (checkLimits(actual_child_node_ptr)) return true; - steps.push_back(Step{path, &node, level + 1}); + const String & child_path = it->first; + const SNode & child_node = *it->second.node; + + steps.push_back(Step{child_path, &child_node, level + 1}); } addDelta(root_path, root_node, level); @@ -1820,9 +1846,10 @@ private: by_level_deltas[level].emplace_back(root_path.toString(), zxid, typename Storage::RemoveNodeDelta{root_node.version, root_node.ephemeralOwner()}); } - bool checkLimits(const SNode & node) + bool checkLimits(const SNode * node) { - nodes_observed += node.numChildren(); + chassert(node != nullptr); + nodes_observed += node->numChildren(); return nodes_observed > limit; } }; diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index c2f6e4c5a74..904af76ef37 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -566,6 +566,7 @@ public: void rollback(int64_t rollback_zxid); std::shared_ptr getNode(StringRef path) const; + const Node * getActualNodeView(StringRef path, const Node & storage_node) const; Coordination::ACLs getACLs(StringRef path) const; void applyDelta(const Delta & delta); diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index 037a102082c..deb6aaed8c9 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -3563,11 +3563,15 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveInMultiRequest) Storage storage{500, "", this->keeper_context}; int zxid = 0; - Coordination::Requests ops; - ops.push_back(zkutil::makeCreateRequest("/A", "A", zkutil::CreateMode::Persistent)); - ops.push_back(zkutil::makeCreateRequest("/A/B", "B", zkutil::CreateMode::Persistent)); - ops.push_back(zkutil::makeCreateRequest("/A/C", "C", zkutil::CreateMode::Ephemeral)); - ops.push_back(zkutil::makeCreateRequest("/A/B/D", "D", zkutil::CreateMode::Ephemeral)); + auto prepare_create_tree = []() + { + return Coordination::Requests{ + zkutil::makeCreateRequest("/A", "A", zkutil::CreateMode::Persistent), + zkutil::makeCreateRequest("/A/B", "B", zkutil::CreateMode::Persistent), + zkutil::makeCreateRequest("/A/C", "C", zkutil::CreateMode::Ephemeral), + zkutil::makeCreateRequest("/A/B/D", "D", zkutil::CreateMode::Ephemeral), + }; + }; const auto exists = [&](const String & path) { @@ -3597,6 +3601,7 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveInMultiRequest) { SCOPED_TRACE("Remove In Multi Tx"); int new_zxid = ++zxid; + auto ops = prepare_create_tree(); ops.push_back(zkutil::makeRemoveRequest("/A", -1)); const auto request = std::make_shared(ops, ACLs{}); @@ -3612,6 +3617,7 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveInMultiRequest) { SCOPED_TRACE("Recursive Remove In Multi Tx"); int new_zxid = ++zxid; + auto ops = prepare_create_tree(); ops.push_back(zkutil::makeRemoveRecursiveRequest("/A", 4)); const auto request = std::make_shared(ops, ACLs{}); @@ -3627,6 +3633,67 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveInMultiRequest) ASSERT_FALSE(exists("/A/B")); ASSERT_FALSE(exists("/A/B/D")); } + + { + SCOPED_TRACE("Recursive Remove With Regular In Multi Tx"); + int new_zxid = ++zxid; + auto ops = prepare_create_tree(); + + ops.push_back(zkutil::makeRemoveRequest("/A/C", -1)); + ops.push_back(zkutil::makeRemoveRecursiveRequest("/A", 4)); + const auto request = std::make_shared(ops, ACLs{}); + + storage.preprocessRequest(request, 1, 0, new_zxid); + auto responses = storage.processRequest(request, 1, new_zxid); + ops.pop_back(); + ops.pop_back(); + + ASSERT_EQ(responses.size(), 1); + ASSERT_TRUE(is_multi_ok(responses[0].response)); + ASSERT_FALSE(exists("/A")); + ASSERT_FALSE(exists("/A/C")); + ASSERT_FALSE(exists("/A/B")); + ASSERT_FALSE(exists("/A/B/D")); + } + + { + SCOPED_TRACE("Recursive Remove From Committed and Uncommitted states"); + int create_zxid = ++zxid; + auto ops = prepare_create_tree(); + + /// First create nodes + const auto create_request = std::make_shared(ops, ACLs{}); + storage.preprocessRequest(create_request, 1, 0, create_zxid); + auto create_responses = storage.processRequest(create_request, 1, create_zxid); + ASSERT_EQ(create_responses.size(), 1); + ASSERT_TRUE(is_multi_ok(create_responses[0].response)); + ASSERT_TRUE(exists("/A")); + ASSERT_TRUE(exists("/A/C")); + ASSERT_TRUE(exists("/A/B")); + ASSERT_TRUE(exists("/A/B/D")); + + /// Remove node A/C as a single remove request. + /// Remove all other as remove recursive request. + /// In this case we should list storage to understand the tree topology + /// but ignore already deleted nodes in uncommitted state. + + int remove_zxid = ++zxid; + ops = { + zkutil::makeRemoveRequest("/A/C", -1), + zkutil::makeRemoveRecursiveRequest("/A", 3), + }; + const auto remove_request = std::make_shared(ops, ACLs{}); + + storage.preprocessRequest(remove_request, 1, 0, remove_zxid); + auto remove_responses = storage.processRequest(remove_request, 1, remove_zxid); + + ASSERT_EQ(remove_responses.size(), 1); + ASSERT_TRUE(is_multi_ok(remove_responses[0].response)); + ASSERT_FALSE(exists("/A")); + ASSERT_FALSE(exists("/A/C")); + ASSERT_FALSE(exists("/A/B")); + ASSERT_FALSE(exists("/A/B/D")); + } } TYPED_TEST(CoordinationTest, TestRemoveRecursiveWatches)