Add support for latest snapshot disk

This commit is contained in:
Antonio Andelic 2023-05-30 13:22:40 +00:00
parent 67746bfff2
commit 21aba94909
5 changed files with 119 additions and 49 deletions

View File

@ -43,6 +43,11 @@ void KeeperContext::initialize(const Poco::Util::AbstractConfiguration & config)
snapshot_storage = getSnapshotsPathFromConfig(config);
if (config.has("keeper_server.latest_snapshot_storage_disk"))
latest_snapshot_storage = config.getString("keeper_server.latest_snapshot_storage_disk");
else
latest_snapshot_storage = snapshot_storage;
state_file_storage = getStatePathFromConfig(config);
}
@ -107,6 +112,11 @@ void KeeperContext::setLogDisk(DiskPtr disk)
latest_log_storage = std::move(disk);
}
DiskPtr KeeperContext::getLatestSnapshotDisk() const
{
return getDisk(latest_snapshot_storage);
}
DiskPtr KeeperContext::getSnapshotDisk() const
{
return getDisk(snapshot_storage);
@ -126,6 +136,7 @@ std::vector<DiskPtr> KeeperContext::getOldSnapshotDisks() const
void KeeperContext::setSnapshotDisk(DiskPtr disk)
{
snapshot_storage = std::move(disk);
latest_snapshot_storage = snapshot_storage;
}
DiskPtr KeeperContext::getStateFileDisk() const

View File

@ -39,6 +39,7 @@ public:
std::vector<DiskPtr> getOldLogDisks() const;
void setLogDisk(DiskPtr disk);
DiskPtr getLatestSnapshotDisk() const;
DiskPtr getSnapshotDisk() const;
std::vector<DiskPtr> getOldSnapshotDisks() const;
void setSnapshotDisk(DiskPtr disk);
@ -67,6 +68,7 @@ private:
Storage log_storage;
Storage latest_log_storage;
Storage snapshot_storage;
Storage latest_snapshot_storage;
Storage state_file_storage;
std::vector<std::string> old_log_disk_names;

View File

@ -16,6 +16,7 @@
#include <Coordination/pathUtils.h>
#include <Coordination/KeeperConstants.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include "Core/Field.h"
#include <Disks/DiskLocal.h>
@ -31,6 +32,24 @@ namespace ErrorCodes
namespace
{
constexpr std::string_view tmp_prefix = "tmp_";
void moveFileBetweenDisks(DiskPtr disk_from, const std::string & path_from, DiskPtr disk_to, const std::string & path_to)
{
/// we use empty file with prefix tmp_ to detect incomplete copies
/// if a copy is complete we don't care from which disk we use the same file
/// so it's okay if a failure happens after removing of tmp file but before we remove
/// the snapshot from the source disk
auto from_path = fs::path(path_from);
auto tmp_snapshot_name = from_path.parent_path() / (std::string{tmp_prefix} + from_path.filename().string());
{
disk_to->writeFile(tmp_snapshot_name);
}
disk_from->copyFile(from_path, *disk_to, path_to, {});
disk_to->removeFile(tmp_snapshot_name);
disk_from->removeFile(path_from);
}
uint64_t getSnapshotPathUpToLogIdx(const String & snapshot_path)
{
std::filesystem::path path(snapshot_path);
@ -519,43 +538,48 @@ KeeperSnapshotManager::KeeperSnapshotManager(
, storage_tick_time(storage_tick_time_)
, keeper_context(keeper_context_)
{
const auto load_snapshot_from_disk = [&](DiskPtr disk)
const auto load_snapshot_from_disk = [&](const auto & disk)
{
std::unordered_set<std::string> invalid_snapshots;
/// collect invalid snapshots
LOG_TRACE(log, "Reading from disk {}", disk->getName());
std::unordered_map<std::string, std::string> incomplete_files;
const auto clean_incomplete_file = [&](const auto & file_path)
{
if (auto incomplete_it = incomplete_files.find(fs::path(file_path).filename()); incomplete_it != incomplete_files.end())
{
LOG_TRACE(log, "Removing {} from {}", file_path, disk->getName());
disk->removeFile(file_path);
disk->removeFile(incomplete_it->second);
incomplete_files.erase(incomplete_it);
return true;
}
return false;
};
std::vector<std::string> snapshot_files;
for (auto it = disk->iterateDirectory(""); it->isValid(); it->next())
{
const auto & name = it->name();
if (name.empty())
continue;
if (startsWith(name, "tmp_"))
if (it->name().starts_with(tmp_prefix))
{
disk->removeFile(it->path());
invalid_snapshots.insert(name.substr(4));
continue;
}
}
/// process snapshots
for (auto it = disk->iterateDirectory(""); it->isValid(); it->next())
{
const auto & name = it->name();
if (name.empty())
continue;
/// Not snapshot file
if (!startsWith(name, "snapshot_"))
continue;
if (invalid_snapshots.contains(name))
{
disk->removeFile(it->path());
incomplete_files.emplace(it->name().substr(tmp_prefix.size()), it->path());
continue;
}
size_t snapshot_up_to = getSnapshotPathUpToLogIdx(name);
auto [_, inserted] = existing_snapshots.insert_or_assign(snapshot_up_to, SnapshotFileInfo{it->path(), disk});
if (clean_incomplete_file(it->path()))
continue;
snapshot_files.push_back(it->path());
}
for (const auto & snapshot_file : snapshot_files)
{
if (clean_incomplete_file(fs::path(snapshot_file).filename()))
continue;
LOG_TRACE(log, "Found {} on {}", snapshot_file, disk->getName());
size_t snapshot_up_to = getSnapshotPathUpToLogIdx(snapshot_file);
auto [_, inserted] = existing_snapshots.insert_or_assign(snapshot_up_to, SnapshotFileInfo{snapshot_file, disk});
if (!inserted)
LOG_WARNING(
@ -564,6 +588,9 @@ KeeperSnapshotManager::KeeperSnapshotManager(
snapshot_up_to,
disk->getName());
}
for (const auto & [name, path] : incomplete_files)
disk->removeFile(path);
};
for (const auto & disk : keeper_context->getOldSnapshotDisks())
@ -572,25 +599,12 @@ KeeperSnapshotManager::KeeperSnapshotManager(
auto disk = getDisk();
load_snapshot_from_disk(disk);
auto latest_snapshot_disk = getLatestSnapshotDisk();
if (latest_snapshot_disk != disk)
load_snapshot_from_disk(latest_snapshot_disk);
removeOutdatedSnapshotsIfNeeded();
/// move snapshots from old disks to new one
for (auto & [_, file_info] : existing_snapshots)
{
if (file_info.disk == disk)
continue;
auto file_path = fs::path(file_info.path);
auto tmp_snapshot_path = file_path.parent_path() / ("tmp_" + file_path.filename().generic_string());
{
disk->writeFile(tmp_snapshot_path);
}
file_info.disk->copyFile(file_info.path, *disk, file_info.path, {});
disk->removeFile(tmp_snapshot_path);
file_info.disk = disk;
}
moveSnapshotsIfNeeded();
}
SnapshotFileInfo KeeperSnapshotManager::serializeSnapshotBufferToDisk(nuraft::buffer & buffer, uint64_t up_to_log_idx)
@ -600,7 +614,7 @@ SnapshotFileInfo KeeperSnapshotManager::serializeSnapshotBufferToDisk(nuraft::bu
auto snapshot_file_name = getSnapshotFileName(up_to_log_idx, compress_snapshots_zstd);
auto tmp_snapshot_file_name = "tmp_" + snapshot_file_name;
auto disk = getDisk();
auto disk = getLatestSnapshotDisk();
{
disk->writeFile(tmp_snapshot_file_name);
@ -614,6 +628,7 @@ SnapshotFileInfo KeeperSnapshotManager::serializeSnapshotBufferToDisk(nuraft::bu
existing_snapshots.emplace(up_to_log_idx, SnapshotFileInfo{snapshot_file_name, disk});
removeOutdatedSnapshotsIfNeeded();
moveSnapshotsIfNeeded();
return {snapshot_file_name, disk};
}
@ -710,12 +725,47 @@ DiskPtr KeeperSnapshotManager::getDisk() const
return keeper_context->getSnapshotDisk();
}
DiskPtr KeeperSnapshotManager::getLatestSnapshotDisk() const
{
return keeper_context->getLatestSnapshotDisk();
}
void KeeperSnapshotManager::removeOutdatedSnapshotsIfNeeded()
{
while (existing_snapshots.size() > snapshots_to_keep)
removeSnapshot(existing_snapshots.begin()->first);
}
void KeeperSnapshotManager::moveSnapshotsIfNeeded()
{
/// move snapshots to correct disks
auto disk = getDisk();
auto latest_snapshot_disk = getLatestSnapshotDisk();
auto latest_snapshot_idx = getLatestSnapshotIndex();
for (auto & [idx, file_info] : existing_snapshots)
{
if (idx == latest_snapshot_idx)
{
if (file_info.disk != latest_snapshot_disk)
{
moveFileBetweenDisks(file_info.disk, file_info.path, latest_snapshot_disk, file_info.path);
file_info.disk = latest_snapshot_disk;
}
}
else
{
if (file_info.disk != disk)
{
moveFileBetweenDisks(file_info.disk, file_info.path, disk, file_info.path);
file_info.disk = disk;
}
}
}
}
void KeeperSnapshotManager::removeSnapshot(uint64_t log_idx)
{
auto itr = existing_snapshots.find(log_idx);
@ -732,7 +782,7 @@ SnapshotFileInfo KeeperSnapshotManager::serializeSnapshotToDisk(const KeeperStor
auto snapshot_file_name = getSnapshotFileName(up_to_log_idx, compress_snapshots_zstd);
auto tmp_snapshot_file_name = "tmp_" + snapshot_file_name;
auto disk = getDisk();
auto disk = getLatestSnapshotDisk();
{
disk->writeFile(tmp_snapshot_file_name);
}
@ -752,6 +802,7 @@ SnapshotFileInfo KeeperSnapshotManager::serializeSnapshotToDisk(const KeeperStor
existing_snapshots.emplace(up_to_log_idx, SnapshotFileInfo{snapshot_file_name, disk});
removeOutdatedSnapshotsIfNeeded();
moveSnapshotsIfNeeded();
return {snapshot_file_name, disk};
}

View File

@ -165,8 +165,10 @@ public:
private:
void removeOutdatedSnapshotsIfNeeded();
void moveSnapshotsIfNeeded();
DiskPtr getDisk() const;
DiskPtr getLatestSnapshotDisk() const;
/// Checks first 4 buffer bytes to became sure that snapshot compressed with
/// ZSTD codec.
@ -184,6 +186,8 @@ private:
size_t storage_tick_time;
KeeperContextPtr keeper_context;
Poco::Logger * log = &Poco::Logger::get("KeeperSnapshotManager");
};
/// Keeper create snapshots in background thread. KeeperStateMachine just create

View File

@ -468,15 +468,17 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
{
latest_snapshot_meta = snapshot->snapshot_meta;
/// we rely on the fact that the snapshot disk cannot be changed during runtime
if (isLocalDisk(*keeper_context->getSnapshotDisk()))
if (isLocalDisk(*keeper_context->getLatestSnapshotDisk()))
{
latest_snapshot_info = snapshot_manager.serializeSnapshotToDisk(*snapshot);
auto snapshot_info = snapshot_manager.serializeSnapshotToDisk(*snapshot);
latest_snapshot_info = std::move(snapshot_info);
latest_snapshot_buf = nullptr;
}
else
{
auto snapshot_buf = snapshot_manager.serializeSnapshotToBuffer(*snapshot);
latest_snapshot_info = snapshot_manager.serializeSnapshotBufferToDisk(*snapshot_buf, snapshot->snapshot_meta->get_last_log_idx());
auto snapshot_info = snapshot_manager.serializeSnapshotBufferToDisk(*snapshot_buf, snapshot->snapshot_meta->get_last_log_idx());
latest_snapshot_info = std::move(snapshot_info);
latest_snapshot_buf = std::move(snapshot_buf);
}