Add sync support to client

This commit is contained in:
Antonio Andelic 2022-06-13 09:51:12 +00:00
parent 560999d587
commit 367df12626
10 changed files with 151 additions and 4 deletions

View File

@ -92,6 +92,7 @@
M(ZooKeeperSet, "") \
M(ZooKeeperMulti, "") \
M(ZooKeeperCheck, "") \
M(ZooKeeperSync, "") \
M(ZooKeeperClose, "") \
M(ZooKeeperWatchResponse, "") \
M(ZooKeeperUserExceptions, "") \

View File

@ -144,6 +144,7 @@ void ListRequest::addRootPath(const String & root_path) { Coordination::addRootP
void CheckRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); }
void SetACLRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); }
void GetACLRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); }
void SyncRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); }
void MultiRequest::addRootPath(const String & root_path)
{

View File

@ -320,6 +320,25 @@ struct CheckResponse : virtual Response
{
};
struct SyncRequest : virtual Request
{
String path;
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
size_t bytesSize() const override { return path.size(); }
};
struct SyncResponse : virtual Response
{
String path;
size_t bytesSize() const override { return path.size(); }
};
struct MultiRequest : virtual Request
{
Requests requests;
@ -364,6 +383,7 @@ using GetCallback = std::function<void(const GetResponse &)>;
using SetCallback = std::function<void(const SetResponse &)>;
using ListCallback = std::function<void(const ListResponse &)>;
using CheckCallback = std::function<void(const CheckResponse &)>;
using SyncCallback = std::function<void(const SyncResponse &)>;
using MultiCallback = std::function<void(const MultiResponse &)>;
@ -482,6 +502,10 @@ public:
int32_t version,
CheckCallback callback) = 0;
virtual void sync(
const String & path,
SyncCallback callback) = 0;
virtual void multi(
const Requests & requests,
MultiCallback callback) = 0;

View File

@ -133,6 +133,14 @@ struct TestKeeperCheckRequest final : CheckRequest, TestKeeperRequest
std::pair<ResponsePtr, Undo> process(TestKeeper::Container & container, int64_t zxid) const override;
};
struct TestKeeperSyncRequest final : SyncRequest, TestKeeperRequest
{
TestKeeperSyncRequest() = default;
explicit TestKeeperSyncRequest(const SyncRequest & base) : SyncRequest(base) {}
ResponsePtr createResponse() const override;
std::pair<ResponsePtr, Undo> process(TestKeeper::Container & container, int64_t zxid) const override;
};
struct TestKeeperMultiRequest final : MultiRequest, TestKeeperRequest
{
explicit TestKeeperMultiRequest(const Requests & generic_requests)
@ -413,6 +421,14 @@ std::pair<ResponsePtr, Undo> TestKeeperCheckRequest::process(TestKeeper::Contain
return { std::make_shared<CheckResponse>(response), {} };
}
std::pair<ResponsePtr, Undo> TestKeeperSyncRequest::process(TestKeeper::Container & /*container*/, int64_t) const
{
SyncResponse response;
response.path = path;
return { std::make_shared<SyncResponse>(std::move(response)), {} };
}
std::pair<ResponsePtr, Undo> TestKeeperMultiRequest::process(TestKeeper::Container & container, int64_t zxid) const
{
MultiResponse response;
@ -471,6 +487,7 @@ ResponsePtr TestKeeperGetRequest::createResponse() const { return std::make_shar
ResponsePtr TestKeeperSetRequest::createResponse() const { return std::make_shared<SetResponse>(); }
ResponsePtr TestKeeperListRequest::createResponse() const { return std::make_shared<ListResponse>(); }
ResponsePtr TestKeeperCheckRequest::createResponse() const { return std::make_shared<CheckResponse>(); }
ResponsePtr TestKeeperSyncRequest::createResponse() const { return std::make_shared<SyncResponse>(); }
ResponsePtr TestKeeperMultiRequest::createResponse() const { return std::make_shared<MultiResponse>(); }
@ -779,6 +796,19 @@ void TestKeeper::check(
pushRequest(std::move(request_info));
}
void TestKeeper::sync(
const String & path,
SyncCallback callback)
{
TestKeeperSyncRequest request;
request.path = path;
RequestInfo request_info;
request_info.request = std::make_shared<TestKeeperSyncRequest>(std::move(request));
request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const SyncResponse &>(response)); };
pushRequest(std::move(request_info));
}
void TestKeeper::multi(
const Requests & requests,
MultiCallback callback)

View File

@ -79,6 +79,10 @@ public:
int32_t version,
CheckCallback callback) override;
void sync(
const String & path,
SyncCallback callback) override;
void multi(
const Requests & requests,
MultiCallback callback) override;

View File

