From 367df12626bdc0485f5dd774bbb4a39eb39db321 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Mon, 13 Jun 2022 09:51:12 +0000 Subject: [PATCH] Add sync support to client --- src/Common/ProfileEvents.cpp | 1 + src/Common/ZooKeeper/IKeeper.cpp | 1 + src/Common/ZooKeeper/IKeeper.h | 24 +++++++++++ src/Common/ZooKeeper/TestKeeper.cpp | 30 +++++++++++++ src/Common/ZooKeeper/TestKeeper.h | 4 ++ src/Common/ZooKeeper/ZooKeeper.cpp | 59 ++++++++++++++++++++++++++ src/Common/ZooKeeper/ZooKeeper.h | 10 +++++ src/Common/ZooKeeper/ZooKeeperCommon.h | 5 +-- src/Common/ZooKeeper/ZooKeeperImpl.cpp | 17 ++++++++ src/Common/ZooKeeper/ZooKeeperImpl.h | 4 ++ 10 files changed, 151 insertions(+), 4 deletions(-) diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index b8e552f6023..6a15b3be699 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -92,6 +92,7 @@ M(ZooKeeperSet, "") \ M(ZooKeeperMulti, "") \ M(ZooKeeperCheck, "") \ + M(ZooKeeperSync, "") \ M(ZooKeeperClose, "") \ M(ZooKeeperWatchResponse, "") \ M(ZooKeeperUserExceptions, "") \ diff --git a/src/Common/ZooKeeper/IKeeper.cpp b/src/Common/ZooKeeper/IKeeper.cpp index 70fe33b3f6e..23d29ed3019 100644 --- a/src/Common/ZooKeeper/IKeeper.cpp +++ b/src/Common/ZooKeeper/IKeeper.cpp @@ -144,6 +144,7 @@ void ListRequest::addRootPath(const String & root_path) { Coordination::addRootP void CheckRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); } void SetACLRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); } void GetACLRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); } +void SyncRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); } void MultiRequest::addRootPath(const String & root_path) { diff --git a/src/Common/ZooKeeper/IKeeper.h b/src/Common/ZooKeeper/IKeeper.h index 1e79468b7e3..5cf776aa6cb 100644 --- a/src/Common/ZooKeeper/IKeeper.h +++ b/src/Common/ZooKeeper/IKeeper.h @@ -320,6 +320,25 @@ struct CheckResponse : virtual Response { }; +struct SyncRequest : virtual Request +{ + String path; + + void addRootPath(const String & root_path) override; + String getPath() const override { return path; } + + size_t bytesSize() const override { return path.size(); } +}; + +struct SyncResponse : virtual Response +{ + String path; + + size_t bytesSize() const override { return path.size(); } +}; + + + struct MultiRequest : virtual Request { Requests requests; @@ -364,6 +383,7 @@ using GetCallback = std::function; using SetCallback = std::function; using ListCallback = std::function; using CheckCallback = std::function; +using SyncCallback = std::function; using MultiCallback = std::function; @@ -482,6 +502,10 @@ public: int32_t version, CheckCallback callback) = 0; + virtual void sync( + const String & path, + SyncCallback callback) = 0; + virtual void multi( const Requests & requests, MultiCallback callback) = 0; diff --git a/src/Common/ZooKeeper/TestKeeper.cpp b/src/Common/ZooKeeper/TestKeeper.cpp index 62d5fc811df..3d2d5fcb667 100644 --- a/src/Common/ZooKeeper/TestKeeper.cpp +++ b/src/Common/ZooKeeper/TestKeeper.cpp @@ -133,6 +133,14 @@ struct TestKeeperCheckRequest final : CheckRequest, TestKeeperRequest std::pair process(TestKeeper::Container & container, int64_t zxid) const override; }; +struct TestKeeperSyncRequest final : SyncRequest, TestKeeperRequest +{ + TestKeeperSyncRequest() = default; + explicit TestKeeperSyncRequest(const SyncRequest & base) : SyncRequest(base) {} + ResponsePtr createResponse() const override; + std::pair process(TestKeeper::Container & container, int64_t zxid) const override; +}; + struct TestKeeperMultiRequest final : MultiRequest, TestKeeperRequest { explicit TestKeeperMultiRequest(const Requests & generic_requests) @@ -413,6 +421,14 @@ std::pair TestKeeperCheckRequest::process(TestKeeper::Contain return { std::make_shared(response), {} }; } +std::pair TestKeeperSyncRequest::process(TestKeeper::Container & /*container*/, int64_t) const +{ + SyncResponse response; + response.path = path; + + return { std::make_shared(std::move(response)), {} }; +} + std::pair TestKeeperMultiRequest::process(TestKeeper::Container & container, int64_t zxid) const { MultiResponse response; @@ -471,6 +487,7 @@ ResponsePtr TestKeeperGetRequest::createResponse() const { return std::make_shar ResponsePtr TestKeeperSetRequest::createResponse() const { return std::make_shared(); } ResponsePtr TestKeeperListRequest::createResponse() const { return std::make_shared(); } ResponsePtr TestKeeperCheckRequest::createResponse() const { return std::make_shared(); } +ResponsePtr TestKeeperSyncRequest::createResponse() const { return std::make_shared(); } ResponsePtr TestKeeperMultiRequest::createResponse() const { return std::make_shared(); } @@ -779,6 +796,19 @@ void TestKeeper::check( pushRequest(std::move(request_info)); } +void TestKeeper::sync( + const String & path, + SyncCallback callback) +{ + TestKeeperSyncRequest request; + request.path = path; + + RequestInfo request_info; + request_info.request = std::make_shared(std::move(request)); + request_info.callback = [callback](const Response & response) { callback(dynamic_cast(response)); }; + pushRequest(std::move(request_info)); +} + void TestKeeper::multi( const Requests & requests, MultiCallback callback) diff --git a/src/Common/ZooKeeper/TestKeeper.h b/src/Common/ZooKeeper/TestKeeper.h index e57471341e8..40cac3094f1 100644 --- a/src/Common/ZooKeeper/TestKeeper.h +++ b/src/Common/ZooKeeper/TestKeeper.h @@ -79,6 +79,10 @@ public: int32_t version, CheckCallback callback) override; + void sync( + const String & path, + SyncCallback callback) override; + void multi( const Requests & requests, MultiCallback callback) override; diff --git a/src/Common/ZooKeeper/ZooKeeper.cpp b/src/Common/ZooKeeper/ZooKeeper.cpp index 9fc68d0bbff..193be0dbe5a 100644 --- a/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/src/Common/ZooKeeper/ZooKeeper.cpp @@ -667,6 +667,34 @@ Coordination::Error ZooKeeper::tryMulti(const Coordination::Requests & requests, return code; } +Coordination::Error ZooKeeper::syncImpl(const std::string & path, std::string & returned_path) +{ + auto future_result = asyncTrySyncNoThrow(path); + + if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready) + { + impl->finalize(fmt::format("Operation timeout on {} {}", toString(Coordination::OpNum::Sync), path)); + return Coordination::Error::ZOPERATIONTIMEOUT; + } + else + { + auto response = future_result.get(); + Coordination::Error code = response.error; + returned_path = std::move(response.path); + return code; + } +} +std::string ZooKeeper::sync(const std::string & path) +{ + std::string returned_path; + check(syncImpl(path, returned_path), path); + return returned_path; +} + +Coordination::Error ZooKeeper::trySync(const std::string & path, std::string & returned_path) +{ + return syncImpl(path, returned_path); +} void ZooKeeper::removeChildren(const std::string & path) { @@ -1144,6 +1172,37 @@ Coordination::Error ZooKeeper::tryMultiNoThrow(const Coordination::Requests & re } } +std::future ZooKeeper::asyncTrySyncNoThrow(const std::string & path) +{ + auto promise = std::make_shared>(); + auto future = promise->get_future(); + + auto callback = [promise](const Coordination::SyncResponse & response) mutable + { + promise->set_value(response); + }; + + impl->sync(path, std::move(callback)); + return future; +} + +std::future ZooKeeper::asyncSync(const std::string & path) +{ + auto promise = std::make_shared>(); + auto future = promise->get_future(); + + auto callback = [promise](const Coordination::SyncResponse & response) mutable + { + if (response.error != Coordination::Error::ZOK) + promise->set_exception(std::make_exception_ptr(KeeperException(response.error))); + else + promise->set_value(response); + }; + + impl->sync(path, std::move(callback)); + return future; +} + void ZooKeeper::finalize(const String & reason) { impl->finalize(reason); diff --git a/src/Common/ZooKeeper/ZooKeeper.h b/src/Common/ZooKeeper/ZooKeeper.h index a36d379669e..6aebccd2b4e 100644 --- a/src/Common/ZooKeeper/ZooKeeper.h +++ b/src/Common/ZooKeeper/ZooKeeper.h @@ -209,6 +209,10 @@ public: /// Throws nothing (even session expired errors) Coordination::Error tryMultiNoThrow(const Coordination::Requests & requests, Coordination::Responses & responses); + std::string sync(const std::string & path); + + Coordination::Error trySync(const std::string & path, std::string & returned_path); + Int64 getClientID(); /// Remove the node with the subtree. If someone concurrently adds or removes a node @@ -294,6 +298,11 @@ public: /// Like the previous one but don't throw any exceptions on future.get() FutureMulti asyncTryMultiNoThrow(const Coordination::Requests & ops); + using FutureSync = std::future; + FutureSync asyncSync(const std::string & path); + /// Like the previous one but don't throw any exceptions on future.get() + FutureSync asyncTrySyncNoThrow(const std::string & path); + /// Very specific methods introduced without following general style. Implements /// some custom throw/no throw logic on future.get(). /// @@ -329,6 +338,7 @@ private: const std::string & path, Strings & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback); Coordination::Error multiImpl(const Coordination::Requests & requests, Coordination::Responses & responses); Coordination::Error existsImpl(const std::string & path, Coordination::Stat * stat_, Coordination::WatchCallback watch_callback); + Coordination::Error syncImpl(const std::string & path, std::string & returned_path); std::unique_ptr impl; diff --git a/src/Common/ZooKeeper/ZooKeeperCommon.h b/src/Common/ZooKeeper/ZooKeeperCommon.h index 532488c08f8..77fe99282a7 100644 --- a/src/Common/ZooKeeper/ZooKeeperCommon.h +++ b/src/Common/ZooKeeper/ZooKeeperCommon.h @@ -106,14 +106,11 @@ struct ZooKeeperSyncRequest final : ZooKeeperRequest size_t bytesSize() const override { return ZooKeeperRequest::bytesSize() + path.size(); } }; -struct ZooKeeperSyncResponse final : ZooKeeperResponse +struct ZooKeeperSyncResponse final : SyncResponse, ZooKeeperResponse { - String path; void readImpl(ReadBuffer & in) override; void writeImpl(WriteBuffer & out) const override; OpNum getOpNum() const override { return OpNum::Sync; } - - size_t bytesSize() const override { return path.size(); } }; struct ZooKeeperHeartbeatResponse final : ZooKeeperResponse diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/src/Common/ZooKeeper/ZooKeeperImpl.cpp index 791e6a3b5f5..2327c1af79d 100644 --- a/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -1,3 +1,4 @@ +#include "Common/ZooKeeper/ZooKeeperCommon.h" #include #include #include @@ -31,6 +32,7 @@ namespace ProfileEvents extern const Event ZooKeeperSet; extern const Event ZooKeeperList; extern const Event ZooKeeperCheck; + extern const Event ZooKeeperSync; extern const Event ZooKeeperClose; extern const Event ZooKeeperWaitMicroseconds; extern const Event ZooKeeperBytesSent; @@ -1199,6 +1201,21 @@ void ZooKeeper::check( ProfileEvents::increment(ProfileEvents::ZooKeeperCheck); } +void ZooKeeper::sync( + const String & path, + SyncCallback callback) +{ + ZooKeeperSyncRequest request; + request.path = path; + + RequestInfo request_info; + request_info.request = std::make_shared(std::move(request)); + request_info.callback = [callback](const Response & response) { callback(dynamic_cast(response)); }; + + pushRequest(std::move(request_info)); + ProfileEvents::increment(ProfileEvents::ZooKeeperSync); +} + void ZooKeeper::multi( const Requests & requests, diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.h b/src/Common/ZooKeeper/ZooKeeperImpl.h index 58c5947e8ea..bc9f974a721 100644 --- a/src/Common/ZooKeeper/ZooKeeperImpl.h +++ b/src/Common/ZooKeeper/ZooKeeperImpl.h @@ -171,6 +171,10 @@ public: int32_t version, CheckCallback callback) override; + void sync( + const String & path, + SyncCallback callback) override; + void multi( const Requests & requests, MultiCallback callback) override;