diff --git a/src/Coordination/NuKeeperStateMachine.cpp b/src/Coordination/NuKeeperStateMachine.cpp index 32bb4269f20..23485cb8b5b 100644 --- a/src/Coordination/NuKeeperStateMachine.cpp +++ b/src/Coordination/NuKeeperStateMachine.cpp @@ -4,6 +4,7 @@ #include #include #include +#include namespace DB { @@ -227,7 +228,28 @@ void NuKeeperStateMachine::save_logical_snp_obj( nuraft::ptr snp_buf = s.serialize(); cloned_meta = nuraft::snapshot::deserialize(*snp_buf); - auto result_path = snapshot_manager.serializeSnapshotBufferToDisk(*cloned_buffer, s.get_last_log_idx()); + /// Sometimes NuRaft can call save and create snapshots from different threads + /// at onces. To avoid race conditions we serialize snapshots through snapshots_queue + /// TODO: make something better + CreateSnapshotTask snapshot_task; + std::shared_ptr> waiter = std::make_shared>(); + auto future = waiter->get_future(); + snapshot_task.snapshot = nullptr; + snapshot_task.create_snapshot = [this, waiter, cloned_buffer, log_idx = s.get_last_log_idx()] (NuKeeperStorageSnapshotPtr &&) + { + try + { + auto result_path = snapshot_manager.serializeSnapshotBufferToDisk(*cloned_buffer, log_idx); + LOG_DEBUG(log, "Saved snapshot {} to path {}", log_idx, result_path); + } + catch (...) + { + tryLogCurrentException(log); + } + waiter->set_value(); + }; + snapshots_queue.push(std::move(snapshot_task)); + future.wait(); { std::lock_guard lock(snapshots_lock); @@ -235,7 +257,6 @@ void NuKeeperStateMachine::save_logical_snp_obj( latest_snapshot_meta = cloned_meta; } - LOG_DEBUG(log, "Created snapshot {} with path {}", s.get_last_log_idx(), result_path); obj_id++; }