diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index 2aa11dd9eed..fc97bc5e0e1 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -90,7 +90,7 @@ void KeeperDispatcher::requestThread() KeeperStorage::RequestsForSessions current_batch; - bool has_read_request = false; + bool has_read_request{false}; /// If new request is not read request or we must to process it through quorum. /// Otherwise we will process it locally. @@ -98,6 +98,8 @@ void KeeperDispatcher::requestThread() { current_batch.emplace_back(request); + size_t read_requests = 0; + const auto try_get_request = [&] { /// Trying to get batch requests as fast as possible @@ -106,7 +108,12 @@ void KeeperDispatcher::requestThread() CurrentMetrics::sub(CurrentMetrics::KeeperOutstandingRequets); /// Don't append read request into batch, we have to process them separately if (!coordination_settings->quorum_reads && request.request->isReadRequest()) - has_read_request = true; + { + ++read_requests; + std::pair key{current_batch.back().session_id, current_batch.back().request->xid}; + std::lock_guard lock(read_mutex); + related_read_requests[key].push_back(request); + } else current_batch.emplace_back(request); @@ -118,7 +125,7 @@ void KeeperDispatcher::requestThread() /// If we have enough requests in queue, we will try to batch at least max_quick_batch_size of them. size_t max_quick_batch_size = coordination_settings->max_requests_quick_batch_size; - while (!shutdown_called && !has_read_request && current_batch.size() < max_quick_batch_size && try_get_request()) + while (!shutdown_called && current_batch.size() + read_requests < max_quick_batch_size && try_get_request()) ; const auto prev_result_done = [&] @@ -129,7 +136,7 @@ void KeeperDispatcher::requestThread() }; /// Waiting until previous append will be successful, or batch is big enough - while (!shutdown_called && !has_read_request && !prev_result_done() && current_batch.size() <= max_batch_size) + while (!shutdown_called && !prev_result_done() && current_batch.size() <= max_batch_size) { try_get_request(); } @@ -319,7 +326,22 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf snapshot_s3.startup(config, macros); - server = std::make_unique(configuration_and_settings, config, responses_queue, snapshots_queue, snapshot_s3); + server = std::make_unique(configuration_and_settings, config, responses_queue, snapshots_queue, snapshot_s3, [this](const KeeperStorage::RequestForSession & request_for_session) + { + std::lock_guard lock(read_mutex); + if (auto it = related_read_requests.find(std::pair{request_for_session.session_id, request_for_session.request->xid}); it != related_read_requests.end()) + { + for (const auto & read_request : it->second) + { + if (server->isLeaderAlive()) + server->putLocalReadRequest(read_request); + else + addErrorResponses({read_request}, Coordination::Error::ZCONNECTIONLOSS); + } + + related_read_requests.erase(it); + } + }); try { diff --git a/src/Coordination/KeeperDispatcher.h b/src/Coordination/KeeperDispatcher.h index 9371d2fbbac..e7570727b9a 100644 --- a/src/Coordination/KeeperDispatcher.h +++ b/src/Coordination/KeeperDispatcher.h @@ -1,5 +1,6 @@ #pragma once +#include "Common/ZooKeeper/ZooKeeperCommon.h" #include "config.h" #if USE_NURAFT @@ -103,6 +104,21 @@ private: void forceWaitAndProcessResult(RaftAppendResult & result, KeeperStorage::RequestsForSessions & requests_for_sessions); public: + std::mutex read_mutex; + + struct PairHash + { + auto operator()(std::pair pair) const + { + SipHash hash; + hash.update(pair.first); + hash.update(pair.second); + return hash.get64(); + } + }; + + std::unordered_map, KeeperStorage::RequestsForSessions, PairHash> related_read_requests; + /// Just allocate some objects, real initialization is done by `intialize method` KeeperDispatcher(); diff --git a/src/Coordination/KeeperServer.cpp b/src/Coordination/KeeperServer.cpp index 78a095f8c8d..56ed8f4eafe 100644 --- a/src/Coordination/KeeperServer.cpp +++ b/src/Coordination/KeeperServer.cpp @@ -107,7 +107,8 @@ KeeperServer::KeeperServer( const Poco::Util::AbstractConfiguration & config, ResponsesQueue & responses_queue_, SnapshotsQueue & snapshots_queue_, - KeeperSnapshotManagerS3 & snapshot_manager_s3) + KeeperSnapshotManagerS3 & snapshot_manager_s3, + KeeperStateMachine::CommitCallback commit_callback) : server_id(configuration_and_settings_->server_id) , coordination_settings(configuration_and_settings_->coordination_settings) , log(&Poco::Logger::get("KeeperServer")) @@ -128,6 +129,7 @@ KeeperServer::KeeperServer( coordination_settings, keeper_context, config.getBool("keeper_server.upload_snapshot_on_exit", true) ? &snapshot_manager_s3 : nullptr, + commit_callback, checkAndGetSuperdigest(configuration_and_settings_->super_digest)); state_manager = nuraft::cs_new( diff --git a/src/Coordination/KeeperServer.h b/src/Coordination/KeeperServer.h index bcff81f66f2..db4e9c1962e 100644 --- a/src/Coordination/KeeperServer.h +++ b/src/Coordination/KeeperServer.h @@ -72,7 +72,8 @@ public: const Poco::Util::AbstractConfiguration & config_, ResponsesQueue & responses_queue_, SnapshotsQueue & snapshots_queue_, - KeeperSnapshotManagerS3 & snapshot_manager_s3); + KeeperSnapshotManagerS3 & snapshot_manager_s3, + KeeperStateMachine::CommitCallback commit_callback); /// Load state machine from the latest snapshot and load log storage. Start NuRaft with required settings. void startup(const Poco::Util::AbstractConfiguration & config, bool enable_ipv6 = true); diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index d0bd18b63e2..0b69b00bf0e 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -46,8 +46,10 @@ KeeperStateMachine::KeeperStateMachine( const CoordinationSettingsPtr & coordination_settings_, const KeeperContextPtr & keeper_context_, KeeperSnapshotManagerS3 * snapshot_manager_s3_, + CommitCallback commit_callback_, const std::string & superdigest_) - : coordination_settings(coordination_settings_) + : commit_callback(commit_callback_) + , coordination_settings(coordination_settings_) , snapshot_manager( snapshots_path_, coordination_settings->snapshots_to_keep, @@ -274,6 +276,7 @@ nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, n ProfileEvents::increment(ProfileEvents::KeeperCommits); last_committed_idx = log_idx; + commit_callback(request_for_session); return nullptr; } diff --git a/src/Coordination/KeeperStateMachine.h b/src/Coordination/KeeperStateMachine.h index d8181532f09..6babf741dbd 100644 --- a/src/Coordination/KeeperStateMachine.h +++ b/src/Coordination/KeeperStateMachine.h @@ -22,6 +22,8 @@ using SnapshotsQueue = ConcurrentBoundedQueue; class KeeperStateMachine : public nuraft::state_machine { public: + using CommitCallback = std::function; + KeeperStateMachine( ResponsesQueue & responses_queue_, SnapshotsQueue & snapshots_queue_, @@ -29,6 +31,7 @@ public: const CoordinationSettingsPtr & coordination_settings_, const KeeperContextPtr & keeper_context_, KeeperSnapshotManagerS3 * snapshot_manager_s3_, + CommitCallback commit_callback_, const std::string & superdigest_ = ""); /// Read state from the latest snapshot @@ -105,6 +108,7 @@ public: void recalculateStorageStats(); private: + CommitCallback commit_callback; /// In our state machine we always have a single snapshot which is stored /// in memory in compressed (serialized) format. SnapshotMetadataPtr latest_snapshot_meta = nullptr;