Add support for old storage disks

This commit is contained in:
Antonio Andelic 2023-05-25 13:31:11 +00:00
parent f519aa4613
commit bb77441acb
5 changed files with 103 additions and 54 deletions

View File

@ -8,7 +8,6 @@
namespace DB namespace DB
{ {
KeeperContext::KeeperContext(bool standalone_keeper_) KeeperContext::KeeperContext(bool standalone_keeper_)
: disk_selector(std::make_shared<DiskSelector>()) : disk_selector(std::make_shared<DiskSelector>())
, standalone_keeper(standalone_keeper_) , standalone_keeper(standalone_keeper_)
@ -28,13 +27,19 @@ void KeeperContext::initialize(const Poco::Util::AbstractConfiguration & config)
else else
current_log_storage = log_storage; current_log_storage = log_storage;
Poco::Util::AbstractConfiguration::Keys old_log_disk_name_keys; const auto collect_old_disk_names = [&](const std::string_view key_prefix, std::vector<std::string> & disk_names)
config.keys("keeper_server", old_log_disk_name_keys);
for (const auto & key : old_log_disk_name_keys)
{ {
if (key.starts_with("old_log_storage_disk")) Poco::Util::AbstractConfiguration::Keys disk_name_keys;
old_log_disk_names.push_back(config.getString("keeper_server." + key)); config.keys("keeper_server", disk_name_keys);
for (const auto & key : disk_name_keys)
{
if (key.starts_with(key_prefix))
disk_names.push_back(config.getString(fmt::format("keeper_server.{}", key_prefix)));
} }
};
collect_old_disk_names("old_log_storage_disk", old_log_disk_names);
collect_old_disk_names("old_snapshot_storage_disk", old_snapshot_disk_names);
snapshot_storage = getSnapshotsPathFromConfig(config); snapshot_storage = getSnapshotsPathFromConfig(config);
@ -107,6 +112,17 @@ DiskPtr KeeperContext::getSnapshotDisk() const
return getDisk(snapshot_storage); return getDisk(snapshot_storage);
} }
std::vector<DiskPtr> KeeperContext::getOldSnapshotDisks() const
{
std::vector<DiskPtr> old_snapshot_disks;
old_snapshot_disks.reserve(old_snapshot_disk_names.size());
for (const auto & disk_name : old_snapshot_disk_names)
old_snapshot_disks.push_back(disk_selector->get(disk_name));
return old_snapshot_disks;
}
void KeeperContext::setSnapshotDisk(DiskPtr disk) void KeeperContext::setSnapshotDisk(DiskPtr disk)
{ {
snapshot_storage = std::move(disk); snapshot_storage = std::move(disk);

View File

@ -40,6 +40,7 @@ public:
void setLogDisk(DiskPtr disk); void setLogDisk(DiskPtr disk);
DiskPtr getSnapshotDisk() const; DiskPtr getSnapshotDisk() const;
std::vector<DiskPtr> getOldSnapshotDisks() const;
void setSnapshotDisk(DiskPtr disk); void setSnapshotDisk(DiskPtr disk);
DiskPtr getStateFileDisk() const; DiskPtr getStateFileDisk() const;
@ -69,6 +70,7 @@ private:
Storage state_file_storage; Storage state_file_storage;
std::vector<std::string> old_log_disk_names; std::vector<std::string> old_log_disk_names;
std::vector<std::string> old_snapshot_disk_names;
bool standalone_keeper; bool standalone_keeper;
}; };

View File

