mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-03 13:02:00 +00:00
Extract into function
This commit is contained in:
parent
761d53b438
commit
39307f9dba
@ -84,70 +84,6 @@ void KeeperDispatcher::requestThread()
|
||||
KeeperStorage::RequestsForSessions quorum_requests;
|
||||
KeeperStorage::RequestsForSessions read_requests;
|
||||
|
||||
auto process_read_requests = [&, this](const auto & coordination_settings) mutable
|
||||
{
|
||||
if (coordination_settings->read_mode.toString() == "fastlinear")
|
||||
{
|
||||
// we just want to know what's the current latest committed log on Leader node
|
||||
auto leader_info_result = server->getLeaderInfo();
|
||||
if (leader_info_result)
|
||||
{
|
||||
leader_info_result->when_ready([&, requests_for_sessions = std::move(read_requests)](nuraft::cmd_result<nuraft::ptr<nuraft::buffer>> & result, nuraft::ptr<std::exception> & exception) mutable
|
||||
{
|
||||
if (!result.get_accepted() || result.get_result_code() == nuraft::cmd_result_code::TIMEOUT)
|
||||
{
|
||||
addErrorResponses(requests_for_sessions, Coordination::Error::ZOPERATIONTIMEOUT);
|
||||
return;
|
||||
}
|
||||
else if (result.get_result_code() != nuraft::cmd_result_code::OK)
|
||||
{
|
||||
addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS);
|
||||
return;
|
||||
}
|
||||
else if (exception)
|
||||
{
|
||||
LOG_INFO(log, "Got exception while waiting for read results {}", exception->what());
|
||||
addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS);
|
||||
return;
|
||||
}
|
||||
|
||||
auto & leader_info_ctx = result.get();
|
||||
|
||||
if (!leader_info_ctx)
|
||||
{
|
||||
addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS);
|
||||
return;
|
||||
}
|
||||
|
||||
KeeperServer::NodeInfo leader_info;
|
||||
leader_info.term = leader_info_ctx->get_ulong();
|
||||
leader_info.last_committed_index = leader_info_ctx->get_ulong();
|
||||
std::lock_guard lock(leader_waiter_mutex);
|
||||
auto node_info = server->getNodeInfo();
|
||||
|
||||
/// we're behind, we need to wait
|
||||
if (node_info.term < leader_info.term || node_info.last_committed_index < leader_info.last_committed_index)
|
||||
{
|
||||
auto & leader_waiter = leader_waiters[leader_info];
|
||||
leader_waiter.insert(leader_waiter.end(), requests_for_sessions.begin(), requests_for_sessions.end());
|
||||
LOG_TRACE(log, "waiting for term {}, idx {}", leader_info.term, leader_info.last_committed_index);
|
||||
}
|
||||
/// process it in background thread
|
||||
else if (!read_requests_queue.push(std::move(requests_for_sessions)))
|
||||
throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue");
|
||||
});
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
assert(coordination_settings->read_mode.toString() == "nonlinear");
|
||||
if (!read_requests_queue.push(std::move(read_requests)))
|
||||
throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue");
|
||||
}
|
||||
|
||||
read_requests.clear();
|
||||
};
|
||||
|
||||
auto process_quorum_requests = [&, this]() mutable
|
||||
{
|
||||
/// Forcefully process all previous pending requests
|
||||
@ -205,7 +141,7 @@ void KeeperDispatcher::requestThread()
|
||||
|
||||
if (read_requests.size() > max_batch_size)
|
||||
{
|
||||
process_read_requests(coordination_settings);
|
||||
processReadRequests(coordination_settings, read_requests);
|
||||
|
||||
if (previous_quorum_done())
|
||||
break;
|
||||
@ -225,7 +161,7 @@ void KeeperDispatcher::requestThread()
|
||||
/// batch of read requests can send at most one request
|
||||
/// so we don't care if the previous batch hasn't received response
|
||||
if (!read_requests.empty())
|
||||
process_read_requests(coordination_settings);
|
||||
processReadRequests(coordination_settings, read_requests);
|
||||
|
||||
/// if we still didn't process previous batch we can
|
||||
/// increase are current batch even more
|
||||
@ -252,6 +188,72 @@ void KeeperDispatcher::requestThread()
|
||||
}
|
||||
}
|
||||
|
||||
void KeeperDispatcher::processReadRequests(const CoordinationSettingsPtr & coordination_settings, KeeperStorage::RequestsForSessions & read_requests)
|
||||
{
|
||||
if (coordination_settings->read_mode.toString() == "fastlinear")
|
||||
{
|
||||
// we just want to know what's the current latest committed log on Leader node
|
||||
auto leader_info_result = server->getLeaderInfo();
|
||||
if (leader_info_result)
|
||||
{
|
||||
leader_info_result->when_ready([&, requests_for_sessions = std::move(read_requests)](nuraft::cmd_result<nuraft::ptr<nuraft::buffer>> & result, nuraft::ptr<std::exception> & exception) mutable
|
||||
{
|
||||
if (!result.get_accepted() || result.get_result_code() == nuraft::cmd_result_code::TIMEOUT)
|
||||
{
|
||||
addErrorResponses(requests_for_sessions, Coordination::Error::ZOPERATIONTIMEOUT);
|
||||
return;
|
||||
}
|
||||
|
||||
if (result.get_result_code() != nuraft::cmd_result_code::OK)
|
||||
{
|
||||
addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS);
|
||||
return;
|
||||
}
|
||||
|
||||
if (exception)
|
||||
{
|
||||
LOG_INFO(log, "Got exception while waiting for read results {}", exception->what());
|
||||
addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS);
|
||||
return;
|
||||
}
|
||||
|
||||
auto & leader_info_ctx = result.get();
|
||||
|
||||
if (!leader_info_ctx)
|
||||
{
|
||||
addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS);
|
||||
return;
|
||||
}
|
||||
|
||||
KeeperServer::NodeInfo leader_info;
|
||||
leader_info.term = leader_info_ctx->get_ulong();
|
||||
leader_info.last_committed_index = leader_info_ctx->get_ulong();
|
||||
std::lock_guard lock(leader_waiter_mutex);
|
||||
auto node_info = server->getNodeInfo();
|
||||
|
||||
/// we're behind, we need to wait
|
||||
if (node_info.term < leader_info.term || node_info.last_committed_index < leader_info.last_committed_index)
|
||||
{
|
||||
auto & leader_waiter = leader_waiters[leader_info];
|
||||
leader_waiter.insert(leader_waiter.end(), requests_for_sessions.begin(), requests_for_sessions.end());
|
||||
LOG_TRACE(log, "waiting for term {}, idx {}", leader_info.term, leader_info.last_committed_index);
|
||||
}
|
||||
/// process it in background thread
|
||||
else if (!read_requests_queue.push(std::move(requests_for_sessions)))
|
||||
throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue");
|
||||
});
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
assert(coordination_settings->read_mode.toString() == "nonlinear");
|
||||
if (!read_requests_queue.push(std::move(read_requests)))
|
||||
throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue");
|
||||
}
|
||||
|
||||
read_requests.clear();
|
||||
}
|
||||
|
||||
void KeeperDispatcher::responseThread()
|
||||
{
|
||||
setThreadName("KeeperRspT");
|
||||
|
@ -125,6 +125,8 @@ private:
|
||||
|
||||
void finalizeRequestsThread();
|
||||
|
||||
void processReadRequests(const CoordinationSettingsPtr & coordination_settings, KeeperStorage::RequestsForSessions & read_requests);
|
||||
|
||||
void setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response);
|
||||
|
||||
/// Add error responses for requests to responses queue.
|
||||
|
Loading…
Reference in New Issue
Block a user