mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 08:02:02 +00:00
Abort on invalid snapshots
This commit is contained in:
parent
f6d35cf4b1
commit
f7abe7601e
@ -780,7 +780,7 @@ DiskPtr KeeperSnapshotManager::getLatestSnapshotDisk() const
|
||||
void KeeperSnapshotManager::removeOutdatedSnapshotsIfNeeded()
|
||||
{
|
||||
while (existing_snapshots.size() > snapshots_to_keep)
|
||||
removeSnapshot(existing_snapshots.begin()->first, /*detach=*/false);
|
||||
removeSnapshot(existing_snapshots.begin()->first);
|
||||
}
|
||||
|
||||
void KeeperSnapshotManager::moveSnapshotsIfNeeded()
|
||||
@ -813,50 +813,14 @@ void KeeperSnapshotManager::moveSnapshotsIfNeeded()
|
||||
|
||||
}
|
||||
|
||||
void KeeperSnapshotManager::removeSnapshot(uint64_t log_idx, bool detach)
|
||||
void KeeperSnapshotManager::removeSnapshot(uint64_t log_idx)
|
||||
{
|
||||
auto itr = existing_snapshots.find(log_idx);
|
||||
if (itr == existing_snapshots.end())
|
||||
throw Exception(ErrorCodes::UNKNOWN_SNAPSHOT, "Unknown snapshot with log index {}", log_idx);
|
||||
|
||||
const auto & [path_string, snapshot_disk] = itr->second;
|
||||
std::filesystem::path path(path_string);
|
||||
|
||||
if (!detach)
|
||||
{
|
||||
snapshot_disk->removeFileIfExists(path);
|
||||
existing_snapshots.erase(itr);
|
||||
return;
|
||||
}
|
||||
|
||||
auto disk = getDisk();
|
||||
|
||||
const auto timestamp_folder = (fs::path(snapshots_detached_dir) / getCurrentTimestampFolder()).generic_string();
|
||||
|
||||
if (!disk->exists(timestamp_folder))
|
||||
{
|
||||
LOG_WARNING(log, "Moving broken snapshot to {}", timestamp_folder);
|
||||
disk->createDirectories(timestamp_folder);
|
||||
}
|
||||
|
||||
LOG_WARNING(log, "Removing snapshot {}", path);
|
||||
const auto new_path = timestamp_folder / path.filename();
|
||||
|
||||
if (snapshot_disk == disk)
|
||||
{
|
||||
try
|
||||
{
|
||||
disk->moveFile(path.generic_string(), new_path.generic_string());
|
||||
}
|
||||
catch (const DB::Exception & e)
|
||||
{
|
||||
if (e.code() == DB::ErrorCodes::NOT_IMPLEMENTED)
|
||||
moveSnapshotBetweenDisks(snapshot_disk, path, disk, new_path, keeper_context);
|
||||
}
|
||||
}
|
||||
else
|
||||
moveSnapshotBetweenDisks(snapshot_disk, path, disk, new_path, keeper_context);
|
||||
|
||||
const auto & [path, disk] = itr->second;
|
||||
disk->removeFileIfExists(path);
|
||||
existing_snapshots.erase(itr);
|
||||
}
|
||||
|
||||
|
@ -136,7 +136,7 @@ public:
|
||||
nuraft::ptr<nuraft::buffer> deserializeLatestSnapshotBufferFromDisk();
|
||||
|
||||
/// Remove snapshot with this log_index
|
||||
void removeSnapshot(uint64_t log_idx, bool detach);
|
||||
void removeSnapshot(uint64_t log_idx);
|
||||
|
||||
/// Total amount of snapshots
|
||||
size_t totalSnapshots() const { return existing_snapshots.size(); }
|
||||
|
@ -83,29 +83,32 @@ void KeeperStateMachine::init()
|
||||
uint64_t latest_log_index = snapshot_manager.getLatestSnapshotIndex();
|
||||
LOG_DEBUG(log, "Trying to load state machine from snapshot up to log index {}", latest_log_index);
|
||||
|
||||
try
|
||||
if (has_snapshots)
|
||||
{
|
||||
latest_snapshot_buf = snapshot_manager.deserializeSnapshotBufferFromDisk(latest_log_index);
|
||||
auto snapshot_deserialization_result = snapshot_manager.deserializeSnapshotFromBuffer(latest_snapshot_buf);
|
||||
latest_snapshot_info = snapshot_manager.getLatestSnapshotInfo();
|
||||
try
|
||||
{
|
||||
latest_snapshot_buf = snapshot_manager.deserializeSnapshotBufferFromDisk(latest_log_index);
|
||||
auto snapshot_deserialization_result = snapshot_manager.deserializeSnapshotFromBuffer(latest_snapshot_buf);
|
||||
latest_snapshot_info = snapshot_manager.getLatestSnapshotInfo();
|
||||
|
||||
if (isLocalDisk(*latest_snapshot_info.disk))
|
||||
latest_snapshot_buf = nullptr;
|
||||
if (isLocalDisk(*latest_snapshot_info.disk))
|
||||
latest_snapshot_buf = nullptr;
|
||||
|
||||
storage = std::move(snapshot_deserialization_result.storage);
|
||||
latest_snapshot_meta = snapshot_deserialization_result.snapshot_meta;
|
||||
cluster_config = snapshot_deserialization_result.cluster_config;
|
||||
keeper_context->setLastCommitIndex(latest_snapshot_meta->get_last_log_idx());
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(
|
||||
log,
|
||||
fmt::format(
|
||||
"Aborting because of failure to load from latest snapshot with index {}. Problematic snapshot can be removed but it will "
|
||||
"lead to data loss",
|
||||
latest_log_index));
|
||||
std::abort();
|
||||
storage = std::move(snapshot_deserialization_result.storage);
|
||||
latest_snapshot_meta = snapshot_deserialization_result.snapshot_meta;
|
||||
cluster_config = snapshot_deserialization_result.cluster_config;
|
||||
keeper_context->setLastCommitIndex(latest_snapshot_meta->get_last_log_idx());
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(
|
||||
log,
|
||||
fmt::format(
|
||||
"Aborting because of failure to load from latest snapshot with index {}. Problematic snapshot can be removed but it will "
|
||||
"lead to data loss",
|
||||
latest_log_index));
|
||||
std::abort();
|
||||
}
|
||||
}
|
||||
|
||||
auto last_committed_idx = keeper_context->lastCommittedIndex();
|
||||
|
@ -1873,8 +1873,7 @@ void testLogAndStateMachine(
|
||||
SnapshotsQueue snapshots_queue2{1};
|
||||
keeper_context = get_keeper_context();
|
||||
auto invalid_snapshot_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue2, keeper_context, nullptr);
|
||||
invalid_snapshot_machine->init();
|
||||
assertBrokenFileRemoved("./snapshots", fs::path(path).filename());
|
||||
ASSERT_DEATH(invalid_snapshot_machine->init(), "Aborting because of failure to load from latest snapshot with");
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user