ClickHouse/src/Coordination/KeeperDispatcher.cpp

830 lines
30 KiB
C++
Raw Normal View History

#include <Coordination/KeeperDispatcher.h>
2021-01-19 14:22:28 +00:00
#include <Common/setThreadName.h>
2021-04-16 13:50:09 +00:00
#include <Common/ZooKeeper/KeeperException.h>
#include <future>
#include <chrono>
2021-11-05 10:21:34 +00:00
#include <Poco/Path.h>
#include <Common/hex.h>
2021-11-12 12:48:42 +00:00
#include <filesystem>
2021-11-18 20:17:22 +00:00
#include <Common/checkStackSize.h>
2021-11-12 12:48:42 +00:00
namespace fs = std::filesystem;
2021-01-19 14:22:28 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int TIMEOUT_EXCEEDED;
2021-10-08 13:32:02 +00:00
extern const int SYSTEM_ERROR;
2021-10-27 12:26:42 +00:00
}
2021-01-19 14:22:28 +00:00
KeeperDispatcher::KeeperDispatcher()
2021-10-27 12:26:42 +00:00
: responses_queue(std::numeric_limits<size_t>::max())
2021-11-18 20:17:22 +00:00
, configuration_and_settings(std::make_shared<KeeperConfigurationAndSettings>())
2021-03-29 08:24:56 +00:00
, log(&Poco::Logger::get("KeeperDispatcher"))
{
}
2021-04-16 13:50:09 +00:00
void KeeperDispatcher::requestThread()
2021-01-19 14:22:28 +00:00
{
2021-03-29 08:24:56 +00:00
setThreadName("KeeperReqT");
2021-04-16 13:50:09 +00:00
/// Result of requests batch from previous iteration
RaftAppendResult prev_result = nullptr;
KeeperStorage::RequestsForSessions prev_batch;
2022-06-24 14:42:24 +00:00
const auto needs_quorum = [](const auto & coordination_settings, const auto & request)
2022-06-24 11:36:12 +00:00
{
return coordination_settings->quorum_reads || !request.request->isReadRequest();
};
KeeperStorage::RequestsForSessions write_requests;
KeeperStorage::RequestsForSessions read_requests;
2021-01-26 14:08:31 +00:00
while (!shutdown_called)
2021-01-19 14:22:28 +00:00
{
2021-03-29 08:24:56 +00:00
KeeperStorage::RequestForSession request;
2021-01-19 14:22:28 +00:00
2021-11-18 20:17:22 +00:00
auto coordination_settings = configuration_and_settings->coordination_settings;
uint64_t max_wait = coordination_settings->operation_timeout_ms.totalMilliseconds();
uint64_t max_batch_size = coordination_settings->max_requests_batch_size;
2021-04-16 13:50:09 +00:00
2021-04-16 19:08:52 +00:00
/// The code below do a very simple thing: batch all write (quorum) requests into vector until
2021-04-16 13:50:09 +00:00
/// previous write batch is not finished or max_batch size achieved. The main complexity goes from
/// the ability to process read requests without quorum (from local state). So when we are collecting
/// requests into a batch we must check that the new request is not read request. Otherwise we have to
/// process all already accumulated write requests, wait them synchronously and only after that process
/// read request. So reads are some kind of "separator" for writes.
try
2021-01-26 07:47:04 +00:00
{
KeeperStorage::RequestsForSessions * current_batch{nullptr};
bool is_current_read = false;
2022-06-24 11:36:12 +00:00
2021-04-16 13:50:09 +00:00
if (requests_queue->tryPop(request, max_wait))
2021-01-19 14:22:28 +00:00
{
2021-04-16 13:50:09 +00:00
if (shutdown_called)
break;
if (needs_quorum(coordination_settings, request))
2021-04-16 13:50:09 +00:00
{
write_requests.emplace_back(request);
2021-04-16 13:50:09 +00:00
if (!current_batch)
2021-04-16 13:50:09 +00:00
{
current_batch = &write_requests;
is_current_read = false;
2021-04-16 13:50:09 +00:00
}
}
else
2022-06-24 11:36:12 +00:00
{
read_requests.emplace_back(request);
if (!current_batch)
{
current_batch = &read_requests;
is_current_read = true;
}
}
/// Waiting until previous append will be successful, or batch is big enough
/// has_result == false && get_result_code == OK means that our request still not processed.
/// Sometimes NuRaft set errorcode without setting result, so we check both here.
while (true)
{
if (write_requests.size() > max_batch_size)
{
current_batch = &write_requests;
is_current_read = false;
break;
}
if (read_requests.size() > max_batch_size)
{
current_batch = &read_requests;
is_current_read = true;
break;
}
/// Trying to get batch requests as fast as possible
if (requests_queue->tryPop(request, 1))
{
if (needs_quorum(coordination_settings, request))
write_requests.emplace_back(request);
else
read_requests.emplace_back(request);
}
else if (!prev_result || prev_result->has_result() || prev_result->get_result_code() != nuraft::cmd_result_code::OK)
break;
if (shutdown_called)
break;
2022-06-24 11:36:12 +00:00
}
2021-04-16 13:50:09 +00:00
if (shutdown_called)
break;
/// Forcefully process all previous pending requests
if (prev_result)
2021-04-17 14:06:49 +00:00
forceWaitAndProcessResult(prev_result, prev_batch);
2021-04-16 13:50:09 +00:00
if (current_batch && !current_batch->empty())
2021-04-16 13:50:09 +00:00
{
LOG_INFO(&Poco::Logger::get("BATCHING"), "Processing {} batch of {}", is_current_read, current_batch->size());
if (!is_current_read)
2021-04-16 13:50:09 +00:00
{
prev_result = server->putRequestBatch(*current_batch);
prev_batch = std::move(*current_batch);
2021-04-16 13:50:09 +00:00
if (!read_requests.empty())
2022-06-24 11:36:12 +00:00
{
current_batch = &read_requests;
is_current_read = true;
2022-06-24 11:36:12 +00:00
}
else
current_batch = nullptr;
}
else
{
prev_result = server->getLeaderInfo();
prev_result->when_ready([&, requests_for_sessions = *current_batch](nuraft::cmd_result<nuraft::ptr<nuraft::buffer>> & result, nuraft::ptr<std::exception> &)
2022-06-24 11:36:12 +00:00
{
auto & leader_info_ctx = result.get();
2022-06-24 11:36:12 +00:00
KeeperServer::NodeInfo leader_info;
leader_info.term = leader_info_ctx->get_ulong();
leader_info.last_committed_index = leader_info_ctx->get_ulong();
std::lock_guard lock(leader_waiter_mutex);
auto node_info = server->getNodeInfo();
if (node_info.term < leader_info.term || node_info.last_committed_index < leader_info.last_committed_index)
{
auto & leader_waiter = leader_waiters[node_info];
leader_waiter.insert(leader_waiter.end(), requests_for_sessions.begin(), requests_for_sessions.end());
2022-06-24 11:36:12 +00:00
LOG_INFO(log, "waiting for {}, idx {}", leader_info.term, leader_info.last_committed_index);
}
else
{
for (const auto & read_request : requests_for_sessions)
2022-06-24 11:36:12 +00:00
{
if (server->isLeaderAlive())
server->putLocalReadRequest(read_request);
else
addErrorResponses({read_request}, Coordination::Error::ZCONNECTIONLOSS);
finalizeRequest(read_request);
2022-06-24 11:36:12 +00:00
}
}
});
prev_batch = std::move(*current_batch);
if (!write_requests.empty())
{
current_batch = &write_requests;
is_current_read = false;
2022-06-24 11:36:12 +00:00
}
else
current_batch = nullptr;
2022-06-24 11:36:12 +00:00
}
2021-04-16 13:56:57 +00:00
}
}
}
2021-04-16 13:50:09 +00:00
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
void KeeperDispatcher::responseThread()
{
2021-03-29 08:24:56 +00:00
setThreadName("KeeperRspT");
while (!shutdown_called)
{
2021-03-29 08:24:56 +00:00
KeeperStorage::ResponseForSession response_for_session;
2021-11-18 20:17:22 +00:00
uint64_t max_wait = configuration_and_settings->coordination_settings->operation_timeout_ms.totalMilliseconds();
if (responses_queue.tryPop(response_for_session, max_wait))
{
if (shutdown_called)
break;
try
{
setResponse(response_for_session.session_id, response_for_session.response);
2021-01-19 14:22:28 +00:00
}
2021-01-26 07:47:04 +00:00
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
2021-01-19 14:22:28 +00:00
}
}
}
void KeeperDispatcher::snapshotThread()
2021-03-05 10:40:24 +00:00
{
2021-03-29 08:24:56 +00:00
setThreadName("KeeperSnpT");
2021-03-05 10:40:24 +00:00
while (!shutdown_called)
{
CreateSnapshotTask task;
2021-10-06 11:02:40 +00:00
if (!snapshots_queue.pop(task))
break;
2021-03-05 10:40:24 +00:00
if (shutdown_called)
break;
try
{
2021-03-07 21:40:32 +00:00
task.create_snapshot(std::move(task.snapshot));
2021-03-05 10:40:24 +00:00
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
void KeeperDispatcher::setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response)
2021-01-19 14:22:28 +00:00
{
std::lock_guard lock(session_to_response_callback_mutex);
/// Special new session response.
2021-04-16 13:50:09 +00:00
if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::SessionID)
{
const Coordination::ZooKeeperSessionIDResponse & session_id_resp = dynamic_cast<const Coordination::ZooKeeperSessionIDResponse &>(*response);
2021-04-16 18:31:23 +00:00
2021-04-16 13:50:09 +00:00
/// Nobody waits for this session id
if (session_id_resp.server_id != server->getServerID() || !new_session_id_response_callback.contains(session_id_resp.internal_id))
2021-04-16 13:50:09 +00:00
return;
auto callback = new_session_id_response_callback[session_id_resp.internal_id];
callback(response);
new_session_id_response_callback.erase(session_id_resp.internal_id);
}
else /// Normal response, just write to client
2021-04-16 13:50:09 +00:00
{
auto session_response_callback = session_to_response_callback.find(session_id);
/// Session was disconnected, just skip this response
if (session_response_callback == session_to_response_callback.end())
2021-11-20 17:35:41 +00:00
{
LOG_TEST(log, "Cannot write response xid={}, op={}, session {} disconnected", response->xid, response->getOpNum(), session_id);
2021-04-16 13:50:09 +00:00
return;
2021-11-20 17:35:41 +00:00
}
2021-04-16 13:50:09 +00:00
session_response_callback->second(response);
2021-04-16 18:31:23 +00:00
2021-04-16 13:50:09 +00:00
/// Session closed, no more writes
if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::Close)
{
session_to_response_callback.erase(session_response_callback);
2021-04-16 13:50:09 +00:00
}
}
2021-01-19 14:22:28 +00:00
}
bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id)
2021-01-19 14:22:28 +00:00
{
{
/// If session was already disconnected than we will ignore requests
2021-01-19 14:22:28 +00:00
std::lock_guard lock(session_to_response_callback_mutex);
if (!session_to_response_callback.contains(session_id))
2021-01-25 12:29:12 +00:00
return false;
2021-01-19 14:22:28 +00:00
}
2021-03-29 08:24:56 +00:00
KeeperStorage::RequestForSession request_info;
2021-01-19 14:22:28 +00:00
request_info.request = request;
2022-01-06 14:44:01 +00:00
using namespace std::chrono;
request_info.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
2021-01-19 14:22:28 +00:00
request_info.session_id = session_id;
{
std::lock_guard lock{unprocessed_request_mutex};
auto unprocessed_requests_it = unprocessed_requests_for_session.find(session_id);
if (unprocessed_requests_it == unprocessed_requests_for_session.end())
{
auto & unprocessed_requests = unprocessed_requests_for_session[session_id];
unprocessed_requests.unprocessed_num = 1;
unprocessed_requests.is_read = request->isReadRequest();
}
else
{
auto & unprocessed_requests = unprocessed_requests_it->second;
if (!unprocessed_requests.request_queue.empty() || unprocessed_requests.is_read != request->isReadRequest())
{
unprocessed_requests.request_queue.push_back(std::move(request_info));
return true;
}
++unprocessed_requests.unprocessed_num;
}
}
2021-01-19 14:22:28 +00:00
std::lock_guard lock(push_request_mutex);
2021-05-22 07:46:12 +00:00
if (shutdown_called)
return false;
2021-01-19 14:22:28 +00:00
/// Put close requests without timeouts
if (request->getOpNum() == Coordination::OpNum::Close)
2021-10-07 22:06:33 +00:00
{
if (!requests_queue->push(std::move(request_info)))
2021-10-08 13:32:02 +00:00
throw Exception("Cannot push request to queue", ErrorCodes::SYSTEM_ERROR);
2021-10-07 22:06:33 +00:00
}
2021-11-18 20:17:22 +00:00
else if (!requests_queue->tryPush(std::move(request_info), configuration_and_settings->coordination_settings->operation_timeout_ms.totalMilliseconds()))
2021-10-07 22:06:33 +00:00
{
2021-01-19 14:22:28 +00:00
throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED);
2021-10-07 22:06:33 +00:00
}
2021-01-25 12:29:12 +00:00
return true;
2021-01-19 14:22:28 +00:00
}
void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper, bool start_async)
2021-01-19 14:22:28 +00:00
{
LOG_DEBUG(log, "Initializing storage dispatcher");
2021-01-25 12:29:12 +00:00
2021-11-18 20:17:22 +00:00
configuration_and_settings = KeeperConfigurationAndSettings::loadFromConfig(config, standalone_keeper);
requests_queue = std::make_unique<RequestsQueue>(configuration_and_settings->coordination_settings->max_requests_batch_size);
2021-01-25 12:29:12 +00:00
2021-03-26 11:18:31 +00:00
request_thread = ThreadFromGlobalPool([this] { requestThread(); });
responses_thread = ThreadFromGlobalPool([this] { responseThread(); });
snapshot_thread = ThreadFromGlobalPool([this] { snapshotThread(); });
server = std::make_unique<KeeperServer>(configuration_and_settings, config, responses_queue, snapshots_queue, [this](const KeeperStorage::RequestForSession & request_for_session, uint64_t log_term, uint64_t log_idx) { onRequestCommit(request_for_session, log_term, log_idx); });
try
2021-01-25 14:10:18 +00:00
{
2021-02-11 09:17:57 +00:00
LOG_DEBUG(log, "Waiting server to initialize");
2022-04-11 06:41:46 +00:00
server->startup(config, configuration_and_settings->enable_ipv6);
2021-02-11 09:58:02 +00:00
LOG_DEBUG(log, "Server initialized, waiting for quorum");
2021-02-11 09:49:49 +00:00
if (!start_async)
{
server->waitInit();
LOG_DEBUG(log, "Quorum initialized");
}
else
{
LOG_INFO(log, "Starting Keeper asynchronously, server will accept connections to Keeper when it will be ready");
}
2021-01-27 17:54:25 +00:00
}
catch (...)
2021-01-27 17:54:25 +00:00
{
tryLogCurrentException(__PRETTY_FUNCTION__);
throw;
2021-01-25 14:10:18 +00:00
}
2021-01-25 12:29:12 +00:00
/// Start it after keeper server start
session_cleaner_thread = ThreadFromGlobalPool([this] { sessionCleanerTask(); });
2021-10-19 12:00:26 +00:00
update_configuration_thread = ThreadFromGlobalPool([this] { updateConfigurationThread(); });
updateConfiguration(config);
2021-01-25 12:29:12 +00:00
LOG_DEBUG(log, "Dispatcher initialized");
2021-01-19 14:22:28 +00:00
}
void KeeperDispatcher::shutdown()
2021-01-19 14:22:28 +00:00
{
try
{
2021-01-26 14:08:31 +00:00
{
std::lock_guard lock(push_request_mutex);
if (shutdown_called)
return;
LOG_DEBUG(log, "Shutting down storage dispatcher");
2021-01-26 14:08:31 +00:00
shutdown_called = true;
if (session_cleaner_thread.joinable())
session_cleaner_thread.join();
2021-08-30 13:23:31 +00:00
if (requests_queue)
{
2021-10-06 11:02:40 +00:00
requests_queue->finish();
2021-10-08 13:32:02 +00:00
if (request_thread.joinable())
request_thread.join();
2021-08-30 13:23:31 +00:00
}
2021-10-06 11:02:40 +00:00
responses_queue.finish();
if (responses_thread.joinable())
responses_thread.join();
2021-03-05 10:40:24 +00:00
2021-10-06 11:02:40 +00:00
snapshots_queue.finish();
2021-03-05 10:40:24 +00:00
if (snapshot_thread.joinable())
snapshot_thread.join();
2021-10-19 12:00:26 +00:00
update_configuration_queue.finish();
if (update_configuration_thread.joinable())
update_configuration_thread.join();
2021-01-26 14:08:31 +00:00
}
if (server)
server->shutdown();
2021-01-26 14:08:31 +00:00
2021-09-02 09:20:11 +00:00
KeeperStorage::RequestForSession request_for_session;
2021-08-30 13:23:31 +00:00
2021-09-02 09:20:11 +00:00
/// Set session expired for all pending requests
while (requests_queue && requests_queue->tryPop(request_for_session))
{
2021-10-07 17:13:56 +00:00
auto response = request_for_session.request->makeResponse();
response->error = Coordination::Error::ZSESSIONEXPIRED;
setResponse(request_for_session.session_id, response);
2021-01-26 14:08:31 +00:00
}
2021-05-22 07:50:23 +00:00
/// Clear all registered sessions
2021-05-22 07:50:23 +00:00
std::lock_guard lock(session_to_response_callback_mutex);
session_to_response_callback.clear();
2021-01-19 14:22:28 +00:00
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
LOG_DEBUG(log, "Dispatcher shut down");
2021-01-19 14:22:28 +00:00
}
void KeeperDispatcher::forceRecovery()
{
server->forceRecovery();
}
KeeperDispatcher::~KeeperDispatcher()
2021-01-26 14:08:31 +00:00
{
shutdown();
}
void KeeperDispatcher::registerSession(int64_t session_id, ZooKeeperResponseCallback callback)
2021-01-19 14:22:28 +00:00
{
std::lock_guard lock(session_to_response_callback_mutex);
if (!session_to_response_callback.try_emplace(session_id, callback).second)
throw Exception(DB::ErrorCodes::LOGICAL_ERROR, "Session with id {} already registered in dispatcher", session_id);
}
void KeeperDispatcher::sessionCleanerTask()
{
while (true)
{
if (shutdown_called)
return;
try
{
/// Only leader node must check dead sessions
if (server->checkInit() && isLeader())
{
auto dead_sessions = server->getDeadSessions();
for (int64_t dead_session : dead_sessions)
{
LOG_INFO(log, "Found dead session {}, will try to close it", dead_session);
/// Close session == send close request to raft server
Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close);
request->xid = Coordination::CLOSE_XID;
2021-03-29 08:24:56 +00:00
KeeperStorage::RequestForSession request_info;
2021-02-09 18:29:06 +00:00
request_info.request = request;
2022-01-13 09:49:39 +00:00
using namespace std::chrono;
request_info.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
2021-02-09 18:29:06 +00:00
request_info.session_id = dead_session;
{
std::lock_guard lock(push_request_mutex);
2021-10-07 22:06:33 +00:00
if (!requests_queue->push(std::move(request_info)))
2021-10-08 13:32:02 +00:00
LOG_INFO(log, "Cannot push close request to queue while cleaning outdated sessions");
2021-02-09 18:29:06 +00:00
}
/// Remove session from registered sessions
finishSession(dead_session);
LOG_INFO(log, "Dead session close request pushed");
}
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
2021-11-18 20:17:22 +00:00
auto time_to_sleep = configuration_and_settings->coordination_settings->dead_session_check_period_ms.totalMilliseconds();
std::this_thread::sleep_for(std::chrono::milliseconds(time_to_sleep));
}
}
void KeeperDispatcher::finishSession(int64_t session_id)
{
std::lock_guard lock(session_to_response_callback_mutex);
auto session_it = session_to_response_callback.find(session_id);
if (session_it != session_to_response_callback.end())
session_to_response_callback.erase(session_it);
}
void KeeperDispatcher::addErrorResponses(const KeeperStorage::RequestsForSessions & requests_for_sessions, Coordination::Error error)
2021-04-16 13:50:09 +00:00
{
2022-06-15 12:48:30 +00:00
for (const auto & request_for_session : requests_for_sessions)
2021-04-16 13:50:09 +00:00
{
KeeperStorage::ResponsesForSessions responses;
2022-06-15 12:48:30 +00:00
auto response = request_for_session.request->makeResponse();
response->xid = request_for_session.request->xid;
response->zxid = 0;
2021-04-16 13:50:09 +00:00
response->error = error;
2022-06-15 12:48:30 +00:00
if (!responses_queue.push(DB::KeeperStorage::ResponseForSession{request_for_session.session_id, response}))
2021-10-08 13:32:02 +00:00
throw Exception(ErrorCodes::SYSTEM_ERROR,
2021-10-08 08:48:08 +00:00
"Could not push error response xid {} zxid {} error message {} to responses queue",
response->xid,
response->zxid,
errorMessage(error));
2021-04-16 13:50:09 +00:00
}
}
void KeeperDispatcher::forceWaitAndProcessResult(RaftAppendResult & result, KeeperStorage::RequestsForSessions & requests_for_sessions)
2021-04-16 13:50:09 +00:00
{
if (!result->has_result())
result->get();
2021-04-16 18:31:23 +00:00
if (!result->get_accepted() || result->get_result_code() == nuraft::cmd_result_code::TIMEOUT)
2021-04-17 14:06:49 +00:00
addErrorResponses(requests_for_sessions, Coordination::Error::ZOPERATIONTIMEOUT);
2021-04-16 13:50:09 +00:00
else if (result->get_result_code() != nuraft::cmd_result_code::OK)
addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS);
2021-04-16 13:50:09 +00:00
2021-04-17 14:06:49 +00:00
requests_for_sessions.clear();
result = nullptr;
2021-04-16 13:50:09 +00:00
}
int64_t KeeperDispatcher::getSessionID(int64_t session_timeout_ms)
2021-04-16 13:50:09 +00:00
{
/// New session id allocation is a special request, because we cannot process it in normal
/// way: get request -> put to raft -> set response for registered callback.
2021-04-16 13:50:09 +00:00
KeeperStorage::RequestForSession request_info;
std::shared_ptr<Coordination::ZooKeeperSessionIDRequest> request = std::make_shared<Coordination::ZooKeeperSessionIDRequest>();
/// Internal session id. It's a temporary number which is unique for each client on this server
/// but can be same on different servers.
2021-04-16 13:50:09 +00:00
request->internal_id = internal_session_id_counter.fetch_add(1);
request->session_timeout_ms = session_timeout_ms;
2021-04-16 18:31:23 +00:00
request->server_id = server->getServerID();
2021-04-16 13:50:09 +00:00
request_info.request = request;
2022-01-13 09:49:39 +00:00
using namespace std::chrono;
request_info.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
2021-04-16 13:50:09 +00:00
request_info.session_id = -1;
auto promise = std::make_shared<std::promise<int64_t>>();
auto future = promise->get_future();
2021-04-16 13:50:09 +00:00
{
std::lock_guard lock(session_to_response_callback_mutex);
2021-04-16 18:31:23 +00:00
new_session_id_response_callback[request->internal_id] = [promise, internal_id = request->internal_id] (const Coordination::ZooKeeperResponsePtr & response)
2021-04-16 13:50:09 +00:00
{
if (response->getOpNum() != Coordination::OpNum::SessionID)
promise->set_exception(std::make_exception_ptr(Exception(ErrorCodes::LOGICAL_ERROR,
"Incorrect response of type {} instead of SessionID response", Coordination::toString(response->getOpNum()))));
2021-04-16 18:31:23 +00:00
auto session_id_response = dynamic_cast<const Coordination::ZooKeeperSessionIDResponse &>(*response);
if (session_id_response.internal_id != internal_id)
{
promise->set_exception(std::make_exception_ptr(Exception(ErrorCodes::LOGICAL_ERROR,
"Incorrect response with internal id {} instead of {}", session_id_response.internal_id, internal_id)));
}
2021-04-16 13:50:09 +00:00
if (response->error != Coordination::Error::ZOK)
promise->set_exception(std::make_exception_ptr(zkutil::KeeperException("SessionID request failed with error", response->error)));
2021-04-16 18:31:23 +00:00
promise->set_value(session_id_response.session_id);
2021-04-16 13:50:09 +00:00
};
}
/// Push new session request to queue
2021-04-16 13:50:09 +00:00
{
std::lock_guard lock(push_request_mutex);
if (!requests_queue->tryPush(std::move(request_info), session_timeout_ms))
throw Exception("Cannot push session id request to queue within session timeout", ErrorCodes::TIMEOUT_EXCEEDED);
}
if (future.wait_for(std::chrono::milliseconds(session_timeout_ms)) != std::future_status::ready)
throw Exception("Cannot receive session id within session timeout", ErrorCodes::TIMEOUT_EXCEEDED);
/// Forcefully wait for request execution because we cannot process any other
/// requests for this client until it get new session id.
2021-04-16 13:50:09 +00:00
return future.get();
}
2021-10-19 12:00:26 +00:00
void KeeperDispatcher::updateConfigurationThread()
2021-10-18 15:27:51 +00:00
{
2021-10-19 12:00:26 +00:00
while (true)
2021-10-18 15:27:51 +00:00
{
2021-10-19 12:00:26 +00:00
if (shutdown_called)
return;
try
{
2022-04-26 07:32:02 +00:00
using namespace std::chrono_literals;
2021-10-19 12:00:26 +00:00
if (!server->checkInit())
{
LOG_INFO(log, "Server still not initialized, will not apply configuration until initialization finished");
2022-04-26 07:32:02 +00:00
std::this_thread::sleep_for(5000ms);
2021-10-19 12:00:26 +00:00
continue;
}
2022-04-19 08:08:13 +00:00
if (server->isRecovering())
{
LOG_INFO(log, "Server is recovering, will not apply configuration until recovery is finished");
2022-04-26 07:32:02 +00:00
std::this_thread::sleep_for(5000ms);
2022-04-19 08:08:13 +00:00
continue;
}
2021-10-19 12:00:26 +00:00
ConfigUpdateAction action;
if (!update_configuration_queue.pop(action))
break;
2021-10-19 13:11:29 +00:00
/// We must wait this update from leader or apply it ourself (if we are leader)
bool done = false;
while (!done)
2021-10-19 12:00:26 +00:00
{
2022-04-19 08:08:13 +00:00
if (server->isRecovering())
break;
2021-10-19 13:11:29 +00:00
if (shutdown_called)
return;
2021-10-19 12:00:26 +00:00
2021-10-19 13:11:29 +00:00
if (isLeader())
{
server->applyConfigurationUpdate(action);
done = true;
}
else
{
done = server->waitConfigurationUpdate(action);
if (!done)
LOG_INFO(log, "Cannot wait for configuration update, maybe we become leader, or maybe update is invalid, will try to wait one more time");
}
2021-10-19 12:00:26 +00:00
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
2021-10-18 15:27:51 +00:00
}
2021-10-19 12:00:26 +00:00
}
void KeeperDispatcher::finalizeRequest(const KeeperStorage::RequestForSession & request_for_session)
2022-06-08 09:22:46 +00:00
{
std::lock_guard lock{unprocessed_request_mutex};
auto unprocessed_requests_it = unprocessed_requests_for_session.find(request_for_session.session_id);
if (unprocessed_requests_it == unprocessed_requests_for_session.end())
return;
auto & unprocessed_requests = unprocessed_requests_it->second;
--unprocessed_requests.unprocessed_num;
if (unprocessed_requests.unprocessed_num == 0)
{
if (!unprocessed_requests.request_queue.empty())
{
auto & request_queue = unprocessed_requests.request_queue;
unprocessed_requests.is_read = !unprocessed_requests.is_read;
while (!request_queue.empty() && request_queue.front().request->isReadRequest() == unprocessed_requests.is_read)
{
auto & front_request = request_queue.front();
/// Put close requests without timeouts
if (front_request.request->getOpNum() == Coordination::OpNum::Close)
{
if (!requests_queue->push(std::move(front_request)))
throw Exception("Cannot push request to queue", ErrorCodes::SYSTEM_ERROR);
}
else if (!requests_queue->tryPush(std::move(front_request), configuration_and_settings->coordination_settings->operation_timeout_ms.totalMilliseconds()))
{
throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED);
}
++unprocessed_requests.unprocessed_num;
request_queue.pop_front();
}
}
else
{
unprocessed_requests_for_session.erase(unprocessed_requests_it);
}
}
}
void KeeperDispatcher::onRequestCommit(const KeeperStorage::RequestForSession & request_for_session, uint64_t log_term, uint64_t log_idx)
{
finalizeRequest(request_for_session);
2022-06-08 09:22:46 +00:00
const auto process_requests = [this](auto & request_queue)
{
2022-06-24 14:42:24 +00:00
for (const auto & request_info : request_queue)
2022-06-08 09:22:46 +00:00
{
2022-06-24 14:42:24 +00:00
if (server->isLeaderAlive())
server->putLocalReadRequest(request_info);
else
addErrorResponses({request_info}, Coordination::Error::ZCONNECTIONLOSS);
finalizeRequest(request_info);
2022-06-08 09:22:46 +00:00
}
};
2022-06-27 14:22:53 +00:00
KeeperStorage::RequestsForSessions requests;
2022-06-08 09:22:46 +00:00
{
2022-06-27 14:22:53 +00:00
std::lock_guard lock(leader_waiter_mutex);
auto request_queue_it = leader_waiters.find(KeeperServer::NodeInfo{.term = log_term, .last_committed_index = log_idx});
if (request_queue_it != leader_waiters.end())
{
requests = std::move(request_queue_it->second);
leader_waiters.erase(request_queue_it);
}
2022-06-08 09:22:46 +00:00
}
2022-06-27 14:22:53 +00:00
process_requests(requests);
2022-06-08 09:22:46 +00:00
}
2022-04-14 12:00:47 +00:00
bool KeeperDispatcher::isServerActive() const
{
return checkInit() && hasLeader() && !server->isRecovering();
}
2021-10-19 12:00:26 +00:00
void KeeperDispatcher::updateConfiguration(const Poco::Util::AbstractConfiguration & config)
{
auto diff = server->getConfigurationDiff(config);
if (diff.empty())
LOG_TRACE(log, "Configuration update triggered, but nothing changed for RAFT");
else if (diff.size() > 1)
LOG_WARNING(log, "Configuration changed for more than one server ({}) from cluster, it's strictly not recommended", diff.size());
2021-10-18 15:27:51 +00:00
else
2021-10-19 12:00:26 +00:00
LOG_DEBUG(log, "Configuration change size ({})", diff.size());
for (auto & change : diff)
2021-10-18 15:27:51 +00:00
{
2021-10-19 12:00:26 +00:00
bool push_result = update_configuration_queue.push(change);
if (!push_result)
throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push configuration update to queue");
2021-10-18 15:27:51 +00:00
}
}
2021-11-18 20:17:22 +00:00
void KeeperDispatcher::updateKeeperStatLatency(uint64_t process_time_ms)
2021-10-27 12:26:42 +00:00
{
2021-11-18 20:17:22 +00:00
keeper_stats.updateLatency(process_time_ms);
2021-10-27 12:26:42 +00:00
}
2021-11-18 20:17:22 +00:00
static uint64_t getDirSize(const fs::path & dir)
2021-10-27 12:26:42 +00:00
{
2021-11-18 20:17:22 +00:00
checkStackSize();
if (!fs::exists(dir))
return 0;
2021-10-27 12:26:42 +00:00
2021-11-18 20:17:22 +00:00
fs::directory_iterator it(dir);
fs::directory_iterator end;
2021-10-27 12:26:42 +00:00
2021-11-18 20:17:22 +00:00
uint64_t size{0};
while (it != end)
{
if (it->is_regular_file())
size += fs::file_size(*it);
else
size += getDirSize(it->path());
++it;
}
return size;
2021-10-27 12:26:42 +00:00
}
2021-11-18 20:17:22 +00:00
uint64_t KeeperDispatcher::getLogDirSize() const
2021-11-05 10:21:34 +00:00
{
2021-11-18 20:17:22 +00:00
return getDirSize(configuration_and_settings->log_storage_path);
2021-11-05 10:21:34 +00:00
}
2021-11-18 20:17:22 +00:00
uint64_t KeeperDispatcher::getSnapDirSize() const
2021-11-05 10:21:34 +00:00
{
2021-11-18 20:17:22 +00:00
return getDirSize(configuration_and_settings->snapshot_storage_path);
2021-11-05 10:21:34 +00:00
}
2021-11-19 09:30:58 +00:00
Keeper4LWInfo KeeperDispatcher::getKeeper4LWInfo() const
2021-10-27 12:26:42 +00:00
{
2021-11-19 09:30:58 +00:00
Keeper4LWInfo result;
2021-11-19 15:22:46 +00:00
result.is_follower = server->isFollower();
2021-11-19 09:48:38 +00:00
result.is_standalone = !result.is_follower && server->getFollowerCount() == 0;
2021-11-18 20:17:22 +00:00
result.is_leader = isLeader();
result.is_observer = server->isObserver();
result.has_leader = hasLeader();
{
std::lock_guard lock(push_request_mutex);
result.outstanding_requests_count = requests_queue->size();
}
{
std::lock_guard lock(session_to_response_callback_mutex);
result.alive_connections_count = session_to_response_callback.size();
}
if (result.is_leader)
2021-11-05 10:21:34 +00:00
{
2021-11-18 20:17:22 +00:00
result.follower_count = server->getFollowerCount();
result.synced_follower_count = server->getSyncedFollowerCount();
2021-11-05 10:21:34 +00:00
}
2021-11-18 20:17:22 +00:00
result.total_nodes_count = server->getKeeperStateMachine()->getNodesCount();
result.last_zxid = server->getKeeperStateMachine()->getLastProcessedZxid();
return result;
2021-11-05 10:21:34 +00:00
}
2021-01-19 14:22:28 +00:00
}