@ -667,6 +667,34 @@ Coordination::Error ZooKeeper::tryMulti(const Coordination::Requests & requests,
return code;
}
Coordination::Error ZooKeeper::syncImpl(const std::string & path, std::string & returned_path)
{
auto future_result = asyncTrySyncNoThrow(path);
if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready)
{
impl->finalize(fmt::format("Operation timeout on {} {}", toString(Coordination::OpNum::Sync), path));
return Coordination::Error::ZOPERATIONTIMEOUT;
}
else
{
auto response = future_result.get();
Coordination::Error code = response.error;
returned_path = std::move(response.path);
return code;
}
}
std::string ZooKeeper::sync(const std::string & path)
{
std::string returned_path;
check(syncImpl(path, returned_path), path);
return returned_path;
}
Coordination::Error ZooKeeper::trySync(const std::string & path, std::string & returned_path)
{
return syncImpl(path, returned_path);
}
void ZooKeeper::removeChildren(const std::string & path)
{
@ -1144,6 +1172,37 @@ Coordination::Error ZooKeeper::tryMultiNoThrow(const Coordination::Requests & re
}
}
std::future<Coordination::SyncResponse> ZooKeeper::asyncTrySyncNoThrow(const std::string & path)
{
auto promise = std::make_shared<std::promise<Coordination::SyncResponse>>();
auto future = promise->get_future();
auto callback = [promise](const Coordination::SyncResponse & response) mutable
{
promise->set_value(response);
};
impl->sync(path, std::move(callback));
return future;
}
std::future<Coordination::SyncResponse> ZooKeeper::asyncSync(const std::string & path)
{
auto promise = std::make_shared<std::promise<Coordination::SyncResponse>>();
auto future = promise->get_future();
auto callback = [promise](const Coordination::SyncResponse & response) mutable
{
if (response.error != Coordination::Error::ZOK)
promise->set_exception(std::make_exception_ptr(KeeperException(response.error)));
else
promise->set_value(response);
};
impl->sync(path, std::move(callback));
return future;
}
void ZooKeeper::finalize(const String & reason)
{
impl->finalize(reason);

View File

@ -209,6 +209,10 @@ public:
/// Throws nothing (even session expired errors)
Coordination::Error tryMultiNoThrow(const Coordination::Requests & requests, Coordination::Responses & responses);
std::string sync(const std::string & path);
Coordination::Error trySync(const std::string & path, std::string & returned_path);
Int64 getClientID();
/// Remove the node with the subtree. If someone concurrently adds or removes a node
@ -294,6 +298,11 @@ public:
/// Like the previous one but don't throw any exceptions on future.get()
FutureMulti asyncTryMultiNoThrow(const Coordination::Requests & ops);
using FutureSync = std::future<Coordination::SyncResponse>;
FutureSync asyncSync(const std::string & path);
/// Like the previous one but don't throw any exceptions on future.get()
FutureSync asyncTrySyncNoThrow(const std::string & path);
/// Very specific methods introduced without following general style. Implements
/// some custom throw/no throw logic on future.get().
///
@ -329,6 +338,7 @@ private:
const std::string & path, Strings & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback);
Coordination::Error multiImpl(const Coordination::Requests & requests, Coordination::Responses & responses);
Coordination::Error existsImpl(const std::string & path, Coordination::Stat * stat_, Coordination::WatchCallback watch_callback);
Coordination::Error syncImpl(const std::string & path, std::string & returned_path);
std::unique_ptr<Coordination::IKeeper> impl;

View File

@ -106,14 +106,11 @@ struct ZooKeeperSyncRequest final : ZooKeeperRequest
size_t bytesSize() const override { return ZooKeeperRequest::bytesSize() + path.size(); }
};
struct ZooKeeperSyncResponse final : ZooKeeperResponse
struct ZooKeeperSyncResponse final : SyncResponse, ZooKeeperResponse
{
String path;
void readImpl(ReadBuffer & in) override;
void writeImpl(WriteBuffer & out) const override;
OpNum getOpNum() const override { return OpNum::Sync; }
size_t bytesSize() const override { return path.size(); }
};
struct ZooKeeperHeartbeatResponse final : ZooKeeperResponse

View File

@ -1,3 +1,4 @@
#include "Common/ZooKeeper/ZooKeeperCommon.h"
#include <Common/ZooKeeper/ZooKeeperImpl.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
@ -31,6 +32,7 @@ namespace ProfileEvents
extern const Event ZooKeeperSet;
extern const Event ZooKeeperList;
extern const Event ZooKeeperCheck;
extern const Event ZooKeeperSync;
extern const Event ZooKeeperClose;
extern const Event ZooKeeperWaitMicroseconds;
extern const Event ZooKeeperBytesSent;
@ -1199,6 +1201,21 @@ void ZooKeeper::check(
ProfileEvents::increment(ProfileEvents::ZooKeeperCheck);
}
void ZooKeeper::sync(
const String & path,
SyncCallback callback)
{
ZooKeeperSyncRequest request;
request.path = path;
RequestInfo request_info;
request_info.request = std::make_shared<ZooKeeperSyncRequest>(std::move(request));
request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const SyncResponse &>(response)); };
pushRequest(std::move(request_info));
ProfileEvents::increment(ProfileEvents::ZooKeeperSync);
}
void ZooKeeper::multi(
const Requests & requests,

View File

@ -171,6 +171,10 @@ public:
int32_t version,
CheckCallback callback) override;
void sync(
const String & path,
SyncCallback callback) override;
void multi(
const Requests & requests,
MultiCallback callback) override;