Add list request type

This commit is contained in:
Antonio Andelic 2022-06-21 12:35:58 +00:00
parent ae2feacbd1
commit 3a71b63b5d
9 changed files with 49 additions and 4 deletions

View File

@ -281,9 +281,17 @@ struct SetResponse : virtual Response
size_t bytesSize() const override { return sizeof(stat); }
};
enum class ListRequestType : uint8_t
{
ALL,
PERSISTENT_ONLY,
EPHEMERAL_ONLY
};
struct ListRequest : virtual Request
{
String path;
ListRequestType list_request_type{ListRequestType::ALL};
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
@ -492,6 +500,7 @@ public:
virtual void list(
const String & path,
ListRequestType list_request_type,
ListCallback callback,
WatchCallback watch) = 0;

View File

@ -768,11 +768,13 @@ void TestKeeper::set(
void TestKeeper::list(
const String & path,
ListRequestType list_request_type,
ListCallback callback,
WatchCallback watch)
{
TestKeeperListRequest request;
request.path = path;
request.list_request_type = list_request_type;
RequestInfo request_info;
request_info.request = std::make_shared<TestKeeperListRequest>(std::move(request));

View File

@ -71,6 +71,7 @@ public:
void list(
const String & path,
ListRequestType list_request_type,
ListCallback callback,
WatchCallback watch) override;

View File

@ -9,6 +9,7 @@
#include <base/find_symbols.h>
#include <base/sort.h>
#include <base/getFQDNOrHostName.h>
#include "Common/ZooKeeper/IKeeper.h"
#include <Common/StringUtils/StringUtils.h>
#include <Common/Exception.h>
#include <Common/isLocalAddress.h>
@ -1059,7 +1060,7 @@ std::future<Coordination::ListResponse> ZooKeeper::asyncGetChildren(const std::s
promise->set_value(response);
};
impl->list(path, std::move(callback), watch_callback);
impl->list(path, Coordination::ListRequestType::ALL, std::move(callback), watch_callback);
return future;
}
@ -1073,7 +1074,7 @@ std::future<Coordination::ListResponse> ZooKeeper::asyncTryGetChildrenNoThrow(co
promise->set_value(response);
};
impl->list(path, std::move(callback), watch_callback);
impl->list(path, Coordination::ListRequestType::ALL, std::move(callback), watch_callback);
return future;
}

View File

@ -1,3 +1,4 @@
#include "Common/ZooKeeper/IKeeper.h"
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Common/Stopwatch.h>
@ -285,12 +286,20 @@ void ZooKeeperListRequest::writeImpl(WriteBuffer & out) const
{
Coordination::write(path, out);
Coordination::write(has_watch, out);
Coordination::write(static_cast<int32_t>(list_request_type), out);
}
void ZooKeeperListRequest::readImpl(ReadBuffer & in)
{
Coordination::read(path, in);
Coordination::read(has_watch, in);
if (!in.eof())
{
int32_t read_request_type;
Coordination::read(read_request_type, in);
list_request_type = static_cast<ListRequestType>(read_request_type);
}
}
std::string ZooKeeperListRequest::toStringImpl() const

View File

@ -1168,11 +1168,13 @@ void ZooKeeper::set(
void ZooKeeper::list(
const String & path,
ListRequestType list_request_type,
ListCallback callback,
WatchCallback watch)
{
ZooKeeperListRequest request;
request.path = path;
request.list_request_type = list_request_type;
RequestInfo request_info;
request_info.request = std::make_shared<ZooKeeperListRequest>(std::move(request));

View File

@ -164,6 +164,7 @@ public:
void list(
const String & path,
ListRequestType list_request_type,
ListCallback callback,
WatchCallback watch) override;

View File

@ -1161,6 +1161,7 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc
}
auto & container = storage.container;
auto node_it = container.find(request.path);
if (node_it == container.end())
{
@ -1178,8 +1179,27 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc
const auto & children = node_it->value.getChildren();
response.names.reserve(children.size());
const auto add_child = [&](const auto child)
{
using enum Coordination::ListRequestType;
if (request.list_request_type == ALL)
return true;
auto child_it = container.find(fmt::format("{}/{}", request.path, child));
if (child_it == container.end())
onStorageInconsistency();
const auto is_ephemeral = child_it->value.stat.ephemeralOwner != 0;
return (is_ephemeral && request.list_request_type == EPHEMERAL_ONLY) || (!is_ephemeral && request.list_request_type == PERSISTENT_ONLY);
};
for (const auto child : children)
response.names.push_back(child.toString());
{
if (add_child(child))
response.names.push_back(child.toString());
}
response.stat = node_it->value.stat;
response.error = Coordination::Error::ZOK;

View File

@ -62,7 +62,7 @@ void removeRecursive(Coordination::ZooKeeper & zookeeper, const std::string & pa
promise->set_value();
};
zookeeper.list(path, list_callback, nullptr);
zookeeper.list(path, ListRequestType::ALL, list_callback, nullptr);
future.get();
while (!children.empty())