keeper snapshot memory opt.

This commit is contained in:
zhanglistar 2022-02-14 19:43:08 +08:00
parent 34092fa2a3
commit edcea7dc31
4 changed files with 74 additions and 10 deletions

View File

@ -526,5 +526,33 @@ void KeeperSnapshotManager::removeSnapshot(uint64_t log_idx)
existing_snapshots.erase(itr);
}
std::pair<std::string, std::error_code> KeeperSnapshotManager::serializeSnapshotToDisk(const KeeperStorageSnapshot & snapshot)
{
auto up_to_log_idx = snapshot.snapshot_meta->get_last_log_idx();
auto snapshot_file_name = getSnapshotFileName(up_to_log_idx, compress_snapshots_zstd);
auto tmp_snapshot_file_name = "tmp_" + snapshot_file_name;
std::string tmp_snapshot_path = std::filesystem::path{snapshots_path} / tmp_snapshot_file_name;
std::string new_snapshot_path = std::filesystem::path{snapshots_path} / snapshot_file_name;
auto writer = std::make_unique<WriteBufferFromFile>(tmp_snapshot_path, O_WRONLY | O_TRUNC | O_CREAT | O_CLOEXEC| O_APPEND);
std::unique_ptr<WriteBuffer> compressed_writer;
if (compress_snapshots_zstd)
compressed_writer = wrapWriteBufferWithCompressionMethod(std::move(writer), CompressionMethod::Zstd, 3);
else
compressed_writer = std::make_unique<CompressedWriteBuffer>(*writer);
KeeperStorageSnapshot::serialize(snapshot, *compressed_writer);
compressed_writer->finalize();
compressed_writer->sync();
std::error_code ec;
std::filesystem::rename(tmp_snapshot_path, new_snapshot_path, ec);
if (!ec)
{
existing_snapshots.emplace(up_to_log_idx, new_snapshot_path);
removeOutdatedSnapshotsIfNeeded();
}
return {new_snapshot_path, ec};
}
}

View File

@ -1,4 +1,5 @@
#pragma once
#include <system_error>
#include <libnuraft/nuraft.hxx>
#include <Coordination/KeeperStorage.h>
#include <IO/WriteBuffer.h>
@ -101,6 +102,9 @@ public:
/// Serialize already compressed snapshot to disk (return path)
std::string serializeSnapshotBufferToDisk(nuraft::buffer & buffer, uint64_t up_to_log_idx);
/// Serialize snapshot directly to disk
std::pair<std::string, std::error_code> serializeSnapshotToDisk(const KeeperStorageSnapshot & snapshot);
SnapshotDeserializationResult deserializeSnapshotFromBuffer(nuraft::ptr<nuraft::buffer> buffer) const;
/// Deserialize snapshot with log index up_to_log_idx from disk into compressed nuraft buffer.
@ -126,6 +130,13 @@ public:
return 0;
}
std::string getLatestSnapshotPath() const
{
if (!existing_snapshots.empty())
return existing_snapshots.at(getLatestSnapshotIndex());
return "";
}
private:
void removeOutdatedSnapshotsIfNeeded();

View File

