mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
support test keeper
This commit is contained in:
parent
068ada57ba
commit
81972b97e7
@ -90,6 +90,36 @@ struct TestKeeperRemoveRequest final : RemoveRequest, TestKeeperRequest
|
||||
}
|
||||
};
|
||||
|
||||
struct TestKeeperRemoveRecursiveRequest final : RemoveRecursiveRequest, TestKeeperRequest
|
||||
{
|
||||
TestKeeperRemoveRecursiveRequest() = default;
|
||||
explicit TestKeeperRemoveRecursiveRequest(const RemoveRecursiveRequest & base) : RemoveRecursiveRequest(base) {}
|
||||
ResponsePtr createResponse() const override;
|
||||
std::pair<ResponsePtr, Undo> process(TestKeeper::Container & container, int64_t zxid) const override;
|
||||
|
||||
void processWatches(TestKeeper::Watches & node_watches, TestKeeper::Watches & list_watches) const override
|
||||
{
|
||||
std::vector<std::pair<String, size_t>> deleted;
|
||||
|
||||
auto add_deleted_watches = [&](TestKeeper::Watches & w)
|
||||
{
|
||||
for (const auto & [watch_path, _] : w)
|
||||
if (watch_path.starts_with(path))
|
||||
deleted.emplace_back(watch_path, std::count(watch_path.begin(), watch_path.end(), '/'));
|
||||
};
|
||||
|
||||
add_deleted_watches(node_watches);
|
||||
add_deleted_watches(list_watches);
|
||||
std::sort(deleted.begin(), deleted.end(), [](const auto & lhs, const auto & rhs)
|
||||
{
|
||||
return lhs.second < rhs.second;
|
||||
});
|
||||
|
||||
for (const auto & [watch_path, _] : deleted)
|
||||
processWatchesImpl(watch_path, node_watches, list_watches);
|
||||
}
|
||||
};
|
||||
|
||||
struct TestKeeperExistsRequest final : ExistsRequest, TestKeeperRequest
|
||||
{
|
||||
ResponsePtr createResponse() const override;
|
||||
@ -175,6 +205,10 @@ struct TestKeeperMultiRequest final : MultiRequest, TestKeeperRequest
|
||||
{
|
||||
requests.push_back(std::make_shared<TestKeeperRemoveRequest>(*concrete_request_remove));
|
||||
}
|
||||
else if (const auto * concrete_request_remove_recursive = dynamic_cast<const RemoveRecursiveRequest *>(generic_request.get()))
|
||||
{
|
||||
requests.push_back(std::make_shared<TestKeeperRemoveRecursiveRequest>(*concrete_request_remove_recursive));
|
||||
}
|
||||
else if (const auto * concrete_request_set = dynamic_cast<const SetRequest *>(generic_request.get()))
|
||||
{
|
||||
requests.push_back(std::make_shared<TestKeeperSetRequest>(*concrete_request_set));
|
||||
@ -313,6 +347,56 @@ std::pair<ResponsePtr, Undo> TestKeeperRemoveRequest::process(TestKeeper::Contai
|
||||
return { std::make_shared<RemoveResponse>(response), undo };
|
||||
}
|
||||
|
||||
std::pair<ResponsePtr, Undo> TestKeeperRemoveRecursiveRequest::process(TestKeeper::Container & container, int64_t zxid) const
|
||||
{
|
||||
RemoveRecursiveResponse response;
|
||||
response.zxid = zxid;
|
||||
Undo undo;
|
||||
|
||||
auto root_it = container.find(path);
|
||||
if (root_it == container.end())
|
||||
{
|
||||
response.error = Error::ZNONODE;
|
||||
return { std::make_shared<RemoveRecursiveResponse>(response), undo };
|
||||
}
|
||||
|
||||
std::vector<std::pair<std::string, Coordination::TestKeeper::Node>> children;
|
||||
|
||||
for (const auto & [child_path, child_node] : container)
|
||||
if (child_path.starts_with(path))
|
||||
children.emplace_back(child_path, child_node);
|
||||
|
||||
if (children.size() > remove_nodes_limit)
|
||||
{
|
||||
response.error = Error::ZNOTEMPTY;
|
||||
return { std::make_shared<RemoveRecursiveResponse>(response), undo };
|
||||
}
|
||||
|
||||
auto & parent = container.at(parentPath(path));
|
||||
--parent.stat.numChildren;
|
||||
++parent.stat.cversion;
|
||||
|
||||
for (const auto & [child_path, child_node] : children)
|
||||
{
|
||||
auto child_it = container.find(child_path);
|
||||
chassert(child_it != container.end());
|
||||
container.erase(child_it);
|
||||
}
|
||||
|
||||
response.error = Error::ZOK;
|
||||
undo = [&container, dead = std::move(children), root_path = path]()
|
||||
{
|
||||
for (auto && [child_path, child_node] : dead)
|
||||
container.emplace(child_path, child_node);
|
||||
|
||||
auto & undo_parent = container.at(parentPath(root_path));
|
||||
++undo_parent.stat.numChildren;
|
||||
--undo_parent.stat.cversion;
|
||||
};
|
||||
|
||||
return { std::make_shared<RemoveRecursiveResponse>(response), undo };
|
||||
}
|
||||
|
||||
std::pair<ResponsePtr, Undo> TestKeeperExistsRequest::process(TestKeeper::Container & container, int64_t zxid) const
|
||||
{
|
||||
ExistsResponse response;
|
||||
@ -530,6 +614,7 @@ std::pair<ResponsePtr, Undo> TestKeeperMultiRequest::process(TestKeeper::Contain
|
||||
|
||||
ResponsePtr TestKeeperCreateRequest::createResponse() const { return std::make_shared<CreateResponse>(); }
|
||||
ResponsePtr TestKeeperRemoveRequest::createResponse() const { return std::make_shared<RemoveResponse>(); }
|
||||
ResponsePtr TestKeeperRemoveRecursiveRequest::createResponse() const { return std::make_shared<RemoveRecursiveResponse>(); }
|
||||
ResponsePtr TestKeeperExistsRequest::createResponse() const { return std::make_shared<ExistsResponse>(); }
|
||||
ResponsePtr TestKeeperGetRequest::createResponse() const { return std::make_shared<GetResponse>(); }
|
||||
ResponsePtr TestKeeperSetRequest::createResponse() const { return std::make_shared<SetResponse>(); }
|
||||
@ -772,11 +857,18 @@ void TestKeeper::remove(
|
||||
}
|
||||
|
||||
void TestKeeper::removeRecursive(
|
||||
[[maybe_unused]] const String & path,
|
||||
[[maybe_unused]] uint32_t remove_nodes_limit,
|
||||
[[maybe_unused]] RemoveRecursiveCallback callback)
|
||||
const String & path,
|
||||
uint32_t remove_nodes_limit,
|
||||
RemoveRecursiveCallback callback)
|
||||
{
|
||||
/// TODO(michicosun) implement
|
||||
TestKeeperRemoveRecursiveRequest request;
|
||||
request.path = path;
|
||||
request.remove_nodes_limit = remove_nodes_limit;
|
||||
|
||||
RequestInfo request_info;
|
||||
request_info.request = std::make_shared<TestKeeperRemoveRecursiveRequest>(std::move(request));
|
||||
request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const RemoveRecursiveResponse &>(response)); };
|
||||
pushRequest(std::move(request_info));
|
||||
}
|
||||
|
||||
void TestKeeper::exists(
|
||||
|
Loading…
Reference in New Issue
Block a user