diff --git a/src/Coordination/KeeperSnapshotManager.cpp b/src/Coordination/KeeperSnapshotManager.cpp index 8d5df7c35e9..5c009116010 100644 --- a/src/Coordination/KeeperSnapshotManager.cpp +++ b/src/Coordination/KeeperSnapshotManager.cpp @@ -526,5 +526,33 @@ void KeeperSnapshotManager::removeSnapshot(uint64_t log_idx) existing_snapshots.erase(itr); } +std::pair KeeperSnapshotManager::serializeSnapshotToDisk(const KeeperStorageSnapshot & snapshot) +{ + auto up_to_log_idx = snapshot.snapshot_meta->get_last_log_idx(); + auto snapshot_file_name = getSnapshotFileName(up_to_log_idx, compress_snapshots_zstd); + auto tmp_snapshot_file_name = "tmp_" + snapshot_file_name; + std::string tmp_snapshot_path = std::filesystem::path{snapshots_path} / tmp_snapshot_file_name; + std::string new_snapshot_path = std::filesystem::path{snapshots_path} / snapshot_file_name; + + auto writer = std::make_unique(tmp_snapshot_path, O_WRONLY | O_TRUNC | O_CREAT | O_CLOEXEC| O_APPEND); + std::unique_ptr compressed_writer; + if (compress_snapshots_zstd) + compressed_writer = wrapWriteBufferWithCompressionMethod(std::move(writer), CompressionMethod::Zstd, 3); + else + compressed_writer = std::make_unique(*writer); + + KeeperStorageSnapshot::serialize(snapshot, *compressed_writer); + compressed_writer->finalize(); + compressed_writer->sync(); + + std::error_code ec; + std::filesystem::rename(tmp_snapshot_path, new_snapshot_path, ec); + if (!ec) + { + existing_snapshots.emplace(up_to_log_idx, new_snapshot_path); + removeOutdatedSnapshotsIfNeeded(); + } + return {new_snapshot_path, ec}; +} } diff --git a/src/Coordination/KeeperSnapshotManager.h b/src/Coordination/KeeperSnapshotManager.h index 174864a0ceb..81eb2b91893 100644 --- a/src/Coordination/KeeperSnapshotManager.h +++ b/src/Coordination/KeeperSnapshotManager.h @@ -1,4 +1,5 @@ #pragma once +#include #include #include #include @@ -101,6 +102,9 @@ public: /// Serialize already compressed snapshot to disk (return path) std::string serializeSnapshotBufferToDisk(nuraft::buffer & buffer, uint64_t up_to_log_idx); + /// Serialize snapshot directly to disk + std::pair serializeSnapshotToDisk(const KeeperStorageSnapshot & snapshot); + SnapshotDeserializationResult deserializeSnapshotFromBuffer(nuraft::ptr buffer) const; /// Deserialize snapshot with log index up_to_log_idx from disk into compressed nuraft buffer. @@ -126,6 +130,13 @@ public: return 0; } + std::string getLatestSnapshotPath() const + { + if (!existing_snapshots.empty()) + return existing_snapshots.at(getLatestSnapshotIndex()); + return ""; + } + private: void removeOutdatedSnapshotsIfNeeded(); diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index 20d3bcbfd30..b0cf91e51a2 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -73,8 +74,8 @@ void KeeperStateMachine::init() try { - latest_snapshot_buf = snapshot_manager.deserializeSnapshotBufferFromDisk(latest_log_index); - auto snapshot_deserialization_result = snapshot_manager.deserializeSnapshotFromBuffer(latest_snapshot_buf); + auto snapshot_deserialization_result = snapshot_manager.deserializeSnapshotFromBuffer(snapshot_manager.deserializeSnapshotBufferFromDisk(latest_log_index)); + latest_snapshot_path = snapshot_manager.getLatestSnapshotPath(); storage = std::move(snapshot_deserialization_result.storage); latest_snapshot_meta = snapshot_deserialization_result.snapshot_meta; cluster_config = snapshot_deserialization_result.cluster_config; @@ -155,7 +156,7 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s) { /// deserialize and apply snapshot to storage std::lock_guard lock(storage_and_responses_lock); - auto snapshot_deserialization_result = snapshot_manager.deserializeSnapshotFromBuffer(latest_snapshot_ptr); + auto snapshot_deserialization_result = snapshot_manager.deserializeSnapshotFromBuffer(snapshot_manager.deserializeSnapshotBufferFromDisk(s.get_last_log_idx())); storage = std::move(snapshot_deserialization_result.storage); latest_snapshot_meta = snapshot_deserialization_result.snapshot_meta; cluster_config = snapshot_deserialization_result.cluster_config; @@ -207,8 +208,14 @@ void KeeperStateMachine::create_snapshot( auto result_path = snapshot_manager.serializeSnapshotBufferToDisk(*snapshot_buf, snapshot->snapshot_meta->get_last_log_idx()); latest_snapshot_buf = snapshot_buf; latest_snapshot_meta = snapshot->snapshot_meta; - - LOG_DEBUG(log, "Created persistent snapshot {} with path {}", latest_snapshot_meta->get_last_log_idx(), result_path); + auto [path, error_code]= snapshot_manager.serializeSnapshotToDisk(*snapshot); + if (error_code) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Snapshot {} was created failed, error: {}", + snapshot->snapshot_meta->get_last_log_idx(), error_code.message()); + } + latest_snapshot_path = path; + LOG_DEBUG(log, "Created persistent snapshot {} with path {}", latest_snapshot_meta->get_last_log_idx(), path); } { @@ -258,7 +265,6 @@ void KeeperStateMachine::save_logical_snp_obj( else { /// copy snapshot into memory - cloned_buffer = nuraft::buffer::clone(data); } /// copy snapshot meta into memory @@ -268,9 +274,9 @@ void KeeperStateMachine::save_logical_snp_obj( try { std::lock_guard lock(snapshots_lock); - /// Serialize snapshot to disk and switch in memory pointers. - auto result_path = snapshot_manager.serializeSnapshotBufferToDisk(*cloned_buffer, s.get_last_log_idx()); - latest_snapshot_buf = cloned_buffer; + /// Serialize snapshot to disk + auto result_path = snapshot_manager.serializeSnapshotBufferToDisk(data, s.get_last_log_idx()); + latest_snapshot_path = result_path; latest_snapshot_meta = cloned_meta; LOG_DEBUG(log, "Saved snapshot {} to path {}", s.get_last_log_idx(), result_path); obj_id++; @@ -308,7 +314,25 @@ int KeeperStateMachine::read_logical_snp_obj( s.get_last_log_idx(), latest_snapshot_meta->get_last_log_idx()); return -1; } - data_out = nuraft::buffer::clone(*latest_snapshot_buf); + int fd = ::open(latest_snapshot_path.c_str(), O_RDONLY); + LOG_INFO(log, "Opening file {} for read_logical_snp_obj", latest_snapshot_path); + if (fd < 0) + { + LOG_WARNING(log, "Error opening {}.", latest_snapshot_path); + return -1; + } + auto file_size = ::lseek(fd, 0, SEEK_END); + ::lseek(fd, 0, SEEK_SET); + auto* chunk = reinterpret_cast(::mmap(nullptr, file_size, PROT_READ, MAP_FILE | MAP_SHARED, fd, 0)); + if (chunk == MAP_FAILED) + { + LOG_WARNING(log, "Error mmapping {}.", latest_snapshot_path); + ::close(fd); + return -1; + } + data_out->put_raw(chunk, file_size); + ::munmap(chunk, file_size); + ::close(fd); is_last_obj = true; } diff --git a/src/Coordination/KeeperStateMachine.h b/src/Coordination/KeeperStateMachine.h index 291b58e2498..a6d116ca745 100644 --- a/src/Coordination/KeeperStateMachine.h +++ b/src/Coordination/KeeperStateMachine.h @@ -105,6 +105,7 @@ private: /// In our state machine we always have a single snapshot which is stored /// in memory in compressed (serialized) format. SnapshotMetadataPtr latest_snapshot_meta = nullptr; + std::string latest_snapshot_path; nuraft::ptr latest_snapshot_buf = nullptr; CoordinationSettingsPtr coordination_settings;