ClickHouse/src/Coordination/KeeperStorageDispatcher.cpp

461 lines
17 KiB
C++
Raw Normal View History

2021-03-29 08:24:56 +00:00
#include <Coordination/KeeperStorageDispatcher.h>
2021-01-19 14:22:28 +00:00
#include <Common/setThreadName.h>
2021-04-16 13:50:09 +00:00
#include <Common/Stopwatch.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <future>
#include <chrono>
2021-01-19 14:22:28 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int TIMEOUT_EXCEEDED;
}
2021-03-29 08:24:56 +00:00
KeeperStorageDispatcher::KeeperStorageDispatcher()
2021-02-09 14:47:18 +00:00
: coordination_settings(std::make_shared<CoordinationSettings>())
2021-03-29 08:24:56 +00:00
, log(&Poco::Logger::get("KeeperDispatcher"))
{
}
2021-04-16 13:50:09 +00:00
2021-03-29 08:24:56 +00:00
void KeeperStorageDispatcher::requestThread()
2021-01-19 14:22:28 +00:00
{
2021-03-29 08:24:56 +00:00
setThreadName("KeeperReqT");
2021-04-16 13:50:09 +00:00
/// Result of requests batch from previous iteration
RaftAppendResult prev_result = nullptr;
/// Requests from previous iteration. We store them to be able
/// to send errors to the client.
KeeperStorage::RequestsForSessions prev_batch;
2021-01-26 14:08:31 +00:00
while (!shutdown_called)
2021-01-19 14:22:28 +00:00
{
2021-03-29 08:24:56 +00:00
KeeperStorage::RequestForSession request;
2021-01-19 14:22:28 +00:00
2021-02-09 14:47:18 +00:00
UInt64 max_wait = UInt64(coordination_settings->operation_timeout_ms.totalMilliseconds());
2021-04-16 13:50:09 +00:00
uint64_t max_batch_size = coordination_settings->max_requests_batch_size;
2021-04-16 19:08:52 +00:00
/// The code below do a very simple thing: batch all write (quorum) requests into vector until
2021-04-16 13:50:09 +00:00
/// previous write batch is not finished or max_batch size achieved. The main complexity goes from
/// the ability to process read requests without quorum (from local state). So when we are collecting
/// requests into a batch we must check that the new request is not read request. Otherwise we have to
/// process all already accumulated write requests, wait them synchronously and only after that process
/// read request. So reads are some kind of "separator" for writes.
try
2021-01-26 07:47:04 +00:00
{
2021-04-16 13:50:09 +00:00
if (requests_queue->tryPop(request, max_wait))
2021-01-19 14:22:28 +00:00
{
2021-04-16 13:50:09 +00:00
if (shutdown_called)
break;
KeeperStorage::RequestsForSessions current_batch;
bool has_read_request = false;
/// If new request is not read request or we must to process it through quorum.
/// Otherwise we will process it locally.
2021-04-16 13:53:48 +00:00
if (coordination_settings->quorum_reads || !request.request->isReadRequest())
2021-04-16 13:50:09 +00:00
{
current_batch.emplace_back(request);
/// Waiting until previous append will be successful, or batch is big enough
2021-04-16 18:31:23 +00:00
/// has_result == false && get_result_code == OK means that our request still not processed.
/// Sometimes NuRaft set errorcode without setting result, so we check both here.
2021-04-16 13:50:09 +00:00
while (prev_result && (!prev_result->has_result() && prev_result->get_result_code() == nuraft::cmd_result_code::OK) && current_batch.size() <= max_batch_size)
{
/// Trying to get batch requests as fast as possible
if (requests_queue->tryPop(request, 1))
{
/// Don't append read request into batch, we have to process them separately
2021-04-16 13:53:48 +00:00
if (!coordination_settings->quorum_reads && request.request->isReadRequest())
2021-04-16 13:50:09 +00:00
{
has_read_request = true;
break;
}
2021-04-16 18:31:23 +00:00
else
{
2021-04-16 13:50:09 +00:00
2021-04-16 18:31:23 +00:00
current_batch.emplace_back(request);
}
2021-04-16 13:50:09 +00:00
}
if (shutdown_called)
break;
}
}
else
has_read_request = true;
if (shutdown_called)
break;
/// Forcefully process all previous pending requests
if (prev_result)
forceWaitAndProcessResult(std::move(prev_result), std::move(prev_batch));
/// Process collected write requests batch
if (!current_batch.empty())
{
2021-04-16 18:31:23 +00:00
auto result = server->putRequestBatch(current_batch);
2021-04-16 13:50:09 +00:00
2021-04-16 18:31:23 +00:00
if (result)
2021-04-16 13:50:09 +00:00
{
if (has_read_request) /// If we will execute read request next, than we have to process result now
2021-04-16 18:31:23 +00:00
forceWaitAndProcessResult(std::move(result), std::move(current_batch));
2021-04-16 13:50:09 +00:00
}
else
{
addErrorResponses(std::move(current_batch), Coordination::Error::ZRUNTIMEINCONSISTENCY);
}
prev_batch = current_batch;
2021-04-16 18:31:23 +00:00
prev_result = result;
2021-04-16 13:50:09 +00:00
}
/// Read request always goes after write batch (last request)
2021-04-16 13:56:57 +00:00
if (has_read_request)
{
if (server->isLeaderAlive())
server->putLocalReadRequest(request);
else
addErrorResponses({request}, Coordination::Error::ZRUNTIMEINCONSISTENCY);
}
}
}
2021-04-16 13:50:09 +00:00
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
2021-03-29 08:24:56 +00:00
void KeeperStorageDispatcher::responseThread()
{
2021-03-29 08:24:56 +00:00
setThreadName("KeeperRspT");
while (!shutdown_called)
{
2021-03-29 08:24:56 +00:00
KeeperStorage::ResponseForSession response_for_session;
2021-02-09 14:47:18 +00:00
UInt64 max_wait = UInt64(coordination_settings->operation_timeout_ms.totalMilliseconds());
if (responses_queue.tryPop(response_for_session, max_wait))
{
if (shutdown_called)
break;
try
{
setResponse(response_for_session.session_id, response_for_session.response);
2021-01-19 14:22:28 +00:00
}
2021-01-26 07:47:04 +00:00
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
2021-01-19 14:22:28 +00:00
}
}
}
2021-03-29 08:24:56 +00:00
void KeeperStorageDispatcher::snapshotThread()
2021-03-05 10:40:24 +00:00
{
2021-03-29 08:24:56 +00:00
setThreadName("KeeperSnpT");
2021-03-05 10:40:24 +00:00
while (!shutdown_called)
{
CreateSnapshotTask task;
snapshots_queue.pop(task);
if (shutdown_called)
break;
try
{
2021-03-07 21:40:32 +00:00
task.create_snapshot(std::move(task.snapshot));
2021-03-05 10:40:24 +00:00
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
2021-03-29 08:24:56 +00:00
void KeeperStorageDispatcher::setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response)
2021-01-19 14:22:28 +00:00
{
std::lock_guard lock(session_to_response_callback_mutex);
2021-04-16 13:50:09 +00:00
if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::SessionID)
{
const Coordination::ZooKeeperSessionIDResponse & session_id_resp = dynamic_cast<const Coordination::ZooKeeperSessionIDResponse &>(*response);
2021-04-16 18:31:23 +00:00
2021-04-16 13:50:09 +00:00
/// Nobody waits for this session id
2021-04-16 18:31:23 +00:00
if (session_id_resp.server_id != server->getServerID() || !new_session_id_response_callback.count(session_id_resp.internal_id))
2021-04-16 13:50:09 +00:00
return;
auto callback = new_session_id_response_callback[session_id_resp.internal_id];
callback(response);
new_session_id_response_callback.erase(session_id_resp.internal_id);
}
else
{
auto session_writer = session_to_response_callback.find(session_id);
if (session_writer == session_to_response_callback.end())
return;
session_writer->second(response);
2021-04-16 18:31:23 +00:00
2021-04-16 13:50:09 +00:00
/// Session closed, no more writes
if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::Close)
{
session_to_response_callback.erase(session_writer);
}
}
2021-01-19 14:22:28 +00:00
}
2021-03-29 08:24:56 +00:00
bool KeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id)
2021-01-19 14:22:28 +00:00
{
{
std::lock_guard lock(session_to_response_callback_mutex);
if (session_to_response_callback.count(session_id) == 0)
2021-01-25 12:29:12 +00:00
return false;
2021-01-19 14:22:28 +00:00
}
2021-03-29 08:24:56 +00:00
KeeperStorage::RequestForSession request_info;
2021-01-19 14:22:28 +00:00
request_info.request = request;
request_info.session_id = session_id;
std::lock_guard lock(push_request_mutex);
/// Put close requests without timeouts
if (request->getOpNum() == Coordination::OpNum::Close)
2021-04-16 13:50:09 +00:00
requests_queue->push(std::move(request_info));
else if (!requests_queue->tryPush(std::move(request_info), coordination_settings->operation_timeout_ms.totalMilliseconds()))
2021-01-19 14:22:28 +00:00
throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED);
2021-01-25 12:29:12 +00:00
return true;
2021-01-19 14:22:28 +00:00
}
2021-03-29 08:24:56 +00:00
void KeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfiguration & config)
2021-01-19 14:22:28 +00:00
{
LOG_DEBUG(log, "Initializing storage dispatcher");
2021-03-29 08:24:56 +00:00
int myid = config.getInt("keeper_server.server_id");
2021-01-25 12:29:12 +00:00
2021-03-29 08:24:56 +00:00
coordination_settings->loadFromConfig("keeper_server.coordination_settings", config);
2021-04-16 13:50:09 +00:00
requests_queue = std::make_unique<RequestsQueue>(coordination_settings->max_requests_batch_size);
2021-01-25 12:29:12 +00:00
2021-03-26 11:18:31 +00:00
request_thread = ThreadFromGlobalPool([this] { requestThread(); });
responses_thread = ThreadFromGlobalPool([this] { responseThread(); });
snapshot_thread = ThreadFromGlobalPool([this] { snapshotThread(); });
2021-03-29 08:24:56 +00:00
server = std::make_unique<KeeperServer>(myid, coordination_settings, config, responses_queue, snapshots_queue);
try
2021-01-25 14:10:18 +00:00
{
2021-02-11 09:17:57 +00:00
LOG_DEBUG(log, "Waiting server to initialize");
2021-02-11 10:25:10 +00:00
server->startup();
2021-02-11 09:58:02 +00:00
LOG_DEBUG(log, "Server initialized, waiting for quorum");
2021-02-11 09:49:49 +00:00
2021-02-11 09:17:57 +00:00
server->waitInit();
LOG_DEBUG(log, "Quorum initialized");
2021-01-27 17:54:25 +00:00
}
catch (...)
2021-01-27 17:54:25 +00:00
{
tryLogCurrentException(__PRETTY_FUNCTION__);
throw;
2021-01-25 14:10:18 +00:00
}
2021-01-25 12:29:12 +00:00
2021-03-26 11:18:31 +00:00
session_cleaner_thread = ThreadFromGlobalPool([this] { sessionCleanerTask(); });
2021-01-25 12:29:12 +00:00
LOG_DEBUG(log, "Dispatcher initialized");
2021-01-19 14:22:28 +00:00
}
2021-03-29 08:24:56 +00:00
void KeeperStorageDispatcher::shutdown()
2021-01-19 14:22:28 +00:00
{
try
{
2021-01-26 14:08:31 +00:00
{
std::lock_guard lock(push_request_mutex);
if (shutdown_called)
return;
LOG_DEBUG(log, "Shutting down storage dispatcher");
2021-01-26 14:08:31 +00:00
shutdown_called = true;
if (session_cleaner_thread.joinable())
session_cleaner_thread.join();
2021-03-05 13:06:47 +00:00
/// FIXME not the best way to notify
2021-04-16 13:50:09 +00:00
requests_queue->push({});
if (request_thread.joinable())
request_thread.join();
2021-03-05 13:06:47 +00:00
responses_queue.push({});
if (responses_thread.joinable())
responses_thread.join();
2021-03-05 10:40:24 +00:00
2021-03-05 13:06:47 +00:00
snapshots_queue.push({});
2021-03-05 10:40:24 +00:00
if (snapshot_thread.joinable())
snapshot_thread.join();
2021-01-26 14:08:31 +00:00
}
if (server)
server->shutdown();
2021-01-26 14:08:31 +00:00
2021-03-29 08:24:56 +00:00
KeeperStorage::RequestForSession request_for_session;
2021-04-16 13:50:09 +00:00
while (requests_queue->tryPop(request_for_session))
{
2021-03-07 21:40:32 +00:00
if (request_for_session.request)
{
auto response = request_for_session.request->makeResponse();
response->error = Coordination::Error::ZSESSIONEXPIRED;
setResponse(request_for_session.session_id, response);
}
else
{
break;
}
2021-01-26 14:08:31 +00:00
}
session_to_response_callback.clear();
2021-01-19 14:22:28 +00:00
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
LOG_DEBUG(log, "Dispatcher shut down");
2021-01-19 14:22:28 +00:00
}
2021-03-29 08:24:56 +00:00
KeeperStorageDispatcher::~KeeperStorageDispatcher()
2021-01-26 14:08:31 +00:00
{
shutdown();
}
2021-03-29 08:24:56 +00:00
void KeeperStorageDispatcher::registerSession(int64_t session_id, ZooKeeperResponseCallback callback)
2021-01-19 14:22:28 +00:00
{
std::lock_guard lock(session_to_response_callback_mutex);
if (!session_to_response_callback.try_emplace(session_id, callback).second)
throw Exception(DB::ErrorCodes::LOGICAL_ERROR, "Session with id {} already registered in dispatcher", session_id);
}
2021-03-29 08:24:56 +00:00
void KeeperStorageDispatcher::sessionCleanerTask()
{
while (true)
{
if (shutdown_called)
return;
try
{
if (isLeader())
{
auto dead_sessions = server->getDeadSessions();
for (int64_t dead_session : dead_sessions)
{
LOG_INFO(log, "Found dead session {}, will try to close it", dead_session);
Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close);
request->xid = Coordination::CLOSE_XID;
2021-03-29 08:24:56 +00:00
KeeperStorage::RequestForSession request_info;
2021-02-09 18:29:06 +00:00
request_info.request = request;
request_info.session_id = dead_session;
{
std::lock_guard lock(push_request_mutex);
2021-04-16 13:50:09 +00:00
requests_queue->push(std::move(request_info));
2021-02-09 18:29:06 +00:00
}
finishSession(dead_session);
LOG_INFO(log, "Dead session close request pushed");
}
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
2021-02-09 14:47:18 +00:00
std::this_thread::sleep_for(std::chrono::milliseconds(coordination_settings->dead_session_check_period_ms.totalMilliseconds()));
}
}
2021-03-29 08:24:56 +00:00
void KeeperStorageDispatcher::finishSession(int64_t session_id)
{
std::lock_guard lock(session_to_response_callback_mutex);
auto session_it = session_to_response_callback.find(session_id);
if (session_it != session_to_response_callback.end())
session_to_response_callback.erase(session_it);
}
2021-04-16 13:50:09 +00:00
void KeeperStorageDispatcher::addErrorResponses(KeeperStorage::RequestsForSessions && requests_for_sessions, Coordination::Error error)
{
for (const auto & [session_id, request] : requests_for_sessions)
{
KeeperStorage::ResponsesForSessions responses;
auto response = request->makeResponse();
response->xid = request->xid;
response->zxid = 0;
response->error = error;
responses_queue.push(DB::KeeperStorage::ResponseForSession{session_id, response});
}
requests_for_sessions.clear();
}
void KeeperStorageDispatcher::forceWaitAndProcessResult(RaftAppendResult && result, KeeperStorage::RequestsForSessions && requests_for_sessions)
{
if (!result->has_result())
result->get();
/// If we get some errors, than send them to clients
2021-04-16 18:31:23 +00:00
if (!result->get_accepted() || result->get_result_code() == nuraft::cmd_result_code::TIMEOUT)
2021-04-16 13:50:09 +00:00
addErrorResponses(std::move(requests_for_sessions), Coordination::Error::ZOPERATIONTIMEOUT);
else if (result->get_result_code() != nuraft::cmd_result_code::OK)
addErrorResponses(std::move(requests_for_sessions), Coordination::Error::ZRUNTIMEINCONSISTENCY);
result = nullptr;
}
int64_t KeeperStorageDispatcher::getSessionID(long session_timeout_ms)
{
KeeperStorage::RequestForSession request_info;
std::shared_ptr<Coordination::ZooKeeperSessionIDRequest> request = std::make_shared<Coordination::ZooKeeperSessionIDRequest>();
request->internal_id = internal_session_id_counter.fetch_add(1);
request->session_timeout_ms = session_timeout_ms;
2021-04-16 18:31:23 +00:00
request->server_id = server->getServerID();
2021-04-16 13:50:09 +00:00
request_info.request = request;
request_info.session_id = -1;
auto promise = std::make_shared<std::promise<int64_t>>();
auto future = promise->get_future();
{
std::lock_guard lock(session_to_response_callback_mutex);
2021-04-16 18:31:23 +00:00
new_session_id_response_callback[request->internal_id] = [promise, internal_id = request->internal_id] (const Coordination::ZooKeeperResponsePtr & response)
2021-04-16 13:50:09 +00:00
{
if (response->getOpNum() != Coordination::OpNum::SessionID)
promise->set_exception(std::make_exception_ptr(Exception(ErrorCodes::LOGICAL_ERROR,
"Incorrect response of type {} instead of SessionID response", Coordination::toString(response->getOpNum()))));
2021-04-16 18:31:23 +00:00
auto session_id_response = dynamic_cast<const Coordination::ZooKeeperSessionIDResponse &>(*response);
if (session_id_response.internal_id != internal_id)
{
promise->set_exception(std::make_exception_ptr(Exception(ErrorCodes::LOGICAL_ERROR,
"Incorrect response with internal id {} instead of {}", session_id_response.internal_id, internal_id)));
}
2021-04-16 13:50:09 +00:00
if (response->error != Coordination::Error::ZOK)
promise->set_exception(std::make_exception_ptr(zkutil::KeeperException("SessionID request failed with error", response->error)));
2021-04-16 18:31:23 +00:00
promise->set_value(session_id_response.session_id);
2021-04-16 13:50:09 +00:00
};
}
{
std::lock_guard lock(push_request_mutex);
if (!requests_queue->tryPush(std::move(request_info), session_timeout_ms))
throw Exception("Cannot push session id request to queue within session timeout", ErrorCodes::TIMEOUT_EXCEEDED);
}
if (future.wait_for(std::chrono::milliseconds(session_timeout_ms)) != std::future_status::ready)
throw Exception("Cannot receive session id within session timeout", ErrorCodes::TIMEOUT_EXCEEDED);
return future.get();
}
2021-01-19 14:22:28 +00:00
}