Merge pull request #43947 from azat/backups/dedup

RFC: Add ability to disable deduplication for BACKUP
This commit is contained in:
Vitaly Baranov 2022-12-28 15:24:53 +01:00 committed by GitHub
commit 8a2fbbe88c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 52 additions and 37 deletions

View File

@ -188,15 +188,6 @@ std::optional<FileInfo> BackupCoordinationLocal::getFileInfo(const SizeAndChecks
return it->second;
}
std::optional<SizeAndChecksum> BackupCoordinationLocal::getFileSizeAndChecksum(const String & file_name) const
{
std::lock_guard lock{mutex};
auto it = file_names.find(file_name);
if (it == file_names.end())
return std::nullopt;
return it->second;
}
String BackupCoordinationLocal::getNextArchiveSuffix()
{
std::lock_guard lock{mutex};

View File

@ -48,7 +48,6 @@ public:
std::optional<FileInfo> getFileInfo(const String & file_name) const override;
std::optional<FileInfo> getFileInfo(const SizeAndChecksum & size_and_checksum) const override;
std::optional<SizeAndChecksum> getFileSizeAndChecksum(const String & file_name) const override;
String getNextArchiveSuffix() override;
Strings getAllArchiveSuffixes() const override;

View File

@ -575,15 +575,6 @@ std::optional<FileInfo> BackupCoordinationRemote::getFileInfo(const SizeAndCheck
return deserializeFileInfo(file_info_str);
}
std::optional<SizeAndChecksum> BackupCoordinationRemote::getFileSizeAndChecksum(const String & file_name) const
{
auto zk = getZooKeeper();
String size_and_checksum;
if (!zk->tryGet(zookeeper_path + "/file_names/" + escapeForFileName(file_name), size_and_checksum))
return std::nullopt;
return deserializeSizeAndChecksum(size_and_checksum);
}
String BackupCoordinationRemote::getNextArchiveSuffix()
{
auto zk = getZooKeeper();

View File

@ -51,7 +51,6 @@ public:
bool hasFiles(const String & directory) const override;
std::optional<FileInfo> getFileInfo(const String & file_name) const override;
std::optional<FileInfo> getFileInfo(const SizeAndChecksum & size_and_checksum) const override;
std::optional<SizeAndChecksum> getFileSizeAndChecksum(const String & file_name) const override;
String getNextArchiveSuffix() override;
Strings getAllArchiveSuffixes() const override;

View File

@ -34,6 +34,7 @@ public:
bool is_internal_backup = false;
std::shared_ptr<IBackupCoordination> backup_coordination;
std::optional<UUID> backup_uuid;
bool deduplicate_files = true;
};
static BackupFactory & instance();

View File

@ -80,6 +80,12 @@ namespace
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
return outcome.GetResult().GetContents();
}
bool isNotFoundError(Aws::S3::S3Errors error)
{
return error == Aws::S3::S3Errors::RESOURCE_NOT_FOUND
|| error == Aws::S3::S3Errors::NO_SUCH_KEY;
}
}
@ -370,7 +376,7 @@ void BackupWriterS3::removeFile(const String & file_name)
request.SetBucket(s3_uri.bucket);
request.SetKey(fs::path(s3_uri.key) / file_name);
auto outcome = client->DeleteObject(request);
if (!outcome.IsSuccess())
if (!outcome.IsSuccess() && !isNotFoundError(outcome.GetError().GetErrorType()))
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
}
@ -428,7 +434,7 @@ void BackupWriterS3::removeFilesBatch(const Strings & file_names)
request.SetDelete(delkeys);
auto outcome = client->DeleteObjects(request);
if (!outcome.IsSuccess())
if (!outcome.IsSuccess() && !isNotFoundError(outcome.GetError().GetErrorType()))
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
}
}

View File

@ -167,17 +167,19 @@ BackupImpl::BackupImpl(
const ContextPtr & context_,
bool is_internal_backup_,
const std::shared_ptr<IBackupCoordination> & coordination_,
const std::optional<UUID> & backup_uuid_)
const std::optional<UUID> & backup_uuid_,
bool deduplicate_files_)
: backup_name_for_logging(backup_name_for_logging_)
, archive_params(archive_params_)
, use_archives(!archive_params.archive_name.empty())
, open_mode(OpenMode::WRITE)
, writer(std::move(writer_))
, is_internal_backup(is_internal_backup_)
, coordination(coordination_ ? coordination_ : std::make_shared<BackupCoordinationLocal>())
, coordination(coordination_)
, uuid(backup_uuid_)
, version(CURRENT_BACKUP_VERSION)
, base_backup_info(base_backup_info_)
, deduplicate_files(deduplicate_files_)
, log(&Poco::Logger::get("BackupImpl"))
{
open(context_);
@ -287,6 +289,7 @@ void BackupImpl::writeBackupMetadata()
Poco::AutoPtr<Poco::Util::XMLConfiguration> config{new Poco::Util::XMLConfiguration()};
config->setInt("version", CURRENT_BACKUP_VERSION);
config->setBool("deduplicate_files", deduplicate_files);
config->setString("timestamp", toString(LocalDateTime{timestamp}));
config->setString("uuid", toString(*uuid));
@ -759,7 +762,7 @@ void BackupImpl::writeFile(const String & file_name, BackupEntryPtr entry)
};
/// Empty file, nothing to backup
if (info.size == 0)
if (info.size == 0 && deduplicate_files)
{
coordination->addFileInfo(info);
return;
@ -828,7 +831,7 @@ void BackupImpl::writeFile(const String & file_name, BackupEntryPtr entry)
}
/// Maybe we have a copy of this file in the backup already.
if (coordination->getFileInfo(std::pair{info.size, info.checksum}))
if (coordination->getFileInfo(std::pair{info.size, info.checksum}) && deduplicate_files)
{
LOG_TRACE(log, "File {} already exist in current backup, adding reference", adjusted_path);
coordination->addFileInfo(info);
@ -861,7 +864,7 @@ void BackupImpl::writeFile(const String & file_name, BackupEntryPtr entry)
bool is_data_file_required;
coordination->addFileInfo(info, is_data_file_required);
if (!is_data_file_required)
if (!is_data_file_required && deduplicate_files)
{
LOG_TRACE(log, "File {} doesn't exist in current backup, but we have file with same size and checksum", adjusted_path);
return; /// We copy data only if it's a new combination of size & checksum.

View File

@ -47,9 +47,10 @@ public:
const std::optional<BackupInfo> & base_backup_info_,
std::shared_ptr<IBackupWriter> writer_,
const ContextPtr & context_,
bool is_internal_backup_ = false,
const std::shared_ptr<IBackupCoordination> & coordination_ = {},
const std::optional<UUID> & backup_uuid_ = {});
bool is_internal_backup_,
const std::shared_ptr<IBackupCoordination> & coordination_,
const std::optional<UUID> & backup_uuid_,
bool deduplicate_files_);
~BackupImpl() override;
@ -132,6 +133,7 @@ private:
String lock_file_name;
std::atomic<size_t> num_files_written = 0;
bool writing_finalized = false;
bool deduplicate_files = true;
const Poco::Logger * log;
};

