mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 01:22:04 +00:00
add test for watches
This commit is contained in:
parent
2bbe933531
commit
cf0e0b766d
@ -3629,6 +3629,102 @@ TYPED_TEST(CoordinationTest, TestRemoveRecursiveInMultiRequest)
|
||||
}
|
||||
}
|
||||
|
||||
TYPED_TEST(CoordinationTest, TestRemoveRecursiveWatches)
|
||||
{
|
||||
using namespace DB;
|
||||
using namespace Coordination;
|
||||
|
||||
using Storage = typename TestFixture::Storage;
|
||||
|
||||
ChangelogDirTest rocks("./rocksdb");
|
||||
this->setRocksDBDirectory("./rocksdb");
|
||||
|
||||
Storage storage{500, "", this->keeper_context};
|
||||
int zxid = 0;
|
||||
|
||||
const auto create = [&](const String & path, int create_mode)
|
||||
{
|
||||
int new_zxid = ++zxid;
|
||||
|
||||
const auto create_request = std::make_shared<ZooKeeperCreateRequest>();
|
||||
create_request->path = path;
|
||||
create_request->is_ephemeral = create_mode == zkutil::CreateMode::Ephemeral || create_mode == zkutil::CreateMode::EphemeralSequential;
|
||||
create_request->is_sequential = create_mode == zkutil::CreateMode::PersistentSequential || create_mode == zkutil::CreateMode::EphemeralSequential;
|
||||
|
||||
storage.preprocessRequest(create_request, 1, 0, new_zxid);
|
||||
auto responses = storage.processRequest(create_request, 1, new_zxid);
|
||||
|
||||
EXPECT_EQ(responses.size(), 1);
|
||||
EXPECT_EQ(responses[0].response->error, Coordination::Error::ZOK) << "Failed to create " << path;
|
||||
};
|
||||
|
||||
const auto add_watch = [&](const String & path)
|
||||
{
|
||||
int new_zxid = ++zxid;
|
||||
|
||||
const auto exists_request = std::make_shared<ZooKeeperExistsRequest>();
|
||||
exists_request->path = path;
|
||||
exists_request->has_watch = true;
|
||||
|
||||
storage.preprocessRequest(exists_request, 1, 0, new_zxid);
|
||||
auto responses = storage.processRequest(exists_request, 1, new_zxid);
|
||||
|
||||
EXPECT_EQ(responses.size(), 1);
|
||||
EXPECT_EQ(responses[0].response->error, Coordination::Error::ZOK);
|
||||
};
|
||||
|
||||
const auto add_list_watch = [&](const String & path)
|
||||
{
|
||||
int new_zxid = ++zxid;
|
||||
|
||||
const auto list_request = std::make_shared<ZooKeeperListRequest>();
|
||||
list_request->path = path;
|
||||
list_request->has_watch = true;
|
||||
|
||||
storage.preprocessRequest(list_request, 1, 0, new_zxid);
|
||||
auto responses = storage.processRequest(list_request, 1, new_zxid);
|
||||
|
||||
EXPECT_EQ(responses.size(), 1);
|
||||
EXPECT_EQ(responses[0].response->error, Coordination::Error::ZOK);
|
||||
};
|
||||
|
||||
create("/A", zkutil::CreateMode::Persistent);
|
||||
create("/A/B", zkutil::CreateMode::Persistent);
|
||||
create("/A/C", zkutil::CreateMode::Ephemeral);
|
||||
create("/A/B/D", zkutil::CreateMode::Ephemeral);
|
||||
|
||||
add_watch("/A");
|
||||
add_watch("/A/B");
|
||||
add_watch("/A/C");
|
||||
add_watch("/A/B/D");
|
||||
add_list_watch("/A");
|
||||
add_list_watch("/A/B");
|
||||
ASSERT_EQ(storage.watches.size(), 4);
|
||||
ASSERT_EQ(storage.list_watches.size(), 2);
|
||||
|
||||
int new_zxid = ++zxid;
|
||||
|
||||
auto remove_request = std::make_shared<ZooKeeperRemoveRecursiveRequest>();
|
||||
remove_request->path = "/A";
|
||||
remove_request->remove_nodes_limit = 4;
|
||||
|
||||
storage.preprocessRequest(remove_request, 1, 0, new_zxid);
|
||||
auto responses = storage.processRequest(remove_request, 1, new_zxid);
|
||||
|
||||
ASSERT_EQ(responses.size(), 7);
|
||||
|
||||
for (size_t i = 0; i < 7; ++i)
|
||||
{
|
||||
ASSERT_EQ(responses[i].response->error, Coordination::Error::ZOK);
|
||||
|
||||
if (const auto * watch_response = dynamic_cast<Coordination::ZooKeeperWatchResponse *>(responses[i].response.get()))
|
||||
ASSERT_EQ(watch_response->type, Coordination::Event::DELETED);
|
||||
}
|
||||
|
||||
ASSERT_EQ(storage.watches.size(), 0);
|
||||
ASSERT_EQ(storage.list_watches.size(), 0);
|
||||
}
|
||||
|
||||
/// INSTANTIATE_TEST_SUITE_P(CoordinationTestSuite,
|
||||
/// CoordinationTest,
|
||||
/// ::testing::ValuesIn(std::initializer_list<CompressionParam>{CompressionParam{true, ".zstd"}, CompressionParam{false, ""}}));
|
||||
|
Loading…
Reference in New Issue
Block a user