diff --git a/src/Common/ZooKeeper/Types.h b/src/Common/ZooKeeper/Types.h index d2876adaabc..4a163c15838 100644 --- a/src/Common/ZooKeeper/Types.h +++ b/src/Common/ZooKeeper/Types.h @@ -31,6 +31,7 @@ using AsyncResponses = std::vector>>; Coordination::RequestPtr makeCreateRequest(const std::string & path, const std::string & data, int create_mode, bool ignore_if_exists = false); Coordination::RequestPtr makeRemoveRequest(const std::string & path, int version); +Coordination::RequestPtr makeRemoveRecursiveRequest(const std::string & path, uint32_t remove_nodes_limit); Coordination::RequestPtr makeSetRequest(const std::string & path, const std::string & data, int version); Coordination::RequestPtr makeCheckRequest(const std::string & path, int version); diff --git a/src/Common/ZooKeeper/ZooKeeper.cpp b/src/Common/ZooKeeper/ZooKeeper.cpp index 93e833e87c8..65c3fdba8d2 100644 --- a/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/src/Common/ZooKeeper/ZooKeeper.cpp @@ -1637,6 +1637,14 @@ Coordination::RequestPtr makeRemoveRequest(const std::string & path, int version return request; } +Coordination::RequestPtr makeRemoveRecursiveRequest(const std::string & path, uint32_t remove_nodes_limit) +{ + auto request = std::make_shared(); + request->path = path; + request->remove_nodes_limit = remove_nodes_limit; + return request; +} + Coordination::RequestPtr makeSetRequest(const std::string & path, const std::string & data, int version) { auto request = std::make_shared(); diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index f902a806b37..27061b15b90 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -3406,20 +3406,25 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveRequest) EXPECT_EQ(responses[0].response->error, Coordination::Error::ZOK) << "Failed to create " << path; }; - const auto remove = [&](const String & path, int32_t version = -1, std::optional remove_nodes_limit = std::nullopt) + const auto remove = [&](const String & path, int32_t version = -1) { int new_zxid = ++zxid; - std::shared_ptr remove_request; - - if (remove_nodes_limit.has_value()) - remove_request = std::make_shared(); - else - remove_request = std::make_shared(); - + auto remove_request = std::make_shared(); remove_request->path = path; remove_request->version = version; - remove_request->remove_nodes_limit = remove_nodes_limit.value_or(1); + + storage.preprocessRequest(remove_request, 1, 0, new_zxid); + return storage.processRequest(remove_request, 1, new_zxid); + }; + + const auto remove_recursive = [&](const String & path, uint32_t remove_nodes_limit = 1) + { + int new_zxid = ++zxid; + + auto remove_request = std::make_shared(); + remove_request->path = path; + remove_request->remove_nodes_limit = remove_nodes_limit; storage.preprocessRequest(remove_request, 1, 0, new_zxid); return storage.processRequest(remove_request, 1, new_zxid); @@ -3464,7 +3469,7 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveRequest) SCOPED_TRACE("Recursive Remove Single Node"); create("/T3", zkutil::CreateMode::Persistent); - auto responses = remove("/T3", 0, 100); + auto responses = remove_recursive("/T3", 100); ASSERT_EQ(responses.size(), 1); ASSERT_EQ(responses[0].response->error, Coordination::Error::ZOK); ASSERT_FALSE(exists("/T3")); @@ -3475,7 +3480,7 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveRequest) create("/T4", zkutil::CreateMode::Persistent); create("/T4/A", zkutil::CreateMode::Persistent); - auto responses = remove("/T4", 0, 100); + auto responses = remove_recursive("/T4", 100); ASSERT_EQ(responses.size(), 1); ASSERT_EQ(responses[0].response->error, Coordination::Error::ZOK); ASSERT_FALSE(exists("/T4")); @@ -3489,7 +3494,7 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveRequest) create("/T5/B", zkutil::CreateMode::Persistent); create("/T5/A/C", zkutil::CreateMode::Persistent); - auto responses = remove("/T5", 0, 2); + auto responses = remove_recursive("/T5", 2); ASSERT_EQ(responses.size(), 1); ASSERT_EQ(responses[0].response->error, Coordination::Error::ZNOTEMPTY); ASSERT_TRUE(exists("/T5")); @@ -3499,19 +3504,19 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveRequest) } { - SCOPED_TRACE("Recursive Remove Tree Small Limit"); + SCOPED_TRACE("Recursive Remove Tree Big Limit"); create("/T6", zkutil::CreateMode::Persistent); create("/T6/A", zkutil::CreateMode::Persistent); create("/T6/B", zkutil::CreateMode::Persistent); create("/T6/A/C", zkutil::CreateMode::Persistent); - auto responses = remove("/T6", 0, 2); + auto responses = remove_recursive("/T6", 4); ASSERT_EQ(responses.size(), 1); - ASSERT_EQ(responses[0].response->error, Coordination::Error::ZNOTEMPTY); - ASSERT_TRUE(exists("/T6")); - ASSERT_TRUE(exists("/T6/A")); - ASSERT_TRUE(exists("/T6/B")); - ASSERT_TRUE(exists("/T6/A/C")); + ASSERT_EQ(responses[0].response->error, Coordination::Error::ZOK); + ASSERT_FALSE(exists("/T6")); + ASSERT_FALSE(exists("/T6/A")); + ASSERT_FALSE(exists("/T6/B")); + ASSERT_FALSE(exists("/T6/A/C")); } { @@ -3519,7 +3524,7 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveRequest) create("/T7", zkutil::CreateMode::Ephemeral); ASSERT_EQ(storage.ephemerals.size(), 1); - auto responses = remove("/T7", 0, 100); + auto responses = remove_recursive("/T7", 100); ASSERT_EQ(responses.size(), 1); ASSERT_EQ(responses[0].response->error, Coordination::Error::ZOK); ASSERT_EQ(storage.ephemerals.size(), 0); @@ -3534,7 +3539,7 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveRequest) create("/T8/A/C", zkutil::CreateMode::Ephemeral); ASSERT_EQ(storage.ephemerals.size(), 1); - auto responses = remove("/T8", 0, 4); + auto responses = remove_recursive("/T8", 4); ASSERT_EQ(responses.size(), 1); ASSERT_EQ(responses[0].response->error, Coordination::Error::ZOK); ASSERT_EQ(storage.ephemerals.size(), 0); @@ -3545,6 +3550,85 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveRequest) } } +TYPED_TEST(CoordinationTest, TestRemoveRecursiveInMultiRequest) +{ + using namespace DB; + using namespace Coordination; + + using Storage = typename TestFixture::Storage; + + ChangelogDirTest rocks("./rocksdb"); + this->setRocksDBDirectory("./rocksdb"); + + Storage storage{500, "", this->keeper_context}; + int zxid = 0; + + Coordination::Requests ops; + ops.push_back(zkutil::makeCreateRequest("/A", "A", zkutil::CreateMode::Persistent)); + ops.push_back(zkutil::makeCreateRequest("/A/B", "B", zkutil::CreateMode::Persistent)); + ops.push_back(zkutil::makeCreateRequest("/A/C", "C", zkutil::CreateMode::Ephemeral)); + ops.push_back(zkutil::makeCreateRequest("/A/B/D", "D", zkutil::CreateMode::Ephemeral)); + + const auto exists = [&](const String & path) + { + int new_zxid = ++zxid; + + const auto exists_request = std::make_shared(); + exists_request->path = path; + + storage.preprocessRequest(exists_request, 1, 0, new_zxid); + auto responses = storage.processRequest(exists_request, 1, new_zxid); + + EXPECT_EQ(responses.size(), 1); + return responses[0].response->error == Coordination::Error::ZOK; + }; + + const auto is_multi_ok = [&](Coordination::ZooKeeperResponsePtr response) + { + const auto & multi_response = dynamic_cast(*response); + + for (const auto & op_response : multi_response.responses) + if (op_response->error != Coordination::Error::ZOK) + return false; + + return true; + }; + + { + SCOPED_TRACE("Remove In Multi Tx"); + int new_zxid = ++zxid; + + ops.push_back(zkutil::makeRemoveRequest("/A", -1)); + const auto request = std::make_shared(ops, ACLs{}); + + storage.preprocessRequest(request, 1, 0, new_zxid); + auto responses = storage.processRequest(request, 1, new_zxid); + ops.pop_back(); + + ASSERT_EQ(responses.size(), 1); + ASSERT_FALSE(is_multi_ok(responses[0].response)); + } + + { + SCOPED_TRACE("Recursive Remove In Multi Tx"); + int new_zxid = ++zxid; + + ops.push_back(zkutil::makeRemoveRecursiveRequest("/A", 4)); + const auto request = std::make_shared(ops, ACLs{}); + + storage.preprocessRequest(request, 1, 0, new_zxid); + auto responses = storage.processRequest(request, 1, new_zxid); + ops.pop_back(); + + ASSERT_EQ(responses.size(), 1); + ASSERT_TRUE(is_multi_ok(responses[0].response)); + ASSERT_FALSE(exists("/A")); + ASSERT_FALSE(exists("/A/C")); + ASSERT_FALSE(exists("/A/B")); + ASSERT_FALSE(exists("/A/B/D")); + } +} + /// INSTANTIATE_TEST_SUITE_P(CoordinationTestSuite, /// CoordinationTest, /// ::testing::ValuesIn(std::initializer_list{CompressionParam{true, ".zstd"}, CompressionParam{false, ""}}));