@ -1,3 +1,4 @@
#include <sys/mman.h>
#include <Coordination/KeeperStateMachine.h>
#include <Coordination/ReadBufferFromNuraftBuffer.h>
#include <Coordination/WriteBufferFromNuraftBuffer.h>
@ -73,8 +74,8 @@ void KeeperStateMachine::init()
try
{
latest_snapshot_buf = snapshot_manager.deserializeSnapshotBufferFromDisk(latest_log_index);
auto snapshot_deserialization_result = snapshot_manager.deserializeSnapshotFromBuffer(latest_snapshot_buf);
auto snapshot_deserialization_result = snapshot_manager.deserializeSnapshotFromBuffer(snapshot_manager.deserializeSnapshotBufferFromDisk(latest_log_index));
latest_snapshot_path = snapshot_manager.getLatestSnapshotPath();
storage = std::move(snapshot_deserialization_result.storage);
latest_snapshot_meta = snapshot_deserialization_result.snapshot_meta;
cluster_config = snapshot_deserialization_result.cluster_config;
@ -155,7 +156,7 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
{ /// deserialize and apply snapshot to storage
std::lock_guard lock(storage_and_responses_lock);
auto snapshot_deserialization_result = snapshot_manager.deserializeSnapshotFromBuffer(latest_snapshot_ptr);
auto snapshot_deserialization_result = snapshot_manager.deserializeSnapshotFromBuffer(snapshot_manager.deserializeSnapshotBufferFromDisk(s.get_last_log_idx()));
storage = std::move(snapshot_deserialization_result.storage);
latest_snapshot_meta = snapshot_deserialization_result.snapshot_meta;
cluster_config = snapshot_deserialization_result.cluster_config;
@ -207,8 +208,14 @@ void KeeperStateMachine::create_snapshot(
auto result_path = snapshot_manager.serializeSnapshotBufferToDisk(*snapshot_buf, snapshot->snapshot_meta->get_last_log_idx());
latest_snapshot_buf = snapshot_buf;
latest_snapshot_meta = snapshot->snapshot_meta;
LOG_DEBUG(log, "Created persistent snapshot {} with path {}", latest_snapshot_meta->get_last_log_idx(), result_path);
auto [path, error_code]= snapshot_manager.serializeSnapshotToDisk(*snapshot);
if (error_code)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Snapshot {} was created failed, error: {}",
snapshot->snapshot_meta->get_last_log_idx(), error_code.message());
}
latest_snapshot_path = path;
LOG_DEBUG(log, "Created persistent snapshot {} with path {}", latest_snapshot_meta->get_last_log_idx(), path);
}
{
@ -258,7 +265,6 @@ void KeeperStateMachine::save_logical_snp_obj(
else
{
/// copy snapshot into memory
cloned_buffer = nuraft::buffer::clone(data);
}
/// copy snapshot meta into memory
@ -268,9 +274,9 @@ void KeeperStateMachine::save_logical_snp_obj(
try
{
std::lock_guard lock(snapshots_lock);
/// Serialize snapshot to disk and switch in memory pointers.
auto result_path = snapshot_manager.serializeSnapshotBufferToDisk(*cloned_buffer, s.get_last_log_idx());
latest_snapshot_buf = cloned_buffer;
/// Serialize snapshot to disk
auto result_path = snapshot_manager.serializeSnapshotBufferToDisk(data, s.get_last_log_idx());
latest_snapshot_path = result_path;
latest_snapshot_meta = cloned_meta;
LOG_DEBUG(log, "Saved snapshot {} to path {}", s.get_last_log_idx(), result_path);
obj_id++;
@ -308,7 +314,25 @@ int KeeperStateMachine::read_logical_snp_obj(
s.get_last_log_idx(), latest_snapshot_meta->get_last_log_idx());
return -1;
}
data_out = nuraft::buffer::clone(*latest_snapshot_buf);
int fd = ::open(latest_snapshot_path.c_str(), O_RDONLY);
LOG_INFO(log, "Opening file {} for read_logical_snp_obj", latest_snapshot_path);
if (fd < 0)
{
LOG_WARNING(log, "Error opening {}.", latest_snapshot_path);
return -1;
}
auto file_size = ::lseek(fd, 0, SEEK_END);
::lseek(fd, 0, SEEK_SET);
auto* chunk = reinterpret_cast<nuraft::byte*>(::mmap(nullptr, file_size, PROT_READ, MAP_FILE | MAP_SHARED, fd, 0));
if (chunk == MAP_FAILED)
{
LOG_WARNING(log, "Error mmapping {}.", latest_snapshot_path);
::close(fd);
return -1;
}
data_out->put_raw(chunk, file_size);
::munmap(chunk, file_size);
::close(fd);
is_last_obj = true;
}

View File

@ -105,6 +105,7 @@ private:
/// In our state machine we always have a single snapshot which is stored
/// in memory in compressed (serialized) format.
SnapshotMetadataPtr latest_snapshot_meta = nullptr;
std::string latest_snapshot_path;
nuraft::ptr<nuraft::buffer> latest_snapshot_buf = nullptr;
CoordinationSettingsPtr coordination_settings;