Add backup setting "check_parts" and restore setting "restore_broken_parts_as_detached".

This commit is contained in:
Vitaly Baranov 2023-09-04 17:51:20 +02:00
parent 9bbd81ca6c
commit 8060407c70
9 changed files with 157 additions and 26 deletions

View File

@ -31,6 +31,7 @@ namespace ErrorCodes
M(Bool, read_from_filesystem_cache) \
M(UInt64, shard_num) \
M(UInt64, replica_num) \
M(Bool, check_parts) \
M(Bool, internal) \
M(String, host_id) \
M(OptionalUUID, backup_uuid)

View File

@ -59,6 +59,9 @@ struct BackupSettings
/// Can only be used with BACKUP ON CLUSTER.
size_t replica_num = 0;
/// Check checksums of the data parts before writing them to a backup.
bool check_parts = true;
/// Internal, should not be specified by user.
/// Whether this backup is a part of a distributed backup created by BACKUP ON CLUSTER.
bool internal = false;

View File

@ -164,6 +164,7 @@ namespace
M(RestoreUDFCreationMode, create_function) \
M(Bool, allow_s3_native_copy) \
M(Bool, use_same_s3_credentials_for_base_backup) \
M(Bool, restore_broken_parts_as_detached) \
M(Bool, internal) \
M(String, host_id) \
M(OptionalString, storage_policy) \

View File

@ -113,6 +113,10 @@ struct RestoreSettings
/// Whether base backup from S3 should inherit credentials from the RESTORE query.
bool use_same_s3_credentials_for_base_backup = false;
/// If it's true RESTORE won't stop on broken parts while restoring, instead they will be restored as detached parts
/// to the `detached` folder with names starting with `broken-from-backup'.
bool restore_broken_parts_as_detached = false;
/// Internal, should not be specified by user.
bool internal = false;

View File

