Use hash_of_all_files from system.parts to check identity of parts during on-cluster backups.

This commit is contained in:
Vitaly Baranov 2023-06-14 16:27:17 +02:00
parent cbed327077
commit 6366940a37
5 changed files with 38 additions and 54 deletions

View File

@ -185,11 +185,10 @@ void BackupCoordinationReplicatedTables::addPartNames(PartNamesForTableReplica &
const String & other_replica_name = **other.replica_names.begin();
throw Exception(
ErrorCodes::CANNOT_BACKUP_TABLE,
"Table {} on replica {} has part {} which is different from the part on replica {}. Must be the same",
table_name_for_logs,
replica_name,
part_name,
other_replica_name);
"Table {} on replica {} has part {} different from the part on replica {} "
"(checksum '{}' on replica {} != checksum '{}' on replica {})",
table_name_for_logs, replica_name, part_name, other_replica_name,
getHexUIntLowercase(checksum), replica_name, getHexUIntLowercase(other.checksum), other_replica_name);
}
}

View File

@ -5118,14 +5118,13 @@ Pipe MergeTreeData::alterPartition(
return {};
}
BackupEntries MergeTreeData::backupParts(
MergeTreeData::PartsBackupEntries MergeTreeData::backupParts(
const DataPartsVector & data_parts,
const String & data_path_in_backup,
const BackupSettings & backup_settings,
const ContextPtr & local_context)
{
BackupEntries backup_entries;
MergeTreeData::PartsBackupEntries res;
std::map<DiskPtr, std::shared_ptr<TemporaryFileOnDisk>> temp_dirs;
TableLockHolder table_lock;
ReadSettings read_settings = local_context->getBackupReadSettings();
@ -5190,10 +5189,13 @@ BackupEntries MergeTreeData::backupParts(
wrapBackupEntriesWith(backup_entries_from_part, storage_and_part);
}
insertAtEnd(backup_entries, std::move(backup_entries_from_part));
auto & part_backup_entries = res.emplace_back();
part_backup_entries.part_name = part->name;
part_backup_entries.part_checksum = part->checksums.getTotalChecksumUInt128();
part_backup_entries.backup_entries = std::move(backup_entries_from_part);
}
return backup_entries;
return res;
}
void MergeTreeData::restoreDataFromBackup(RestorerFromBackup & restorer, const String & data_path_in_backup, const std::optional<ASTs> & partitions)

View File

@ -1324,8 +1324,16 @@ protected:
/// Moves part to specified space, used in ALTER ... MOVE ... queries
MovePartsOutcome movePartsToSpace(const DataPartsVector & parts, SpacePtr space);
struct PartBackupEntries
{
String part_name;
UInt128 part_checksum; /// same as MinimalisticDataPartChecksums::hash_of_all_files
BackupEntries backup_entries;
};
using PartsBackupEntries = std::vector<PartBackupEntries>;
/// Makes backup entries to backup the parts of this table.
BackupEntries backupParts(const DataPartsVector & data_parts, const String & data_path_in_backup, const BackupSettings & backup_settings, const ContextPtr & local_context);
PartsBackupEntries backupParts(const DataPartsVector & data_parts, const String & data_path_in_backup, const BackupSettings & backup_settings, const ContextPtr & local_context);
class RestoredPartsHolder;

View File

@ -2160,7 +2160,10 @@ void StorageMergeTree::backupData(BackupEntriesCollector & backup_entries_collec
for (const auto & data_part : data_parts)
min_data_version = std::min(min_data_version, data_part->info.getDataVersion() + 1);
backup_entries_collector.addBackupEntries(backupParts(data_parts, data_path_in_backup, backup_settings, local_context));
auto parts_backup_entries = backupParts(data_parts, data_path_in_backup, backup_settings, local_context);
for (auto & part_backup_entries : parts_backup_entries)
backup_entries_collector.addBackupEntries(std::move(part_backup_entries.backup_entries));
backup_entries_collector.addBackupEntries(backupMutations(min_data_version, data_path_in_backup));
}

