From 560999d587a1c25ec8f20151b077b0775e7d171a Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Thu, 9 Jun 2022 13:14:30 +0000 Subject: [PATCH 01/10] Block reads until sync is finished --- src/Coordination/KeeperDispatcher.cpp | 144 +++++++++++++++++++++++- src/Coordination/KeeperDispatcher.h | 35 +++++- src/Coordination/KeeperServer.cpp | 6 +- src/Coordination/KeeperServer.h | 3 +- src/Coordination/KeeperStateMachine.cpp | 13 ++- src/Coordination/KeeperStateMachine.h | 13 ++- src/Coordination/KeeperStorage.cpp | 11 ++ src/Coordination/KeeperStorage.h | 1 + 8 files changed, 212 insertions(+), 14 deletions(-) diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index f97c65e7056..b2540b5b819 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -1,4 +1,6 @@ #include +#include "Common/ZooKeeper/ZooKeeperCommon.h" +#include "Common/ZooKeeper/ZooKeeperConstants.h" #include #include #include @@ -6,7 +8,10 @@ #include #include #include +#include #include +#include "Core/UUID.h" +#include "base/find_symbols.h" namespace fs = std::filesystem; @@ -229,7 +234,34 @@ void KeeperDispatcher::setResponse(int64_t session_id, const Coordination::ZooKe } } -bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id) +namespace +{ + +const auto & getPathFromReadRequest(const Coordination::ZooKeeperRequestPtr & request) +{ + const auto op_num = request->getOpNum(); + switch (op_num) + { + case Coordination::OpNum::Check: + return dynamic_cast(*request).path; + case Coordination::OpNum::Get: + return dynamic_cast(*request).path; + case Coordination::OpNum::GetACL: + return dynamic_cast(*request).path; + case Coordination::OpNum::Exists: + return dynamic_cast(*request).path; + case Coordination::OpNum::List: + return dynamic_cast(*request).path; + case Coordination::OpNum::SimpleList: + return dynamic_cast(*request).path; + default: + throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to fetch path for request of type {}", op_num); + } +} + +} + +bool KeeperDispatcher::putRequest(Coordination::ZooKeeperRequestPtr & request, int64_t session_id) { { /// If session was already disconnected than we will ignore requests @@ -244,6 +276,32 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ request_info.time = duration_cast(system_clock::now().time_since_epoch()).count(); request_info.session_id = session_id; + std::unique_lock sync_lock{sync_waiters_mutex, std::defer_lock}; + std::optional> cached_sync_path; + if (request->getOpNum() == Coordination::OpNum::Sync) + { + // keep the lock until we insert the Sync into queue + sync_lock.lock(); + auto & sync_request = dynamic_cast(*request); + auto uuid = toString(UUIDHelpers::generateV4()); + cached_sync_path.emplace(uuid, sync_request.path); + sync_request.path = fmt::format("{}{}{}", uuid, KeeperStorage::sync_path_delimiter, sync_request.path); + } + else if (request->isReadRequest()) + { + sync_lock.lock(); + const auto & request_path = getPathFromReadRequest(request); + for (auto & [path, sync_waiter] : sync_waiters_for_path) + { + if (request_path.starts_with(path)) + { + sync_waiter.request_queues_for_uuid[sync_waiter.last_uuid].push_back(std::move(request_info)); + return true; + } + } + sync_lock.unlock(); + } + std::lock_guard lock(push_request_mutex); if (shutdown_called) @@ -259,6 +317,32 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ { throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED); } + else if (request->getOpNum() == Coordination::OpNum::Sync) + { + assert(cached_sync_path); + auto & [uuid, request_path] = *cached_sync_path; + bool found = false; + for (auto & [path, sync_waiter] : sync_waiters_for_path) + { + if (path == request_path) + { + sync_waiter.request_queues_for_uuid.emplace(std::pair{uuid, std::vector{}}); + sync_waiter.last_uuid = std::move(uuid); + found = true; + break; + } + } + + if (!found) + { + auto [sync_waiter_it, _] = sync_waiters_for_path.emplace(std::pair{std::move(request_path), SyncWaiter{}}); + sync_waiter_it->second.request_queues_for_uuid.emplace(std::pair{uuid, std::vector{}}); + sync_waiter_it->second.last_uuid = std::move(uuid); + } + + sync_lock.unlock(); + } + return true; } @@ -273,7 +357,12 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf responses_thread = ThreadFromGlobalPool([this] { responseThread(); }); snapshot_thread = ThreadFromGlobalPool([this] { snapshotThread(); }); - server = std::make_unique(configuration_and_settings, config, responses_queue, snapshots_queue); + server = std::make_unique( + configuration_and_settings, + config, + responses_queue, + snapshots_queue, + [this](KeeperStorage::RequestForSession & request_for_session) { onRequestCommit(request_for_session); }); try { @@ -473,6 +562,57 @@ void KeeperDispatcher::forceWaitAndProcessResult(RaftAppendResult & result, Keep requests_for_sessions.clear(); } +void KeeperDispatcher::onRequestCommit(KeeperStorage::RequestForSession & request_for_session) +{ + if (request_for_session.request->getOpNum() != Coordination::OpNum::Sync) + return; + + const auto & sync_request = dynamic_cast(*request_for_session.request); + const auto & path = sync_request.path; + + const auto * delimiter = find_first_symbols_or_null(path.data(), path.data() + path.size()); + + if (!delimiter) + return; + + assert(delimiter != path.data() && delimiter != path.data() + path.size()); + + const std::string_view zk_path{delimiter + 1, path.data() + path.size()}; + const std::string_view uuid{path.data(), delimiter}; + + std::unique_lock lock{sync_waiters_mutex}; + + auto sync_waiter_it = sync_waiters_for_path.find(zk_path); + if (sync_waiter_it == sync_waiters_for_path.end()) + return; + + auto & [_, sync_waiter] = *sync_waiter_it; + auto queue_it = sync_waiter.request_queues_for_uuid.find(uuid); + if (queue_it == sync_waiter.request_queues_for_uuid.end()) + return; + + auto requests = std::move(queue_it->second); + + sync_waiter.request_queues_for_uuid.erase(queue_it); + if (sync_waiter.request_queues_for_uuid.empty()) + sync_waiters_for_path.erase(sync_waiter_it); + + lock.unlock(); + + std::lock_guard push_lock(push_request_mutex); + for (auto & request_info : requests) + { + if (shutdown_called) + return; + + assert(request_info.request->isReadRequest()); + if (!requests_queue->tryPush(std::move(request_info), configuration_and_settings->coordination_settings->operation_timeout_ms.totalMilliseconds())) + { + throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED); + } + } +} + int64_t KeeperDispatcher::getSessionID(int64_t session_timeout_ms) { /// New session id allocation is a special request, because we cannot process it in normal diff --git a/src/Coordination/KeeperDispatcher.h b/src/Coordination/KeeperDispatcher.h index b632327a165..5c64bcaf623 100644 --- a/src/Coordination/KeeperDispatcher.h +++ b/src/Coordination/KeeperDispatcher.h @@ -18,6 +18,7 @@ namespace DB { + using ZooKeeperResponseCallback = std::function; /// Highlevel wrapper for ClickHouse Keeper. @@ -74,6 +75,35 @@ private: Poco::Logger * log; + struct StringHash + { + using is_transparent = void; + + size_t operator()(std::string_view s) const + { + return std::hash()(s); + } + }; + + struct StringCmp + { + using is_transparent = void; + + bool operator()(std::string_view a, std::string_view b) const + { + return a == b; + } + }; + + struct SyncWaiter + { + std::unordered_map, StringHash, StringCmp> request_queues_for_uuid; + std::string last_uuid; + }; + + std::unordered_map sync_waiters_for_path; + std::mutex sync_waiters_mutex; + /// Counter for new session_id requests. std::atomic internal_session_id_counter{0}; @@ -98,7 +128,10 @@ private: /// Clears both arguments void forceWaitAndProcessResult(RaftAppendResult & result, KeeperStorage::RequestsForSessions & requests_for_sessions); + void onRequestCommit(KeeperStorage::RequestForSession & request_for_session); + public: + /// Just allocate some objects, real initialization is done by `intialize method` KeeperDispatcher(); @@ -130,7 +163,7 @@ public: void forceRecovery(); /// Put request to ClickHouse Keeper - bool putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id); + bool putRequest(Coordination::ZooKeeperRequestPtr & request, int64_t session_id); /// Get new session ID int64_t getSessionID(int64_t session_timeout_ms); diff --git a/src/Coordination/KeeperServer.cpp b/src/Coordination/KeeperServer.cpp index 5f77e996744..bd65a9c8c2e 100644 --- a/src/Coordination/KeeperServer.cpp +++ b/src/Coordination/KeeperServer.cpp @@ -100,7 +100,8 @@ KeeperServer::KeeperServer( const KeeperConfigurationAndSettingsPtr & configuration_and_settings_, const Poco::Util::AbstractConfiguration & config, ResponsesQueue & responses_queue_, - SnapshotsQueue & snapshots_queue_) + SnapshotsQueue & snapshots_queue_, + KeeperStateMachine::CommitCallback commit_callback) : server_id(configuration_and_settings_->server_id) , coordination_settings(configuration_and_settings_->coordination_settings) , state_machine(nuraft::cs_new( @@ -108,7 +109,8 @@ KeeperServer::KeeperServer( snapshots_queue_, configuration_and_settings_->snapshot_storage_path, coordination_settings, - checkAndGetSuperdigest(configuration_and_settings_->super_digest))) + checkAndGetSuperdigest(configuration_and_settings_->super_digest), + std::move(commit_callback))) , state_manager(nuraft::cs_new( server_id, "keeper_server", configuration_and_settings_->log_storage_path, config, coordination_settings)) , log(&Poco::Logger::get("KeeperServer")) diff --git a/src/Coordination/KeeperServer.h b/src/Coordination/KeeperServer.h index 8c21cf47d94..72ea223feca 100644 --- a/src/Coordination/KeeperServer.h +++ b/src/Coordination/KeeperServer.h @@ -65,7 +65,8 @@ public: const KeeperConfigurationAndSettingsPtr & settings_, const Poco::Util::AbstractConfiguration & config_, ResponsesQueue & responses_queue_, - SnapshotsQueue & snapshots_queue_); + SnapshotsQueue & snapshots_queue_, + KeeperStateMachine::CommitCallback commit_callback); /// Load state machine from the latest snapshot and load log storage. Start NuRaft with required settings. void startup(const Poco::Util::AbstractConfiguration & config, bool enable_ipv6 = true); diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index 8e595c7393a..71681174616 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -50,11 +50,12 @@ namespace } KeeperStateMachine::KeeperStateMachine( - ResponsesQueue & responses_queue_, - SnapshotsQueue & snapshots_queue_, - const std::string & snapshots_path_, - const CoordinationSettingsPtr & coordination_settings_, - const std::string & superdigest_) + ResponsesQueue & responses_queue_, + SnapshotsQueue & snapshots_queue_, + const std::string & snapshots_path_, + const CoordinationSettingsPtr & coordination_settings_, + const std::string & superdigest_, + CommitCallback commit_callback_) : coordination_settings(coordination_settings_) , snapshot_manager( snapshots_path_, coordination_settings->snapshots_to_keep, @@ -64,6 +65,7 @@ KeeperStateMachine::KeeperStateMachine( , snapshots_queue(snapshots_queue_) , last_committed_idx(0) , log(&Poco::Logger::get("KeeperStateMachine")) + , commit_callback(std::move(commit_callback_)) , superdigest(superdigest_) { } @@ -146,6 +148,7 @@ nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, n throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push response with session id {} into responses queue", response_for_session.session_id); } + commit_callback(request_for_session); last_committed_idx = log_idx; return nullptr; } diff --git a/src/Coordination/KeeperStateMachine.h b/src/Coordination/KeeperStateMachine.h index 73578e6a2ba..39055b172f9 100644 --- a/src/Coordination/KeeperStateMachine.h +++ b/src/Coordination/KeeperStateMachine.h @@ -19,10 +19,15 @@ using SnapshotsQueue = ConcurrentBoundedQueue; class KeeperStateMachine : public nuraft::state_machine { public: + using CommitCallback = std::function; + KeeperStateMachine( - ResponsesQueue & responses_queue_, SnapshotsQueue & snapshots_queue_, - const std::string & snapshots_path_, const CoordinationSettingsPtr & coordination_settings_, - const std::string & superdigest_ = ""); + ResponsesQueue & responses_queue_, + SnapshotsQueue & snapshots_queue_, + const std::string & snapshots_path_, + const CoordinationSettingsPtr & coordination_settings_, + const std::string & superdigest_ = "", + CommitCallback commit_callback_ = [](KeeperStorage::RequestForSession &) {}); /// Read state from the latest snapshot void init(); @@ -137,6 +142,8 @@ private: Poco::Logger * log; + CommitCallback commit_callback; + /// Cluster config for our quorum. /// It's a copy of config stored in StateManager, but here /// we also write it to disk during snapshot. Must be used with lock. diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index e47a0905194..cfd9f2ac5f9 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -234,9 +234,20 @@ struct KeeperStorageSyncRequestProcessor final : public KeeperStorageRequestProc using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; std::pair process(KeeperStorage & /* storage */, int64_t /* zxid */, int64_t /* session_id */, int64_t /* time */) const override { + std::string_view path = dynamic_cast(*zk_request).path; auto response = zk_request->makeResponse(); dynamic_cast(*response).path = dynamic_cast(*zk_request).path; + + const auto * delimiter = find_first_symbols_or_null(path.data(), path.data() + path.size()); + + if (delimiter) + { + assert(delimiter != path.data() && delimiter != path.data() + path.size()); + path = std::string_view{delimiter + 1, path.data() + path.size()}; + } + + dynamic_cast(*response).path = path; return {response, {}}; } }; diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index b7fb76a72e6..7e57a048838 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -29,6 +29,7 @@ struct KeeperStorageSnapshot; class KeeperStorage { public: + static constexpr char sync_path_delimiter = ':'; struct Node { From 367df12626bdc0485f5dd774bbb4a39eb39db321 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Mon, 13 Jun 2022 09:51:12 +0000 Subject: [PATCH 02/10] Add sync support to client --- src/Common/ProfileEvents.cpp | 1 + src/Common/ZooKeeper/IKeeper.cpp | 1 + src/Common/ZooKeeper/IKeeper.h | 24 +++++++++++ src/Common/ZooKeeper/TestKeeper.cpp | 30 +++++++++++++ src/Common/ZooKeeper/TestKeeper.h | 4 ++ src/Common/ZooKeeper/ZooKeeper.cpp | 59 ++++++++++++++++++++++++++ src/Common/ZooKeeper/ZooKeeper.h | 10 +++++ src/Common/ZooKeeper/ZooKeeperCommon.h | 5 +-- src/Common/ZooKeeper/ZooKeeperImpl.cpp | 17 ++++++++ src/Common/ZooKeeper/ZooKeeperImpl.h | 4 ++ 10 files changed, 151 insertions(+), 4 deletions(-) diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index b8e552f6023..6a15b3be699 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -92,6 +92,7 @@ M(ZooKeeperSet, "") \ M(ZooKeeperMulti, "") \ M(ZooKeeperCheck, "") \ + M(ZooKeeperSync, "") \ M(ZooKeeperClose, "") \ M(ZooKeeperWatchResponse, "") \ M(ZooKeeperUserExceptions, "") \ diff --git a/src/Common/ZooKeeper/IKeeper.cpp b/src/Common/ZooKeeper/IKeeper.cpp index 70fe33b3f6e..23d29ed3019 100644 --- a/src/Common/ZooKeeper/IKeeper.cpp +++ b/src/Common/ZooKeeper/IKeeper.cpp @@ -144,6 +144,7 @@ void ListRequest::addRootPath(const String & root_path) { Coordination::addRootP void CheckRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); } void SetACLRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); } void GetACLRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); } +void SyncRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); } void MultiRequest::addRootPath(const String & root_path) { diff --git a/src/Common/ZooKeeper/IKeeper.h b/src/Common/ZooKeeper/IKeeper.h index 1e79468b7e3..5cf776aa6cb 100644 --- a/src/Common/ZooKeeper/IKeeper.h +++ b/src/Common/ZooKeeper/IKeeper.h @@ -320,6 +320,25 @@ struct CheckResponse : virtual Response { }; +struct SyncRequest : virtual Request +{ + String path; + + void addRootPath(const String & root_path) override; + String getPath() const override { return path; } + + size_t bytesSize() const override { return path.size(); } +}; + +struct SyncResponse : virtual Response +{ + String path; + + size_t bytesSize() const override { return path.size(); } +}; + + + struct MultiRequest : virtual Request { Requests requests; @@ -364,6 +383,7 @@ using GetCallback = std::function; using SetCallback = std::function; using ListCallback = std::function; using CheckCallback = std::function; +using SyncCallback = std::function; using MultiCallback = std::function; @@ -482,6 +502,10 @@ public: int32_t version, CheckCallback callback) = 0; + virtual void sync( + const String & path, + SyncCallback callback) = 0; + virtual void multi( const Requests & requests, MultiCallback callback) = 0; diff --git a/src/Common/ZooKeeper/TestKeeper.cpp b/src/Common/ZooKeeper/TestKeeper.cpp index 62d5fc811df..3d2d5fcb667 100644 --- a/src/Common/ZooKeeper/TestKeeper.cpp +++ b/src/Common/ZooKeeper/TestKeeper.cpp @@ -133,6 +133,14 @@ struct TestKeeperCheckRequest final : CheckRequest, TestKeeperRequest std::pair process(TestKeeper::Container & container, int64_t zxid) const override; }; +struct TestKeeperSyncRequest final : SyncRequest, TestKeeperRequest +{ + TestKeeperSyncRequest() = default; + explicit TestKeeperSyncRequest(const SyncRequest & base) : SyncRequest(base) {} + ResponsePtr createResponse() const override; + std::pair process(TestKeeper::Container & container, int64_t zxid) const override; +}; + struct TestKeeperMultiRequest final : MultiRequest, TestKeeperRequest { explicit TestKeeperMultiRequest(const Requests & generic_requests) @@ -413,6 +421,14 @@ std::pair TestKeeperCheckRequest::process(TestKeeper::Contain return { std::make_shared(response), {} }; } +std::pair TestKeeperSyncRequest::process(TestKeeper::Container & /*container*/, int64_t) const +{ + SyncResponse response; + response.path = path; + + return { std::make_shared(std::move(response)), {} }; +} + std::pair TestKeeperMultiRequest::process(TestKeeper::Container & container, int64_t zxid) const { MultiResponse response; @@ -471,6 +487,7 @@ ResponsePtr TestKeeperGetRequest::createResponse() const { return std::make_shar ResponsePtr TestKeeperSetRequest::createResponse() const { return std::make_shared(); } ResponsePtr TestKeeperListRequest::createResponse() const { return std::make_shared(); } ResponsePtr TestKeeperCheckRequest::createResponse() const { return std::make_shared(); } +ResponsePtr TestKeeperSyncRequest::createResponse() const { return std::make_shared(); } ResponsePtr TestKeeperMultiRequest::createResponse() const { return std::make_shared(); } @@ -779,6 +796,19 @@ void TestKeeper::check( pushRequest(std::move(request_info)); } +void TestKeeper::sync( + const String & path, + SyncCallback callback) +{ + TestKeeperSyncRequest request; + request.path = path; + + RequestInfo request_info; + request_info.request = std::make_shared(std::move(request)); + request_info.callback = [callback](const Response & response) { callback(dynamic_cast(response)); }; + pushRequest(std::move(request_info)); +} + void TestKeeper::multi( const Requests & requests, MultiCallback callback) diff --git a/src/Common/ZooKeeper/TestKeeper.h b/src/Common/ZooKeeper/TestKeeper.h index e57471341e8..40cac3094f1 100644 --- a/src/Common/ZooKeeper/TestKeeper.h +++ b/src/Common/ZooKeeper/TestKeeper.h @@ -79,6 +79,10 @@ public: int32_t version, CheckCallback callback) override; + void sync( + const String & path, + SyncCallback callback) override; + void multi( const Requests & requests, MultiCallback callback) override; diff --git a/src/Common/ZooKeeper/ZooKeeper.cpp b/src/Common/ZooKeeper/ZooKeeper.cpp index 9fc68d0bbff..193be0dbe5a 100644 --- a/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/src/Common/ZooKeeper/ZooKeeper.cpp @@ -667,6 +667,34 @@ Coordination::Error ZooKeeper::tryMulti(const Coordination::Requests & requests, return code; } +Coordination::Error ZooKeeper::syncImpl(const std::string & path, std::string & returned_path) +{ + auto future_result = asyncTrySyncNoThrow(path); + + if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready) + { + impl->finalize(fmt::format("Operation timeout on {} {}", toString(Coordination::OpNum::Sync), path)); + return Coordination::Error::ZOPERATIONTIMEOUT; + } + else + { + auto response = future_result.get(); + Coordination::Error code = response.error; + returned_path = std::move(response.path); + return code; + } +} +std::string ZooKeeper::sync(const std::string & path) +{ + std::string returned_path; + check(syncImpl(path, returned_path), path); + return returned_path; +} + +Coordination::Error ZooKeeper::trySync(const std::string & path, std::string & returned_path) +{ + return syncImpl(path, returned_path); +} void ZooKeeper::removeChildren(const std::string & path) { @@ -1144,6 +1172,37 @@ Coordination::Error ZooKeeper::tryMultiNoThrow(const Coordination::Requests & re } } +std::future ZooKeeper::asyncTrySyncNoThrow(const std::string & path) +{ + auto promise = std::make_shared>(); + auto future = promise->get_future(); + + auto callback = [promise](const Coordination::SyncResponse & response) mutable + { + promise->set_value(response); + }; + + impl->sync(path, std::move(callback)); + return future; +} + +std::future ZooKeeper::asyncSync(const std::string & path) +{ + auto promise = std::make_shared>(); + auto future = promise->get_future(); + + auto callback = [promise](const Coordination::SyncResponse & response) mutable + { + if (response.error != Coordination::Error::ZOK) + promise->set_exception(std::make_exception_ptr(KeeperException(response.error))); + else + promise->set_value(response); + }; + + impl->sync(path, std::move(callback)); + return future; +} + void ZooKeeper::finalize(const String & reason) { impl->finalize(reason); diff --git a/src/Common/ZooKeeper/ZooKeeper.h b/src/Common/ZooKeeper/ZooKeeper.h index a36d379669e..6aebccd2b4e 100644 --- a/src/Common/ZooKeeper/ZooKeeper.h +++ b/src/Common/ZooKeeper/ZooKeeper.h @@ -209,6 +209,10 @@ public: /// Throws nothing (even session expired errors) Coordination::Error tryMultiNoThrow(const Coordination::Requests & requests, Coordination::Responses & responses); + std::string sync(const std::string & path); + + Coordination::Error trySync(const std::string & path, std::string & returned_path); + Int64 getClientID(); /// Remove the node with the subtree. If someone concurrently adds or removes a node @@ -294,6 +298,11 @@ public: /// Like the previous one but don't throw any exceptions on future.get() FutureMulti asyncTryMultiNoThrow(const Coordination::Requests & ops); + using FutureSync = std::future; + FutureSync asyncSync(const std::string & path); + /// Like the previous one but don't throw any exceptions on future.get() + FutureSync asyncTrySyncNoThrow(const std::string & path); + /// Very specific methods introduced without following general style. Implements /// some custom throw/no throw logic on future.get(). /// @@ -329,6 +338,7 @@ private: const std::string & path, Strings & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback); Coordination::Error multiImpl(const Coordination::Requests & requests, Coordination::Responses & responses); Coordination::Error existsImpl(const std::string & path, Coordination::Stat * stat_, Coordination::WatchCallback watch_callback); + Coordination::Error syncImpl(const std::string & path, std::string & returned_path); std::unique_ptr impl; diff --git a/src/Common/ZooKeeper/ZooKeeperCommon.h b/src/Common/ZooKeeper/ZooKeeperCommon.h index 532488c08f8..77fe99282a7 100644 --- a/src/Common/ZooKeeper/ZooKeeperCommon.h +++ b/src/Common/ZooKeeper/ZooKeeperCommon.h @@ -106,14 +106,11 @@ struct ZooKeeperSyncRequest final : ZooKeeperRequest size_t bytesSize() const override { return ZooKeeperRequest::bytesSize() + path.size(); } }; -struct ZooKeeperSyncResponse final : ZooKeeperResponse +struct ZooKeeperSyncResponse final : SyncResponse, ZooKeeperResponse { - String path; void readImpl(ReadBuffer & in) override; void writeImpl(WriteBuffer & out) const override; OpNum getOpNum() const override { return OpNum::Sync; } - - size_t bytesSize() const override { return path.size(); } }; struct ZooKeeperHeartbeatResponse final : ZooKeeperResponse diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/src/Common/ZooKeeper/ZooKeeperImpl.cpp index 791e6a3b5f5..2327c1af79d 100644 --- a/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -1,3 +1,4 @@ +#include "Common/ZooKeeper/ZooKeeperCommon.h" #include #include #include @@ -31,6 +32,7 @@ namespace ProfileEvents extern const Event ZooKeeperSet; extern const Event ZooKeeperList; extern const Event ZooKeeperCheck; + extern const Event ZooKeeperSync; extern const Event ZooKeeperClose; extern const Event ZooKeeperWaitMicroseconds; extern const Event ZooKeeperBytesSent; @@ -1199,6 +1201,21 @@ void ZooKeeper::check( ProfileEvents::increment(ProfileEvents::ZooKeeperCheck); } +void ZooKeeper::sync( + const String & path, + SyncCallback callback) +{ + ZooKeeperSyncRequest request; + request.path = path; + + RequestInfo request_info; + request_info.request = std::make_shared(std::move(request)); + request_info.callback = [callback](const Response & response) { callback(dynamic_cast(response)); }; + + pushRequest(std::move(request_info)); + ProfileEvents::increment(ProfileEvents::ZooKeeperSync); +} + void ZooKeeper::multi( const Requests & requests, diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.h b/src/Common/ZooKeeper/ZooKeeperImpl.h index 58c5947e8ea..bc9f974a721 100644 --- a/src/Common/ZooKeeper/ZooKeeperImpl.h +++ b/src/Common/ZooKeeper/ZooKeeperImpl.h @@ -171,6 +171,10 @@ public: int32_t version, CheckCallback callback) override; + void sync( + const String & path, + SyncCallback callback) override; + void multi( const Requests & requests, MultiCallback callback) override; From 200f8df8d8840d5b95b4ba4b2f774e416bbea1f3 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Tue, 14 Jun 2022 08:14:44 +0000 Subject: [PATCH 03/10] Fix putReqeust --- src/Coordination/KeeperDispatcher.cpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index b2540b5b819..c45fbff328a 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -312,12 +312,14 @@ bool KeeperDispatcher::putRequest(Coordination::ZooKeeperRequestPtr & request, i { if (!requests_queue->push(std::move(request_info))) throw Exception("Cannot push request to queue", ErrorCodes::SYSTEM_ERROR); + + return true; } - else if (!requests_queue->tryPush(std::move(request_info), configuration_and_settings->coordination_settings->operation_timeout_ms.totalMilliseconds())) - { + + if (!requests_queue->tryPush(std::move(request_info), configuration_and_settings->coordination_settings->operation_timeout_ms.totalMilliseconds())) throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED); - } - else if (request->getOpNum() == Coordination::OpNum::Sync) + + if (request->getOpNum() == Coordination::OpNum::Sync) { assert(cached_sync_path); auto & [uuid, request_path] = *cached_sync_path; From 5787509083c85c23e8665b842782c0de0203645e Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Tue, 14 Jun 2022 10:19:39 +0000 Subject: [PATCH 04/10] Remove trailing whitespace --- src/Coordination/KeeperDispatcher.cpp | 2 +- src/Coordination/KeeperDispatcher.h | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index c45fbff328a..38f4e89a270 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -568,7 +568,7 @@ void KeeperDispatcher::onRequestCommit(KeeperStorage::RequestForSession & reques { if (request_for_session.request->getOpNum() != Coordination::OpNum::Sync) return; - + const auto & sync_request = dynamic_cast(*request_for_session.request); const auto & path = sync_request.path; diff --git a/src/Coordination/KeeperDispatcher.h b/src/Coordination/KeeperDispatcher.h index 5c64bcaf623..31ed97f0b7c 100644 --- a/src/Coordination/KeeperDispatcher.h +++ b/src/Coordination/KeeperDispatcher.h @@ -75,7 +75,7 @@ private: Poco::Logger * log; - struct StringHash + struct StringHash { using is_transparent = void; @@ -89,7 +89,7 @@ private: { using is_transparent = void; - bool operator()(std::string_view a, std::string_view b) const + bool operator()(std::string_view a, std::string_view b) const { return a == b; } From 38a88e7d8cf0f670385611e532dc7a09f88fc80e Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Tue, 14 Jun 2022 11:48:10 +0000 Subject: [PATCH 05/10] Use session for sync --- src/Coordination/KeeperDispatcher.cpp | 28 ++++++--------------- src/Coordination/KeeperDispatcher.h | 36 ++++++++++++++++++++++++++- 2 files changed, 43 insertions(+), 21 deletions(-) diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index 38f4e89a270..2e9605722bc 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -291,9 +291,9 @@ bool KeeperDispatcher::putRequest(Coordination::ZooKeeperRequestPtr & request, i { sync_lock.lock(); const auto & request_path = getPathFromReadRequest(request); - for (auto & [path, sync_waiter] : sync_waiters_for_path) + for (auto & [path_with_session, sync_waiter] : sync_waiters_for_path) { - if (request_path.starts_with(path)) + if (path_with_session.session_id == session_id && request_path.starts_with(path_with_session.path)) { sync_waiter.request_queues_for_uuid[sync_waiter.last_uuid].push_back(std::move(request_info)); return true; @@ -323,24 +323,12 @@ bool KeeperDispatcher::putRequest(Coordination::ZooKeeperRequestPtr & request, i { assert(cached_sync_path); auto & [uuid, request_path] = *cached_sync_path; - bool found = false; - for (auto & [path, sync_waiter] : sync_waiters_for_path) - { - if (path == request_path) - { - sync_waiter.request_queues_for_uuid.emplace(std::pair{uuid, std::vector{}}); - sync_waiter.last_uuid = std::move(uuid); - found = true; - break; - } - } + auto sync_waiter_it = sync_waiters_for_path.find(std::pair{std::string_view{request_path}, session_id}); + if (sync_waiter_it == sync_waiters_for_path.end()) + std::tie(sync_waiter_it, std::ignore) = sync_waiters_for_path.emplace(std::pair{PathWithSessionId{std::move(request_path), session_id}, SyncWaiter{}}); - if (!found) - { - auto [sync_waiter_it, _] = sync_waiters_for_path.emplace(std::pair{std::move(request_path), SyncWaiter{}}); - sync_waiter_it->second.request_queues_for_uuid.emplace(std::pair{uuid, std::vector{}}); - sync_waiter_it->second.last_uuid = std::move(uuid); - } + sync_waiter_it->second.request_queues_for_uuid.emplace(std::pair{uuid, std::vector{}}); + sync_waiter_it->second.last_uuid = std::move(uuid); sync_lock.unlock(); } @@ -584,7 +572,7 @@ void KeeperDispatcher::onRequestCommit(KeeperStorage::RequestForSession & reques std::unique_lock lock{sync_waiters_mutex}; - auto sync_waiter_it = sync_waiters_for_path.find(zk_path); + auto sync_waiter_it = sync_waiters_for_path.find(std::pair{zk_path, request_for_session.session_id}); if (sync_waiter_it == sync_waiters_for_path.end()) return; diff --git a/src/Coordination/KeeperDispatcher.h b/src/Coordination/KeeperDispatcher.h index 31ed97f0b7c..aeea94c71ef 100644 --- a/src/Coordination/KeeperDispatcher.h +++ b/src/Coordination/KeeperDispatcher.h @@ -101,7 +101,41 @@ private: std::string last_uuid; }; - std::unordered_map sync_waiters_for_path; + struct PathWithSessionId + { + std::string path; + int64_t session_id; + + operator std::pair() const // NOLINT + { + return {path, session_id}; + } + }; + + struct PathWithSessionIdHash + { + using is_transparent = void; + + size_t operator()(const std::pair & path_with_session) const + { + SipHash hash; + hash.update(path_with_session.first); + hash.update(path_with_session.second); + return hash.get64(); + } + }; + + struct PathWithSessionIdCmp + { + using is_transparent = void; + + size_t operator()(const std::pair & a, const std::pair & b) const + { + return a.first == a.first && a.second == b.second; + } + }; + + std::unordered_map sync_waiters_for_path; std::mutex sync_waiters_mutex; /// Counter for new session_id requests. From c98c6566bfb1cc98bd536938fa3b60f601c2cd82 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Tue, 14 Jun 2022 12:49:42 +0000 Subject: [PATCH 06/10] Use std::pair --- src/Common/SipHash.h | 5 +++++ src/Coordination/KeeperDispatcher.cpp | 2 +- src/Coordination/KeeperDispatcher.h | 13 ++----------- 3 files changed, 8 insertions(+), 12 deletions(-) diff --git a/src/Common/SipHash.h b/src/Common/SipHash.h index 108f7b31f43..f604721fcc9 100644 --- a/src/Common/SipHash.h +++ b/src/Common/SipHash.h @@ -147,6 +147,11 @@ public: update(x.data(), x.length()); } + ALWAYS_INLINE void update(const std::string_view x) + { + update(x.data(), x.length()); + } + /// Get the result in some form. This can only be done once! void get128(char * out) diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index 2e9605722bc..fa98ce702f8 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -293,7 +293,7 @@ bool KeeperDispatcher::putRequest(Coordination::ZooKeeperRequestPtr & request, i const auto & request_path = getPathFromReadRequest(request); for (auto & [path_with_session, sync_waiter] : sync_waiters_for_path) { - if (path_with_session.session_id == session_id && request_path.starts_with(path_with_session.path)) + if (path_with_session.second == session_id && request_path.starts_with(path_with_session.first)) { sync_waiter.request_queues_for_uuid[sync_waiter.last_uuid].push_back(std::move(request_info)); return true; diff --git a/src/Coordination/KeeperDispatcher.h b/src/Coordination/KeeperDispatcher.h index aeea94c71ef..e2126cb88c2 100644 --- a/src/Coordination/KeeperDispatcher.h +++ b/src/Coordination/KeeperDispatcher.h @@ -101,16 +101,7 @@ private: std::string last_uuid; }; - struct PathWithSessionId - { - std::string path; - int64_t session_id; - - operator std::pair() const // NOLINT - { - return {path, session_id}; - } - }; + using PathWithSessionId = std::pair; struct PathWithSessionIdHash { @@ -131,7 +122,7 @@ private: size_t operator()(const std::pair & a, const std::pair & b) const { - return a.first == a.first && a.second == b.second; + return a.first == b.first && a.second == b.second; } }; From 5b9e81bdf9f99bb60de03a7842d818377f1d295e Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Thu, 16 Jun 2022 07:53:29 +0000 Subject: [PATCH 07/10] Revert sync waiter --- src/Coordination/KeeperDispatcher.cpp | 138 +----------------------- src/Coordination/KeeperDispatcher.h | 60 +---------- src/Coordination/KeeperServer.cpp | 6 +- src/Coordination/KeeperServer.h | 3 +- src/Coordination/KeeperStateMachine.cpp | 13 +-- src/Coordination/KeeperStateMachine.h | 13 +-- src/Coordination/KeeperStorage.cpp | 11 -- src/Coordination/KeeperStorage.h | 1 - 8 files changed, 16 insertions(+), 229 deletions(-) diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index fa98ce702f8..f97c65e7056 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -1,6 +1,4 @@ #include -#include "Common/ZooKeeper/ZooKeeperCommon.h" -#include "Common/ZooKeeper/ZooKeeperConstants.h" #include #include #include @@ -8,10 +6,7 @@ #include #include #include -#include #include -#include "Core/UUID.h" -#include "base/find_symbols.h" namespace fs = std::filesystem; @@ -234,34 +229,7 @@ void KeeperDispatcher::setResponse(int64_t session_id, const Coordination::ZooKe } } -namespace -{ - -const auto & getPathFromReadRequest(const Coordination::ZooKeeperRequestPtr & request) -{ - const auto op_num = request->getOpNum(); - switch (op_num) - { - case Coordination::OpNum::Check: - return dynamic_cast(*request).path; - case Coordination::OpNum::Get: - return dynamic_cast(*request).path; - case Coordination::OpNum::GetACL: - return dynamic_cast(*request).path; - case Coordination::OpNum::Exists: - return dynamic_cast(*request).path; - case Coordination::OpNum::List: - return dynamic_cast(*request).path; - case Coordination::OpNum::SimpleList: - return dynamic_cast(*request).path; - default: - throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to fetch path for request of type {}", op_num); - } -} - -} - -bool KeeperDispatcher::putRequest(Coordination::ZooKeeperRequestPtr & request, int64_t session_id) +bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id) { { /// If session was already disconnected than we will ignore requests @@ -276,32 +244,6 @@ bool KeeperDispatcher::putRequest(Coordination::ZooKeeperRequestPtr & request, i request_info.time = duration_cast(system_clock::now().time_since_epoch()).count(); request_info.session_id = session_id; - std::unique_lock sync_lock{sync_waiters_mutex, std::defer_lock}; - std::optional> cached_sync_path; - if (request->getOpNum() == Coordination::OpNum::Sync) - { - // keep the lock until we insert the Sync into queue - sync_lock.lock(); - auto & sync_request = dynamic_cast(*request); - auto uuid = toString(UUIDHelpers::generateV4()); - cached_sync_path.emplace(uuid, sync_request.path); - sync_request.path = fmt::format("{}{}{}", uuid, KeeperStorage::sync_path_delimiter, sync_request.path); - } - else if (request->isReadRequest()) - { - sync_lock.lock(); - const auto & request_path = getPathFromReadRequest(request); - for (auto & [path_with_session, sync_waiter] : sync_waiters_for_path) - { - if (path_with_session.second == session_id && request_path.starts_with(path_with_session.first)) - { - sync_waiter.request_queues_for_uuid[sync_waiter.last_uuid].push_back(std::move(request_info)); - return true; - } - } - sync_lock.unlock(); - } - std::lock_guard lock(push_request_mutex); if (shutdown_called) @@ -312,27 +254,11 @@ bool KeeperDispatcher::putRequest(Coordination::ZooKeeperRequestPtr & request, i { if (!requests_queue->push(std::move(request_info))) throw Exception("Cannot push request to queue", ErrorCodes::SYSTEM_ERROR); - - return true; } - - if (!requests_queue->tryPush(std::move(request_info), configuration_and_settings->coordination_settings->operation_timeout_ms.totalMilliseconds())) - throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED); - - if (request->getOpNum() == Coordination::OpNum::Sync) + else if (!requests_queue->tryPush(std::move(request_info), configuration_and_settings->coordination_settings->operation_timeout_ms.totalMilliseconds())) { - assert(cached_sync_path); - auto & [uuid, request_path] = *cached_sync_path; - auto sync_waiter_it = sync_waiters_for_path.find(std::pair{std::string_view{request_path}, session_id}); - if (sync_waiter_it == sync_waiters_for_path.end()) - std::tie(sync_waiter_it, std::ignore) = sync_waiters_for_path.emplace(std::pair{PathWithSessionId{std::move(request_path), session_id}, SyncWaiter{}}); - - sync_waiter_it->second.request_queues_for_uuid.emplace(std::pair{uuid, std::vector{}}); - sync_waiter_it->second.last_uuid = std::move(uuid); - - sync_lock.unlock(); + throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED); } - return true; } @@ -347,12 +273,7 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf responses_thread = ThreadFromGlobalPool([this] { responseThread(); }); snapshot_thread = ThreadFromGlobalPool([this] { snapshotThread(); }); - server = std::make_unique( - configuration_and_settings, - config, - responses_queue, - snapshots_queue, - [this](KeeperStorage::RequestForSession & request_for_session) { onRequestCommit(request_for_session); }); + server = std::make_unique(configuration_and_settings, config, responses_queue, snapshots_queue); try { @@ -552,57 +473,6 @@ void KeeperDispatcher::forceWaitAndProcessResult(RaftAppendResult & result, Keep requests_for_sessions.clear(); } -void KeeperDispatcher::onRequestCommit(KeeperStorage::RequestForSession & request_for_session) -{ - if (request_for_session.request->getOpNum() != Coordination::OpNum::Sync) - return; - - const auto & sync_request = dynamic_cast(*request_for_session.request); - const auto & path = sync_request.path; - - const auto * delimiter = find_first_symbols_or_null(path.data(), path.data() + path.size()); - - if (!delimiter) - return; - - assert(delimiter != path.data() && delimiter != path.data() + path.size()); - - const std::string_view zk_path{delimiter + 1, path.data() + path.size()}; - const std::string_view uuid{path.data(), delimiter}; - - std::unique_lock lock{sync_waiters_mutex}; - - auto sync_waiter_it = sync_waiters_for_path.find(std::pair{zk_path, request_for_session.session_id}); - if (sync_waiter_it == sync_waiters_for_path.end()) - return; - - auto & [_, sync_waiter] = *sync_waiter_it; - auto queue_it = sync_waiter.request_queues_for_uuid.find(uuid); - if (queue_it == sync_waiter.request_queues_for_uuid.end()) - return; - - auto requests = std::move(queue_it->second); - - sync_waiter.request_queues_for_uuid.erase(queue_it); - if (sync_waiter.request_queues_for_uuid.empty()) - sync_waiters_for_path.erase(sync_waiter_it); - - lock.unlock(); - - std::lock_guard push_lock(push_request_mutex); - for (auto & request_info : requests) - { - if (shutdown_called) - return; - - assert(request_info.request->isReadRequest()); - if (!requests_queue->tryPush(std::move(request_info), configuration_and_settings->coordination_settings->operation_timeout_ms.totalMilliseconds())) - { - throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED); - } - } -} - int64_t KeeperDispatcher::getSessionID(int64_t session_timeout_ms) { /// New session id allocation is a special request, because we cannot process it in normal diff --git a/src/Coordination/KeeperDispatcher.h b/src/Coordination/KeeperDispatcher.h index e2126cb88c2..b632327a165 100644 --- a/src/Coordination/KeeperDispatcher.h +++ b/src/Coordination/KeeperDispatcher.h @@ -18,7 +18,6 @@ namespace DB { - using ZooKeeperResponseCallback = std::function; /// Highlevel wrapper for ClickHouse Keeper. @@ -75,60 +74,6 @@ private: Poco::Logger * log; - struct StringHash - { - using is_transparent = void; - - size_t operator()(std::string_view s) const - { - return std::hash()(s); - } - }; - - struct StringCmp - { - using is_transparent = void; - - bool operator()(std::string_view a, std::string_view b) const - { - return a == b; - } - }; - - struct SyncWaiter - { - std::unordered_map, StringHash, StringCmp> request_queues_for_uuid; - std::string last_uuid; - }; - - using PathWithSessionId = std::pair; - - struct PathWithSessionIdHash - { - using is_transparent = void; - - size_t operator()(const std::pair & path_with_session) const - { - SipHash hash; - hash.update(path_with_session.first); - hash.update(path_with_session.second); - return hash.get64(); - } - }; - - struct PathWithSessionIdCmp - { - using is_transparent = void; - - size_t operator()(const std::pair & a, const std::pair & b) const - { - return a.first == b.first && a.second == b.second; - } - }; - - std::unordered_map sync_waiters_for_path; - std::mutex sync_waiters_mutex; - /// Counter for new session_id requests. std::atomic internal_session_id_counter{0}; @@ -153,10 +98,7 @@ private: /// Clears both arguments void forceWaitAndProcessResult(RaftAppendResult & result, KeeperStorage::RequestsForSessions & requests_for_sessions); - void onRequestCommit(KeeperStorage::RequestForSession & request_for_session); - public: - /// Just allocate some objects, real initialization is done by `intialize method` KeeperDispatcher(); @@ -188,7 +130,7 @@ public: void forceRecovery(); /// Put request to ClickHouse Keeper - bool putRequest(Coordination::ZooKeeperRequestPtr & request, int64_t session_id); + bool putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id); /// Get new session ID int64_t getSessionID(int64_t session_timeout_ms); diff --git a/src/Coordination/KeeperServer.cpp b/src/Coordination/KeeperServer.cpp index bd65a9c8c2e..5f77e996744 100644 --- a/src/Coordination/KeeperServer.cpp +++ b/src/Coordination/KeeperServer.cpp @@ -100,8 +100,7 @@ KeeperServer::KeeperServer( const KeeperConfigurationAndSettingsPtr & configuration_and_settings_, const Poco::Util::AbstractConfiguration & config, ResponsesQueue & responses_queue_, - SnapshotsQueue & snapshots_queue_, - KeeperStateMachine::CommitCallback commit_callback) + SnapshotsQueue & snapshots_queue_) : server_id(configuration_and_settings_->server_id) , coordination_settings(configuration_and_settings_->coordination_settings) , state_machine(nuraft::cs_new( @@ -109,8 +108,7 @@ KeeperServer::KeeperServer( snapshots_queue_, configuration_and_settings_->snapshot_storage_path, coordination_settings, - checkAndGetSuperdigest(configuration_and_settings_->super_digest), - std::move(commit_callback))) + checkAndGetSuperdigest(configuration_and_settings_->super_digest))) , state_manager(nuraft::cs_new( server_id, "keeper_server", configuration_and_settings_->log_storage_path, config, coordination_settings)) , log(&Poco::Logger::get("KeeperServer")) diff --git a/src/Coordination/KeeperServer.h b/src/Coordination/KeeperServer.h index 72ea223feca..8c21cf47d94 100644 --- a/src/Coordination/KeeperServer.h +++ b/src/Coordination/KeeperServer.h @@ -65,8 +65,7 @@ public: const KeeperConfigurationAndSettingsPtr & settings_, const Poco::Util::AbstractConfiguration & config_, ResponsesQueue & responses_queue_, - SnapshotsQueue & snapshots_queue_, - KeeperStateMachine::CommitCallback commit_callback); + SnapshotsQueue & snapshots_queue_); /// Load state machine from the latest snapshot and load log storage. Start NuRaft with required settings. void startup(const Poco::Util::AbstractConfiguration & config, bool enable_ipv6 = true); diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index 71681174616..8e595c7393a 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -50,12 +50,11 @@ namespace } KeeperStateMachine::KeeperStateMachine( - ResponsesQueue & responses_queue_, - SnapshotsQueue & snapshots_queue_, - const std::string & snapshots_path_, - const CoordinationSettingsPtr & coordination_settings_, - const std::string & superdigest_, - CommitCallback commit_callback_) + ResponsesQueue & responses_queue_, + SnapshotsQueue & snapshots_queue_, + const std::string & snapshots_path_, + const CoordinationSettingsPtr & coordination_settings_, + const std::string & superdigest_) : coordination_settings(coordination_settings_) , snapshot_manager( snapshots_path_, coordination_settings->snapshots_to_keep, @@ -65,7 +64,6 @@ KeeperStateMachine::KeeperStateMachine( , snapshots_queue(snapshots_queue_) , last_committed_idx(0) , log(&Poco::Logger::get("KeeperStateMachine")) - , commit_callback(std::move(commit_callback_)) , superdigest(superdigest_) { } @@ -148,7 +146,6 @@ nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, n throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push response with session id {} into responses queue", response_for_session.session_id); } - commit_callback(request_for_session); last_committed_idx = log_idx; return nullptr; } diff --git a/src/Coordination/KeeperStateMachine.h b/src/Coordination/KeeperStateMachine.h index 39055b172f9..73578e6a2ba 100644 --- a/src/Coordination/KeeperStateMachine.h +++ b/src/Coordination/KeeperStateMachine.h @@ -19,15 +19,10 @@ using SnapshotsQueue = ConcurrentBoundedQueue; class KeeperStateMachine : public nuraft::state_machine { public: - using CommitCallback = std::function; - KeeperStateMachine( - ResponsesQueue & responses_queue_, - SnapshotsQueue & snapshots_queue_, - const std::string & snapshots_path_, - const CoordinationSettingsPtr & coordination_settings_, - const std::string & superdigest_ = "", - CommitCallback commit_callback_ = [](KeeperStorage::RequestForSession &) {}); + ResponsesQueue & responses_queue_, SnapshotsQueue & snapshots_queue_, + const std::string & snapshots_path_, const CoordinationSettingsPtr & coordination_settings_, + const std::string & superdigest_ = ""); /// Read state from the latest snapshot void init(); @@ -142,8 +137,6 @@ private: Poco::Logger * log; - CommitCallback commit_callback; - /// Cluster config for our quorum. /// It's a copy of config stored in StateManager, but here /// we also write it to disk during snapshot. Must be used with lock. diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index cfd9f2ac5f9..e47a0905194 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -234,20 +234,9 @@ struct KeeperStorageSyncRequestProcessor final : public KeeperStorageRequestProc using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; std::pair process(KeeperStorage & /* storage */, int64_t /* zxid */, int64_t /* session_id */, int64_t /* time */) const override { - std::string_view path = dynamic_cast(*zk_request).path; auto response = zk_request->makeResponse(); dynamic_cast(*response).path = dynamic_cast(*zk_request).path; - - const auto * delimiter = find_first_symbols_or_null(path.data(), path.data() + path.size()); - - if (delimiter) - { - assert(delimiter != path.data() && delimiter != path.data() + path.size()); - path = std::string_view{delimiter + 1, path.data() + path.size()}; - } - - dynamic_cast(*response).path = path; return {response, {}}; } }; diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index 7e57a048838..b7fb76a72e6 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -29,7 +29,6 @@ struct KeeperStorageSnapshot; class KeeperStorage { public: - static constexpr char sync_path_delimiter = ':'; struct Node { From 6c225bb04df44950adf3edc804a7a7c670d0fd37 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Mon, 20 Jun 2022 14:55:59 +0200 Subject: [PATCH 08/10] Update src/Common/ZooKeeper/ZooKeeperImpl.cpp Co-authored-by: Alexander Tokmakov --- src/Common/ZooKeeper/ZooKeeperImpl.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/src/Common/ZooKeeper/ZooKeeperImpl.cpp index 2327c1af79d..bd284ed0c91 100644 --- a/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -1,4 +1,4 @@ -#include "Common/ZooKeeper/ZooKeeperCommon.h" +#include #include #include #include From df31b562ec6489c8acdc2d4190ed85356968b04b Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Mon, 20 Jun 2022 14:05:51 +0000 Subject: [PATCH 09/10] Use sync after reconnect to avoid stale reads --- src/Interpreters/TransactionLog.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/Interpreters/TransactionLog.cpp b/src/Interpreters/TransactionLog.cpp index 0ddc726ff7f..3b8ec205c5c 100644 --- a/src/Interpreters/TransactionLog.cpp +++ b/src/Interpreters/TransactionLog.cpp @@ -233,9 +233,8 @@ void TransactionLog::runUpdatingThread() } /// It's possible that we connected to different [Zoo]Keeper instance - /// so we may read a bit stale state. Run some writing request before loading log entries - /// to make that instance up-to-date. - zookeeper->set(zookeeper_path_log, ""); + /// so we may read a bit stale state. + zookeeper->sync(zookeeper_path_log); } loadNewEntries(); From 0c76fc1121622613bdd7161bba3194d091b4a856 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Mon, 20 Jun 2022 14:21:59 +0000 Subject: [PATCH 10/10] Fix style --- src/Common/ZooKeeper/IKeeper.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/Common/ZooKeeper/IKeeper.h b/src/Common/ZooKeeper/IKeeper.h index c879058a6fa..73c7da25a8b 100644 --- a/src/Common/ZooKeeper/IKeeper.h +++ b/src/Common/ZooKeeper/IKeeper.h @@ -337,8 +337,6 @@ struct SyncResponse : virtual Response size_t bytesSize() const override { return path.size(); } }; - - struct MultiRequest : virtual Request { Requests requests;