From bb77441acb488bb2a32f691ce724b2950ec0d9ba Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Thu, 25 May 2023 13:31:11 +0000 Subject: [PATCH] Add support for old storage disks --- src/Coordination/KeeperContext.cpp | 30 ++++-- src/Coordination/KeeperContext.h | 2 + src/Coordination/KeeperSnapshotManager.cpp | 118 +++++++++++++-------- src/Coordination/KeeperSnapshotManager.h | 5 +- src/Coordination/KeeperStateMachine.cpp | 2 +- 5 files changed, 103 insertions(+), 54 deletions(-) diff --git a/src/Coordination/KeeperContext.cpp b/src/Coordination/KeeperContext.cpp index da49868f706..1d6f1be9bfb 100644 --- a/src/Coordination/KeeperContext.cpp +++ b/src/Coordination/KeeperContext.cpp @@ -8,7 +8,6 @@ namespace DB { - KeeperContext::KeeperContext(bool standalone_keeper_) : disk_selector(std::make_shared()) , standalone_keeper(standalone_keeper_) @@ -28,13 +27,19 @@ void KeeperContext::initialize(const Poco::Util::AbstractConfiguration & config) else current_log_storage = log_storage; - Poco::Util::AbstractConfiguration::Keys old_log_disk_name_keys; - config.keys("keeper_server", old_log_disk_name_keys); - for (const auto & key : old_log_disk_name_keys) + const auto collect_old_disk_names = [&](const std::string_view key_prefix, std::vector & disk_names) { - if (key.starts_with("old_log_storage_disk")) - old_log_disk_names.push_back(config.getString("keeper_server." + key)); - } + Poco::Util::AbstractConfiguration::Keys disk_name_keys; + config.keys("keeper_server", disk_name_keys); + for (const auto & key : disk_name_keys) + { + if (key.starts_with(key_prefix)) + disk_names.push_back(config.getString(fmt::format("keeper_server.{}", key_prefix))); + } + }; + + collect_old_disk_names("old_log_storage_disk", old_log_disk_names); + collect_old_disk_names("old_snapshot_storage_disk", old_snapshot_disk_names); snapshot_storage = getSnapshotsPathFromConfig(config); @@ -107,6 +112,17 @@ DiskPtr KeeperContext::getSnapshotDisk() const return getDisk(snapshot_storage); } +std::vector KeeperContext::getOldSnapshotDisks() const +{ + std::vector old_snapshot_disks; + old_snapshot_disks.reserve(old_snapshot_disk_names.size()); + + for (const auto & disk_name : old_snapshot_disk_names) + old_snapshot_disks.push_back(disk_selector->get(disk_name)); + + return old_snapshot_disks; +} + void KeeperContext::setSnapshotDisk(DiskPtr disk) { snapshot_storage = std::move(disk); diff --git a/src/Coordination/KeeperContext.h b/src/Coordination/KeeperContext.h index 6b7af3a60db..e41d8e35032 100644 --- a/src/Coordination/KeeperContext.h +++ b/src/Coordination/KeeperContext.h @@ -40,6 +40,7 @@ public: void setLogDisk(DiskPtr disk); DiskPtr getSnapshotDisk() const; + std::vector getOldSnapshotDisks() const; void setSnapshotDisk(DiskPtr disk); DiskPtr getStateFileDisk() const; @@ -69,6 +70,7 @@ private: Storage state_file_storage; std::vector old_log_disk_names; + std::vector old_snapshot_disk_names; bool standalone_keeper; }; diff --git a/src/Coordination/KeeperSnapshotManager.cpp b/src/Coordination/KeeperSnapshotManager.cpp index 3bfe700bcd5..77abbfb2054 100644 --- a/src/Coordination/KeeperSnapshotManager.cpp +++ b/src/Coordination/KeeperSnapshotManager.cpp @@ -519,49 +519,79 @@ KeeperSnapshotManager::KeeperSnapshotManager( , storage_tick_time(storage_tick_time_) , keeper_context(keeper_context_) { + const auto load_snapshot_from_disk = [&](DiskPtr disk) + { + std::unordered_set invalid_snapshots; + /// collect invalid snapshots + for (auto it = disk->iterateDirectory(""); it->isValid(); it->next()) + { + const auto & name = it->name(); + if (name.empty()) + continue; + + if (startsWith(name, "tmp_")) + { + disk->removeFile(it->path()); + invalid_snapshots.insert(name.substr(4)); + continue; + } + } + + /// process snapshots + for (auto it = disk->iterateDirectory(""); it->isValid(); it->next()) + { + const auto & name = it->name(); + if (name.empty()) + continue; + + /// Not snapshot file + if (!startsWith(name, "snapshot_")) + continue; + + if (invalid_snapshots.contains(name)) + { + disk->removeFile(it->path()); + continue; + } + + size_t snapshot_up_to = getSnapshotPathUpToLogIdx(name); + auto [_, inserted] = existing_snapshots.insert_or_assign(snapshot_up_to, SnapshotFileInfo{it->path(), disk}); + + if (!inserted) + LOG_WARNING( + &Poco::Logger::get("KeeperSnapshotManager"), + "Found another snapshots with last log idx {}, will use snapshot from disk {}", + snapshot_up_to, + disk->getName()); + } + }; + + for (const auto & disk : keeper_context->getOldSnapshotDisks()) + load_snapshot_from_disk(disk); + auto disk = getDisk(); - - std::unordered_set invalid_snapshots; - /// collect invalid snapshots - for (auto it = disk->iterateDirectory(""); it->isValid(); it->next()) - { - const auto & name = it->name(); - if (name.empty()) - continue; - - if (startsWith(name, "tmp_")) - { - disk->removeFile(it->path()); - invalid_snapshots.insert(name.substr(4)); - continue; - } - - } - - /// process snapshots - for (auto it = disk->iterateDirectory(""); it->isValid(); it->next()) - { - const auto & name = it->name(); - if (name.empty()) - continue; - - /// Not snapshot file - if (!startsWith(name, "snapshot_")) - continue; - - if (invalid_snapshots.contains(name)) - { - disk->removeFile(it->path()); - continue; - } - - size_t snapshot_up_to = getSnapshotPathUpToLogIdx(name); - existing_snapshots[snapshot_up_to] = it->path(); - } + load_snapshot_from_disk(disk); removeOutdatedSnapshotsIfNeeded(); -} + /// move snapshots from old disks to new one + for (auto & [_, file_info] : existing_snapshots) + { + if (file_info.disk == disk) + continue; + + auto file_path = fs::path(file_info.path); + auto tmp_snapshot_path = file_path.parent_path() / ("tmp_" + file_path.filename().generic_string()); + + { + disk->writeFile(tmp_snapshot_path); + } + + file_info.disk->copyFile(file_info.path, *disk, file_info.path, {}); + disk->removeFile(tmp_snapshot_path); + file_info.disk = disk; + } +} SnapshotFileInfo KeeperSnapshotManager::serializeSnapshotBufferToDisk(nuraft::buffer & buffer, uint64_t up_to_log_idx) { @@ -599,7 +629,8 @@ nuraft::ptr KeeperSnapshotManager::deserializeLatestSnapshotBuff } catch (const DB::Exception &) { - getDisk()->removeFile(latest_itr->second); + const auto & [path, disk] = latest_itr->second; + disk->removeFile(path); existing_snapshots.erase(latest_itr->first); tryLogCurrentException(__PRETTY_FUNCTION__); } @@ -610,9 +641,9 @@ nuraft::ptr KeeperSnapshotManager::deserializeLatestSnapshotBuff nuraft::ptr KeeperSnapshotManager::deserializeSnapshotBufferFromDisk(uint64_t up_to_log_idx) const { - const std::string & snapshot_path = existing_snapshots.at(up_to_log_idx); + const auto & [snapshot_path, snapshot_disk] = existing_snapshots.at(up_to_log_idx); WriteBufferFromNuraftBuffer writer; - auto reader = getDisk()->readFile(snapshot_path); + auto reader = snapshot_disk->readFile(snapshot_path); copyData(*reader, writer); return writer.getBuffer(); } @@ -690,7 +721,8 @@ void KeeperSnapshotManager::removeSnapshot(uint64_t log_idx) auto itr = existing_snapshots.find(log_idx); if (itr == existing_snapshots.end()) throw Exception(ErrorCodes::UNKNOWN_SNAPSHOT, "Unknown snapshot with log index {}", log_idx); - getDisk()->removeFile(itr->second); + const auto & [path, disk] = itr->second; + disk->removeFile(path); existing_snapshots.erase(itr); } diff --git a/src/Coordination/KeeperSnapshotManager.h b/src/Coordination/KeeperSnapshotManager.h index 036c0cab62b..0afe582ef59 100644 --- a/src/Coordination/KeeperSnapshotManager.h +++ b/src/Coordination/KeeperSnapshotManager.h @@ -149,11 +149,10 @@ public: { if (!existing_snapshots.empty()) { - const auto & path = existing_snapshots.at(getLatestSnapshotIndex()); + const auto & [path, disk] = existing_snapshots.at(getLatestSnapshotIndex()); try { - auto disk = getDisk(); if (disk->exists(path)) return {path, disk}; } @@ -176,7 +175,7 @@ private: /// How many snapshots to keep before remove const size_t snapshots_to_keep; /// All existing snapshots in our path (log_index -> path) - std::map existing_snapshots; + std::map existing_snapshots; /// Compress snapshots in common ZSTD format instead of custom ClickHouse block LZ4 format const bool compress_snapshots_zstd; /// Superdigest for deserialization of storage diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index 5cfc9333a66..9a1cb1941b7 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -404,11 +404,11 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res } else { + latest_snapshot_meta = snapshot->snapshot_meta; /// we rely on the fact that the snapshot disk cannot be changed during runtime if (isLocalDisk(*keeper_context->getSnapshotDisk())) { latest_snapshot_info = snapshot_manager.serializeSnapshotToDisk(*snapshot); - latest_snapshot_meta = snapshot->snapshot_meta; latest_snapshot_buf = nullptr; } else