Move broken Keeper snapshots to detached folder

This commit is contained in:
Antonio Andelic 2024-02-27 15:10:30 +01:00
parent c8db5403d7
commit 2c8759442d
10 changed files with 147 additions and 81 deletions

View File

@ -4,6 +4,7 @@
#include <Coordination/CoordinationSettings.h>
#include <Coordination/KeeperSnapshotManager.h>
#include <Coordination/ZooKeeperDataReader.h>
#include <Coordination/KeeperContext.h>
#include <Common/TerminalSize.h>
#include <Poco/ConsoleChannel.h>
#include <Poco/AutoPtr.h>

View File

@ -1843,24 +1843,6 @@ void Changelog::initWriter(ChangelogFileDescriptionPtr description)
current_writer->setFile(std::move(description), WriteMode::Append);
}
namespace
{
std::string getCurrentTimestampFolder()
{
const auto timestamp = LocalDateTime{std::time(nullptr)};
return fmt::format(
"{:02}{:02}{:02}T{:02}{:02}{:02}",
timestamp.year(),
timestamp.month(),
timestamp.day(),
timestamp.hour(),
timestamp.minute(),
timestamp.second());
}
}
DiskPtr Changelog::getDisk() const
{
return keeper_context->getLogDisk();

View File

@ -119,4 +119,18 @@ void moveFileBetweenDisks(
if (!run_with_retries([&] { disk_from->removeFileIfExists(path_from); }, "removing file from source disk"))
return;
}
std::string getCurrentTimestampFolder()
{
const auto timestamp = LocalDateTime{std::time(nullptr)};
return fmt::format(
"{:02}{:02}{:02}T{:02}{:02}{:02}",
timestamp.year(),
timestamp.month(),
timestamp.day(),
timestamp.hour(),
timestamp.minute(),
timestamp.second());
}
}

View File

@ -26,4 +26,6 @@ void moveFileBetweenDisks(
LoggerPtr logger,
const KeeperContextPtr & keeper_context);
std::string getCurrentTimestampFolder();
}

View File

@ -13,6 +13,8 @@
#include <Common/logger_useful.h>
#include <Common/formatReadable.h>
#include <Disks/IDisk.h>
#include <atomic>
#include <future>
#include <chrono>

View File

