diff --git a/src/Coordination/KeeperServer.cpp b/src/Coordination/KeeperServer.cpp index fef373a399e..c7446c296f0 100644 --- a/src/Coordination/KeeperServer.cpp +++ b/src/Coordination/KeeperServer.cpp @@ -150,6 +150,13 @@ void KeeperServer::putRequest(const KeeperStorage::RequestForSession & request_f int64_t KeeperServer::getSessionID(int64_t session_timeout_ms) { + /// Just some sanity check. We don't want to make a lot of clients wait with lock. + if (active_session_id_requests > 10) + throw Exception(ErrorCodes::RAFT_ERROR, "Too many concurrent SessionID requests already in flight"); + + ++active_session_id_requests; + SCOPE_EXIT({ --active_session_id_requests; }); + auto entry = nuraft::buffer::alloc(sizeof(int64_t)); /// Just special session request nuraft::buffer_serializer bs(entry); diff --git a/src/Coordination/KeeperServer.h b/src/Coordination/KeeperServer.h index 091a2b0150f..9a0260cda94 100644 --- a/src/Coordination/KeeperServer.h +++ b/src/Coordination/KeeperServer.h @@ -34,6 +34,7 @@ private: std::atomic initialized_flag = false; std::condition_variable initialized_cv; std::atomic initial_batch_committed = false; + std::atomic active_session_id_requests = 0; nuraft::cb_func::ReturnCode callbackFunc(nuraft::cb_func::Type type, nuraft::cb_func::Param * param); diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index 1a1957b1979..227bb1aaa4e 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -169,15 +169,15 @@ void KeeperStateMachine::create_snapshot( bool ret = true; try { - auto snapshot_buf = snapshot_manager.serializeSnapshotToBuffer(*snapshot); - auto result_path = snapshot_manager.serializeSnapshotBufferToDisk(*snapshot_buf, snapshot->snapshot_meta->get_last_log_idx()); { std::lock_guard lock(snapshots_lock); + auto snapshot_buf = snapshot_manager.serializeSnapshotToBuffer(*snapshot); + auto result_path = snapshot_manager.serializeSnapshotBufferToDisk(*snapshot_buf, snapshot->snapshot_meta->get_last_log_idx()); latest_snapshot_buf = snapshot_buf; latest_snapshot_meta = snapshot->snapshot_meta; - } - LOG_DEBUG(log, "Created persistent snapshot {} with path {}", latest_snapshot_meta->get_last_log_idx(), result_path); + LOG_DEBUG(log, "Created persistent snapshot {} with path {}", latest_snapshot_meta->get_last_log_idx(), result_path); + } { /// Must do it with lock (clearing elements from list) @@ -228,37 +228,19 @@ void KeeperStateMachine::save_logical_snp_obj( nuraft::ptr snp_buf = s.serialize(); cloned_meta = nuraft::snapshot::deserialize(*snp_buf); - /// Sometimes NuRaft can call save and create snapshots from different threads - /// at once. To avoid race conditions we serialize snapshots through snapshots_queue - /// TODO: make something better - CreateSnapshotTask snapshot_task; - std::shared_ptr> waiter = std::make_shared>(); - auto future = waiter->get_future(); - snapshot_task.snapshot = nullptr; - snapshot_task.create_snapshot = [this, waiter, cloned_buffer, log_idx = s.get_last_log_idx()] (KeeperStorageSnapshotPtr &&) - { - try - { - auto result_path = snapshot_manager.serializeSnapshotBufferToDisk(*cloned_buffer, log_idx); - LOG_DEBUG(log, "Saved snapshot {} to path {}", log_idx, result_path); - } - catch (...) - { - tryLogCurrentException(log); - } - waiter->set_value(); - }; - snapshots_queue.push(std::move(snapshot_task)); - future.wait(); - + try { std::lock_guard lock(snapshots_lock); + auto result_path = snapshot_manager.serializeSnapshotBufferToDisk(*cloned_buffer, s.get_last_log_idx()); latest_snapshot_buf = cloned_buffer; latest_snapshot_meta = cloned_meta; + LOG_DEBUG(log, "Saved snapshot {} to path {}", s.get_last_log_idx(), result_path); + obj_id++; + } + catch (...) + { + tryLogCurrentException(log); } - - - obj_id++; } int KeeperStateMachine::read_logical_snp_obj(