View File

@ -65,6 +65,7 @@ namespace
M(String, password) \
M(Bool, structure_only) \
M(Bool, async) \
M(Bool, deduplicate_files) \
M(UInt64, shard_num) \
M(UInt64, replica_num) \
M(Bool, internal) \

View File

@ -32,6 +32,9 @@ struct BackupSettings
/// Whether the BACKUP command must return immediately without waiting until the backup has completed.
bool async = false;
/// Whether the BACKUP will omit similar files (within one backup only).
bool deduplicate_files = true;
/// 1-based shard index to store in the backup. 0 means all shards.
/// Can only be used with BACKUP ON CLUSTER.
size_t shard_num = 0;

View File

@ -286,6 +286,7 @@ void BackupsWorker::doBackup(
backup_create_params.is_internal_backup = backup_settings.internal;
backup_create_params.backup_coordination = backup_coordination;
backup_create_params.backup_uuid = backup_settings.backup_uuid;
backup_create_params.deduplicate_files = backup_settings.deduplicate_files;
BackupMutablePtr backup = BackupFactory::instance().createBackup(backup_create_params);
/// Write the backup.

View File

@ -108,7 +108,6 @@ public:
virtual std::optional<FileInfo> getFileInfo(const String & file_name) const = 0;
virtual std::optional<FileInfo> getFileInfo(const SizeAndChecksum & size_and_checksum) const = 0;
virtual std::optional<SizeAndChecksum> getFileSizeAndChecksum(const String & file_name) const = 0;
/// Generates a new archive suffix, e.g. "001", "002", "003", ...
virtual String getNextArchiveSuffix() = 0;

View File

@ -116,7 +116,16 @@ void registerBackupEngineS3(BackupFactory & factory)
else
{
auto writer = std::make_shared<BackupWriterS3>(S3::URI{s3_uri}, access_key_id, secret_access_key, params.context);
return std::make_unique<BackupImpl>(backup_name_for_logging, archive_params, params.base_backup_info, writer, params.context, params.is_internal_backup, params.backup_coordination, params.backup_uuid);
return std::make_unique<BackupImpl>(
backup_name_for_logging,
archive_params,
params.base_backup_info,
writer,
params.context,
params.is_internal_backup,
params.backup_coordination,
params.backup_uuid,
params.deduplicate_files);
}
#else
throw Exception("S3 support is disabled", ErrorCodes::SUPPORT_IS_DISABLED);

View File

@ -181,7 +181,16 @@ void registerBackupEnginesFileAndDisk(BackupFactory & factory)
writer = std::make_shared<BackupWriterFile>(path);
else
writer = std::make_shared<BackupWriterDisk>(disk, path);
return std::make_unique<BackupImpl>(backup_name_for_logging, archive_params, params.base_backup_info, writer, params.context, params.is_internal_backup, params.backup_coordination, params.backup_uuid);
return std::make_unique<BackupImpl>(
backup_name_for_logging,
archive_params,
params.base_backup_info,
writer,
params.context,
params.is_internal_backup,
params.backup_coordination,
params.backup_uuid,
params.deduplicate_files);
}
};

View File

@ -41,16 +41,17 @@ def test_attach_part(table_name, backup_name, storage_policy, min_bytes_for_wide
create database ordinary_db engine=Ordinary;
create table ordinary_db.{table_name} engine=MergeTree() order by tuple() as select * from numbers(100);
create table ordinary_db.{table_name} engine=MergeTree() order by key partition by part as select number%5 part, number key from numbers(100);
-- NOTE: name of backup ("backup") is significant.
backup table ordinary_db.{table_name} TO Disk('backup_disk_s3_plain', '{backup_name}');
backup table ordinary_db.{table_name} TO Disk('backup_disk_s3_plain', '{backup_name}') settings deduplicate_files=0;
drop table ordinary_db.{table_name};
attach table ordinary_db.{table_name} (number UInt64)
attach table ordinary_db.{table_name} (part UInt8, key UInt64)
engine=MergeTree()
order by tuple()
order by key partition by part
settings
min_bytes_for_wide_part={min_bytes_for_wide_part},
max_suspicious_broken_parts=0,
storage_policy='{storage_policy}';
"""
)