@ -1,24 +1,24 @@
#include <filesystem>
#include <memory>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Coordination/CoordinationSettings.h>
#include <Coordination/KeeperCommon.h>
#include <Coordination/KeeperConstants.h>
#include <Coordination/KeeperContext.h>
#include <Coordination/KeeperSnapshotManager.h>
#include <Coordination/ReadBufferFromNuraftBuffer.h>
#include <Coordination/WriteBufferFromNuraftBuffer.h>
#include <Coordination/CoordinationSettings.h>
#include <Core/Field.h>
#include <Disks/DiskLocal.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <filesystem>
#include <memory>
#include <Common/logger_useful.h>
#include <Coordination/KeeperContext.h>
#include <Coordination/KeeperCommon.h>
#include <Coordination/KeeperConstants.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Core/Field.h>
#include <Disks/DiskLocal.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Common/logger_useful.h>
namespace DB
{
@ -569,7 +569,8 @@ KeeperSnapshotManager::KeeperSnapshotManager(
bool compress_snapshots_zstd_,
const std::string & superdigest_,
size_t storage_tick_time_)
: snapshots_to_keep(snapshots_to_keep_)
: snapshots_detached_dir("detached")
, snapshots_to_keep(snapshots_to_keep_)
, compress_snapshots_zstd(compress_snapshots_zstd_)
, superdigest(superdigest_)
, storage_tick_time(storage_tick_time_)
@ -779,7 +780,7 @@ DiskPtr KeeperSnapshotManager::getLatestSnapshotDisk() const
void KeeperSnapshotManager::removeOutdatedSnapshotsIfNeeded()
{
while (existing_snapshots.size() > snapshots_to_keep)
removeSnapshot(existing_snapshots.begin()->first);
removeSnapshot(existing_snapshots.begin()->first, /*detach=*/false);
}
void KeeperSnapshotManager::moveSnapshotsIfNeeded()
@ -812,13 +813,50 @@ void KeeperSnapshotManager::moveSnapshotsIfNeeded()
}
void KeeperSnapshotManager::removeSnapshot(uint64_t log_idx)
void KeeperSnapshotManager::removeSnapshot(uint64_t log_idx, bool detach)
{
auto itr = existing_snapshots.find(log_idx);
if (itr == existing_snapshots.end())
throw Exception(ErrorCodes::UNKNOWN_SNAPSHOT, "Unknown snapshot with log index {}", log_idx);
const auto & [path, disk] = itr->second;
disk->removeFileIfExists(path);
const auto & [path_string, snapshot_disk] = itr->second;
std::filesystem::path path(path_string);
if (!detach)
{
snapshot_disk->removeFileIfExists(path);
existing_snapshots.erase(itr);
return;
}
auto disk = getDisk();
const auto timestamp_folder = (fs::path(snapshots_detached_dir) / getCurrentTimestampFolder()).generic_string();
if (!disk->exists(timestamp_folder))
{
LOG_WARNING(log, "Moving broken snapshot to {}", timestamp_folder);
disk->createDirectories(timestamp_folder);
}
LOG_WARNING(log, "Removing snapshot {}", path);
const auto new_path = timestamp_folder / path.filename();
if (snapshot_disk == disk)
{
try
{
disk->moveFile(path.generic_string(), new_path.generic_string());
}
catch (const DB::Exception & e)
{
if (e.code() == DB::ErrorCodes::NOT_IMPLEMENTED)
moveSnapshotBetweenDisks(snapshot_disk, path, disk, new_path, keeper_context);
}
}
else
moveSnapshotBetweenDisks(snapshot_disk, path, disk, new_path, keeper_context);
existing_snapshots.erase(itr);
}
@ -862,4 +900,29 @@ SnapshotFileInfo KeeperSnapshotManager::serializeSnapshotToDisk(const KeeperStor
return {snapshot_file_name, disk};
}
size_t KeeperSnapshotManager::getLatestSnapshotIndex() const
{
if (!existing_snapshots.empty())
return existing_snapshots.rbegin()->first;
return 0;
}
SnapshotFileInfo KeeperSnapshotManager::getLatestSnapshotInfo() const
{
if (!existing_snapshots.empty())
{
const auto & [path, disk] = existing_snapshots.at(getLatestSnapshotIndex());
try
{
if (disk->exists(path))
return {path, disk};
}
catch (...)
{
}
}
return {"", nullptr};
}
}

View File

