Expose list request type in internal client

This commit is contained in:
Antonio Andelic 2022-06-23 10:25:34 +00:00
parent 568c957eb2
commit ae3d9fd962
3 changed files with 43 additions and 23 deletions

View File

@ -313,9 +313,10 @@ static Coordination::WatchCallback callbackForEvent(const EventPtr & watch)
Coordination::Error ZooKeeper::getChildrenImpl(const std::string & path, Strings & res,
Coordination::Stat * stat,
Coordination::WatchCallback watch_callback)
Coordination::WatchCallback watch_callback,
Coordination::ListRequestType list_request_type)
{
auto future_result = asyncTryGetChildrenNoThrow(path, watch_callback);
auto future_result = asyncTryGetChildrenNoThrow(path, watch_callback, list_request_type);
if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready)
{
@ -336,26 +337,28 @@ Coordination::Error ZooKeeper::getChildrenImpl(const std::string & path, Strings
}
}
Strings ZooKeeper::getChildren(
const std::string & path, Coordination::Stat * stat, const EventPtr & watch)
Strings ZooKeeper::getChildren(const std::string & path, Coordination::Stat * stat, const EventPtr & watch)
{
Strings res;
check(tryGetChildren(path, res, stat, watch), path);
return res;
}
Strings ZooKeeper::getChildrenWatch(
const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback)
Strings ZooKeeper::getChildrenWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback)
{
Strings res;
check(tryGetChildrenWatch(path, res, stat, watch_callback), path);
return res;
}
Coordination::Error ZooKeeper::tryGetChildren(const std::string & path, Strings & res,
Coordination::Stat * stat, const EventPtr & watch)
Coordination::Error ZooKeeper::tryGetChildren(
const std::string & path,
Strings & res,
Coordination::Stat * stat,
const EventPtr & watch,
Coordination::ListRequestType list_request_type)
{
Coordination::Error code = getChildrenImpl(path, res, stat, callbackForEvent(watch));
Coordination::Error code = getChildrenImpl(path, res, stat, callbackForEvent(watch), list_request_type);
if (!(code == Coordination::Error::ZOK || code == Coordination::Error::ZNONODE))
throw KeeperException(code, path);
@ -363,10 +366,14 @@ Coordination::Error ZooKeeper::tryGetChildren(const std::string & path, Strings
return code;
}
Coordination::Error ZooKeeper::tryGetChildrenWatch(const std::string & path, Strings & res,
Coordination::Stat * stat, Coordination::WatchCallback watch_callback)
Coordination::Error ZooKeeper::tryGetChildrenWatch(
const std::string & path,
Strings & res,
Coordination::Stat * stat,
Coordination::WatchCallback watch_callback,
Coordination::ListRequestType list_request_type)
{
Coordination::Error code = getChildrenImpl(path, res, stat, watch_callback);
Coordination::Error code = getChildrenImpl(path, res, stat, watch_callback, list_request_type);
if (!(code == Coordination::Error::ZOK || code == Coordination::Error::ZNONODE))
throw KeeperException(code, path);
@ -1047,7 +1054,8 @@ std::future<Coordination::SetResponse> ZooKeeper::asyncTrySetNoThrow(const std::
return future;
}
std::future<Coordination::ListResponse> ZooKeeper::asyncGetChildren(const std::string & path, Coordination::WatchCallback watch_callback)
std::future<Coordination::ListResponse> ZooKeeper::asyncGetChildren(
const std::string & path, Coordination::WatchCallback watch_callback, Coordination::ListRequestType list_request_type)
{
auto promise = std::make_shared<std::promise<Coordination::ListResponse>>();
auto future = promise->get_future();
@ -1060,11 +1068,12 @@ std::future<Coordination::ListResponse> ZooKeeper::asyncGetChildren(const std::s
promise->set_value(response);
};
impl->list(path, Coordination::ListRequestType::ALL, std::move(callback), watch_callback);
impl->list(path, list_request_type, std::move(callback), watch_callback);
return future;
}
std::future<Coordination::ListResponse> ZooKeeper::asyncTryGetChildrenNoThrow(const std::string & path, Coordination::WatchCallback watch_callback)
std::future<Coordination::ListResponse> ZooKeeper::asyncTryGetChildrenNoThrow(
const std::string & path, Coordination::WatchCallback watch_callback, Coordination::ListRequestType list_request_type)
{
auto promise = std::make_shared<std::promise<Coordination::ListResponse>>();
auto future = promise->get_future();
@ -1074,7 +1083,7 @@ std::future<Coordination::ListResponse> ZooKeeper::asyncTryGetChildrenNoThrow(co
promise->set_value(response);
};
impl->list(path, Coordination::ListRequestType::ALL, std::move(callback), watch_callback);
impl->list(path, list_request_type, std::move(callback), watch_callback);
return future;
}

View File

@ -194,11 +194,13 @@ public:
/// * The node doesn't exist.
Coordination::Error tryGetChildren(const std::string & path, Strings & res,
Coordination::Stat * stat = nullptr,
const EventPtr & watch = nullptr);
const EventPtr & watch = nullptr,
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
Coordination::Error tryGetChildrenWatch(const std::string & path, Strings & res,
Coordination::Stat * stat,
Coordination::WatchCallback watch_callback);
Coordination::WatchCallback watch_callback,
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
/// Performs several operations in a transaction.
/// Throws on every error.
@ -279,9 +281,15 @@ public:
FutureExists asyncTryExistsNoThrow(const std::string & path, Coordination::WatchCallback watch_callback = {});
using FutureGetChildren = std::future<Coordination::ListResponse>;
FutureGetChildren asyncGetChildren(const std::string & path, Coordination::WatchCallback watch_callback = {});
FutureGetChildren asyncGetChildren(
const std::string & path,
Coordination::WatchCallback watch_callback = {},
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
/// Like the previous one but don't throw any exceptions on future.get()
FutureGetChildren asyncTryGetChildrenNoThrow(const std::string & path, Coordination::WatchCallback watch_callback = {});
FutureGetChildren asyncTryGetChildrenNoThrow(
const std::string & path,
Coordination::WatchCallback watch_callback = {},
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
using FutureSet = std::future<Coordination::SetResponse>;
FutureSet asyncSet(const std::string & path, const std::string & data, int32_t version = -1);
@ -335,7 +343,11 @@ private:
const std::string & path, std::string & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback);
Coordination::Error setImpl(const std::string & path, const std::string & data, int32_t version, Coordination::Stat * stat);
Coordination::Error getChildrenImpl(
const std::string & path, Strings & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback);
const std::string & path,
Strings & res,
Coordination::Stat * stat,
Coordination::WatchCallback watch_callback,
Coordination::ListRequestType list_request_type);
Coordination::Error multiImpl(const Coordination::Requests & requests, Coordination::Responses & responses);
Coordination::Error existsImpl(const std::string & path, Coordination::Stat * stat_, Coordination::WatchCallback watch_callback);
Coordination::Error syncImpl(const std::string & path, std::string & returned_path);

View File

@ -1186,12 +1186,11 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc
if (request.list_request_type == ALL)
return true;
auto child_it = container.find(fmt::format("{}/{}", request.path, child));
auto child_it = container.find(fmt::format("{}{}{}", request.path, (request.path.ends_with('/') ? "" : "/"), child));
if (child_it == container.end())
onStorageInconsistency();
const auto is_ephemeral = child_it->value.stat.ephemeralOwner != 0;
return (is_ephemeral && request.list_request_type == EPHEMERAL_ONLY) || (!is_ephemeral && request.list_request_type == PERSISTENT_ONLY);
};