View File

@ -9343,45 +9343,17 @@ void StorageReplicatedMergeTree::backupData(
else
data_parts = getVisibleDataPartsVector(local_context);
auto backup_entries = backupParts(data_parts, /* data_path_in_backup */ "", backup_settings, local_context);
auto parts_backup_entries = backupParts(data_parts, /* data_path_in_backup */ "", backup_settings, local_context);
auto coordination = backup_entries_collector.getBackupCoordination();
String shared_id = getTableSharedID();
coordination->addReplicatedDataPath(shared_id, data_path_in_backup);
std::unordered_map<String, SipHash> part_names_with_hashes_calculating;
for (auto & [relative_path, backup_entry] : backup_entries)
{
size_t slash_pos = relative_path.find('/');
if (slash_pos != String::npos)
{
String part_name = relative_path.substr(0, slash_pos);
if (MergeTreePartInfo::tryParsePartName(part_name, MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING))
{
auto & hash = part_names_with_hashes_calculating[part_name];
if (relative_path.ends_with(".bin"))
{
hash.update(relative_path);
hash.update(backup_entry->getSize());
hash.update(backup_entry->getChecksum());
}
continue;
}
}
/// Not a part name, probably error.
throw Exception(ErrorCodes::LOGICAL_ERROR, "{} doesn't follow the format <part_name>/<path>", quoteString(relative_path));
}
std::vector<IBackupCoordination::PartNameAndChecksum> part_names_with_hashes;
part_names_with_hashes.reserve(part_names_with_hashes_calculating.size());
for (auto & [part_name, hash] : part_names_with_hashes_calculating)
{
UInt128 checksum;
hash.get128(checksum);
auto & part_name_with_hash = part_names_with_hashes.emplace_back();
part_name_with_hash.part_name = part_name;
part_name_with_hash.checksum = checksum;
}
using PartNameAndChecksum = IBackupCoordination::PartNameAndChecksum;
std::vector<PartNameAndChecksum> part_names_with_hashes;
part_names_with_hashes.reserve(parts_backup_entries.size());
for (const auto & part_backup_entries : parts_backup_entries)
part_names_with_hashes.emplace_back(PartNameAndChecksum{part_backup_entries.part_name, part_backup_entries.part_checksum});
/// Send our list of part names to the coordination (to compare with other replicas).
coordination->addReplicatedPartNames(shared_id, getStorageID().getFullTableName(), getReplicaName(), part_names_with_hashes);
@ -9409,7 +9381,7 @@ void StorageReplicatedMergeTree::backupData(
auto post_collecting_task = [shared_id,
my_replica_name = getReplicaName(),
coordination,
my_backup_entries = std::move(backup_entries),
parts_backup_entries = std::move(parts_backup_entries),
&backup_entries_collector]()
{
Strings data_paths = coordination->getReplicatedDataPaths(shared_id);
@ -9421,14 +9393,14 @@ void StorageReplicatedMergeTree::backupData(
Strings part_names = coordination->getReplicatedPartNames(shared_id, my_replica_name);
std::unordered_set<std::string_view> part_names_set{part_names.begin(), part_names.end()};
for (const auto & [relative_path, backup_entry] : my_backup_entries)
for (const auto & part_backup_entries : parts_backup_entries)
{
size_t slash_pos = relative_path.find('/');
String part_name = relative_path.substr(0, slash_pos);
if (!part_names_set.contains(part_name))
continue;
for (const auto & data_path : data_paths_fs)
backup_entries_collector.addBackupEntry(data_path / relative_path, backup_entry);
if (part_names_set.contains(part_backup_entries.part_name))
{
for (const auto & [relative_path, backup_entry] : part_backup_entries.backup_entries)
for (const auto & data_path : data_paths_fs)
backup_entries_collector.addBackupEntry(data_path / relative_path, backup_entry);
}
}
auto mutation_infos = coordination->getReplicatedMutations(shared_id, my_replica_name);