@ -1,12 +1,6 @@
#pragma once
#include <filesystem>
#include <system_error>
#include <Coordination/KeeperStorage.h>
#include <IO/ReadBuffer.h>
#include <IO/WriteBuffer.h>
#include <libnuraft/nuraft.hxx>
#include <Coordination/KeeperContext.h>
#include <Disks/IDisk.h>
namespace DB
{
@ -16,6 +10,15 @@ using SnapshotMetadataPtr = std::shared_ptr<SnapshotMetadata>;
using ClusterConfig = nuraft::cluster_config;
using ClusterConfigPtr = nuraft::ptr<ClusterConfig>;
class WriteBuffer;
class ReadBuffer;
class KeeperContext;
using KeeperContextPtr = std::shared_ptr<KeeperContext>;
class IDisk;
using DiskPtr = std::shared_ptr<IDisk>;
enum SnapshotVersion : uint8_t
{
V0 = 0,
@ -133,36 +136,15 @@ public:
nuraft::ptr<nuraft::buffer> deserializeLatestSnapshotBufferFromDisk();
/// Remove snapshot with this log_index
void removeSnapshot(uint64_t log_idx);
void removeSnapshot(uint64_t log_idx, bool detach);
/// Total amount of snapshots
size_t totalSnapshots() const { return existing_snapshots.size(); }
/// The most fresh snapshot log index we have
size_t getLatestSnapshotIndex() const
{
if (!existing_snapshots.empty())
return existing_snapshots.rbegin()->first;
return 0;
}
size_t getLatestSnapshotIndex() const;
SnapshotFileInfo getLatestSnapshotInfo() const
{
if (!existing_snapshots.empty())
{
const auto & [path, disk] = existing_snapshots.at(getLatestSnapshotIndex());
try
{
if (disk->exists(path))
return {path, disk};
}
catch (...)
{
}
}
return {"", nullptr};
}
SnapshotFileInfo getLatestSnapshotInfo() const;
private:
void removeOutdatedSnapshotsIfNeeded();
@ -175,6 +157,8 @@ private:
/// ZSTD codec.
static bool isZstdCompressed(nuraft::ptr<nuraft::buffer> buffer);
const std::string snapshots_detached_dir;
/// How many snapshots to keep before remove
const size_t snapshots_to_keep;
/// All existing snapshots in our path (log_index -> path)

View File

@ -6,6 +6,8 @@
#include <Common/Exception.h>
#include <Common/setThreadName.h>
#include <Disks/IDisk.h>
#include <IO/S3/getObjectInfo.h>
#include <IO/S3/Credentials.h>
#include <IO/WriteBufferFromS3.h>

View File

@ -109,7 +109,7 @@ void KeeperStateMachine::init()
"Failed to load from snapshot with index {}, with error {}, will remove it from disk",
latest_log_index,
ex.displayText());
snapshot_manager.removeSnapshot(latest_log_index);
snapshot_manager.removeSnapshot(latest_log_index, /*detach=*/true);
}
}

View File