@ -1909,6 +1909,13 @@ void IMergeTreeDataPart::checkConsistency(bool /* require_part_metadata */) cons
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method 'checkConsistency' is not implemented for part with type {}", getType().toString());
}
void IMergeTreeDataPart::checkConsistencyWithProjections(bool require_part_metadata) const
{
checkConsistency(require_part_metadata);
for (const auto & [_, proj_part] : projection_parts)
proj_part->checkConsistency(require_part_metadata);
}
void IMergeTreeDataPart::calculateColumnsAndSecondaryIndicesSizesOnDisk()
{
calculateColumnsSizesOnDisk();

View File

@ -489,6 +489,12 @@ public:
void writeChecksums(const MergeTreeDataPartChecksums & checksums_, const WriteSettings & settings);
/// Checks the consistency of this data part.
virtual void checkConsistency(bool require_part_metadata) const;
/// Checks the consistency of this data part, and check the consistency of its projections (if any) as well.
void checkConsistencyWithProjections(bool require_part_metadata) const;
/// "delete-on-destroy.txt" is deprecated. It is no longer being created, only is removed.
/// TODO: remove this method after some time.
void removeDeleteOnDestroyMarker();
@ -534,7 +540,6 @@ protected:
void removeIfNeeded();
virtual void checkConsistency(bool require_part_metadata) const;
void checkConsistencyBase() const;
/// Fill each_columns_size and total_size with sizes from columns files on

View File

@ -5254,6 +5254,9 @@ MergeTreeData::PartsBackupEntries MergeTreeData::backupParts(
if (hold_table_lock && !table_lock)
table_lock = lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
if (backup_settings.check_parts)
part->checkConsistencyWithProjections(/* require_part_metadata= */ true);
BackupEntries backup_entries_from_part;
part->getDataPartStorage().backup(
part->checksums,
@ -5314,8 +5317,8 @@ void MergeTreeData::restoreDataFromBackup(RestorerFromBackup & restorer, const S
class MergeTreeData::RestoredPartsHolder
{
public:
RestoredPartsHolder(const std::shared_ptr<MergeTreeData> & storage_, const BackupPtr & backup_, size_t num_parts_)
: storage(storage_), backup(backup_), num_parts(num_parts_)
RestoredPartsHolder(const std::shared_ptr<MergeTreeData> & storage_, const BackupPtr & backup_)
: storage(storage_), backup(backup_)
{
}
@ -5328,6 +5331,13 @@ public:
attachIfAllPartsRestored();
}
void increaseNumBrokenParts()
{
std::lock_guard lock{mutex};
++num_broken_parts;
attachIfAllPartsRestored();
}
void addPart(MutableDataPartPtr part)
{
std::lock_guard lock{mutex};
@ -5347,7 +5357,7 @@ public:
private:
void attachIfAllPartsRestored()
{
if (!num_parts || (parts.size() < num_parts))
if (!num_parts || (parts.size() + num_broken_parts < num_parts))
return;
/// Sort parts by min_block (because we need to preserve the order of parts).
@ -5362,9 +5372,10 @@ private:
num_parts = 0;
}
std::shared_ptr<MergeTreeData> storage;
BackupPtr backup;
const std::shared_ptr<MergeTreeData> storage;
const BackupPtr backup;
size_t num_parts = 0;
size_t num_broken_parts = 0;
MutableDataPartsVector parts;
std::map<DiskPtr, std::shared_ptr<TemporaryFileOnDisk>> temp_dirs;
mutable std::mutex mutex;
@ -5380,8 +5391,9 @@ void MergeTreeData::restorePartsFromBackup(RestorerFromBackup & restorer, const
Strings part_names = backup->listFiles(data_path_in_backup);
boost::remove_erase(part_names, "mutations");
auto restored_parts_holder
= std::make_shared<RestoredPartsHolder>(std::static_pointer_cast<MergeTreeData>(shared_from_this()), backup, part_names.size());
bool restore_broken_parts_as_detached = restorer.getRestoreSettings().restore_broken_parts_as_detached;
auto restored_parts_holder = std::make_shared<RestoredPartsHolder>(std::static_pointer_cast<MergeTreeData>(shared_from_this()), backup);
fs::path data_path_in_backup_fs = data_path_in_backup;
size_t num_parts = 0;
@ -5403,8 +5415,9 @@ void MergeTreeData::restorePartsFromBackup(RestorerFromBackup & restorer, const
backup,
part_path_in_backup = data_path_in_backup_fs / part_name,
my_part_info = *part_info,
restore_broken_parts_as_detached,
restored_parts_holder]
{ storage->restorePartFromBackup(restored_parts_holder, my_part_info, part_path_in_backup); });
{ storage->restorePartFromBackup(restored_parts_holder, my_part_info, part_path_in_backup, restore_broken_parts_as_detached); });
++num_parts;
}
@ -5412,11 +5425,12 @@ void MergeTreeData::restorePartsFromBackup(RestorerFromBackup & restorer, const
restored_parts_holder->setNumParts(num_parts);
}
void MergeTreeData::restorePartFromBackup(std::shared_ptr<RestoredPartsHolder> restored_parts_holder, const MergeTreePartInfo & part_info, const String & part_path_in_backup) const
void MergeTreeData::restorePartFromBackup(std::shared_ptr<RestoredPartsHolder> restored_parts_holder, const MergeTreePartInfo & part_info, const String & part_path_in_backup, bool detach_if_broken) const
{
String part_name = part_info.getPartNameAndCheckFormat(format_version);
auto backup = restored_parts_holder->getBackup();
/// Calculate the total size of the part.
UInt64 total_size_of_part = 0;
Strings filenames = backup->listFiles(part_path_in_backup, /* recursive= */ true);
fs::path part_path_in_backup_fs = part_path_in_backup;
@ -5424,21 +5438,22 @@ void MergeTreeData::restorePartFromBackup(std::shared_ptr<RestoredPartsHolder> r
total_size_of_part += backup->getFileSize(part_path_in_backup_fs / filename);
std::shared_ptr<IReservation> reservation = getStoragePolicy()->reserveAndCheck(total_size_of_part);
auto disk = reservation->getDisk();
fs::path temp_dir = restored_parts_holder->getTemporaryDirectory(disk);
fs::path temp_part_dir = temp_dir / part_path_in_backup_fs.relative_path();
disk->createDirectories(temp_part_dir);
/// For example:
/// Calculate paths, for example:
/// part_name = 0_1_1_0
/// part_path_in_backup = /data/test/table/0_1_1_0
/// tmp_dir = tmp/1aaaaaa
/// tmp_part_dir = tmp/1aaaaaa/data/test/table/0_1_1_0
auto disk = reservation->getDisk();
fs::path temp_dir = restored_parts_holder->getTemporaryDirectory(disk);
fs::path temp_part_dir = temp_dir / part_path_in_backup_fs.relative_path();
/// Subdirectories in the part's directory. It's used to restore projections.
std::unordered_set<String> subdirs;
/// Copy files from the backup to the directory `tmp_part_dir`.
disk->createDirectories(temp_part_dir);
for (const String & filename : filenames)
{
/// Needs to create subdirectories before copying the files. Subdirectories are used to represent projections.
@ -5458,14 +5473,106 @@ void MergeTreeData::restorePartFromBackup(std::shared_ptr<RestoredPartsHolder> r
reservation->update(reservation->getSize() - file_size);
}
auto single_disk_volume = std::make_shared<SingleDiskVolume>(disk->getName(), disk, 0);
MergeTreeDataPartBuilder builder(*this, part_name, single_disk_volume, temp_part_dir.parent_path(), part_name);
builder.withPartFormatFromDisk();
auto part = std::move(builder).build();
part->version.setCreationTID(Tx::PrehistoricTID, nullptr);
part->loadColumnsChecksumsIndexes(false, true);
if (auto part = loadPartRestoredFromBackup(disk, temp_part_dir.parent_path(), part_name, detach_if_broken))
restored_parts_holder->addPart(part);
else
restored_parts_holder->increaseNumBrokenParts();
}
restored_parts_holder->addPart(part);
MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartRestoredFromBackup(const DiskPtr & disk, const String & temp_dir, const String & part_name, bool detach_if_broken) const
{
MutableDataPartPtr part;
auto single_disk_volume = std::make_shared<SingleDiskVolume>(disk->getName(), disk, 0);
/// Load this part from the directory `tmp_part_dir`.
auto load_part = [&]
{
MergeTreeDataPartBuilder builder(*this, part_name, single_disk_volume, temp_dir, part_name);
builder.withPartFormatFromDisk();
part = std::move(builder).build();
part->version.setCreationTID(Tx::PrehistoricTID, nullptr);
part->loadColumnsChecksumsIndexes(/* require_columns_checksums= */ false, /* check_consistency= */ true);
};
/// Broken parts can appear in a backup sometimes.
auto mark_broken = [&](const std::exception_ptr error)
{
tryLogException(error, log,
fmt::format("Part {} will be restored as detached because it's broken. You need to resolve this manually", part_name));
if (!part)
{
/// Make a fake data part only to copy its files to /detached/.
part = MergeTreeDataPartBuilder{*this, part_name, single_disk_volume, temp_dir, part_name}
.withPartStorageType(MergeTreeDataPartStorageType::Full)
.withPartType(MergeTreeDataPartType::Wide)
.build();
}
part->renameToDetached("broken-from-backup");
};
/// Try to load this part multiple times.
auto backoff_ms = loading_parts_initial_backoff_ms;
for (size_t try_no = 0; try_no < loading_parts_max_tries; ++try_no)
{
std::exception_ptr error;
bool retryable = false;
try
{
load_part();
}
catch (const Exception & e)
{
error = std::current_exception();
retryable = isRetryableException(e);
}
catch (const Poco::Net::NetException &)
{
error = std::current_exception();
retryable = true;
}
catch (const Poco::TimeoutException &)
{
error = std::current_exception();
retryable = true;
}
catch (...)
{
error = std::current_exception();
}
if (!error)
return part;
if (!retryable && detach_if_broken)
{
mark_broken(error);
return nullptr;
}
if (!retryable)
{
LOG_ERROR(log,
"Failed to restore part {} because it's broken. You can skip broken parts while restoring by setting "
"'restore_broken_parts_as_detached = true'",
part_name);
}
if (!retryable || (try_no + 1 == loading_parts_max_tries))
{
if (Exception * e = exception_cast<Exception *>(error))
e->addMessage("while restoring part {} of table {}", part->name, getStorageID());
std::rethrow_exception(error);
}
tryLogException(error, log,
fmt::format("Failed to load part {} at try {} with a retryable error. Will retry in {} ms", part_name, try_no, backoff_ms));
std::this_thread::sleep_for(std::chrono::milliseconds(backoff_ms));
backoff_ms = std::min(backoff_ms * 2, loading_parts_max_backoff_ms);
}
UNREACHABLE();
}

View File

@ -1357,7 +1357,8 @@ protected:
/// Restores the parts of this table from backup.
void restorePartsFromBackup(RestorerFromBackup & restorer, const String & data_path_in_backup, const std::optional<ASTs> & partitions);
void restorePartFromBackup(std::shared_ptr<RestoredPartsHolder> restored_parts_holder, const MergeTreePartInfo & part_info, const String & part_path_in_backup) const;
void restorePartFromBackup(std::shared_ptr<RestoredPartsHolder> restored_parts_holder, const MergeTreePartInfo & part_info, const String & part_path_in_backup, bool detach_if_broken) const;
MutableDataPartPtr loadPartRestoredFromBackup(const DiskPtr & disk, const String & temp_dir, const String & part_name, bool detach_if_broken) const;
/// Attaches restored parts to the storage.
virtual void attachRestoredParts(MutableDataPartsVector && parts) = 0;

View File

@ -163,7 +163,8 @@ struct DetachedPartInfo : public MergeTreePartInfo
"tmp-fetch",
"covered-by-broken",
"merge-not-byte-identical",
"mutate-not-byte-identical"
"mutate-not-byte-identical",
"broken-from-backup",
});
static constexpr auto DETACHED_REASONS_REMOVABLE_BY_TIMEOUT = std::to_array<std::string_view>({
@ -175,7 +176,8 @@ struct DetachedPartInfo : public MergeTreePartInfo
"deleting",
"clone",
"merge-not-byte-identical",
"mutate-not-byte-identical"
"mutate-not-byte-identical",
"broken-from-backup",
});
/// NOTE: It may parse part info incorrectly.