Some improvements

This commit is contained in:
alesapin 2021-04-01 15:19:39 +03:00
parent 06639666ff
commit 8fc01195ce
3 changed files with 20 additions and 30 deletions

View File

@ -150,6 +150,13 @@ void KeeperServer::putRequest(const KeeperStorage::RequestForSession & request_f
int64_t KeeperServer::getSessionID(int64_t session_timeout_ms)
{
/// Just some sanity check. We don't want to make a lot of clients wait with lock.
if (active_session_id_requests > 10)
throw Exception(ErrorCodes::RAFT_ERROR, "Too many concurrent SessionID requests already in flight");
++active_session_id_requests;
SCOPE_EXIT({ --active_session_id_requests; });
auto entry = nuraft::buffer::alloc(sizeof(int64_t));
/// Just special session request
nuraft::buffer_serializer bs(entry);

View File

@ -34,6 +34,7 @@ private:
std::atomic<bool> initialized_flag = false;
std::condition_variable initialized_cv;
std::atomic<bool> initial_batch_committed = false;
std::atomic<size_t> active_session_id_requests = 0;
nuraft::cb_func::ReturnCode callbackFunc(nuraft::cb_func::Type type, nuraft::cb_func::Param * param);

View File

@ -169,15 +169,15 @@ void KeeperStateMachine::create_snapshot(
bool ret = true;
try
{
auto snapshot_buf = snapshot_manager.serializeSnapshotToBuffer(*snapshot);
auto result_path = snapshot_manager.serializeSnapshotBufferToDisk(*snapshot_buf, snapshot->snapshot_meta->get_last_log_idx());
{
std::lock_guard lock(snapshots_lock);
auto snapshot_buf = snapshot_manager.serializeSnapshotToBuffer(*snapshot);
auto result_path = snapshot_manager.serializeSnapshotBufferToDisk(*snapshot_buf, snapshot->snapshot_meta->get_last_log_idx());
latest_snapshot_buf = snapshot_buf;
latest_snapshot_meta = snapshot->snapshot_meta;
}
LOG_DEBUG(log, "Created persistent snapshot {} with path {}", latest_snapshot_meta->get_last_log_idx(), result_path);
LOG_DEBUG(log, "Created persistent snapshot {} with path {}", latest_snapshot_meta->get_last_log_idx(), result_path);
}
{
/// Must do it with lock (clearing elements from list)
@ -228,37 +228,19 @@ void KeeperStateMachine::save_logical_snp_obj(
nuraft::ptr<nuraft::buffer> snp_buf = s.serialize();
cloned_meta = nuraft::snapshot::deserialize(*snp_buf);
/// Sometimes NuRaft can call save and create snapshots from different threads
/// at once. To avoid race conditions we serialize snapshots through snapshots_queue
/// TODO: make something better
CreateSnapshotTask snapshot_task;
std::shared_ptr<std::promise<void>> waiter = std::make_shared<std::promise<void>>();
auto future = waiter->get_future();
snapshot_task.snapshot = nullptr;
snapshot_task.create_snapshot = [this, waiter, cloned_buffer, log_idx = s.get_last_log_idx()] (KeeperStorageSnapshotPtr &&)
{
try
{
auto result_path = snapshot_manager.serializeSnapshotBufferToDisk(*cloned_buffer, log_idx);
LOG_DEBUG(log, "Saved snapshot {} to path {}", log_idx, result_path);
}
catch (...)
{
tryLogCurrentException(log);
}
waiter->set_value();
};
snapshots_queue.push(std::move(snapshot_task));
future.wait();
try
{
std::lock_guard lock(snapshots_lock);
auto result_path = snapshot_manager.serializeSnapshotBufferToDisk(*cloned_buffer, s.get_last_log_idx());
latest_snapshot_buf = cloned_buffer;
latest_snapshot_meta = cloned_meta;
LOG_DEBUG(log, "Saved snapshot {} to path {}", s.get_last_log_idx(), result_path);
obj_id++;
}
catch (...)
{
tryLogCurrentException(log);
}
obj_id++;
}
int KeeperStateMachine::read_logical_snp_obj(