@ -949,12 +949,12 @@ TEST_P(CoordinationTest, ChangelogTestStartNewLogAfterRead)
namespace
{
void assertBrokenLogRemoved(const fs::path & log_folder, const fs::path & filename)
void assertBrokenFileRemoved(const fs::path & directory, const fs::path & filename)
{
EXPECT_FALSE(fs::exists(log_folder / filename));
// broken logs are sent to the detached/{timestamp} folder
EXPECT_FALSE(fs::exists(directory / filename));
// broken files are sent to the detached/{timestamp} folder
// we don't know timestamp so we iterate all of them
for (const auto & dir_entry : fs::recursive_directory_iterator(log_folder / "detached"))
for (const auto & dir_entry : fs::recursive_directory_iterator(directory / "detached"))
{
if (dir_entry.path().filename() == filename)
return;
@ -1014,10 +1014,10 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate)
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + params.extension));
assertBrokenLogRemoved(log_folder, "changelog_16_20.bin" + params.extension);
assertBrokenLogRemoved(log_folder, "changelog_21_25.bin" + params.extension);
assertBrokenLogRemoved(log_folder, "changelog_26_30.bin" + params.extension);
assertBrokenLogRemoved(log_folder, "changelog_31_35.bin" + params.extension);
assertBrokenFileRemoved(log_folder, "changelog_16_20.bin" + params.extension);
assertBrokenFileRemoved(log_folder, "changelog_21_25.bin" + params.extension);
assertBrokenFileRemoved(log_folder, "changelog_26_30.bin" + params.extension);
assertBrokenFileRemoved(log_folder, "changelog_31_35.bin" + params.extension);
auto entry = getLogEntry("h", 7777);
changelog_reader.append(entry);
@ -1031,10 +1031,10 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate)
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin" + params.extension));
assertBrokenLogRemoved(log_folder, "changelog_16_20.bin" + params.extension);
assertBrokenLogRemoved(log_folder, "changelog_21_25.bin" + params.extension);
assertBrokenLogRemoved(log_folder, "changelog_26_30.bin" + params.extension);
assertBrokenLogRemoved(log_folder, "changelog_31_35.bin" + params.extension);
assertBrokenFileRemoved(log_folder, "changelog_16_20.bin" + params.extension);
assertBrokenFileRemoved(log_folder, "changelog_21_25.bin" + params.extension);
assertBrokenFileRemoved(log_folder, "changelog_26_30.bin" + params.extension);
assertBrokenFileRemoved(log_folder, "changelog_31_35.bin" + params.extension);
DB::KeeperLogStore changelog_reader2(
DB::LogFileSettings{.force_sync = true, .compress_logs = params.enable_compression, .rotate_interval = 5},
@ -1081,7 +1081,7 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2)
EXPECT_EQ(changelog_reader.size(), 0);
EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin" + params.extension));
assertBrokenLogRemoved("./logs", "changelog_21_40.bin" + params.extension);
assertBrokenFileRemoved("./logs", "changelog_21_40.bin" + params.extension);
auto entry = getLogEntry("hello_world", 7777);
changelog_reader.append(entry);
changelog_reader.end_of_append_batch(0, 0);
@ -1138,7 +1138,7 @@ TEST_F(CoordinationTest, ChangelogTestReadAfterBrokenTruncate3)
EXPECT_EQ(changelog_reader.size(), 19);
EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin"));
assertBrokenLogRemoved("./logs", "changelog_21_40.bin");
assertBrokenFileRemoved("./logs", "changelog_21_40.bin");
EXPECT_TRUE(fs::exists("./logs/changelog_20_39.bin"));
auto entry = getLogEntry("hello_world", 7777);
changelog_reader.append(entry);
@ -1277,7 +1277,7 @@ TEST_P(CoordinationTest, ChangelogTestLostFiles)
keeper_context);
/// It should print error message, but still able to start
changelog_reader.init(5, 0);
assertBrokenLogRemoved("./logs", "changelog_21_40.bin" + params.extension);
assertBrokenFileRemoved("./logs", "changelog_21_40.bin" + params.extension);
}
TEST_P(CoordinationTest, ChangelogTestLostFiles2)
@ -1318,7 +1318,7 @@ TEST_P(CoordinationTest, ChangelogTestLostFiles2)
EXPECT_TRUE(fs::exists("./logs/changelog_1_10.bin" + params.extension));
EXPECT_TRUE(fs::exists("./logs/changelog_11_20.bin" + params.extension));
assertBrokenLogRemoved("./logs", "changelog_31_40.bin" + params.extension);
assertBrokenFileRemoved("./logs", "changelog_31_40.bin" + params.extension);
}
struct IntNode
{
@ -1789,6 +1789,8 @@ void testLogAndStateMachine(
DB::FlushSettings(),
keeper_context);
changelog.init(state_machine->last_commit_index() + 1, settings->reserved_log_items);
std::optional<SnapshotFileInfo> latest_snapshot;
for (size_t i = 1; i < total_logs + 1; ++i)
{
std::shared_ptr<ZooKeeperCreateRequest> request = std::make_shared<ZooKeeperCreateRequest>();
@ -1817,7 +1819,7 @@ void testLogAndStateMachine(
bool pop_result = snapshots_queue.pop(snapshot_task);
EXPECT_TRUE(pop_result);
snapshot_task.create_snapshot(std::move(snapshot_task.snapshot));
latest_snapshot = snapshot_task.create_snapshot(std::move(snapshot_task.snapshot));
}
if (snapshot_created && changelog.size() > settings->reserved_log_items)
@ -1860,6 +1862,20 @@ void testLogAndStateMachine(
auto path = "/hello_" + std::to_string(i);
EXPECT_EQ(source_storage.container.getValue(path).getData(), restored_storage.container.getValue(path).getData());
}
if (latest_snapshot.has_value())
{
const auto & [path, disk] = *latest_snapshot;
EXPECT_TRUE(disk->exists(path));
DB::WriteBufferFromFile plain_buf(
fs::path("./snapshots") / path, DB::DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
plain_buf.truncate(0);
SnapshotsQueue snapshots_queue2{1};
keeper_context = get_keeper_context();
auto invalid_snapshot_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue2, keeper_context, nullptr);
invalid_snapshot_machine->init();
assertBrokenFileRemoved("./snapshots", fs::path(path).filename());
}
}
TEST_P(CoordinationTest, TestStateMachineAndLogStore)