@ -519,8 +519,8 @@ KeeperSnapshotManager::KeeperSnapshotManager(
, storage_tick_time(storage_tick_time_) , storage_tick_time(storage_tick_time_)
, keeper_context(keeper_context_) , keeper_context(keeper_context_)
{ {
auto disk = getDisk(); const auto load_snapshot_from_disk = [&](DiskPtr disk)
{
std::unordered_set<std::string> invalid_snapshots; std::unordered_set<std::string> invalid_snapshots;
/// collect invalid snapshots /// collect invalid snapshots
for (auto it = disk->iterateDirectory(""); it->isValid(); it->next()) for (auto it = disk->iterateDirectory(""); it->isValid(); it->next())
@ -535,7 +535,6 @@ KeeperSnapshotManager::KeeperSnapshotManager(
invalid_snapshots.insert(name.substr(4)); invalid_snapshots.insert(name.substr(4));
continue; continue;
} }
} }
/// process snapshots /// process snapshots
@ -556,12 +555,43 @@ KeeperSnapshotManager::KeeperSnapshotManager(
} }
size_t snapshot_up_to = getSnapshotPathUpToLogIdx(name); size_t snapshot_up_to = getSnapshotPathUpToLogIdx(name);
existing_snapshots[snapshot_up_to] = it->path(); auto [_, inserted] = existing_snapshots.insert_or_assign(snapshot_up_to, SnapshotFileInfo{it->path(), disk});
if (!inserted)
LOG_WARNING(
&Poco::Logger::get("KeeperSnapshotManager"),
"Found another snapshots with last log idx {}, will use snapshot from disk {}",
snapshot_up_to,
disk->getName());
} }
};
for (const auto & disk : keeper_context->getOldSnapshotDisks())
load_snapshot_from_disk(disk);
auto disk = getDisk();
load_snapshot_from_disk(disk);
removeOutdatedSnapshotsIfNeeded(); removeOutdatedSnapshotsIfNeeded();
}
/// move snapshots from old disks to new one
for (auto & [_, file_info] : existing_snapshots)
{
if (file_info.disk == disk)
continue;
auto file_path = fs::path(file_info.path);
auto tmp_snapshot_path = file_path.parent_path() / ("tmp_" + file_path.filename().generic_string());
{
disk->writeFile(tmp_snapshot_path);
}
file_info.disk->copyFile(file_info.path, *disk, file_info.path, {});
disk->removeFile(tmp_snapshot_path);
file_info.disk = disk;
}
}
SnapshotFileInfo KeeperSnapshotManager::serializeSnapshotBufferToDisk(nuraft::buffer & buffer, uint64_t up_to_log_idx) SnapshotFileInfo KeeperSnapshotManager::serializeSnapshotBufferToDisk(nuraft::buffer & buffer, uint64_t up_to_log_idx)
{ {
@ -599,7 +629,8 @@ nuraft::ptr<nuraft::buffer> KeeperSnapshotManager::deserializeLatestSnapshotBuff
} }
catch (const DB::Exception &) catch (const DB::Exception &)
{ {
getDisk()->removeFile(latest_itr->second); const auto & [path, disk] = latest_itr->second;
disk->removeFile(path);
existing_snapshots.erase(latest_itr->first); existing_snapshots.erase(latest_itr->first);
tryLogCurrentException(__PRETTY_FUNCTION__); tryLogCurrentException(__PRETTY_FUNCTION__);
} }
@ -610,9 +641,9 @@ nuraft::ptr<nuraft::buffer> KeeperSnapshotManager::deserializeLatestSnapshotBuff
nuraft::ptr<nuraft::buffer> KeeperSnapshotManager::deserializeSnapshotBufferFromDisk(uint64_t up_to_log_idx) const nuraft::ptr<nuraft::buffer> KeeperSnapshotManager::deserializeSnapshotBufferFromDisk(uint64_t up_to_log_idx) const
{ {
const std::string & snapshot_path = existing_snapshots.at(up_to_log_idx); const auto & [snapshot_path, snapshot_disk] = existing_snapshots.at(up_to_log_idx);
WriteBufferFromNuraftBuffer writer; WriteBufferFromNuraftBuffer writer;
auto reader = getDisk()->readFile(snapshot_path); auto reader = snapshot_disk->readFile(snapshot_path);
copyData(*reader, writer); copyData(*reader, writer);
return writer.getBuffer(); return writer.getBuffer();
} }
@ -690,7 +721,8 @@ void KeeperSnapshotManager::removeSnapshot(uint64_t log_idx)
auto itr = existing_snapshots.find(log_idx); auto itr = existing_snapshots.find(log_idx);
if (itr == existing_snapshots.end()) if (itr == existing_snapshots.end())
throw Exception(ErrorCodes::UNKNOWN_SNAPSHOT, "Unknown snapshot with log index {}", log_idx); throw Exception(ErrorCodes::UNKNOWN_SNAPSHOT, "Unknown snapshot with log index {}", log_idx);
getDisk()->removeFile(itr->second); const auto & [path, disk] = itr->second;
disk->removeFile(path);
existing_snapshots.erase(itr); existing_snapshots.erase(itr);
} }

View File

@ -149,11 +149,10 @@ public:
{ {
if (!existing_snapshots.empty()) if (!existing_snapshots.empty())
{ {
const auto & path = existing_snapshots.at(getLatestSnapshotIndex()); const auto & [path, disk] = existing_snapshots.at(getLatestSnapshotIndex());
try try
{ {
auto disk = getDisk();
if (disk->exists(path)) if (disk->exists(path))
return {path, disk}; return {path, disk};
} }
@ -176,7 +175,7 @@ private:
/// How many snapshots to keep before remove /// How many snapshots to keep before remove
const size_t snapshots_to_keep; const size_t snapshots_to_keep;
/// All existing snapshots in our path (log_index -> path) /// All existing snapshots in our path (log_index -> path)
std::map<uint64_t, std::string> existing_snapshots; std::map<uint64_t, SnapshotFileInfo> existing_snapshots;
/// Compress snapshots in common ZSTD format instead of custom ClickHouse block LZ4 format /// Compress snapshots in common ZSTD format instead of custom ClickHouse block LZ4 format
const bool compress_snapshots_zstd; const bool compress_snapshots_zstd;
/// Superdigest for deserialization of storage /// Superdigest for deserialization of storage

View File

@ -404,11 +404,11 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
} }
else else
{ {
latest_snapshot_meta = snapshot->snapshot_meta;
/// we rely on the fact that the snapshot disk cannot be changed during runtime /// we rely on the fact that the snapshot disk cannot be changed during runtime
if (isLocalDisk(*keeper_context->getSnapshotDisk())) if (isLocalDisk(*keeper_context->getSnapshotDisk()))
{ {
latest_snapshot_info = snapshot_manager.serializeSnapshotToDisk(*snapshot); latest_snapshot_info = snapshot_manager.serializeSnapshotToDisk(*snapshot);
latest_snapshot_meta = snapshot->snapshot_meta;
latest_snapshot_buf = nullptr; latest_snapshot_buf = nullptr;
} }
else else