Avoid breaking batches with read requests

This commit is contained in:
Antonio Andelic 2023-03-24 14:37:40 +00:00
parent 52541e5e23
commit f353561204
6 changed files with 56 additions and 8 deletions

View File

@ -90,7 +90,7 @@ void KeeperDispatcher::requestThread()
KeeperStorage::RequestsForSessions current_batch;
bool has_read_request = false;
bool has_read_request{false};
/// If new request is not read request or we must to process it through quorum.
/// Otherwise we will process it locally.
@ -98,6 +98,8 @@ void KeeperDispatcher::requestThread()
{
current_batch.emplace_back(request);
size_t read_requests = 0;
const auto try_get_request = [&]
{
/// Trying to get batch requests as fast as possible
@ -106,7 +108,12 @@ void KeeperDispatcher::requestThread()
CurrentMetrics::sub(CurrentMetrics::KeeperOutstandingRequets);
/// Don't append read request into batch, we have to process them separately
if (!coordination_settings->quorum_reads && request.request->isReadRequest())
has_read_request = true;
{
++read_requests;
std::pair<int64_t, int32_t> key{current_batch.back().session_id, current_batch.back().request->xid};
std::lock_guard lock(read_mutex);
related_read_requests[key].push_back(request);
}
else
current_batch.emplace_back(request);
@ -118,7 +125,7 @@ void KeeperDispatcher::requestThread()
/// If we have enough requests in queue, we will try to batch at least max_quick_batch_size of them.
size_t max_quick_batch_size = coordination_settings->max_requests_quick_batch_size;
while (!shutdown_called && !has_read_request && current_batch.size() < max_quick_batch_size && try_get_request())
while (!shutdown_called && current_batch.size() + read_requests < max_quick_batch_size && try_get_request())
;
const auto prev_result_done = [&]
@ -129,7 +136,7 @@ void KeeperDispatcher::requestThread()
};
/// Waiting until previous append will be successful, or batch is big enough
while (!shutdown_called && !has_read_request && !prev_result_done() && current_batch.size() <= max_batch_size)
while (!shutdown_called && !prev_result_done() && current_batch.size() <= max_batch_size)
{
try_get_request();
}
@ -319,7 +326,22 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf
snapshot_s3.startup(config, macros);
server = std::make_unique<KeeperServer>(configuration_and_settings, config, responses_queue, snapshots_queue, snapshot_s3);
server = std::make_unique<KeeperServer>(configuration_and_settings, config, responses_queue, snapshots_queue, snapshot_s3, [this](const KeeperStorage::RequestForSession & request_for_session)
{
std::lock_guard lock(read_mutex);
if (auto it = related_read_requests.find(std::pair{request_for_session.session_id, request_for_session.request->xid}); it != related_read_requests.end())
{
for (const auto & read_request : it->second)
{
if (server->isLeaderAlive())
server->putLocalReadRequest(read_request);
else
addErrorResponses({read_request}, Coordination::Error::ZCONNECTIONLOSS);
}
related_read_requests.erase(it);
}
});
try
{

View File

@ -1,5 +1,6 @@
#pragma once
#include "Common/ZooKeeper/ZooKeeperCommon.h"
#include "config.h"
#if USE_NURAFT
@ -103,6 +104,21 @@ private:
void forceWaitAndProcessResult(RaftAppendResult & result, KeeperStorage::RequestsForSessions & requests_for_sessions);
public:
std::mutex read_mutex;
struct PairHash
{
auto operator()(std::pair<int64_t, int32_t> pair) const
{
SipHash hash;
hash.update(pair.first);
hash.update(pair.second);
return hash.get64();
}
};
std::unordered_map<std::pair<int64_t, int32_t>, KeeperStorage::RequestsForSessions, PairHash> related_read_requests;
/// Just allocate some objects, real initialization is done by `intialize method`
KeeperDispatcher();

View File

@ -107,7 +107,8 @@ KeeperServer::KeeperServer(
const Poco::Util::AbstractConfiguration & config,
ResponsesQueue & responses_queue_,
SnapshotsQueue & snapshots_queue_,
KeeperSnapshotManagerS3 & snapshot_manager_s3)
KeeperSnapshotManagerS3 & snapshot_manager_s3,
KeeperStateMachine::CommitCallback commit_callback)
: server_id(configuration_and_settings_->server_id)
, coordination_settings(configuration_and_settings_->coordination_settings)
, log(&Poco::Logger::get("KeeperServer"))
@ -128,6 +129,7 @@ KeeperServer::KeeperServer(
coordination_settings,
keeper_context,
config.getBool("keeper_server.upload_snapshot_on_exit", true) ? &snapshot_manager_s3 : nullptr,
commit_callback,
checkAndGetSuperdigest(configuration_and_settings_->super_digest));
state_manager = nuraft::cs_new<KeeperStateManager>(

View File

@ -72,7 +72,8 @@ public:
const Poco::Util::AbstractConfiguration & config_,
ResponsesQueue & responses_queue_,
SnapshotsQueue & snapshots_queue_,
KeeperSnapshotManagerS3 & snapshot_manager_s3);
KeeperSnapshotManagerS3 & snapshot_manager_s3,
KeeperStateMachine::CommitCallback commit_callback);
/// Load state machine from the latest snapshot and load log storage. Start NuRaft with required settings.
void startup(const Poco::Util::AbstractConfiguration & config, bool enable_ipv6 = true);

View File

@ -46,8 +46,10 @@ KeeperStateMachine::KeeperStateMachine(
const CoordinationSettingsPtr & coordination_settings_,
const KeeperContextPtr & keeper_context_,
KeeperSnapshotManagerS3 * snapshot_manager_s3_,
CommitCallback commit_callback_,
const std::string & superdigest_)
: coordination_settings(coordination_settings_)
: commit_callback(commit_callback_)
, coordination_settings(coordination_settings_)
, snapshot_manager(
snapshots_path_,
coordination_settings->snapshots_to_keep,
@ -274,6 +276,7 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
ProfileEvents::increment(ProfileEvents::KeeperCommits);
last_committed_idx = log_idx;
commit_callback(request_for_session);
return nullptr;
}

View File

@ -22,6 +22,8 @@ using SnapshotsQueue = ConcurrentBoundedQueue<CreateSnapshotTask>;
class KeeperStateMachine : public nuraft::state_machine
{
public:
using CommitCallback = std::function<void(const KeeperStorage::RequestForSession &)>;
KeeperStateMachine(
ResponsesQueue & responses_queue_,
SnapshotsQueue & snapshots_queue_,
@ -29,6 +31,7 @@ public:
const CoordinationSettingsPtr & coordination_settings_,
const KeeperContextPtr & keeper_context_,
KeeperSnapshotManagerS3 * snapshot_manager_s3_,
CommitCallback commit_callback_,
const std::string & superdigest_ = "");
/// Read state from the latest snapshot
@ -105,6 +108,7 @@ public:
void recalculateStorageStats();
private:
CommitCallback commit_callback;
/// In our state machine we always have a single snapshot which is stored
/// in memory in compressed (serialized) format.
SnapshotMetadataPtr latest_snapshot_meta = nullptr;