mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 00:22:29 +00:00
Merge pull request #38200 from ClickHouse/keeper-linearizable-reads
Keeper faster linearizable reads
This commit is contained in:
commit
222b964070
2
contrib/NuRaft
vendored
2
contrib/NuRaft
vendored
@ -1 +1 @@
|
||||
Subproject commit 1be805e7cb2494aa8170015493474379b0362dfc
|
||||
Subproject commit e15858f8ad0ce8aba85cf74e3763874c76bf927c
|
@ -189,6 +189,9 @@ KeeperConfigurationAndSettings::loadFromConfig(const Poco::Util::AbstractConfigu
|
||||
|
||||
ret->coordination_settings->loadFromConfig("keeper_server.coordination_settings", config);
|
||||
|
||||
if (ret->coordination_settings->quorum_reads)
|
||||
LOG_WARNING(&Poco::Logger::get("KeeperConfigurationAndSettings"), "Setting 'quorum_reads' is deprecated. Please use 'read_mode'");
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -26,6 +26,7 @@ struct Settings;
|
||||
M(Milliseconds, heart_beat_interval_ms, 500, "Heartbeat interval between quorum nodes", 0) \
|
||||
M(Milliseconds, election_timeout_lower_bound_ms, 1000, "Lower bound of election timer (avoid too often leader elections)", 0) \
|
||||
M(Milliseconds, election_timeout_upper_bound_ms, 2000, "Upper bound of election timer (avoid too often leader elections)", 0) \
|
||||
M(Milliseconds, leadership_expiry, 0, "How often will leader node check if it still has majority. Set it lower or equal to election_timeout_lower_bound_ms to have linearizable reads.", 0) \
|
||||
M(UInt64, reserved_log_items, 100000, "How many log items to store (don't remove during compaction)", 0) \
|
||||
M(UInt64, snapshot_distance, 100000, "How many log items we have to collect to write new snapshot", 0) \
|
||||
M(Bool, auto_forwarding, true, "Allow to forward write requests from followers to leader", 0) \
|
||||
@ -38,11 +39,12 @@ struct Settings;
|
||||
M(UInt64, stale_log_gap, 10000, "When node became stale and should receive snapshots from leader", 0) \
|
||||
M(UInt64, fresh_log_gap, 200, "When node became fresh", 0) \
|
||||
M(UInt64, max_requests_batch_size, 100, "Max size of batch in requests count before it will be sent to RAFT", 0) \
|
||||
M(Bool, quorum_reads, false, "Execute read requests as writes through whole RAFT consesus with similar speed", 0) \
|
||||
M(Bool, quorum_reads, false, "Deprecated - use read_mode. Execute read requests as writes through whole RAFT consesus with similar speed", 0) \
|
||||
M(Bool, force_sync, true, "Call fsync on each change in RAFT changelog", 0) \
|
||||
M(Bool, compress_logs, true, "Write compressed coordination logs in ZSTD format", 0) \
|
||||
M(Bool, compress_snapshots_with_zstd_format, true, "Write compressed snapshots in ZSTD format (instead of custom LZ4)", 0) \
|
||||
M(UInt64, configuration_change_tries_count, 20, "How many times we will try to apply configuration change (add/remove server) to the cluster", 0)
|
||||
M(UInt64, configuration_change_tries_count, 20, "How many times we will try to apply configuration change (add/remove server) to the cluster", 0) \
|
||||
M(String, read_mode, "nonlinear", "How should reads be processed. Valid values: 'nonlinear', 'fastlinear', 'quorum'. 'nonlinear' is the fastest option because there are no consistency requirements", 0)
|
||||
|
||||
DECLARE_SETTINGS_TRAITS(CoordinationSettingsTraits, LIST_OF_COORDINATION_SETTINGS)
|
||||
|
||||
|
@ -1,4 +1,5 @@
|
||||
#include <Coordination/KeeperDispatcher.h>
|
||||
#include <libnuraft/async.hxx>
|
||||
#include <Common/setThreadName.h>
|
||||
#include <Common/ZooKeeper/KeeperException.h>
|
||||
#include <future>
|
||||
@ -6,6 +7,8 @@
|
||||
#include <Poco/Path.h>
|
||||
#include <Common/hex.h>
|
||||
#include <filesystem>
|
||||
#include <iterator>
|
||||
#include <limits>
|
||||
#include <Common/checkStackSize.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
|
||||
@ -30,22 +33,83 @@ namespace ErrorCodes
|
||||
|
||||
KeeperDispatcher::KeeperDispatcher()
|
||||
: responses_queue(std::numeric_limits<size_t>::max())
|
||||
, read_requests_queue(std::numeric_limits<size_t>::max())
|
||||
, finalize_requests_queue(std::numeric_limits<size_t>::max())
|
||||
, configuration_and_settings(std::make_shared<KeeperConfigurationAndSettings>())
|
||||
, log(&Poco::Logger::get("KeeperDispatcher"))
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
/// ZooKeepers has 2 requirements:
|
||||
/// - writes need to be linearizable
|
||||
/// - all requests from single session need to be processed in the order of their arrival
|
||||
///
|
||||
/// Because of that, we cannot process read and write requests from SAME session at the same time.
|
||||
/// To be able to process read and write requests in parallel we need to make sure that only 1 type
|
||||
/// of request is being processed from a single session.
|
||||
/// Multiple types from different sessions can be processed at the same time.
|
||||
///
|
||||
/// We do some in-session housekeeping to make sure that the multithreaded request processing is correct.
|
||||
/// When a request is received from a client, we check if there are requests being processed from that same
|
||||
/// session, and if yes, of what type. If the types are the same, and there are no requests of different
|
||||
/// type inbetetween, we can instanly add it to active request queue. Otherwise, we need to wait until
|
||||
/// all requests of the other type are processed.
|
||||
///
|
||||
/// There are multiple threads used for processing the request, each of them communicating with a queue.
|
||||
/// Assumption: only one type of request is being processed from a same session at any point in time (read or write).
|
||||
///
|
||||
/// requestThread -> requests currently being processed
|
||||
/// readRequestThread -> thread for processing read requests
|
||||
/// finalizeRequestThread -> thread for finalizing requests:
|
||||
/// - in-session housekeeping, add requests to the active request queue if there are any
|
||||
///
|
||||
/// If reads are linearizable without quorum, a request can possibly wait for a certain log to be committed.
|
||||
/// In that case we add it to the waiting queue for that log.
|
||||
/// When that log is committed, the committing thread will send that read request to readRequestThread so it can be processed.
|
||||
///
|
||||
void KeeperDispatcher::requestThread()
|
||||
{
|
||||
setThreadName("KeeperReqT");
|
||||
|
||||
/// Result of requests batch from previous iteration
|
||||
RaftAppendResult prev_result = nullptr;
|
||||
/// Requests from previous iteration. We store them to be able
|
||||
/// to send errors to the client.
|
||||
KeeperStorage::RequestsForSessions prev_batch;
|
||||
RaftResult prev_result = nullptr;
|
||||
const auto previous_quorum_done = [&] { return !prev_result || prev_result->has_result() || prev_result->get_result_code() != nuraft::cmd_result_code::OK; };
|
||||
|
||||
const auto needs_quorum = [](const auto & coordination_settings, const auto & request)
|
||||
{
|
||||
return coordination_settings->quorum_reads || coordination_settings->read_mode.toString() == "quorum" || !request.request->isReadRequest();
|
||||
};
|
||||
|
||||
KeeperStorage::RequestsForSessions quorum_requests;
|
||||
KeeperStorage::RequestsForSessions read_requests;
|
||||
|
||||
auto process_quorum_requests = [&, this]() mutable
|
||||
{
|
||||
/// Forcefully process all previous pending requests
|
||||
if (prev_result)
|
||||
forceWaitAndProcessResult(prev_result);
|
||||
|
||||
prev_result = server->putRequestBatch(quorum_requests);
|
||||
|
||||
if (prev_result)
|
||||
{
|
||||
prev_result->when_ready([&, requests_for_sessions = std::move(quorum_requests)](nuraft::cmd_result<nuraft::ptr<nuraft::buffer>> & result, nuraft::ptr<std::exception> &) mutable
|
||||
{
|
||||
if (!result.get_accepted() || result.get_result_code() == nuraft::cmd_result_code::TIMEOUT)
|
||||
addErrorResponses(requests_for_sessions, Coordination::Error::ZOPERATIONTIMEOUT);
|
||||
else if (result.get_result_code() != nuraft::cmd_result_code::OK)
|
||||
addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS);
|
||||
});
|
||||
}
|
||||
|
||||
quorum_requests.clear();
|
||||
};
|
||||
|
||||
/// ZooKeeper requires that the requests inside a single session are processed in a strict order
|
||||
/// (we cannot process later requests before all the previous once are processed)
|
||||
/// By making sure that at this point we can either have just read requests or just write requests
|
||||
/// from a single session, we can process them independently
|
||||
while (!shutdown_called)
|
||||
{
|
||||
KeeperStorage::RequestForSession request;
|
||||
@ -54,94 +118,67 @@ void KeeperDispatcher::requestThread()
|
||||
uint64_t max_wait = coordination_settings->operation_timeout_ms.totalMilliseconds();
|
||||
uint64_t max_batch_size = coordination_settings->max_requests_batch_size;
|
||||
|
||||
/// The code below do a very simple thing: batch all write (quorum) requests into vector until
|
||||
/// previous write batch is not finished or max_batch size achieved. The main complexity goes from
|
||||
/// the ability to process read requests without quorum (from local state). So when we are collecting
|
||||
/// requests into a batch we must check that the new request is not read request. Otherwise we have to
|
||||
/// process all already accumulated write requests, wait them synchronously and only after that process
|
||||
/// read request. So reads are some kind of "separator" for writes.
|
||||
try
|
||||
{
|
||||
if (requests_queue->tryPop(request, max_wait))
|
||||
if (active_requests_queue->tryPop(request, max_wait))
|
||||
{
|
||||
CurrentMetrics::sub(CurrentMetrics::KeeperOutstandingRequets);
|
||||
if (shutdown_called)
|
||||
break;
|
||||
|
||||
KeeperStorage::RequestsForSessions current_batch;
|
||||
|
||||
bool has_read_request = false;
|
||||
|
||||
/// If new request is not read request or we must to process it through quorum.
|
||||
/// Otherwise we will process it locally.
|
||||
if (coordination_settings->quorum_reads || !request.request->isReadRequest())
|
||||
{
|
||||
current_batch.emplace_back(request);
|
||||
if (needs_quorum(coordination_settings, request))
|
||||
quorum_requests.emplace_back(request);
|
||||
else
|
||||
read_requests.emplace_back(request);
|
||||
|
||||
/// Waiting until previous append will be successful, or batch is big enough
|
||||
/// has_result == false && get_result_code == OK means that our request still not processed.
|
||||
/// Sometimes NuRaft set errorcode without setting result, so we check both here.
|
||||
while (prev_result && (!prev_result->has_result() && prev_result->get_result_code() == nuraft::cmd_result_code::OK) && current_batch.size() <= max_batch_size)
|
||||
while (true)
|
||||
{
|
||||
if (quorum_requests.size() > max_batch_size)
|
||||
break;
|
||||
|
||||
if (read_requests.size() > max_batch_size)
|
||||
{
|
||||
processReadRequests(coordination_settings, read_requests);
|
||||
|
||||
if (previous_quorum_done())
|
||||
break;
|
||||
}
|
||||
|
||||
/// Trying to get batch requests as fast as possible
|
||||
if (requests_queue->tryPop(request, 1))
|
||||
if (active_requests_queue->tryPop(request, 1))
|
||||
{
|
||||
CurrentMetrics::sub(CurrentMetrics::KeeperOutstandingRequets);
|
||||
/// Don't append read request into batch, we have to process them separately
|
||||
if (!coordination_settings->quorum_reads && request.request->isReadRequest())
|
||||
{
|
||||
has_read_request = true;
|
||||
break;
|
||||
if (needs_quorum(coordination_settings, request))
|
||||
quorum_requests.emplace_back(request);
|
||||
else
|
||||
read_requests.emplace_back(request);
|
||||
}
|
||||
else
|
||||
{
|
||||
/// batch of read requests can send at most one request
|
||||
/// so we don't care if the previous batch hasn't received response
|
||||
if (!read_requests.empty())
|
||||
processReadRequests(coordination_settings, read_requests);
|
||||
|
||||
current_batch.emplace_back(request);
|
||||
}
|
||||
/// if we still didn't process previous batch we can
|
||||
/// increase are current batch even more
|
||||
if (previous_quorum_done())
|
||||
break;
|
||||
}
|
||||
|
||||
if (shutdown_called)
|
||||
break;
|
||||
}
|
||||
}
|
||||
else
|
||||
has_read_request = true;
|
||||
|
||||
if (shutdown_called)
|
||||
break;
|
||||
|
||||
/// Forcefully process all previous pending requests
|
||||
if (prev_result)
|
||||
forceWaitAndProcessResult(prev_result, prev_batch);
|
||||
if (!quorum_requests.empty())
|
||||
process_quorum_requests();
|
||||
|
||||
/// Process collected write requests batch
|
||||
if (!current_batch.empty())
|
||||
{
|
||||
auto result = server->putRequestBatch(current_batch);
|
||||
|
||||
if (result)
|
||||
{
|
||||
if (has_read_request) /// If we will execute read request next, than we have to process result now
|
||||
forceWaitAndProcessResult(result, current_batch);
|
||||
}
|
||||
else
|
||||
{
|
||||
addErrorResponses(current_batch, Coordination::Error::ZCONNECTIONLOSS);
|
||||
current_batch.clear();
|
||||
}
|
||||
|
||||
prev_batch = std::move(current_batch);
|
||||
prev_result = result;
|
||||
}
|
||||
|
||||
/// Read request always goes after write batch (last request)
|
||||
if (has_read_request)
|
||||
{
|
||||
if (server->isLeaderAlive())
|
||||
server->putLocalReadRequest(request);
|
||||
else
|
||||
addErrorResponses({request}, Coordination::Error::ZCONNECTIONLOSS);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
@ -151,6 +188,72 @@ void KeeperDispatcher::requestThread()
|
||||
}
|
||||
}
|
||||
|
||||
void KeeperDispatcher::processReadRequests(const CoordinationSettingsPtr & coordination_settings, KeeperStorage::RequestsForSessions & read_requests)
|
||||
{
|
||||
if (coordination_settings->read_mode.toString() == "fastlinear")
|
||||
{
|
||||
// we just want to know what's the current latest committed log on Leader node
|
||||
auto leader_info_result = server->getLeaderInfo();
|
||||
if (leader_info_result)
|
||||
{
|
||||
leader_info_result->when_ready([&, requests_for_sessions = std::move(read_requests)](nuraft::cmd_result<nuraft::ptr<nuraft::buffer>> & result, nuraft::ptr<std::exception> & exception) mutable
|
||||
{
|
||||
if (!result.get_accepted() || result.get_result_code() == nuraft::cmd_result_code::TIMEOUT)
|
||||
{
|
||||
addErrorResponses(requests_for_sessions, Coordination::Error::ZOPERATIONTIMEOUT);
|
||||
return;
|
||||
}
|
||||
|
||||
if (result.get_result_code() != nuraft::cmd_result_code::OK)
|
||||
{
|
||||
addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS);
|
||||
return;
|
||||
}
|
||||
|
||||
if (exception)
|
||||
{
|
||||
LOG_INFO(log, "Got exception while waiting for read results {}", exception->what());
|
||||
addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS);
|
||||
return;
|
||||
}
|
||||
|
||||
auto & leader_info_ctx = result.get();
|
||||
|
||||
if (!leader_info_ctx)
|
||||
{
|
||||
addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS);
|
||||
return;
|
||||
}
|
||||
|
||||
KeeperServer::NodeInfo leader_info;
|
||||
leader_info.term = leader_info_ctx->get_ulong();
|
||||
leader_info.last_committed_index = leader_info_ctx->get_ulong();
|
||||
std::lock_guard lock(leader_waiter_mutex);
|
||||
auto node_info = server->getNodeInfo();
|
||||
|
||||
/// we're behind, we need to wait
|
||||
if (node_info.term < leader_info.term || node_info.last_committed_index < leader_info.last_committed_index)
|
||||
{
|
||||
auto & leader_waiter = leader_waiters[leader_info];
|
||||
leader_waiter.insert(leader_waiter.end(), requests_for_sessions.begin(), requests_for_sessions.end());
|
||||
LOG_TRACE(log, "waiting for term {}, idx {}", leader_info.term, leader_info.last_committed_index);
|
||||
}
|
||||
/// process it in background thread
|
||||
else if (!read_requests_queue.push(std::move(requests_for_sessions)))
|
||||
throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue");
|
||||
});
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
assert(coordination_settings->read_mode.toString() == "nonlinear");
|
||||
if (!read_requests_queue.push(std::move(read_requests)))
|
||||
throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue");
|
||||
}
|
||||
|
||||
read_requests.clear();
|
||||
}
|
||||
|
||||
void KeeperDispatcher::responseThread()
|
||||
{
|
||||
setThreadName("KeeperRspT");
|
||||
@ -200,6 +303,65 @@ void KeeperDispatcher::snapshotThread()
|
||||
}
|
||||
}
|
||||
|
||||
/// Background thread for processing read requests
|
||||
void KeeperDispatcher::readRequestThread()
|
||||
{
|
||||
setThreadName("KeeperReadT");
|
||||
while (!shutdown_called)
|
||||
{
|
||||
KeeperStorage::RequestsForSessions requests;
|
||||
if (!read_requests_queue.pop(requests))
|
||||
break;
|
||||
|
||||
if (shutdown_called)
|
||||
break;
|
||||
|
||||
try
|
||||
{
|
||||
for (const auto & request_info : requests)
|
||||
{
|
||||
if (server->isLeaderAlive())
|
||||
server->putLocalReadRequest(request_info);
|
||||
else
|
||||
addErrorResponses({request_info}, Coordination::Error::ZCONNECTIONLOSS);
|
||||
}
|
||||
|
||||
if (!finalize_requests_queue.push(std::move(requests)))
|
||||
throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue");
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// We finalize requests every time we commit a single log with request
|
||||
/// or process a batch of read requests.
|
||||
/// Because it can get heavy, we do it in background thread.
|
||||
void KeeperDispatcher::finalizeRequestsThread()
|
||||
{
|
||||
setThreadName("KeeperFinalT");
|
||||
while (!shutdown_called)
|
||||
{
|
||||
KeeperStorage::RequestsForSessions requests;
|
||||
if (!finalize_requests_queue.pop(requests))
|
||||
break;
|
||||
|
||||
if (shutdown_called)
|
||||
break;
|
||||
|
||||
try
|
||||
{
|
||||
finalizeRequests(requests);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void KeeperDispatcher::setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response)
|
||||
{
|
||||
std::lock_guard lock(session_to_response_callback_mutex);
|
||||
@ -255,6 +417,30 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ
|
||||
request_info.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
|
||||
request_info.session_id = session_id;
|
||||
|
||||
{
|
||||
std::lock_guard lock{unprocessed_request_mutex};
|
||||
auto unprocessed_requests_it = unprocessed_requests_for_session.find(session_id);
|
||||
if (unprocessed_requests_it == unprocessed_requests_for_session.end())
|
||||
{
|
||||
auto & unprocessed_requests = unprocessed_requests_for_session[session_id];
|
||||
unprocessed_requests.unprocessed_num = 1;
|
||||
unprocessed_requests.is_read = request->isReadRequest();
|
||||
}
|
||||
else
|
||||
{
|
||||
auto & unprocessed_requests = unprocessed_requests_it->second;
|
||||
|
||||
/// queue is not empty, or the request types don't match, put it in the waiting queue
|
||||
if (!unprocessed_requests.request_queue.empty() || unprocessed_requests.is_read != request->isReadRequest())
|
||||
{
|
||||
unprocessed_requests.request_queue.push_back(std::move(request_info));
|
||||
return true;
|
||||
}
|
||||
|
||||
++unprocessed_requests.unprocessed_num;
|
||||
}
|
||||
}
|
||||
|
||||
std::lock_guard lock(push_request_mutex);
|
||||
|
||||
if (shutdown_called)
|
||||
@ -263,10 +449,10 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ
|
||||
/// Put close requests without timeouts
|
||||
if (request->getOpNum() == Coordination::OpNum::Close)
|
||||
{
|
||||
if (!requests_queue->push(std::move(request_info)))
|
||||
if (!active_requests_queue->push(std::move(request_info)))
|
||||
throw Exception("Cannot push request to queue", ErrorCodes::SYSTEM_ERROR);
|
||||
}
|
||||
else if (!requests_queue->tryPush(std::move(request_info), configuration_and_settings->coordination_settings->operation_timeout_ms.totalMilliseconds()))
|
||||
else if (!active_requests_queue->tryPush(std::move(request_info), configuration_and_settings->coordination_settings->operation_timeout_ms.totalMilliseconds()))
|
||||
{
|
||||
throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED);
|
||||
}
|
||||
@ -279,13 +465,23 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf
|
||||
LOG_DEBUG(log, "Initializing storage dispatcher");
|
||||
|
||||
configuration_and_settings = KeeperConfigurationAndSettings::loadFromConfig(config, standalone_keeper);
|
||||
requests_queue = std::make_unique<RequestsQueue>(configuration_and_settings->coordination_settings->max_requests_batch_size);
|
||||
active_requests_queue = std::make_unique<RequestsQueue>(configuration_and_settings->coordination_settings->max_requests_batch_size);
|
||||
|
||||
request_thread = ThreadFromGlobalPool([this] { requestThread(); });
|
||||
responses_thread = ThreadFromGlobalPool([this] { responseThread(); });
|
||||
snapshot_thread = ThreadFromGlobalPool([this] { snapshotThread(); });
|
||||
read_request_thread = ThreadFromGlobalPool([this] { readRequestThread(); });
|
||||
finalize_requests_thread = ThreadFromGlobalPool([this] { finalizeRequestsThread(); });
|
||||
|
||||
server = std::make_unique<KeeperServer>(configuration_and_settings, config, responses_queue, snapshots_queue);
|
||||
server = std::make_unique<KeeperServer>(
|
||||
configuration_and_settings,
|
||||
config,
|
||||
responses_queue,
|
||||
snapshots_queue,
|
||||
[this](const KeeperStorage::RequestForSession & request_for_session, uint64_t log_term, uint64_t log_idx)
|
||||
{ onRequestCommit(request_for_session, log_term, log_idx); },
|
||||
[this](uint64_t term, uint64_t last_idx)
|
||||
{ onApplySnapshot(term, last_idx); });
|
||||
|
||||
try
|
||||
{
|
||||
@ -333,9 +529,9 @@ void KeeperDispatcher::shutdown()
|
||||
if (session_cleaner_thread.joinable())
|
||||
session_cleaner_thread.join();
|
||||
|
||||
if (requests_queue)
|
||||
if (active_requests_queue)
|
||||
{
|
||||
requests_queue->finish();
|
||||
active_requests_queue->finish();
|
||||
|
||||
if (request_thread.joinable())
|
||||
request_thread.join();
|
||||
@ -349,6 +545,14 @@ void KeeperDispatcher::shutdown()
|
||||
if (snapshot_thread.joinable())
|
||||
snapshot_thread.join();
|
||||
|
||||
read_requests_queue.finish();
|
||||
if (read_request_thread.joinable())
|
||||
read_request_thread.join();
|
||||
|
||||
finalize_requests_queue.finish();
|
||||
if (finalize_requests_thread.joinable())
|
||||
finalize_requests_thread.join();
|
||||
|
||||
update_configuration_queue.finish();
|
||||
if (update_configuration_thread.joinable())
|
||||
update_configuration_thread.join();
|
||||
@ -357,7 +561,7 @@ void KeeperDispatcher::shutdown()
|
||||
KeeperStorage::RequestForSession request_for_session;
|
||||
|
||||
/// Set session expired for all pending requests
|
||||
while (requests_queue && requests_queue->tryPop(request_for_session))
|
||||
while (active_requests_queue && active_requests_queue->tryPop(request_for_session))
|
||||
{
|
||||
CurrentMetrics::sub(CurrentMetrics::KeeperOutstandingRequets);
|
||||
auto response = request_for_session.request->makeResponse();
|
||||
@ -474,7 +678,7 @@ void KeeperDispatcher::sessionCleanerTask()
|
||||
};
|
||||
{
|
||||
std::lock_guard lock(push_request_mutex);
|
||||
if (!requests_queue->push(std::move(request_info)))
|
||||
if (!active_requests_queue->push(std::move(request_info)))
|
||||
LOG_INFO(log, "Cannot push close request to queue while cleaning outdated sessions");
|
||||
CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets);
|
||||
}
|
||||
@ -524,19 +728,12 @@ void KeeperDispatcher::addErrorResponses(const KeeperStorage::RequestsForSession
|
||||
}
|
||||
}
|
||||
|
||||
void KeeperDispatcher::forceWaitAndProcessResult(RaftAppendResult & result, KeeperStorage::RequestsForSessions & requests_for_sessions)
|
||||
void KeeperDispatcher::forceWaitAndProcessResult(RaftResult & result)
|
||||
{
|
||||
if (!result->has_result())
|
||||
result->get();
|
||||
|
||||
/// If we get some errors, than send them to clients
|
||||
if (!result->get_accepted() || result->get_result_code() == nuraft::cmd_result_code::TIMEOUT)
|
||||
addErrorResponses(requests_for_sessions, Coordination::Error::ZOPERATIONTIMEOUT);
|
||||
else if (result->get_result_code() != nuraft::cmd_result_code::OK)
|
||||
addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS);
|
||||
|
||||
result = nullptr;
|
||||
requests_for_sessions.clear();
|
||||
}
|
||||
|
||||
int64_t KeeperDispatcher::getSessionID(int64_t session_timeout_ms)
|
||||
@ -584,7 +781,7 @@ int64_t KeeperDispatcher::getSessionID(int64_t session_timeout_ms)
|
||||
/// Push new session request to queue
|
||||
{
|
||||
std::lock_guard lock(push_request_mutex);
|
||||
if (!requests_queue->tryPush(std::move(request_info), session_timeout_ms))
|
||||
if (!active_requests_queue->tryPush(std::move(request_info), session_timeout_ms))
|
||||
throw Exception("Cannot push session id request to queue within session timeout", ErrorCodes::TIMEOUT_EXCEEDED);
|
||||
CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets);
|
||||
}
|
||||
@ -657,6 +854,122 @@ void KeeperDispatcher::updateConfigurationThread()
|
||||
}
|
||||
}
|
||||
|
||||
// Used to update the state for a session based on the requests
|
||||
// - update the number of current unprocessed requests for the session
|
||||
// - if the number of unprocessed requests is 0, we can start adding next type of requests
|
||||
// from unprocessed requests queue to the active queue
|
||||
void KeeperDispatcher::finalizeRequests(const KeeperStorage::RequestsForSessions & requests_for_sessions)
|
||||
{
|
||||
std::unordered_map<int64_t, size_t> counts_for_session;
|
||||
|
||||
for (const auto & request_for_session : requests_for_sessions)
|
||||
{
|
||||
++counts_for_session[request_for_session.session_id];
|
||||
}
|
||||
|
||||
std::lock_guard lock{unprocessed_request_mutex};
|
||||
for (const auto [session_id, count] : counts_for_session)
|
||||
{
|
||||
auto unprocessed_requests_it = unprocessed_requests_for_session.find(session_id);
|
||||
if (unprocessed_requests_it == unprocessed_requests_for_session.end())
|
||||
continue;
|
||||
|
||||
auto & unprocessed_requests = unprocessed_requests_it->second;
|
||||
unprocessed_requests.unprocessed_num -= count;
|
||||
|
||||
if (unprocessed_requests.unprocessed_num == 0)
|
||||
{
|
||||
if (!unprocessed_requests.request_queue.empty())
|
||||
{
|
||||
auto & unprocessed_requests_queue = unprocessed_requests.request_queue;
|
||||
unprocessed_requests.is_read = !unprocessed_requests.is_read;
|
||||
// start adding next type of requests
|
||||
while (!unprocessed_requests_queue.empty() && unprocessed_requests_queue.front().request->isReadRequest() == unprocessed_requests.is_read)
|
||||
{
|
||||
auto & front_request = unprocessed_requests_queue.front();
|
||||
|
||||
/// Put close requests without timeouts
|
||||
if (front_request.request->getOpNum() == Coordination::OpNum::Close)
|
||||
{
|
||||
if (!active_requests_queue->push(std::move(front_request)))
|
||||
throw Exception("Cannot push request to queue", ErrorCodes::SYSTEM_ERROR);
|
||||
}
|
||||
else if (!active_requests_queue->tryPush(std::move(front_request), configuration_and_settings->coordination_settings->operation_timeout_ms.totalMilliseconds()))
|
||||
{
|
||||
throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED);
|
||||
}
|
||||
|
||||
++unprocessed_requests.unprocessed_num;
|
||||
unprocessed_requests_queue.pop_front();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
unprocessed_requests_for_session.erase(unprocessed_requests_it);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Finalize request
|
||||
// Process read requests that were waiting for this commit
|
||||
void KeeperDispatcher::onRequestCommit(const KeeperStorage::RequestForSession & request_for_session, uint64_t log_term, uint64_t log_idx)
|
||||
{
|
||||
if (!finalize_requests_queue.push({request_for_session}))
|
||||
throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue");
|
||||
|
||||
KeeperStorage::RequestsForSessions requests;
|
||||
{
|
||||
std::lock_guard lock(leader_waiter_mutex);
|
||||
auto request_queue_it = leader_waiters.find(KeeperServer::NodeInfo{.term = log_term, .last_committed_index = log_idx});
|
||||
if (request_queue_it != leader_waiters.end())
|
||||
{
|
||||
requests = std::move(request_queue_it->second);
|
||||
leader_waiters.erase(request_queue_it);
|
||||
}
|
||||
}
|
||||
|
||||
if (requests.empty())
|
||||
return;
|
||||
|
||||
if (!read_requests_queue.push(std::move(requests)))
|
||||
throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue");
|
||||
}
|
||||
|
||||
/// Process all read request that are waiting for lower or currently last processed log index
|
||||
void KeeperDispatcher::onApplySnapshot(uint64_t term, uint64_t last_idx)
|
||||
{
|
||||
KeeperServer::NodeInfo current_node_info{term, last_idx};
|
||||
KeeperStorage::RequestsForSessions requests;
|
||||
{
|
||||
std::lock_guard lock(leader_waiter_mutex);
|
||||
for (auto leader_waiter_it = leader_waiters.begin(); leader_waiter_it != leader_waiters.end();)
|
||||
{
|
||||
auto waiting_node_info = leader_waiter_it->first;
|
||||
if (waiting_node_info.term <= current_node_info.term
|
||||
&& waiting_node_info.last_committed_index <= current_node_info.last_committed_index)
|
||||
{
|
||||
for (auto & request : leader_waiter_it->second)
|
||||
{
|
||||
requests.push_back(std::move(request));
|
||||
}
|
||||
|
||||
leader_waiter_it = leader_waiters.erase(leader_waiter_it);
|
||||
}
|
||||
else
|
||||
{
|
||||
++leader_waiter_it;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (requests.empty())
|
||||
return;
|
||||
|
||||
if (!read_requests_queue.push(std::move(requests)))
|
||||
throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue");
|
||||
}
|
||||
|
||||
bool KeeperDispatcher::isServerActive() const
|
||||
{
|
||||
return checkInit() && hasLeader() && !server->isRecovering();
|
||||
@ -721,7 +1034,7 @@ Keeper4LWInfo KeeperDispatcher::getKeeper4LWInfo() const
|
||||
Keeper4LWInfo result = server->getPartiallyFilled4LWInfo();
|
||||
{
|
||||
std::lock_guard lock(push_request_mutex);
|
||||
result.outstanding_requests_count = requests_queue->size();
|
||||
result.outstanding_requests_count = active_requests_queue->size();
|
||||
}
|
||||
{
|
||||
std::lock_guard lock(session_to_response_callback_mutex);
|
||||
|
@ -32,9 +32,12 @@ private:
|
||||
using UpdateConfigurationQueue = ConcurrentBoundedQueue<ConfigUpdateAction>;
|
||||
|
||||
/// Size depends on coordination settings
|
||||
std::unique_ptr<RequestsQueue> requests_queue;
|
||||
/// Request currently being processed
|
||||
std::unique_ptr<RequestsQueue> active_requests_queue;
|
||||
ResponsesQueue responses_queue;
|
||||
SnapshotsQueue snapshots_queue{1};
|
||||
ConcurrentBoundedQueue<KeeperStorage::RequestsForSessions> read_requests_queue;
|
||||
ConcurrentBoundedQueue<KeeperStorage::RequestsForSessions> finalize_requests_queue;
|
||||
|
||||
/// More than 1k updates is definitely misconfiguration.
|
||||
UpdateConfigurationQueue update_configuration_queue{1000};
|
||||
@ -64,6 +67,8 @@ private:
|
||||
ThreadFromGlobalPool snapshot_thread;
|
||||
/// Apply or wait for configuration changes
|
||||
ThreadFromGlobalPool update_configuration_thread;
|
||||
ThreadFromGlobalPool read_request_thread;
|
||||
ThreadFromGlobalPool finalize_requests_thread;
|
||||
|
||||
/// RAFT wrapper.
|
||||
std::unique_ptr<KeeperServer> server;
|
||||
@ -77,6 +82,34 @@ private:
|
||||
/// Counter for new session_id requests.
|
||||
std::atomic<int64_t> internal_session_id_counter{0};
|
||||
|
||||
/// A read request needs to have at least the log it was the last committed log on the leader
|
||||
/// at the time the request was being made.
|
||||
/// If the node is stale, we need to wait to commit that log before doing local read requests to achieve
|
||||
/// linearizability.
|
||||
std::unordered_map<KeeperServer::NodeInfo, KeeperStorage::RequestsForSessions> leader_waiters;
|
||||
std::mutex leader_waiter_mutex;
|
||||
|
||||
/// We can be actively processing one type of requests (either read or write) from a single session.
|
||||
/// If we receive a request of a type that is not currently being processed, we put it in the waiting queue.
|
||||
/// Also, we want to process them in ariving order, so if we have a different type in the queue, we cannot process that request
|
||||
/// but wait for all the previous requests to finish.
|
||||
/// E.g. READ -> WRITE -> READ, the last READ will go to the waiting queue even though we are currently processing the first READ
|
||||
/// because we have WRITE request before it that needs to be processed.
|
||||
struct UnprocessedRequests
|
||||
{
|
||||
/// how many requests are currently in the active request queue
|
||||
size_t unprocessed_num{0};
|
||||
/// is_read currently being processed
|
||||
bool is_read{false};
|
||||
std::list<KeeperStorage::RequestForSession> request_queue;
|
||||
};
|
||||
|
||||
// Called every time a batch of requests are processed.
|
||||
void finalizeRequests(const KeeperStorage::RequestsForSessions & requests_for_sessions);
|
||||
|
||||
std::unordered_map<int64_t, UnprocessedRequests> unprocessed_requests_for_session;
|
||||
std::mutex unprocessed_request_mutex;
|
||||
|
||||
/// Thread put requests to raft
|
||||
void requestThread();
|
||||
/// Thread put responses for subscribed sessions
|
||||
@ -88,6 +121,12 @@ private:
|
||||
/// Thread apply or wait configuration changes from leader
|
||||
void updateConfigurationThread();
|
||||
|
||||
void readRequestThread();
|
||||
|
||||
void finalizeRequestsThread();
|
||||
|
||||
void processReadRequests(const CoordinationSettingsPtr & coordination_settings, KeeperStorage::RequestsForSessions & read_requests);
|
||||
|
||||
void setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response);
|
||||
|
||||
/// Add error responses for requests to responses queue.
|
||||
@ -96,7 +135,7 @@ private:
|
||||
|
||||
/// Forcefully wait for result and sets errors if something when wrong.
|
||||
/// Clears both arguments
|
||||
void forceWaitAndProcessResult(RaftAppendResult & result, KeeperStorage::RequestsForSessions & requests_for_sessions);
|
||||
static void forceWaitAndProcessResult(RaftResult & result);
|
||||
|
||||
public:
|
||||
/// Just allocate some objects, real initialization is done by `intialize method`
|
||||
@ -116,6 +155,12 @@ public:
|
||||
return server && server->checkInit();
|
||||
}
|
||||
|
||||
/// Called when a single log with request is committed.
|
||||
void onRequestCommit(const KeeperStorage::RequestForSession & request_for_session, uint64_t log_term, uint64_t log_idx);
|
||||
|
||||
/// Called when a snapshot is applied
|
||||
void onApplySnapshot(uint64_t term, uint64_t last_idx);
|
||||
|
||||
/// Is server accepting requests, i.e. connected to the cluster
|
||||
/// and achieved quorum
|
||||
bool isServerActive() const;
|
||||
|
@ -105,7 +105,9 @@ KeeperServer::KeeperServer(
|
||||
const KeeperConfigurationAndSettingsPtr & configuration_and_settings_,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
ResponsesQueue & responses_queue_,
|
||||
SnapshotsQueue & snapshots_queue_)
|
||||
SnapshotsQueue & snapshots_queue_,
|
||||
KeeperStateMachine::CommitCallback commit_callback,
|
||||
KeeperStateMachine::ApplySnapshotCallback apply_snapshot_callback)
|
||||
: server_id(configuration_and_settings_->server_id)
|
||||
, coordination_settings(configuration_and_settings_->coordination_settings)
|
||||
, log(&Poco::Logger::get("KeeperServer"))
|
||||
@ -113,7 +115,7 @@ KeeperServer::KeeperServer(
|
||||
, keeper_context{std::make_shared<KeeperContext>()}
|
||||
, create_snapshot_on_exit(config.getBool("keeper_server.create_snapshot_on_exit", true))
|
||||
{
|
||||
if (coordination_settings->quorum_reads)
|
||||
if (coordination_settings->quorum_reads || coordination_settings->read_mode.toString() == "quorum")
|
||||
LOG_WARNING(log, "Quorum reads enabled, Keeper will work slower.");
|
||||
|
||||
keeper_context->digest_enabled = config.getBool("keeper_server.digest_enabled", false);
|
||||
@ -125,7 +127,9 @@ KeeperServer::KeeperServer(
|
||||
configuration_and_settings_->snapshot_storage_path,
|
||||
coordination_settings,
|
||||
keeper_context,
|
||||
checkAndGetSuperdigest(configuration_and_settings_->super_digest));
|
||||
checkAndGetSuperdigest(configuration_and_settings_->super_digest),
|
||||
std::move(commit_callback),
|
||||
std::move(apply_snapshot_callback));
|
||||
|
||||
state_manager = nuraft::cs_new<KeeperStateManager>(
|
||||
server_id,
|
||||
@ -176,6 +180,13 @@ struct KeeperServer::KeeperRaftServer : public nuraft::raft_server
|
||||
reconfigure(new_config);
|
||||
}
|
||||
|
||||
RaftResult getLeaderInfo()
|
||||
{
|
||||
nuraft::ptr<nuraft::req_msg> req
|
||||
= nuraft::cs_new<nuraft::req_msg>(0ull, nuraft::msg_type::leader_status_request, 0, 0, 0ull, 0ull, 0ull);
|
||||
return send_msg_to_leader(req);
|
||||
}
|
||||
|
||||
void commit_in_bg() override
|
||||
{
|
||||
// For NuRaft, if any commit fails (uncaught exception) the whole server aborts as a safety
|
||||
@ -269,6 +280,20 @@ void KeeperServer::launchRaftServer(const Poco::Util::AbstractConfiguration & co
|
||||
coordination_settings->election_timeout_lower_bound_ms.totalMilliseconds(), "election_timeout_lower_bound_ms", log);
|
||||
params.election_timeout_upper_bound_ = getValueOrMaxInt32AndLogWarning(
|
||||
coordination_settings->election_timeout_upper_bound_ms.totalMilliseconds(), "election_timeout_upper_bound_ms", log);
|
||||
|
||||
params.leadership_expiry_ = getValueOrMaxInt32AndLogWarning(coordination_settings->leadership_expiry.totalMilliseconds(), "leadership_expiry", log);
|
||||
|
||||
if (coordination_settings->read_mode.toString() == "fastlinear")
|
||||
{
|
||||
if (params.leadership_expiry_ == 0)
|
||||
params.leadership_expiry_ = params.election_timeout_lower_bound_;
|
||||
else if (params.leadership_expiry_ > params.election_timeout_lower_bound_)
|
||||
{
|
||||
LOG_WARNING(log, "To use fast linearizable reads, leadership_expiry should be set to a value that is less or equal to the election_timeout_upper_bound_ms. "
|
||||
"Based on current settings, there are no guarantees for linearizability of reads.");
|
||||
}
|
||||
}
|
||||
|
||||
params.reserved_log_items_ = getValueOrMaxInt32AndLogWarning(coordination_settings->reserved_log_items, "reserved_log_items", log);
|
||||
params.snapshot_distance_ = getValueOrMaxInt32AndLogWarning(coordination_settings->snapshot_distance, "snapshot_distance", log);
|
||||
|
||||
@ -487,7 +512,7 @@ void KeeperServer::putLocalReadRequest(const KeeperStorage::RequestForSession &
|
||||
state_machine->processReadRequest(request_for_session);
|
||||
}
|
||||
|
||||
RaftAppendResult KeeperServer::putRequestBatch(const KeeperStorage::RequestsForSessions & requests_for_sessions)
|
||||
RaftResult KeeperServer::putRequestBatch(const KeeperStorage::RequestsForSessions & requests_for_sessions)
|
||||
{
|
||||
std::vector<nuraft::ptr<nuraft::buffer>> entries;
|
||||
for (const auto & request_for_session : requests_for_sessions)
|
||||
@ -713,6 +738,20 @@ std::vector<int64_t> KeeperServer::getDeadSessions()
|
||||
return state_machine->getDeadSessions();
|
||||
}
|
||||
|
||||
RaftResult KeeperServer::getLeaderInfo()
|
||||
{
|
||||
std::lock_guard lock{server_write_mutex};
|
||||
if (is_recovering)
|
||||
return nullptr;
|
||||
|
||||
return raft_instance->getLeaderInfo();
|
||||
}
|
||||
|
||||
KeeperServer::NodeInfo KeeperServer::getNodeInfo()
|
||||
{
|
||||
return { .term = raft_instance->get_term(), .last_committed_index = state_machine->last_commit_index() };
|
||||
}
|
||||
|
||||
ConfigUpdateActions KeeperServer::getConfigurationDiff(const Poco::Util::AbstractConfiguration & config)
|
||||
{
|
||||
auto diff = state_manager->getConfigurationDiff(config);
|
||||
|
@ -14,7 +14,7 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
using RaftAppendResult = nuraft::ptr<nuraft::cmd_result<nuraft::ptr<nuraft::buffer>>>;
|
||||
using RaftResult = nuraft::ptr<nuraft::cmd_result<nuraft::ptr<nuraft::buffer>>>;
|
||||
|
||||
class KeeperServer
|
||||
{
|
||||
@ -71,7 +71,9 @@ public:
|
||||
const KeeperConfigurationAndSettingsPtr & settings_,
|
||||
const Poco::Util::AbstractConfiguration & config_,
|
||||
ResponsesQueue & responses_queue_,
|
||||
SnapshotsQueue & snapshots_queue_);
|
||||
SnapshotsQueue & snapshots_queue_,
|
||||
KeeperStateMachine::CommitCallback commit_callback,
|
||||
KeeperStateMachine::ApplySnapshotCallback apply_snapshot_callback);
|
||||
|
||||
/// Load state machine from the latest snapshot and load log storage. Start NuRaft with required settings.
|
||||
void startup(const Poco::Util::AbstractConfiguration & config, bool enable_ipv6 = true);
|
||||
@ -84,7 +86,7 @@ public:
|
||||
|
||||
/// Put batch of requests into Raft and get result of put. Responses will be set separately into
|
||||
/// responses_queue.
|
||||
RaftAppendResult putRequestBatch(const KeeperStorage::RequestsForSessions & requests);
|
||||
RaftResult putRequestBatch(const KeeperStorage::RequestsForSessions & requests);
|
||||
|
||||
/// Return set of the non-active sessions
|
||||
std::vector<int64_t> getDeadSessions();
|
||||
@ -119,6 +121,17 @@ public:
|
||||
|
||||
int getServerID() const { return server_id; }
|
||||
|
||||
struct NodeInfo
|
||||
{
|
||||
uint64_t term;
|
||||
uint64_t last_committed_index;
|
||||
|
||||
bool operator==(const NodeInfo &) const = default;
|
||||
};
|
||||
|
||||
RaftResult getLeaderInfo();
|
||||
NodeInfo getNodeInfo();
|
||||
|
||||
/// Get configuration diff between current configuration in RAFT and in XML file
|
||||
ConfigUpdateActions getConfigurationDiff(const Poco::Util::AbstractConfiguration & config);
|
||||
|
||||
@ -126,10 +139,23 @@ public:
|
||||
/// Synchronously check for update results with retries.
|
||||
void applyConfigurationUpdate(const ConfigUpdateAction & task);
|
||||
|
||||
|
||||
/// Wait configuration update for action. Used by followers.
|
||||
/// Return true if update was successfully received.
|
||||
bool waitConfigurationUpdate(const ConfigUpdateAction & task);
|
||||
};
|
||||
|
||||
}
|
||||
namespace std
|
||||
{
|
||||
template <>
|
||||
struct hash<DB::KeeperServer::NodeInfo>
|
||||
{
|
||||
size_t operator()(const DB::KeeperServer::NodeInfo & info) const
|
||||
{
|
||||
SipHash hash_state;
|
||||
hash_state.update(info.term);
|
||||
hash_state.update(info.last_committed_index);
|
||||
return hash_state.get64();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -44,7 +44,9 @@ KeeperStateMachine::KeeperStateMachine(
|
||||
const std::string & snapshots_path_,
|
||||
const CoordinationSettingsPtr & coordination_settings_,
|
||||
const KeeperContextPtr & keeper_context_,
|
||||
const std::string & superdigest_)
|
||||
const std::string & superdigest_,
|
||||
CommitCallback commit_callback_,
|
||||
ApplySnapshotCallback apply_snapshot_callback_)
|
||||
: coordination_settings(coordination_settings_)
|
||||
, snapshot_manager(
|
||||
snapshots_path_,
|
||||
@ -58,6 +60,8 @@ KeeperStateMachine::KeeperStateMachine(
|
||||
, last_committed_idx(0)
|
||||
, log(&Poco::Logger::get("KeeperStateMachine"))
|
||||
, superdigest(superdigest_)
|
||||
, commit_callback(std::move(commit_callback_))
|
||||
, apply_snapshot_callback(std::move(apply_snapshot_callback_))
|
||||
, keeper_context(keeper_context_)
|
||||
{
|
||||
}
|
||||
@ -223,11 +227,11 @@ bool KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & req
|
||||
return true;
|
||||
}
|
||||
|
||||
nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, nuraft::buffer & data)
|
||||
nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit_ext(const ext_op_params & params)
|
||||
{
|
||||
auto request_for_session = parseRequest(data);
|
||||
auto request_for_session = parseRequest(*params.data);
|
||||
if (!request_for_session.zxid)
|
||||
request_for_session.zxid = log_idx;
|
||||
request_for_session.zxid = params.log_idx;
|
||||
|
||||
/// Special processing of session_id request
|
||||
if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID)
|
||||
@ -272,8 +276,9 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
|
||||
assertDigest(*request_for_session.digest, storage->getNodesDigest(true), *request_for_session.request, true);
|
||||
}
|
||||
|
||||
last_committed_idx = params.log_idx;
|
||||
commit_callback(request_for_session, params.log_term, params.log_idx);
|
||||
ProfileEvents::increment(ProfileEvents::KeeperCommits);
|
||||
last_committed_idx = log_idx;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
@ -306,6 +311,7 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::KeeperSnapshotApplys);
|
||||
last_committed_idx = s.get_last_log_idx();
|
||||
apply_snapshot_callback(s.get_last_log_term(), s.get_last_log_idx());
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -320,6 +326,10 @@ void KeeperStateMachine::commit_config(const uint64_t /* log_idx */, nuraft::ptr
|
||||
void KeeperStateMachine::rollback(uint64_t log_idx, nuraft::buffer & data)
|
||||
{
|
||||
auto request_for_session = parseRequest(data);
|
||||
|
||||
if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID)
|
||||
return;
|
||||
|
||||
// If we received a log from an older node, use the log_idx as the zxid
|
||||
// log_idx will always be larger or equal to the zxid so we can safely do this
|
||||
// (log_idx is increased for all logs, while zxid is only increased for requests)
|
||||
|
@ -20,13 +20,18 @@ using SnapshotsQueue = ConcurrentBoundedQueue<CreateSnapshotTask>;
|
||||
class KeeperStateMachine : public nuraft::state_machine
|
||||
{
|
||||
public:
|
||||
using CommitCallback = std::function<void(const KeeperStorage::RequestForSession &, uint64_t, uint64_t)>;
|
||||
using ApplySnapshotCallback = std::function<void(uint64_t, uint64_t)>;
|
||||
|
||||
KeeperStateMachine(
|
||||
ResponsesQueue & responses_queue_,
|
||||
SnapshotsQueue & snapshots_queue_,
|
||||
const std::string & snapshots_path_,
|
||||
const CoordinationSettingsPtr & coordination_settings_,
|
||||
const KeeperContextPtr & keeper_context_,
|
||||
const std::string & superdigest_ = "");
|
||||
const std::string & superdigest_ = "",
|
||||
CommitCallback commit_callback_ = [](const KeeperStorage::RequestForSession &, uint64_t, uint64_t){},
|
||||
ApplySnapshotCallback apply_snapshot_callback_ = [](uint64_t, uint64_t){});
|
||||
|
||||
/// Read state from the latest snapshot
|
||||
void init();
|
||||
@ -37,7 +42,7 @@ public:
|
||||
|
||||
nuraft::ptr<nuraft::buffer> pre_commit(uint64_t log_idx, nuraft::buffer & data) override;
|
||||
|
||||
nuraft::ptr<nuraft::buffer> commit(const uint64_t log_idx, nuraft::buffer & data) override; /// NOLINT
|
||||
nuraft::ptr<nuraft::buffer> commit_ext(const ext_op_params & params) override; /// NOLINT
|
||||
|
||||
/// Save new cluster config to our snapshot (copy of the config stored in StateManager)
|
||||
void commit_config(const uint64_t log_idx, nuraft::ptr<nuraft::cluster_config> & new_conf) override; /// NOLINT
|
||||
@ -145,6 +150,11 @@ private:
|
||||
/// Special part of ACL system -- superdigest specified in server config.
|
||||
const std::string superdigest;
|
||||
|
||||
/// call when a request is committed
|
||||
const CommitCallback commit_callback;
|
||||
/// call when snapshot is applied
|
||||
const ApplySnapshotCallback apply_snapshot_callback;
|
||||
|
||||
KeeperContextPtr keeper_context;
|
||||
};
|
||||
|
||||
|
@ -1330,8 +1330,9 @@ void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, uint
|
||||
changelog.append(entry);
|
||||
changelog.end_of_append_batch(0, 0);
|
||||
|
||||
state_machine->pre_commit(i, changelog.entry_at(i)->get_buf());
|
||||
state_machine->commit(i, changelog.entry_at(i)->get_buf());
|
||||
auto entry_buf = changelog.entry_at(i)->get_buf_ptr();
|
||||
state_machine->pre_commit(i, *entry_buf);
|
||||
state_machine->commit_ext(nuraft::state_machine::ext_op_params{i, entry_buf});
|
||||
bool snapshot_created = false;
|
||||
if (i % settings->snapshot_distance == 0)
|
||||
{
|
||||
@ -1375,8 +1376,9 @@ void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, uint
|
||||
|
||||
for (size_t i = restore_machine->last_commit_index() + 1; i < restore_changelog.next_slot(); ++i)
|
||||
{
|
||||
restore_machine->pre_commit(i, changelog.entry_at(i)->get_buf());
|
||||
restore_machine->commit(i, changelog.entry_at(i)->get_buf());
|
||||
auto entry = changelog.entry_at(i)->get_buf_ptr();
|
||||
restore_machine->pre_commit(i, *entry);
|
||||
restore_machine->commit_ext(nuraft::state_machine::ext_op_params{i, entry});
|
||||
}
|
||||
|
||||
auto & source_storage = state_machine->getStorage();
|
||||
@ -1477,18 +1479,18 @@ TEST_P(CoordinationTest, TestEphemeralNodeRemove)
|
||||
std::shared_ptr<ZooKeeperCreateRequest> request_c = std::make_shared<ZooKeeperCreateRequest>();
|
||||
request_c->path = "/hello";
|
||||
request_c->is_ephemeral = true;
|
||||
auto entry_c = getLogEntryFromZKRequest(0, 1, state_machine->getNextZxid(), request_c);
|
||||
state_machine->pre_commit(1, entry_c->get_buf());
|
||||
state_machine->commit(1, entry_c->get_buf());
|
||||
auto entry_c = getLogEntryFromZKRequest(0, 1, state_machine->getNextZxid(), request_c)->get_buf_ptr();
|
||||
state_machine->pre_commit(1, *entry_c);
|
||||
state_machine->commit_ext(nuraft::state_machine::ext_op_params{1, entry_c});
|
||||
const auto & storage = state_machine->getStorage();
|
||||
|
||||
EXPECT_EQ(storage.ephemerals.size(), 1);
|
||||
std::shared_ptr<ZooKeeperRemoveRequest> request_d = std::make_shared<ZooKeeperRemoveRequest>();
|
||||
request_d->path = "/hello";
|
||||
/// Delete from other session
|
||||
auto entry_d = getLogEntryFromZKRequest(0, 2, state_machine->getNextZxid(), request_d);
|
||||
state_machine->pre_commit(2, entry_d->get_buf());
|
||||
state_machine->commit(2, entry_d->get_buf());
|
||||
auto entry_d = getLogEntryFromZKRequest(0, 2, state_machine->getNextZxid(), request_d)->get_buf_ptr();
|
||||
state_machine->pre_commit(2, *entry_d);
|
||||
state_machine->commit_ext(nuraft::state_machine::ext_op_params{2, entry_d});
|
||||
|
||||
EXPECT_EQ(storage.ephemerals.size(), 0);
|
||||
}
|
||||
|
@ -21,7 +21,8 @@
|
||||
<heart_beat_interval_ms>1000</heart_beat_interval_ms>
|
||||
<election_timeout_lower_bound_ms>2000</election_timeout_lower_bound_ms>
|
||||
<election_timeout_upper_bound_ms>4000</election_timeout_upper_bound_ms>
|
||||
<quorum_reads>{quorum_reads}</quorum_reads>
|
||||
<quorum_reads>0</quorum_reads>
|
||||
<read_mode>fastlinear</read_mode>
|
||||
<snapshot_distance>{snapshot_distance}</snapshot_distance>
|
||||
<stale_log_gap>{stale_log_gap}</stale_log_gap>
|
||||
<reserved_log_items>{reserved_log_items}</reserved_log_items>
|
||||
|
@ -27,7 +27,12 @@
|
||||
|
||||
(invoke! [this test op]
|
||||
(case (:f op)
|
||||
:read (exec-with-retries 30 (fn []
|
||||
:read (try
|
||||
(assoc op
|
||||
:type :ok
|
||||
:value (count (zk-list conn root-path)))
|
||||
(catch Exception _ (assoc op :type :info, :error :connect-error)))
|
||||
:final-read (exec-with-retries 30 (fn []
|
||||
(assoc op
|
||||
:type :ok
|
||||
:value (count (zk-list conn root-path)))))
|
||||
@ -49,7 +54,5 @@
|
||||
:checker (checker/compose
|
||||
{:counter (checker/counter)
|
||||
:perf (checker/perf)})
|
||||
:generator (->> (range)
|
||||
(map (fn [x]
|
||||
(->> (gen/mix [r add])))))
|
||||
:final-generator (gen/once {:type :invoke, :f :read, :value nil})})
|
||||
:generator (gen/mix [r add])
|
||||
:final-generator (gen/once {:type :invoke, :f :final-read, :value nil})})
|
||||
|
@ -98,7 +98,6 @@
|
||||
#"\{srv2\}" (get nodes 1)
|
||||
#"\{srv3\}" (get nodes 2)
|
||||
#"\{id\}" (str (inc (.indexOf nodes node)))
|
||||
#"\{quorum_reads\}" (str (boolean (:quorum test)))
|
||||
#"\{snapshot_distance\}" (str (:snapshot-distance test))
|
||||
#"\{stale_log_gap\}" (str (:stale-log-gap test))
|
||||
#"\{reserved_log_items\}" (str (:reserved-log-items test))}]
|
||||
|
@ -103,7 +103,7 @@
|
||||
current-nemesis (get custom-nemesis/custom-nemesises (:nemesis opts))]
|
||||
(merge tests/noop-test
|
||||
opts
|
||||
{:name (str "clickhouse-keeper-quorum=" quorum "-" (name (:workload opts)) "-" (name (:nemesis opts)))
|
||||
{:name (str "clickhouse-keeper-" (name (:workload opts)) "-" (name (:nemesis opts)))
|
||||
:os ubuntu/os
|
||||
:db (get-db opts)
|
||||
:pure-generators true
|
||||
|
@ -20,7 +20,8 @@
|
||||
(assoc this :conn (zk-connect node 9181 30000)))
|
||||
|
||||
(setup! [this test]
|
||||
(zk-create-range conn 300)) ; 300 nodes to be sure
|
||||
(exec-with-retries 30 (fn []
|
||||
(zk-create-range conn 300))))
|
||||
|
||||
(invoke! [_ test op]
|
||||
(let [[k v] (:value op)
|
||||
|
@ -45,7 +45,7 @@
|
||||
|
||||
(defn zk-connect
|
||||
[host port timeout]
|
||||
(exec-with-retries 30 (fn [] (zk/connect (str host ":" port) :timeout-msec timeout))))
|
||||
(zk/connect (str host ":" port) :timeout-msec timeout))
|
||||
|
||||
(defn zk-create-range
|
||||
[conn n]
|
||||
|
Loading…
Reference in New Issue
Block a user