2021-08-24 12:30:31 +00:00
|
|
|
#include <Coordination/KeeperDispatcher.h>
|
2022-07-01 13:19:28 +00:00
|
|
|
#include <libnuraft/async.hxx>
|
2021-01-19 14:22:28 +00:00
|
|
|
#include <Common/setThreadName.h>
|
2021-04-16 13:50:09 +00:00
|
|
|
#include <Common/ZooKeeper/KeeperException.h>
|
|
|
|
#include <future>
|
|
|
|
#include <chrono>
|
2021-11-05 10:21:34 +00:00
|
|
|
#include <Poco/Path.h>
|
|
|
|
#include <Common/hex.h>
|
2021-11-12 12:48:42 +00:00
|
|
|
#include <filesystem>
|
2022-09-07 11:51:56 +00:00
|
|
|
#include <iterator>
|
2022-06-28 08:48:19 +00:00
|
|
|
#include <limits>
|
2021-11-18 20:17:22 +00:00
|
|
|
#include <Common/checkStackSize.h>
|
2022-06-10 07:49:46 +00:00
|
|
|
#include <Common/CurrentMetrics.h>
|
|
|
|
|
|
|
|
namespace CurrentMetrics
|
|
|
|
{
|
|
|
|
extern const Metric KeeperAliveConnections;
|
|
|
|
extern const Metric KeeperOutstandingRequets;
|
|
|
|
}
|
2021-11-12 12:48:42 +00:00
|
|
|
|
|
|
|
namespace fs = std::filesystem;
|
2021-01-19 14:22:28 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int LOGICAL_ERROR;
|
|
|
|
extern const int TIMEOUT_EXCEEDED;
|
2021-10-08 13:32:02 +00:00
|
|
|
extern const int SYSTEM_ERROR;
|
2021-10-27 12:26:42 +00:00
|
|
|
}
|
|
|
|
|
2021-01-19 14:22:28 +00:00
|
|
|
|
2021-08-24 12:30:31 +00:00
|
|
|
KeeperDispatcher::KeeperDispatcher()
|
2021-10-27 12:26:42 +00:00
|
|
|
: responses_queue(std::numeric_limits<size_t>::max())
|
2022-06-28 08:48:19 +00:00
|
|
|
, read_requests_queue(std::numeric_limits<size_t>::max())
|
2022-07-01 13:19:28 +00:00
|
|
|
, finalize_requests_queue(std::numeric_limits<size_t>::max())
|
2021-11-18 20:17:22 +00:00
|
|
|
, configuration_and_settings(std::make_shared<KeeperConfigurationAndSettings>())
|
2021-03-29 08:24:56 +00:00
|
|
|
, log(&Poco::Logger::get("KeeperDispatcher"))
|
2021-02-01 11:27:26 +00:00
|
|
|
{
|
|
|
|
}
|
|
|
|
|
2021-04-16 13:50:09 +00:00
|
|
|
|
2022-08-29 10:50:18 +00:00
|
|
|
/// ZooKeepers has 2 requirements:
|
|
|
|
/// - writes need to be linearizable
|
|
|
|
/// - all requests from single session need to be processed in the order of their arrival
|
|
|
|
///
|
|
|
|
/// Because of that, we cannot process read and write requests from SAME session at the same time.
|
2022-08-29 11:56:51 +00:00
|
|
|
/// To be able to process read and write requests in parallel we need to make sure that only 1 type
|
2022-08-29 10:50:18 +00:00
|
|
|
/// of request is being processed from a single session.
|
|
|
|
/// Multiple types from different sessions can be processed at the same time.
|
|
|
|
///
|
|
|
|
/// We do some in-session housekeeping to make sure that the multithreaded request processing is correct.
|
|
|
|
/// When a request is received from a client, we check if there are requests being processed from that same
|
|
|
|
/// session, and if yes, of what type. If the types are the same, and there are no requests of different
|
|
|
|
/// type inbetetween, we can instanly add it to active request queue. Otherwise, we need to wait until
|
2022-09-05 07:51:02 +00:00
|
|
|
/// all requests of the other type are processed.
|
2022-08-29 10:50:18 +00:00
|
|
|
///
|
|
|
|
/// There are multiple threads used for processing the request, each of them communicating with a queue.
|
|
|
|
/// Assumption: only one type of request is being processed from a same session at any point in time (read or write).
|
|
|
|
///
|
|
|
|
/// requestThread -> requests currently being processed
|
|
|
|
/// readRequestThread -> thread for processing read requests
|
|
|
|
/// finalizeRequestThread -> thread for finalizing requests:
|
|
|
|
/// - in-session housekeeping, add requests to the active request queue if there are any
|
|
|
|
///
|
|
|
|
/// If reads are linearizable without quorum, a request can possibly wait for a certain log to be committed.
|
|
|
|
/// In that case we add it to the waiting queue for that log.
|
|
|
|
/// When that log is committed, the committing thread will send that read request to readRequestThread so it can be processed.
|
|
|
|
///
|
2021-08-24 12:30:31 +00:00
|
|
|
void KeeperDispatcher::requestThread()
|
2021-01-19 14:22:28 +00:00
|
|
|
{
|
2021-03-29 08:24:56 +00:00
|
|
|
setThreadName("KeeperReqT");
|
2021-04-16 13:50:09 +00:00
|
|
|
|
|
|
|
/// Result of requests batch from previous iteration
|
2022-07-13 11:35:06 +00:00
|
|
|
RaftResult prev_result = nullptr;
|
2022-07-01 13:19:28 +00:00
|
|
|
const auto previous_quorum_done = [&] { return !prev_result || prev_result->has_result() || prev_result->get_result_code() != nuraft::cmd_result_code::OK; };
|
2021-04-16 13:50:09 +00:00
|
|
|
|
2022-06-24 14:42:24 +00:00
|
|
|
const auto needs_quorum = [](const auto & coordination_settings, const auto & request)
|
2022-06-24 11:36:12 +00:00
|
|
|
{
|
2022-07-01 09:15:25 +00:00
|
|
|
return coordination_settings->quorum_reads || coordination_settings->read_mode.toString() == "quorum" || !request.request->isReadRequest();
|
2022-06-24 11:36:12 +00:00
|
|
|
};
|
|
|
|
|
2022-07-01 09:15:25 +00:00
|
|
|
KeeperStorage::RequestsForSessions quorum_requests;
|
2022-06-27 12:05:27 +00:00
|
|
|
KeeperStorage::RequestsForSessions read_requests;
|
|
|
|
|
2022-07-01 09:15:25 +00:00
|
|
|
auto process_quorum_requests = [&, this]() mutable
|
2022-06-28 09:54:16 +00:00
|
|
|
{
|
|
|
|
/// Forcefully process all previous pending requests
|
|
|
|
if (prev_result)
|
|
|
|
forceWaitAndProcessResult(prev_result);
|
|
|
|
|
2022-07-01 09:15:25 +00:00
|
|
|
prev_result = server->putRequestBatch(quorum_requests);
|
2022-07-13 11:35:06 +00:00
|
|
|
|
|
|
|
if (prev_result)
|
2022-06-28 09:54:16 +00:00
|
|
|
{
|
2022-07-13 11:35:06 +00:00
|
|
|
prev_result->when_ready([&, requests_for_sessions = std::move(quorum_requests)](nuraft::cmd_result<nuraft::ptr<nuraft::buffer>> & result, nuraft::ptr<std::exception> &) mutable
|
|
|
|
{
|
|
|
|
if (!result.get_accepted() || result.get_result_code() == nuraft::cmd_result_code::TIMEOUT)
|
|
|
|
addErrorResponses(requests_for_sessions, Coordination::Error::ZOPERATIONTIMEOUT);
|
|
|
|
else if (result.get_result_code() != nuraft::cmd_result_code::OK)
|
|
|
|
addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2022-07-01 09:15:25 +00:00
|
|
|
quorum_requests.clear();
|
2022-06-28 09:54:16 +00:00
|
|
|
};
|
|
|
|
|
2022-08-29 10:50:18 +00:00
|
|
|
/// ZooKeeper requires that the requests inside a single session are processed in a strict order
|
|
|
|
/// (we cannot process later requests before all the previous once are processed)
|
|
|
|
/// By making sure that at this point we can either have just read requests or just write requests
|
|
|
|
/// from a single session, we can process them independently
|
2021-01-26 14:08:31 +00:00
|
|
|
while (!shutdown_called)
|
2021-01-19 14:22:28 +00:00
|
|
|
{
|
2021-03-29 08:24:56 +00:00
|
|
|
KeeperStorage::RequestForSession request;
|
2021-01-19 14:22:28 +00:00
|
|
|
|
2021-11-18 20:17:22 +00:00
|
|
|
auto coordination_settings = configuration_and_settings->coordination_settings;
|
|
|
|
uint64_t max_wait = coordination_settings->operation_timeout_ms.totalMilliseconds();
|
|
|
|
uint64_t max_batch_size = coordination_settings->max_requests_batch_size;
|
2021-04-16 13:50:09 +00:00
|
|
|
|
|
|
|
try
|
2021-01-26 07:47:04 +00:00
|
|
|
{
|
2022-08-29 10:50:18 +00:00
|
|
|
if (active_requests_queue->tryPop(request, max_wait))
|
2021-01-19 14:22:28 +00:00
|
|
|
{
|
2022-06-10 07:49:46 +00:00
|
|
|
CurrentMetrics::sub(CurrentMetrics::KeeperOutstandingRequets);
|
2021-04-16 13:50:09 +00:00
|
|
|
if (shutdown_called)
|
|
|
|
break;
|
|
|
|
|
2022-06-27 12:05:27 +00:00
|
|
|
if (needs_quorum(coordination_settings, request))
|
2022-07-01 09:15:25 +00:00
|
|
|
quorum_requests.emplace_back(request);
|
2021-04-16 13:50:09 +00:00
|
|
|
else
|
2022-06-27 12:05:27 +00:00
|
|
|
read_requests.emplace_back(request);
|
|
|
|
|
|
|
|
/// Waiting until previous append will be successful, or batch is big enough
|
|
|
|
/// has_result == false && get_result_code == OK means that our request still not processed.
|
|
|
|
/// Sometimes NuRaft set errorcode without setting result, so we check both here.
|
|
|
|
while (true)
|
|
|
|
{
|
2022-07-01 09:15:25 +00:00
|
|
|
if (quorum_requests.size() > max_batch_size)
|
2022-06-27 12:05:27 +00:00
|
|
|
break;
|
|
|
|
|
|
|
|
if (read_requests.size() > max_batch_size)
|
|
|
|
{
|
2022-09-15 16:14:53 +00:00
|
|
|
processReadRequests(coordination_settings, read_requests);
|
2022-07-01 13:19:28 +00:00
|
|
|
|
|
|
|
if (previous_quorum_done())
|
|
|
|
break;
|
2022-06-27 12:05:27 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Trying to get batch requests as fast as possible
|
2022-08-29 10:50:18 +00:00
|
|
|
if (active_requests_queue->tryPop(request, 1))
|
2022-06-27 12:05:27 +00:00
|
|
|
{
|
2022-07-12 07:33:34 +00:00
|
|
|
CurrentMetrics::sub(CurrentMetrics::KeeperOutstandingRequets);
|
2022-06-27 12:05:27 +00:00
|
|
|
if (needs_quorum(coordination_settings, request))
|
2022-07-01 09:15:25 +00:00
|
|
|
quorum_requests.emplace_back(request);
|
2022-06-27 12:05:27 +00:00
|
|
|
else
|
|
|
|
read_requests.emplace_back(request);
|
|
|
|
}
|
2022-07-01 09:15:25 +00:00
|
|
|
else
|
2022-06-28 09:54:16 +00:00
|
|
|
{
|
2022-08-29 10:50:18 +00:00
|
|
|
/// batch of read requests can send at most one request
|
|
|
|
/// so we don't care if the previous batch hasn't received response
|
2022-06-28 09:54:16 +00:00
|
|
|
if (!read_requests.empty())
|
2022-09-15 16:14:53 +00:00
|
|
|
processReadRequests(coordination_settings, read_requests);
|
2022-06-28 09:54:16 +00:00
|
|
|
|
2022-08-29 10:50:18 +00:00
|
|
|
/// if we still didn't process previous batch we can
|
|
|
|
/// increase are current batch even more
|
2022-07-01 13:19:28 +00:00
|
|
|
if (previous_quorum_done())
|
2022-06-28 09:54:16 +00:00
|
|
|
break;
|
|
|
|
}
|
2022-06-27 12:05:27 +00:00
|
|
|
|
|
|
|
if (shutdown_called)
|
|
|
|
break;
|
2022-06-24 11:36:12 +00:00
|
|
|
}
|
2021-04-16 13:50:09 +00:00
|
|
|
|
|
|
|
if (shutdown_called)
|
|
|
|
break;
|
|
|
|
|
2022-07-01 09:15:25 +00:00
|
|
|
if (!quorum_requests.empty())
|
|
|
|
process_quorum_requests();
|
2021-04-16 13:50:09 +00:00
|
|
|
|
2021-02-08 13:06:55 +00:00
|
|
|
}
|
|
|
|
}
|
2021-04-16 13:50:09 +00:00
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
|
|
}
|
2021-02-08 13:06:55 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-09-15 16:14:53 +00:00
|
|
|
void KeeperDispatcher::processReadRequests(const CoordinationSettingsPtr & coordination_settings, KeeperStorage::RequestsForSessions & read_requests)
|
|
|
|
{
|
|
|
|
if (coordination_settings->read_mode.toString() == "fastlinear")
|
|
|
|
{
|
|
|
|
// we just want to know what's the current latest committed log on Leader node
|
|
|
|
auto leader_info_result = server->getLeaderInfo();
|
|
|
|
if (leader_info_result)
|
|
|
|
{
|
|
|
|
leader_info_result->when_ready([&, requests_for_sessions = std::move(read_requests)](nuraft::cmd_result<nuraft::ptr<nuraft::buffer>> & result, nuraft::ptr<std::exception> & exception) mutable
|
|
|
|
{
|
|
|
|
if (!result.get_accepted() || result.get_result_code() == nuraft::cmd_result_code::TIMEOUT)
|
|
|
|
{
|
|
|
|
addErrorResponses(requests_for_sessions, Coordination::Error::ZOPERATIONTIMEOUT);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (result.get_result_code() != nuraft::cmd_result_code::OK)
|
|
|
|
{
|
|
|
|
addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (exception)
|
|
|
|
{
|
|
|
|
LOG_INFO(log, "Got exception while waiting for read results {}", exception->what());
|
|
|
|
addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
auto & leader_info_ctx = result.get();
|
|
|
|
|
|
|
|
if (!leader_info_ctx)
|
|
|
|
{
|
|
|
|
addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
KeeperServer::NodeInfo leader_info;
|
|
|
|
leader_info.term = leader_info_ctx->get_ulong();
|
|
|
|
leader_info.last_committed_index = leader_info_ctx->get_ulong();
|
|
|
|
std::lock_guard lock(leader_waiter_mutex);
|
|
|
|
auto node_info = server->getNodeInfo();
|
|
|
|
|
|
|
|
/// we're behind, we need to wait
|
|
|
|
if (node_info.term < leader_info.term || node_info.last_committed_index < leader_info.last_committed_index)
|
|
|
|
{
|
|
|
|
auto & leader_waiter = leader_waiters[leader_info];
|
|
|
|
leader_waiter.insert(leader_waiter.end(), requests_for_sessions.begin(), requests_for_sessions.end());
|
|
|
|
LOG_TRACE(log, "waiting for term {}, idx {}", leader_info.term, leader_info.last_committed_index);
|
|
|
|
}
|
|
|
|
/// process it in background thread
|
|
|
|
else if (!read_requests_queue.push(std::move(requests_for_sessions)))
|
|
|
|
throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue");
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
assert(coordination_settings->read_mode.toString() == "nonlinear");
|
|
|
|
if (!read_requests_queue.push(std::move(read_requests)))
|
|
|
|
throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue");
|
|
|
|
}
|
|
|
|
|
|
|
|
read_requests.clear();
|
|
|
|
}
|
|
|
|
|
2021-08-24 12:30:31 +00:00
|
|
|
void KeeperDispatcher::responseThread()
|
2021-02-08 13:06:55 +00:00
|
|
|
{
|
2021-03-29 08:24:56 +00:00
|
|
|
setThreadName("KeeperRspT");
|
2021-02-08 13:06:55 +00:00
|
|
|
while (!shutdown_called)
|
|
|
|
{
|
2021-03-29 08:24:56 +00:00
|
|
|
KeeperStorage::ResponseForSession response_for_session;
|
2021-02-08 13:06:55 +00:00
|
|
|
|
2021-11-18 20:17:22 +00:00
|
|
|
uint64_t max_wait = configuration_and_settings->coordination_settings->operation_timeout_ms.totalMilliseconds();
|
2021-02-08 13:06:55 +00:00
|
|
|
|
|
|
|
if (responses_queue.tryPop(response_for_session, max_wait))
|
|
|
|
{
|
|
|
|
if (shutdown_called)
|
|
|
|
break;
|
|
|
|
|
|
|
|
try
|
|
|
|
{
|
|
|
|
setResponse(response_for_session.session_id, response_for_session.response);
|
2021-01-19 14:22:28 +00:00
|
|
|
}
|
2021-01-26 07:47:04 +00:00
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
|
|
}
|
2021-01-19 14:22:28 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-08-24 12:30:31 +00:00
|
|
|
void KeeperDispatcher::snapshotThread()
|
2021-03-05 10:40:24 +00:00
|
|
|
{
|
2021-03-29 08:24:56 +00:00
|
|
|
setThreadName("KeeperSnpT");
|
2021-03-05 10:40:24 +00:00
|
|
|
while (!shutdown_called)
|
|
|
|
{
|
|
|
|
CreateSnapshotTask task;
|
2021-10-06 11:02:40 +00:00
|
|
|
if (!snapshots_queue.pop(task))
|
|
|
|
break;
|
2021-03-05 10:40:24 +00:00
|
|
|
|
|
|
|
if (shutdown_called)
|
|
|
|
break;
|
|
|
|
|
|
|
|
try
|
|
|
|
{
|
2021-03-07 21:40:32 +00:00
|
|
|
task.create_snapshot(std::move(task.snapshot));
|
2021-03-05 10:40:24 +00:00
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-08-29 10:50:18 +00:00
|
|
|
/// Background thread for processing read requests
|
2022-06-28 08:48:19 +00:00
|
|
|
void KeeperDispatcher::readRequestThread()
|
|
|
|
{
|
|
|
|
setThreadName("KeeperReadT");
|
|
|
|
while (!shutdown_called)
|
|
|
|
{
|
|
|
|
KeeperStorage::RequestsForSessions requests;
|
|
|
|
if (!read_requests_queue.pop(requests))
|
|
|
|
break;
|
|
|
|
|
|
|
|
if (shutdown_called)
|
|
|
|
break;
|
|
|
|
|
|
|
|
try
|
|
|
|
{
|
|
|
|
for (const auto & request_info : requests)
|
|
|
|
{
|
|
|
|
if (server->isLeaderAlive())
|
|
|
|
server->putLocalReadRequest(request_info);
|
|
|
|
else
|
|
|
|
addErrorResponses({request_info}, Coordination::Error::ZCONNECTIONLOSS);
|
|
|
|
}
|
|
|
|
|
2022-07-01 13:19:28 +00:00
|
|
|
if (!finalize_requests_queue.push(std::move(requests)))
|
|
|
|
throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue");
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-08-29 10:50:18 +00:00
|
|
|
/// We finalize requests every time we commit a single log with request
|
|
|
|
/// or process a batch of read requests.
|
|
|
|
/// Because it can get heavy, we do it in background thread.
|
2022-07-01 13:19:28 +00:00
|
|
|
void KeeperDispatcher::finalizeRequestsThread()
|
|
|
|
{
|
|
|
|
setThreadName("KeeperFinalT");
|
|
|
|
while (!shutdown_called)
|
|
|
|
{
|
|
|
|
KeeperStorage::RequestsForSessions requests;
|
|
|
|
if (!finalize_requests_queue.pop(requests))
|
|
|
|
break;
|
|
|
|
|
|
|
|
if (shutdown_called)
|
|
|
|
break;
|
|
|
|
|
|
|
|
try
|
|
|
|
{
|
2022-06-28 08:48:19 +00:00
|
|
|
finalizeRequests(requests);
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-08-24 12:30:31 +00:00
|
|
|
void KeeperDispatcher::setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response)
|
2021-01-19 14:22:28 +00:00
|
|
|
{
|
|
|
|
std::lock_guard lock(session_to_response_callback_mutex);
|
2021-08-24 12:30:31 +00:00
|
|
|
|
|
|
|
/// Special new session response.
|
2021-04-16 13:50:09 +00:00
|
|
|
if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::SessionID)
|
|
|
|
{
|
|
|
|
const Coordination::ZooKeeperSessionIDResponse & session_id_resp = dynamic_cast<const Coordination::ZooKeeperSessionIDResponse &>(*response);
|
2021-04-16 18:31:23 +00:00
|
|
|
|
2021-04-16 13:50:09 +00:00
|
|
|
/// Nobody waits for this session id
|
2022-04-18 10:18:43 +00:00
|
|
|
if (session_id_resp.server_id != server->getServerID() || !new_session_id_response_callback.contains(session_id_resp.internal_id))
|
2021-04-16 13:50:09 +00:00
|
|
|
return;
|
|
|
|
|
|
|
|
auto callback = new_session_id_response_callback[session_id_resp.internal_id];
|
|
|
|
callback(response);
|
|
|
|
new_session_id_response_callback.erase(session_id_resp.internal_id);
|
|
|
|
}
|
2021-08-24 12:30:31 +00:00
|
|
|
else /// Normal response, just write to client
|
2021-04-16 13:50:09 +00:00
|
|
|
{
|
2021-08-24 12:30:31 +00:00
|
|
|
auto session_response_callback = session_to_response_callback.find(session_id);
|
|
|
|
|
|
|
|
/// Session was disconnected, just skip this response
|
|
|
|
if (session_response_callback == session_to_response_callback.end())
|
2021-11-20 17:35:41 +00:00
|
|
|
{
|
2022-07-07 15:16:42 +00:00
|
|
|
LOG_TEST(log, "Cannot write response xid={}, op={}, session {} disconnected",
|
|
|
|
response->xid, response->xid == Coordination::WATCH_XID ? "Watch" : toString(response->getOpNum()), session_id);
|
2021-04-16 13:50:09 +00:00
|
|
|
return;
|
2021-11-20 17:35:41 +00:00
|
|
|
}
|
2021-04-16 13:50:09 +00:00
|
|
|
|
2021-08-24 12:30:31 +00:00
|
|
|
session_response_callback->second(response);
|
2021-04-16 18:31:23 +00:00
|
|
|
|
2021-04-16 13:50:09 +00:00
|
|
|
/// Session closed, no more writes
|
|
|
|
if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::Close)
|
|
|
|
{
|
2021-08-24 12:30:31 +00:00
|
|
|
session_to_response_callback.erase(session_response_callback);
|
2022-06-10 07:49:46 +00:00
|
|
|
CurrentMetrics::sub(CurrentMetrics::KeeperAliveConnections);
|
2021-04-16 13:50:09 +00:00
|
|
|
}
|
|
|
|
}
|
2021-01-19 14:22:28 +00:00
|
|
|
}
|
|
|
|
|
2021-08-24 12:30:31 +00:00
|
|
|
bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id)
|
2021-01-19 14:22:28 +00:00
|
|
|
{
|
|
|
|
{
|
2021-08-24 12:30:31 +00:00
|
|
|
/// If session was already disconnected than we will ignore requests
|
2021-01-19 14:22:28 +00:00
|
|
|
std::lock_guard lock(session_to_response_callback_mutex);
|
2022-04-18 10:18:43 +00:00
|
|
|
if (!session_to_response_callback.contains(session_id))
|
2021-01-25 12:29:12 +00:00
|
|
|
return false;
|
2021-01-19 14:22:28 +00:00
|
|
|
}
|
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
KeeperStorage::RequestForSession request_info;
|
2021-01-19 14:22:28 +00:00
|
|
|
request_info.request = request;
|
2022-01-06 14:44:01 +00:00
|
|
|
using namespace std::chrono;
|
|
|
|
request_info.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
|
2021-01-19 14:22:28 +00:00
|
|
|
request_info.session_id = session_id;
|
|
|
|
|
2022-06-27 12:05:27 +00:00
|
|
|
{
|
|
|
|
std::lock_guard lock{unprocessed_request_mutex};
|
|
|
|
auto unprocessed_requests_it = unprocessed_requests_for_session.find(session_id);
|
|
|
|
if (unprocessed_requests_it == unprocessed_requests_for_session.end())
|
|
|
|
{
|
|
|
|
auto & unprocessed_requests = unprocessed_requests_for_session[session_id];
|
|
|
|
unprocessed_requests.unprocessed_num = 1;
|
|
|
|
unprocessed_requests.is_read = request->isReadRequest();
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
auto & unprocessed_requests = unprocessed_requests_it->second;
|
|
|
|
|
2022-08-29 10:50:18 +00:00
|
|
|
/// queue is not empty, or the request types don't match, put it in the waiting queue
|
2022-06-27 12:05:27 +00:00
|
|
|
if (!unprocessed_requests.request_queue.empty() || unprocessed_requests.is_read != request->isReadRequest())
|
|
|
|
{
|
|
|
|
unprocessed_requests.request_queue.push_back(std::move(request_info));
|
|
|
|
return true;
|
|
|
|
}
|
2022-06-28 09:54:16 +00:00
|
|
|
|
|
|
|
++unprocessed_requests.unprocessed_num;
|
2022-06-27 12:05:27 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-01-19 14:22:28 +00:00
|
|
|
std::lock_guard lock(push_request_mutex);
|
2021-05-22 07:46:12 +00:00
|
|
|
|
|
|
|
if (shutdown_called)
|
|
|
|
return false;
|
|
|
|
|
2021-01-19 14:22:28 +00:00
|
|
|
/// Put close requests without timeouts
|
|
|
|
if (request->getOpNum() == Coordination::OpNum::Close)
|
2021-10-07 22:06:33 +00:00
|
|
|
{
|
2022-08-29 10:50:18 +00:00
|
|
|
if (!active_requests_queue->push(std::move(request_info)))
|
2021-10-08 13:32:02 +00:00
|
|
|
throw Exception("Cannot push request to queue", ErrorCodes::SYSTEM_ERROR);
|
2021-10-07 22:06:33 +00:00
|
|
|
}
|
2022-08-29 10:50:18 +00:00
|
|
|
else if (!active_requests_queue->tryPush(std::move(request_info), configuration_and_settings->coordination_settings->operation_timeout_ms.totalMilliseconds()))
|
2021-10-07 22:06:33 +00:00
|
|
|
{
|
2021-01-19 14:22:28 +00:00
|
|
|
throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED);
|
2021-10-07 22:06:33 +00:00
|
|
|
}
|
2022-06-10 07:49:46 +00:00
|
|
|
CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets);
|
2021-01-25 12:29:12 +00:00
|
|
|
return true;
|
2021-01-19 14:22:28 +00:00
|
|
|
}
|
|
|
|
|
2021-10-14 10:21:41 +00:00
|
|
|
void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper, bool start_async)
|
2021-01-19 14:22:28 +00:00
|
|
|
{
|
2021-02-01 11:27:26 +00:00
|
|
|
LOG_DEBUG(log, "Initializing storage dispatcher");
|
2021-01-25 12:29:12 +00:00
|
|
|
|
2021-11-18 20:17:22 +00:00
|
|
|
configuration_and_settings = KeeperConfigurationAndSettings::loadFromConfig(config, standalone_keeper);
|
2022-08-29 10:50:18 +00:00
|
|
|
active_requests_queue = std::make_unique<RequestsQueue>(configuration_and_settings->coordination_settings->max_requests_batch_size);
|
2021-01-25 12:29:12 +00:00
|
|
|
|
2021-03-26 11:18:31 +00:00
|
|
|
request_thread = ThreadFromGlobalPool([this] { requestThread(); });
|
|
|
|
responses_thread = ThreadFromGlobalPool([this] { responseThread(); });
|
|
|
|
snapshot_thread = ThreadFromGlobalPool([this] { snapshotThread(); });
|
2022-06-28 08:48:19 +00:00
|
|
|
read_request_thread = ThreadFromGlobalPool([this] { readRequestThread(); });
|
2022-08-29 10:50:18 +00:00
|
|
|
finalize_requests_thread = ThreadFromGlobalPool([this] { finalizeRequestsThread(); });
|
2021-03-26 11:18:31 +00:00
|
|
|
|
2022-09-07 11:51:56 +00:00
|
|
|
server = std::make_unique<KeeperServer>(
|
|
|
|
configuration_and_settings,
|
|
|
|
config,
|
|
|
|
responses_queue,
|
|
|
|
snapshots_queue,
|
|
|
|
[this](const KeeperStorage::RequestForSession & request_for_session, uint64_t log_term, uint64_t log_idx)
|
|
|
|
{ onRequestCommit(request_for_session, log_term, log_idx); },
|
|
|
|
[this](uint64_t term, uint64_t last_idx)
|
|
|
|
{ onApplySnapshot(term, last_idx); });
|
2021-08-24 12:30:31 +00:00
|
|
|
|
2021-02-01 11:27:26 +00:00
|
|
|
try
|
2021-01-25 14:10:18 +00:00
|
|
|
{
|
2021-02-11 09:17:57 +00:00
|
|
|
LOG_DEBUG(log, "Waiting server to initialize");
|
2022-04-11 06:41:46 +00:00
|
|
|
server->startup(config, configuration_and_settings->enable_ipv6);
|
2021-02-11 09:58:02 +00:00
|
|
|
LOG_DEBUG(log, "Server initialized, waiting for quorum");
|
2021-02-11 09:49:49 +00:00
|
|
|
|
2021-10-14 10:21:41 +00:00
|
|
|
if (!start_async)
|
|
|
|
{
|
|
|
|
server->waitInit();
|
|
|
|
LOG_DEBUG(log, "Quorum initialized");
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
LOG_INFO(log, "Starting Keeper asynchronously, server will accept connections to Keeper when it will be ready");
|
|
|
|
}
|
2021-01-27 17:54:25 +00:00
|
|
|
}
|
2021-02-01 11:27:26 +00:00
|
|
|
catch (...)
|
2021-01-27 17:54:25 +00:00
|
|
|
{
|
2021-02-01 11:27:26 +00:00
|
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
|
|
throw;
|
2021-01-25 14:10:18 +00:00
|
|
|
}
|
2021-01-25 12:29:12 +00:00
|
|
|
|
2021-08-24 12:30:31 +00:00
|
|
|
/// Start it after keeper server start
|
2021-02-03 20:32:15 +00:00
|
|
|
session_cleaner_thread = ThreadFromGlobalPool([this] { sessionCleanerTask(); });
|
2021-10-19 12:00:26 +00:00
|
|
|
update_configuration_thread = ThreadFromGlobalPool([this] { updateConfigurationThread(); });
|
|
|
|
updateConfiguration(config);
|
2021-01-25 12:29:12 +00:00
|
|
|
|
2021-02-01 11:27:26 +00:00
|
|
|
LOG_DEBUG(log, "Dispatcher initialized");
|
2021-01-19 14:22:28 +00:00
|
|
|
}
|
|
|
|
|
2021-08-24 12:30:31 +00:00
|
|
|
void KeeperDispatcher::shutdown()
|
2021-01-19 14:22:28 +00:00
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
2021-01-26 14:08:31 +00:00
|
|
|
{
|
|
|
|
std::lock_guard lock(push_request_mutex);
|
|
|
|
|
|
|
|
if (shutdown_called)
|
|
|
|
return;
|
|
|
|
|
2021-02-01 11:27:26 +00:00
|
|
|
LOG_DEBUG(log, "Shutting down storage dispatcher");
|
2021-01-26 14:08:31 +00:00
|
|
|
shutdown_called = true;
|
|
|
|
|
2021-02-03 20:32:15 +00:00
|
|
|
if (session_cleaner_thread.joinable())
|
|
|
|
session_cleaner_thread.join();
|
|
|
|
|
2022-08-29 10:50:18 +00:00
|
|
|
if (active_requests_queue)
|
2021-08-30 13:23:31 +00:00
|
|
|
{
|
2022-08-29 10:50:18 +00:00
|
|
|
active_requests_queue->finish();
|
2021-10-08 13:32:02 +00:00
|
|
|
|
|
|
|
if (request_thread.joinable())
|
|
|
|
request_thread.join();
|
2021-08-30 13:23:31 +00:00
|
|
|
}
|
2021-02-08 13:06:55 +00:00
|
|
|
|
2021-10-06 11:02:40 +00:00
|
|
|
responses_queue.finish();
|
2021-02-08 13:06:55 +00:00
|
|
|
if (responses_thread.joinable())
|
|
|
|
responses_thread.join();
|
2021-03-05 10:40:24 +00:00
|
|
|
|
2021-10-06 11:02:40 +00:00
|
|
|
snapshots_queue.finish();
|
2021-03-05 10:40:24 +00:00
|
|
|
if (snapshot_thread.joinable())
|
|
|
|
snapshot_thread.join();
|
2021-10-19 12:00:26 +00:00
|
|
|
|
2022-06-28 08:48:19 +00:00
|
|
|
read_requests_queue.finish();
|
|
|
|
if (read_request_thread.joinable())
|
|
|
|
read_request_thread.join();
|
|
|
|
|
2022-07-01 13:19:28 +00:00
|
|
|
finalize_requests_queue.finish();
|
|
|
|
if (finalize_requests_thread.joinable())
|
|
|
|
finalize_requests_thread.join();
|
|
|
|
|
2021-10-19 12:00:26 +00:00
|
|
|
update_configuration_queue.finish();
|
|
|
|
if (update_configuration_thread.joinable())
|
|
|
|
update_configuration_thread.join();
|
2021-01-26 14:08:31 +00:00
|
|
|
}
|
|
|
|
|
2021-09-02 09:20:11 +00:00
|
|
|
KeeperStorage::RequestForSession request_for_session;
|
2021-08-30 13:23:31 +00:00
|
|
|
|
2021-09-02 09:20:11 +00:00
|
|
|
/// Set session expired for all pending requests
|
2022-08-29 10:50:18 +00:00
|
|
|
while (active_requests_queue && active_requests_queue->tryPop(request_for_session))
|
2021-09-02 09:20:11 +00:00
|
|
|
{
|
2022-06-10 07:49:46 +00:00
|
|
|
CurrentMetrics::sub(CurrentMetrics::KeeperOutstandingRequets);
|
2021-10-07 17:13:56 +00:00
|
|
|
auto response = request_for_session.request->makeResponse();
|
|
|
|
response->error = Coordination::Error::ZSESSIONEXPIRED;
|
|
|
|
setResponse(request_for_session.session_id, response);
|
2021-01-26 14:08:31 +00:00
|
|
|
}
|
2021-05-22 07:50:23 +00:00
|
|
|
|
2022-09-12 12:22:48 +00:00
|
|
|
KeeperStorage::RequestsForSessions close_requests;
|
|
|
|
{
|
|
|
|
/// Clear all registered sessions
|
|
|
|
std::lock_guard lock(session_to_response_callback_mutex);
|
|
|
|
|
|
|
|
if (hasLeader())
|
|
|
|
{
|
2022-09-12 18:19:41 +00:00
|
|
|
close_requests.reserve(session_to_response_callback.size());
|
2022-09-12 12:22:48 +00:00
|
|
|
// send to leader CLOSE requests for active sessions
|
|
|
|
for (const auto & [session, response] : session_to_response_callback)
|
|
|
|
{
|
2022-09-13 09:51:46 +00:00
|
|
|
auto request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close);
|
2022-09-12 12:22:48 +00:00
|
|
|
request->xid = Coordination::CLOSE_XID;
|
|
|
|
using namespace std::chrono;
|
2022-09-13 09:51:46 +00:00
|
|
|
KeeperStorage::RequestForSession request_info
|
|
|
|
{
|
|
|
|
.session_id = session,
|
|
|
|
.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(),
|
|
|
|
.request = std::move(request),
|
|
|
|
};
|
2022-09-12 12:22:48 +00:00
|
|
|
|
|
|
|
close_requests.push_back(std::move(request_info));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
session_to_response_callback.clear();
|
|
|
|
}
|
|
|
|
|
|
|
|
// if there is no leader, there is no reason to do CLOSE because it's a write request
|
|
|
|
if (hasLeader() && !close_requests.empty())
|
|
|
|
{
|
|
|
|
LOG_INFO(log, "Trying to close {} session(s)", close_requests.size());
|
|
|
|
const auto raft_result = server->putRequestBatch(close_requests);
|
2022-09-12 18:19:41 +00:00
|
|
|
auto sessions_closing_done_promise = std::make_shared<std::promise<void>>();
|
|
|
|
auto sessions_closing_done = sessions_closing_done_promise->get_future();
|
|
|
|
raft_result->when_ready([sessions_closing_done_promise = std::move(sessions_closing_done_promise)](
|
|
|
|
nuraft::cmd_result<nuraft::ptr<nuraft::buffer>> & /*result*/,
|
|
|
|
nuraft::ptr<std::exception> & /*exception*/) { sessions_closing_done_promise->set_value(); });
|
2022-09-12 12:22:48 +00:00
|
|
|
|
|
|
|
auto session_shutdown_timeout = configuration_and_settings->coordination_settings->session_shutdown_timeout.totalMilliseconds();
|
2022-09-12 18:19:41 +00:00
|
|
|
if (sessions_closing_done.wait_for(std::chrono::milliseconds(session_shutdown_timeout)) != std::future_status::ready)
|
|
|
|
LOG_WARNING(
|
|
|
|
log,
|
|
|
|
"Failed to close sessions in {}ms. If they are not closed, they will be closed after session timeout.",
|
|
|
|
session_shutdown_timeout);
|
2022-09-12 12:22:48 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if (server)
|
|
|
|
server->shutdown();
|
|
|
|
|
2022-06-10 07:49:46 +00:00
|
|
|
CurrentMetrics::set(CurrentMetrics::KeeperAliveConnections, 0);
|
2022-09-12 12:22:48 +00:00
|
|
|
|
2021-01-19 14:22:28 +00:00
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
|
|
}
|
2021-02-01 11:27:26 +00:00
|
|
|
|
|
|
|
LOG_DEBUG(log, "Dispatcher shut down");
|
2021-01-19 14:22:28 +00:00
|
|
|
}
|
|
|
|
|
2022-04-13 14:08:13 +00:00
|
|
|
void KeeperDispatcher::forceRecovery()
|
|
|
|
{
|
|
|
|
server->forceRecovery();
|
|
|
|
}
|
|
|
|
|
2021-08-24 12:30:31 +00:00
|
|
|
KeeperDispatcher::~KeeperDispatcher()
|
2021-01-26 14:08:31 +00:00
|
|
|
{
|
|
|
|
shutdown();
|
|
|
|
}
|
|
|
|
|
2021-08-24 12:30:31 +00:00
|
|
|
void KeeperDispatcher::registerSession(int64_t session_id, ZooKeeperResponseCallback callback)
|
2021-01-19 14:22:28 +00:00
|
|
|
{
|
|
|
|
std::lock_guard lock(session_to_response_callback_mutex);
|
|
|
|
if (!session_to_response_callback.try_emplace(session_id, callback).second)
|
|
|
|
throw Exception(DB::ErrorCodes::LOGICAL_ERROR, "Session with id {} already registered in dispatcher", session_id);
|
2022-06-10 07:49:46 +00:00
|
|
|
CurrentMetrics::add(CurrentMetrics::KeeperAliveConnections);
|
2021-01-19 14:22:28 +00:00
|
|
|
}
|
|
|
|
|
2021-08-24 12:30:31 +00:00
|
|
|
void KeeperDispatcher::sessionCleanerTask()
|
2021-02-03 20:32:15 +00:00
|
|
|
{
|
|
|
|
while (true)
|
|
|
|
{
|
|
|
|
if (shutdown_called)
|
|
|
|
return;
|
|
|
|
|
|
|
|
try
|
|
|
|
{
|
2021-08-24 12:30:31 +00:00
|
|
|
/// Only leader node must check dead sessions
|
2021-10-14 10:21:41 +00:00
|
|
|
if (server->checkInit() && isLeader())
|
2021-02-03 20:32:15 +00:00
|
|
|
{
|
|
|
|
auto dead_sessions = server->getDeadSessions();
|
2021-08-24 12:30:31 +00:00
|
|
|
|
2021-02-03 20:32:15 +00:00
|
|
|
for (int64_t dead_session : dead_sessions)
|
|
|
|
{
|
|
|
|
LOG_INFO(log, "Found dead session {}, will try to close it", dead_session);
|
2021-08-24 12:30:31 +00:00
|
|
|
|
|
|
|
/// Close session == send close request to raft server
|
2022-09-13 09:51:46 +00:00
|
|
|
auto request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close);
|
2021-02-03 20:32:15 +00:00
|
|
|
request->xid = Coordination::CLOSE_XID;
|
2022-01-13 09:49:39 +00:00
|
|
|
using namespace std::chrono;
|
2022-09-13 09:51:46 +00:00
|
|
|
KeeperStorage::RequestForSession request_info
|
|
|
|
{
|
|
|
|
.session_id = dead_session,
|
|
|
|
.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(),
|
|
|
|
.request = std::move(request),
|
|
|
|
};
|
2021-02-09 18:29:06 +00:00
|
|
|
{
|
|
|
|
std::lock_guard lock(push_request_mutex);
|
2022-08-29 10:50:18 +00:00
|
|
|
if (!active_requests_queue->push(std::move(request_info)))
|
2021-10-08 13:32:02 +00:00
|
|
|
LOG_INFO(log, "Cannot push close request to queue while cleaning outdated sessions");
|
2022-06-10 07:49:46 +00:00
|
|
|
CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets);
|
2021-02-09 18:29:06 +00:00
|
|
|
}
|
2021-08-24 12:30:31 +00:00
|
|
|
|
|
|
|
/// Remove session from registered sessions
|
2021-02-08 13:06:55 +00:00
|
|
|
finishSession(dead_session);
|
2021-02-10 09:28:53 +00:00
|
|
|
LOG_INFO(log, "Dead session close request pushed");
|
2021-02-03 20:32:15 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
|
|
}
|
|
|
|
|
2021-11-18 20:17:22 +00:00
|
|
|
auto time_to_sleep = configuration_and_settings->coordination_settings->dead_session_check_period_ms.totalMilliseconds();
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(time_to_sleep));
|
2021-02-03 20:32:15 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-08-24 12:30:31 +00:00
|
|
|
void KeeperDispatcher::finishSession(int64_t session_id)
|
2021-01-21 11:37:20 +00:00
|
|
|
{
|
|
|
|
std::lock_guard lock(session_to_response_callback_mutex);
|
|
|
|
auto session_it = session_to_response_callback.find(session_id);
|
|
|
|
if (session_it != session_to_response_callback.end())
|
2022-06-10 07:49:46 +00:00
|
|
|
{
|
2021-01-21 11:37:20 +00:00
|
|
|
session_to_response_callback.erase(session_it);
|
2022-06-10 07:49:46 +00:00
|
|
|
CurrentMetrics::sub(CurrentMetrics::KeeperAliveConnections);
|
|
|
|
}
|
2021-01-21 11:37:20 +00:00
|
|
|
}
|
|
|
|
|
2021-08-24 12:30:31 +00:00
|
|
|
void KeeperDispatcher::addErrorResponses(const KeeperStorage::RequestsForSessions & requests_for_sessions, Coordination::Error error)
|
2021-04-16 13:50:09 +00:00
|
|
|
{
|
2022-06-15 12:48:30 +00:00
|
|
|
for (const auto & request_for_session : requests_for_sessions)
|
2021-04-16 13:50:09 +00:00
|
|
|
{
|
|
|
|
KeeperStorage::ResponsesForSessions responses;
|
2022-06-15 12:48:30 +00:00
|
|
|
auto response = request_for_session.request->makeResponse();
|
|
|
|
response->xid = request_for_session.request->xid;
|
|
|
|
response->zxid = 0;
|
2021-04-16 13:50:09 +00:00
|
|
|
response->error = error;
|
2022-06-15 12:48:30 +00:00
|
|
|
if (!responses_queue.push(DB::KeeperStorage::ResponseForSession{request_for_session.session_id, response}))
|
2021-10-08 13:32:02 +00:00
|
|
|
throw Exception(ErrorCodes::SYSTEM_ERROR,
|
2021-10-08 08:48:08 +00:00
|
|
|
"Could not push error response xid {} zxid {} error message {} to responses queue",
|
|
|
|
response->xid,
|
|
|
|
response->zxid,
|
|
|
|
errorMessage(error));
|
2021-04-16 13:50:09 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-07-13 11:35:06 +00:00
|
|
|
void KeeperDispatcher::forceWaitAndProcessResult(RaftResult & result)
|
2021-04-16 13:50:09 +00:00
|
|
|
{
|
|
|
|
if (!result->has_result())
|
|
|
|
result->get();
|
|
|
|
|
2022-06-27 12:05:27 +00:00
|
|
|
result = nullptr;
|
2021-04-16 13:50:09 +00:00
|
|
|
}
|
|
|
|
|
2021-08-24 12:30:31 +00:00
|
|
|
int64_t KeeperDispatcher::getSessionID(int64_t session_timeout_ms)
|
2021-04-16 13:50:09 +00:00
|
|
|
{
|
2021-08-24 12:30:31 +00:00
|
|
|
/// New session id allocation is a special request, because we cannot process it in normal
|
|
|
|
/// way: get request -> put to raft -> set response for registered callback.
|
2021-04-16 13:50:09 +00:00
|
|
|
KeeperStorage::RequestForSession request_info;
|
|
|
|
std::shared_ptr<Coordination::ZooKeeperSessionIDRequest> request = std::make_shared<Coordination::ZooKeeperSessionIDRequest>();
|
2021-08-24 12:30:31 +00:00
|
|
|
/// Internal session id. It's a temporary number which is unique for each client on this server
|
|
|
|
/// but can be same on different servers.
|
2021-04-16 13:50:09 +00:00
|
|
|
request->internal_id = internal_session_id_counter.fetch_add(1);
|
|
|
|
request->session_timeout_ms = session_timeout_ms;
|
2021-04-16 18:31:23 +00:00
|
|
|
request->server_id = server->getServerID();
|
|
|
|
|
2021-04-16 13:50:09 +00:00
|
|
|
request_info.request = request;
|
2022-01-13 09:49:39 +00:00
|
|
|
using namespace std::chrono;
|
|
|
|
request_info.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
|
2021-04-16 13:50:09 +00:00
|
|
|
request_info.session_id = -1;
|
|
|
|
|
|
|
|
auto promise = std::make_shared<std::promise<int64_t>>();
|
|
|
|
auto future = promise->get_future();
|
2021-08-24 12:30:31 +00:00
|
|
|
|
2021-04-16 13:50:09 +00:00
|
|
|
{
|
|
|
|
std::lock_guard lock(session_to_response_callback_mutex);
|
2021-04-16 18:31:23 +00:00
|
|
|
new_session_id_response_callback[request->internal_id] = [promise, internal_id = request->internal_id] (const Coordination::ZooKeeperResponsePtr & response)
|
2021-04-16 13:50:09 +00:00
|
|
|
{
|
|
|
|
if (response->getOpNum() != Coordination::OpNum::SessionID)
|
|
|
|
promise->set_exception(std::make_exception_ptr(Exception(ErrorCodes::LOGICAL_ERROR,
|
|
|
|
"Incorrect response of type {} instead of SessionID response", Coordination::toString(response->getOpNum()))));
|
|
|
|
|
2021-04-16 18:31:23 +00:00
|
|
|
auto session_id_response = dynamic_cast<const Coordination::ZooKeeperSessionIDResponse &>(*response);
|
|
|
|
if (session_id_response.internal_id != internal_id)
|
|
|
|
{
|
|
|
|
promise->set_exception(std::make_exception_ptr(Exception(ErrorCodes::LOGICAL_ERROR,
|
|
|
|
"Incorrect response with internal id {} instead of {}", session_id_response.internal_id, internal_id)));
|
|
|
|
}
|
|
|
|
|
2021-04-16 13:50:09 +00:00
|
|
|
if (response->error != Coordination::Error::ZOK)
|
|
|
|
promise->set_exception(std::make_exception_ptr(zkutil::KeeperException("SessionID request failed with error", response->error)));
|
|
|
|
|
2021-04-16 18:31:23 +00:00
|
|
|
promise->set_value(session_id_response.session_id);
|
2021-04-16 13:50:09 +00:00
|
|
|
};
|
|
|
|
}
|
|
|
|
|
2021-08-24 12:30:31 +00:00
|
|
|
/// Push new session request to queue
|
2021-04-16 13:50:09 +00:00
|
|
|
{
|
|
|
|
std::lock_guard lock(push_request_mutex);
|
2022-08-29 10:50:18 +00:00
|
|
|
if (!active_requests_queue->tryPush(std::move(request_info), session_timeout_ms))
|
2021-04-16 13:50:09 +00:00
|
|
|
throw Exception("Cannot push session id request to queue within session timeout", ErrorCodes::TIMEOUT_EXCEEDED);
|
2022-06-10 07:49:46 +00:00
|
|
|
CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets);
|
2021-04-16 13:50:09 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if (future.wait_for(std::chrono::milliseconds(session_timeout_ms)) != std::future_status::ready)
|
|
|
|
throw Exception("Cannot receive session id within session timeout", ErrorCodes::TIMEOUT_EXCEEDED);
|
|
|
|
|
2021-08-24 12:30:31 +00:00
|
|
|
/// Forcefully wait for request execution because we cannot process any other
|
|
|
|
/// requests for this client until it get new session id.
|
2021-04-16 13:50:09 +00:00
|
|
|
return future.get();
|
|
|
|
}
|
|
|
|
|
2021-10-19 12:00:26 +00:00
|
|
|
|
|
|
|
void KeeperDispatcher::updateConfigurationThread()
|
2021-10-18 15:27:51 +00:00
|
|
|
{
|
2021-10-19 12:00:26 +00:00
|
|
|
while (true)
|
2021-10-18 15:27:51 +00:00
|
|
|
{
|
2021-10-19 12:00:26 +00:00
|
|
|
if (shutdown_called)
|
|
|
|
return;
|
|
|
|
|
|
|
|
try
|
|
|
|
{
|
2022-04-26 07:32:02 +00:00
|
|
|
using namespace std::chrono_literals;
|
2021-10-19 12:00:26 +00:00
|
|
|
if (!server->checkInit())
|
|
|
|
{
|
|
|
|
LOG_INFO(log, "Server still not initialized, will not apply configuration until initialization finished");
|
2022-04-26 07:32:02 +00:00
|
|
|
std::this_thread::sleep_for(5000ms);
|
2021-10-19 12:00:26 +00:00
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
2022-04-19 08:08:13 +00:00
|
|
|
if (server->isRecovering())
|
|
|
|
{
|
|
|
|
LOG_INFO(log, "Server is recovering, will not apply configuration until recovery is finished");
|
2022-04-26 07:32:02 +00:00
|
|
|
std::this_thread::sleep_for(5000ms);
|
2022-04-19 08:08:13 +00:00
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
2021-10-19 12:00:26 +00:00
|
|
|
ConfigUpdateAction action;
|
|
|
|
if (!update_configuration_queue.pop(action))
|
|
|
|
break;
|
2021-10-19 13:11:29 +00:00
|
|
|
|
|
|
|
|
|
|
|
/// We must wait this update from leader or apply it ourself (if we are leader)
|
|
|
|
bool done = false;
|
|
|
|
while (!done)
|
2021-10-19 12:00:26 +00:00
|
|
|
{
|
2022-04-19 08:08:13 +00:00
|
|
|
if (server->isRecovering())
|
|
|
|
break;
|
|
|
|
|
2021-10-19 13:11:29 +00:00
|
|
|
if (shutdown_called)
|
|
|
|
return;
|
2021-10-19 12:00:26 +00:00
|
|
|
|
2021-10-19 13:11:29 +00:00
|
|
|
if (isLeader())
|
|
|
|
{
|
|
|
|
server->applyConfigurationUpdate(action);
|
|
|
|
done = true;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
done = server->waitConfigurationUpdate(action);
|
|
|
|
if (!done)
|
|
|
|
LOG_INFO(log, "Cannot wait for configuration update, maybe we become leader, or maybe update is invalid, will try to wait one more time");
|
|
|
|
}
|
2021-10-19 12:00:26 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
|
|
}
|
2021-10-18 15:27:51 +00:00
|
|
|
}
|
2021-10-19 12:00:26 +00:00
|
|
|
}
|
|
|
|
|
2022-08-29 10:50:18 +00:00
|
|
|
// Used to update the state for a session based on the requests
|
|
|
|
// - update the number of current unprocessed requests for the session
|
|
|
|
// - if the number of unprocessed requests is 0, we can start adding next type of requests
|
|
|
|
// from unprocessed requests queue to the active queue
|
2022-06-27 14:34:27 +00:00
|
|
|
void KeeperDispatcher::finalizeRequests(const KeeperStorage::RequestsForSessions & requests_for_sessions)
|
2022-06-08 09:22:46 +00:00
|
|
|
{
|
2022-06-27 14:34:27 +00:00
|
|
|
std::unordered_map<int64_t, size_t> counts_for_session;
|
2022-06-27 12:05:27 +00:00
|
|
|
|
2022-06-27 14:34:27 +00:00
|
|
|
for (const auto & request_for_session : requests_for_sessions)
|
|
|
|
{
|
|
|
|
++counts_for_session[request_for_session.session_id];
|
|
|
|
}
|
2022-06-27 12:05:27 +00:00
|
|
|
|
2022-06-27 14:34:27 +00:00
|
|
|
std::lock_guard lock{unprocessed_request_mutex};
|
|
|
|
for (const auto [session_id, count] : counts_for_session)
|
2022-06-27 12:05:27 +00:00
|
|
|
{
|
2022-06-27 14:34:27 +00:00
|
|
|
auto unprocessed_requests_it = unprocessed_requests_for_session.find(session_id);
|
|
|
|
if (unprocessed_requests_it == unprocessed_requests_for_session.end())
|
|
|
|
continue;
|
|
|
|
|
|
|
|
auto & unprocessed_requests = unprocessed_requests_it->second;
|
|
|
|
unprocessed_requests.unprocessed_num -= count;
|
|
|
|
|
|
|
|
if (unprocessed_requests.unprocessed_num == 0)
|
2022-06-27 12:05:27 +00:00
|
|
|
{
|
2022-06-27 14:34:27 +00:00
|
|
|
if (!unprocessed_requests.request_queue.empty())
|
2022-06-27 12:05:27 +00:00
|
|
|
{
|
2022-08-29 10:50:18 +00:00
|
|
|
auto & unprocessed_requests_queue = unprocessed_requests.request_queue;
|
2022-06-27 14:34:27 +00:00
|
|
|
unprocessed_requests.is_read = !unprocessed_requests.is_read;
|
2022-08-23 08:22:03 +00:00
|
|
|
// start adding next type of requests
|
2022-08-29 10:50:18 +00:00
|
|
|
while (!unprocessed_requests_queue.empty() && unprocessed_requests_queue.front().request->isReadRequest() == unprocessed_requests.is_read)
|
2022-06-27 12:05:27 +00:00
|
|
|
{
|
2022-08-29 10:50:18 +00:00
|
|
|
auto & front_request = unprocessed_requests_queue.front();
|
2022-06-27 14:34:27 +00:00
|
|
|
|
|
|
|
/// Put close requests without timeouts
|
|
|
|
if (front_request.request->getOpNum() == Coordination::OpNum::Close)
|
|
|
|
{
|
2022-08-29 10:50:18 +00:00
|
|
|
if (!active_requests_queue->push(std::move(front_request)))
|
2022-06-27 14:34:27 +00:00
|
|
|
throw Exception("Cannot push request to queue", ErrorCodes::SYSTEM_ERROR);
|
|
|
|
}
|
2022-08-29 10:50:18 +00:00
|
|
|
else if (!active_requests_queue->tryPush(std::move(front_request), configuration_and_settings->coordination_settings->operation_timeout_ms.totalMilliseconds()))
|
2022-06-27 14:34:27 +00:00
|
|
|
{
|
|
|
|
throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED);
|
|
|
|
}
|
2022-06-27 12:05:27 +00:00
|
|
|
|
2022-06-27 14:34:27 +00:00
|
|
|
++unprocessed_requests.unprocessed_num;
|
2022-08-29 10:50:18 +00:00
|
|
|
unprocessed_requests_queue.pop_front();
|
2022-06-27 14:34:27 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
unprocessed_requests_for_session.erase(unprocessed_requests_it);
|
2022-06-27 12:05:27 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-08-29 10:50:18 +00:00
|
|
|
// Finalize request
|
|
|
|
// Process read requests that were waiting for this commit
|
2022-06-27 12:05:27 +00:00
|
|
|
void KeeperDispatcher::onRequestCommit(const KeeperStorage::RequestForSession & request_for_session, uint64_t log_term, uint64_t log_idx)
|
|
|
|
{
|
2022-07-01 13:19:28 +00:00
|
|
|
if (!finalize_requests_queue.push({request_for_session}))
|
|
|
|
throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue");
|
2022-06-27 12:05:27 +00:00
|
|
|
|
2022-06-27 14:22:53 +00:00
|
|
|
KeeperStorage::RequestsForSessions requests;
|
2022-06-08 09:22:46 +00:00
|
|
|
{
|
2022-06-27 14:22:53 +00:00
|
|
|
std::lock_guard lock(leader_waiter_mutex);
|
|
|
|
auto request_queue_it = leader_waiters.find(KeeperServer::NodeInfo{.term = log_term, .last_committed_index = log_idx});
|
|
|
|
if (request_queue_it != leader_waiters.end())
|
|
|
|
{
|
|
|
|
requests = std::move(request_queue_it->second);
|
|
|
|
leader_waiters.erase(request_queue_it);
|
|
|
|
}
|
2022-06-08 09:22:46 +00:00
|
|
|
}
|
2022-08-29 10:50:18 +00:00
|
|
|
|
|
|
|
if (requests.empty())
|
|
|
|
return;
|
|
|
|
|
2022-06-28 08:48:19 +00:00
|
|
|
if (!read_requests_queue.push(std::move(requests)))
|
|
|
|
throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue");
|
2022-06-08 09:22:46 +00:00
|
|
|
}
|
|
|
|
|
2022-09-07 11:51:56 +00:00
|
|
|
/// Process all read request that are waiting for lower or currently last processed log index
|
|
|
|
void KeeperDispatcher::onApplySnapshot(uint64_t term, uint64_t last_idx)
|
|
|
|
{
|
|
|
|
KeeperServer::NodeInfo current_node_info{term, last_idx};
|
|
|
|
KeeperStorage::RequestsForSessions requests;
|
|
|
|
{
|
|
|
|
std::lock_guard lock(leader_waiter_mutex);
|
|
|
|
for (auto leader_waiter_it = leader_waiters.begin(); leader_waiter_it != leader_waiters.end();)
|
|
|
|
{
|
|
|
|
auto waiting_node_info = leader_waiter_it->first;
|
|
|
|
if (waiting_node_info.term <= current_node_info.term
|
|
|
|
&& waiting_node_info.last_committed_index <= current_node_info.last_committed_index)
|
|
|
|
{
|
|
|
|
for (auto & request : leader_waiter_it->second)
|
|
|
|
{
|
|
|
|
requests.push_back(std::move(request));
|
|
|
|
}
|
|
|
|
|
|
|
|
leader_waiter_it = leader_waiters.erase(leader_waiter_it);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
++leader_waiter_it;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (requests.empty())
|
|
|
|
return;
|
|
|
|
|
|
|
|
if (!read_requests_queue.push(std::move(requests)))
|
|
|
|
throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue");
|
|
|
|
}
|
|
|
|
|
2022-04-14 12:00:47 +00:00
|
|
|
bool KeeperDispatcher::isServerActive() const
|
|
|
|
{
|
|
|
|
return checkInit() && hasLeader() && !server->isRecovering();
|
|
|
|
}
|
|
|
|
|
2021-10-19 12:00:26 +00:00
|
|
|
void KeeperDispatcher::updateConfiguration(const Poco::Util::AbstractConfiguration & config)
|
|
|
|
{
|
|
|
|
auto diff = server->getConfigurationDiff(config);
|
|
|
|
if (diff.empty())
|
|
|
|
LOG_TRACE(log, "Configuration update triggered, but nothing changed for RAFT");
|
|
|
|
else if (diff.size() > 1)
|
|
|
|
LOG_WARNING(log, "Configuration changed for more than one server ({}) from cluster, it's strictly not recommended", diff.size());
|
2021-10-18 15:27:51 +00:00
|
|
|
else
|
2021-10-19 12:00:26 +00:00
|
|
|
LOG_DEBUG(log, "Configuration change size ({})", diff.size());
|
|
|
|
|
|
|
|
for (auto & change : diff)
|
2021-10-18 15:27:51 +00:00
|
|
|
{
|
2021-10-19 12:00:26 +00:00
|
|
|
bool push_result = update_configuration_queue.push(change);
|
|
|
|
if (!push_result)
|
|
|
|
throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push configuration update to queue");
|
2021-10-18 15:27:51 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-11-18 20:17:22 +00:00
|
|
|
void KeeperDispatcher::updateKeeperStatLatency(uint64_t process_time_ms)
|
2021-10-27 12:26:42 +00:00
|
|
|
{
|
2021-11-18 20:17:22 +00:00
|
|
|
keeper_stats.updateLatency(process_time_ms);
|
2021-10-27 12:26:42 +00:00
|
|
|
}
|
|
|
|
|
2021-11-18 20:17:22 +00:00
|
|
|
static uint64_t getDirSize(const fs::path & dir)
|
2021-10-27 12:26:42 +00:00
|
|
|
{
|
2021-11-18 20:17:22 +00:00
|
|
|
checkStackSize();
|
|
|
|
if (!fs::exists(dir))
|
|
|
|
return 0;
|
2021-10-27 12:26:42 +00:00
|
|
|
|
2021-11-18 20:17:22 +00:00
|
|
|
fs::directory_iterator it(dir);
|
|
|
|
fs::directory_iterator end;
|
2021-10-27 12:26:42 +00:00
|
|
|
|
2021-11-18 20:17:22 +00:00
|
|
|
uint64_t size{0};
|
|
|
|
while (it != end)
|
|
|
|
{
|
|
|
|
if (it->is_regular_file())
|
|
|
|
size += fs::file_size(*it);
|
|
|
|
else
|
|
|
|
size += getDirSize(it->path());
|
|
|
|
++it;
|
|
|
|
}
|
|
|
|
return size;
|
2021-10-27 12:26:42 +00:00
|
|
|
}
|
|
|
|
|
2021-11-18 20:17:22 +00:00
|
|
|
uint64_t KeeperDispatcher::getLogDirSize() const
|
2021-11-05 10:21:34 +00:00
|
|
|
{
|
2021-11-18 20:17:22 +00:00
|
|
|
return getDirSize(configuration_and_settings->log_storage_path);
|
2021-11-05 10:21:34 +00:00
|
|
|
}
|
|
|
|
|
2021-11-18 20:17:22 +00:00
|
|
|
uint64_t KeeperDispatcher::getSnapDirSize() const
|
2021-11-05 10:21:34 +00:00
|
|
|
{
|
2021-11-18 20:17:22 +00:00
|
|
|
return getDirSize(configuration_and_settings->snapshot_storage_path);
|
2021-11-05 10:21:34 +00:00
|
|
|
}
|
|
|
|
|
2021-11-19 09:30:58 +00:00
|
|
|
Keeper4LWInfo KeeperDispatcher::getKeeper4LWInfo() const
|
2021-10-27 12:26:42 +00:00
|
|
|
{
|
2022-07-01 13:57:24 +00:00
|
|
|
Keeper4LWInfo result = server->getPartiallyFilled4LWInfo();
|
2021-11-18 20:17:22 +00:00
|
|
|
{
|
|
|
|
std::lock_guard lock(push_request_mutex);
|
2022-08-29 10:50:18 +00:00
|
|
|
result.outstanding_requests_count = active_requests_queue->size();
|
2021-11-18 20:17:22 +00:00
|
|
|
}
|
|
|
|
{
|
|
|
|
std::lock_guard lock(session_to_response_callback_mutex);
|
|
|
|
result.alive_connections_count = session_to_response_callback.size();
|
|
|
|
}
|
|
|
|
return result;
|
2021-11-05 10:21:34 +00:00
|
|
|
}
|
|
|
|
|
2021-01-19 14:22:28 +00:00
|
|
|
}
|