From 39307f9dba215e13c97b87768cfc58ca6914f4da Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Thu, 15 Sep 2022 16:14:53 +0000 Subject: [PATCH] Extract into function --- src/Coordination/KeeperDispatcher.cpp | 134 +++++++++++++------------- src/Coordination/KeeperDispatcher.h | 2 + 2 files changed, 70 insertions(+), 66 deletions(-) diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index cf7b82eda13..3445ef5ea23 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -84,70 +84,6 @@ void KeeperDispatcher::requestThread() KeeperStorage::RequestsForSessions quorum_requests; KeeperStorage::RequestsForSessions read_requests; - auto process_read_requests = [&, this](const auto & coordination_settings) mutable - { - if (coordination_settings->read_mode.toString() == "fastlinear") - { - // we just want to know what's the current latest committed log on Leader node - auto leader_info_result = server->getLeaderInfo(); - if (leader_info_result) - { - leader_info_result->when_ready([&, requests_for_sessions = std::move(read_requests)](nuraft::cmd_result> & result, nuraft::ptr & exception) mutable - { - if (!result.get_accepted() || result.get_result_code() == nuraft::cmd_result_code::TIMEOUT) - { - addErrorResponses(requests_for_sessions, Coordination::Error::ZOPERATIONTIMEOUT); - return; - } - else if (result.get_result_code() != nuraft::cmd_result_code::OK) - { - addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS); - return; - } - else if (exception) - { - LOG_INFO(log, "Got exception while waiting for read results {}", exception->what()); - addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS); - return; - } - - auto & leader_info_ctx = result.get(); - - if (!leader_info_ctx) - { - addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS); - return; - } - - KeeperServer::NodeInfo leader_info; - leader_info.term = leader_info_ctx->get_ulong(); - leader_info.last_committed_index = leader_info_ctx->get_ulong(); - std::lock_guard lock(leader_waiter_mutex); - auto node_info = server->getNodeInfo(); - - /// we're behind, we need to wait - if (node_info.term < leader_info.term || node_info.last_committed_index < leader_info.last_committed_index) - { - auto & leader_waiter = leader_waiters[leader_info]; - leader_waiter.insert(leader_waiter.end(), requests_for_sessions.begin(), requests_for_sessions.end()); - LOG_TRACE(log, "waiting for term {}, idx {}", leader_info.term, leader_info.last_committed_index); - } - /// process it in background thread - else if (!read_requests_queue.push(std::move(requests_for_sessions))) - throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue"); - }); - } - } - else - { - assert(coordination_settings->read_mode.toString() == "nonlinear"); - if (!read_requests_queue.push(std::move(read_requests))) - throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue"); - } - - read_requests.clear(); - }; - auto process_quorum_requests = [&, this]() mutable { /// Forcefully process all previous pending requests @@ -205,7 +141,7 @@ void KeeperDispatcher::requestThread() if (read_requests.size() > max_batch_size) { - process_read_requests(coordination_settings); + processReadRequests(coordination_settings, read_requests); if (previous_quorum_done()) break; @@ -225,7 +161,7 @@ void KeeperDispatcher::requestThread() /// batch of read requests can send at most one request /// so we don't care if the previous batch hasn't received response if (!read_requests.empty()) - process_read_requests(coordination_settings); + processReadRequests(coordination_settings, read_requests); /// if we still didn't process previous batch we can /// increase are current batch even more @@ -252,6 +188,72 @@ void KeeperDispatcher::requestThread() } } +void KeeperDispatcher::processReadRequests(const CoordinationSettingsPtr & coordination_settings, KeeperStorage::RequestsForSessions & read_requests) +{ + if (coordination_settings->read_mode.toString() == "fastlinear") + { + // we just want to know what's the current latest committed log on Leader node + auto leader_info_result = server->getLeaderInfo(); + if (leader_info_result) + { + leader_info_result->when_ready([&, requests_for_sessions = std::move(read_requests)](nuraft::cmd_result> & result, nuraft::ptr & exception) mutable + { + if (!result.get_accepted() || result.get_result_code() == nuraft::cmd_result_code::TIMEOUT) + { + addErrorResponses(requests_for_sessions, Coordination::Error::ZOPERATIONTIMEOUT); + return; + } + + if (result.get_result_code() != nuraft::cmd_result_code::OK) + { + addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS); + return; + } + + if (exception) + { + LOG_INFO(log, "Got exception while waiting for read results {}", exception->what()); + addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS); + return; + } + + auto & leader_info_ctx = result.get(); + + if (!leader_info_ctx) + { + addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS); + return; + } + + KeeperServer::NodeInfo leader_info; + leader_info.term = leader_info_ctx->get_ulong(); + leader_info.last_committed_index = leader_info_ctx->get_ulong(); + std::lock_guard lock(leader_waiter_mutex); + auto node_info = server->getNodeInfo(); + + /// we're behind, we need to wait + if (node_info.term < leader_info.term || node_info.last_committed_index < leader_info.last_committed_index) + { + auto & leader_waiter = leader_waiters[leader_info]; + leader_waiter.insert(leader_waiter.end(), requests_for_sessions.begin(), requests_for_sessions.end()); + LOG_TRACE(log, "waiting for term {}, idx {}", leader_info.term, leader_info.last_committed_index); + } + /// process it in background thread + else if (!read_requests_queue.push(std::move(requests_for_sessions))) + throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue"); + }); + } + } + else + { + assert(coordination_settings->read_mode.toString() == "nonlinear"); + if (!read_requests_queue.push(std::move(read_requests))) + throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue"); + } + + read_requests.clear(); +} + void KeeperDispatcher::responseThread() { setThreadName("KeeperRspT"); diff --git a/src/Coordination/KeeperDispatcher.h b/src/Coordination/KeeperDispatcher.h index 0ebe67a4f39..6421db87793 100644 --- a/src/Coordination/KeeperDispatcher.h +++ b/src/Coordination/KeeperDispatcher.h @@ -125,6 +125,8 @@ private: void finalizeRequestsThread(); + void processReadRequests(const CoordinationSettingsPtr & coordination_settings, KeeperStorage::RequestsForSessions & read_requests); + void setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response); /// Add error responses for requests to responses queue.