Merge pull request #60002 from vitlibar/add-config-setting-remove-backup-files-after-failure

Add new config setting "backups.remove_backup_files_after_failure"
This commit is contained in:
Vitaly Baranov 2024-02-23 12:42:55 +01:00 committed by GitHub
commit db119eed49
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 333 additions and 349 deletions

View File

@ -1569,6 +1569,11 @@
<backups>
<allowed_path>backups</allowed_path>
<!-- If the BACKUP command fails and this setting is true then the files
copied before the failure will be removed automatically.
-->
<remove_backup_files_after_failure>true</remove_backup_files_after_failure>
</backups>
<!-- This allows to disable exposing addresses in stack traces for security reasons.

View File

@ -144,6 +144,13 @@ BackupImpl::BackupImpl(
BackupImpl::~BackupImpl()
{
if ((open_mode == OpenMode::WRITE) && !is_internal_backup && !writing_finalized && !std::uncaught_exceptions() && !std::current_exception())
{
/// It is suspicious to destroy BackupImpl without finalization while writing a backup when there is no exception.
LOG_ERROR(log, "BackupImpl is not finalized when destructor is called. Stack trace: {}", StackTrace().toString());
chassert(false && "BackupImpl is not finalized when destructor is called.");
}
try
{
close();
@ -195,10 +202,6 @@ void BackupImpl::close()
{
std::lock_guard lock{mutex};
closeArchive(/* finalize= */ false);
if (!is_internal_backup && writer && !writing_finalized)
removeAllFilesAfterFailure();
writer.reset();
reader.reset();
coordination.reset();
@ -1005,14 +1008,18 @@ void BackupImpl::setCompressedSize()
}
void BackupImpl::removeAllFilesAfterFailure()
void BackupImpl::tryRemoveAllFiles()
{
if (open_mode != OpenMode::WRITE)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Backup is not opened for writing");
if (is_internal_backup)
return; /// Let the initiator remove unnecessary files.
return;
try
{
LOG_INFO(log, "Removing all files of backup {} after failure", backup_name_for_logging);
LOG_INFO(log, "Removing all files of backup {}", backup_name_for_logging);
closeArchive(/* finalize= */ false);
Strings files_to_remove;
if (use_archive)

View File

@ -81,8 +81,9 @@ public:
size_t copyFileToDisk(const String & file_name, DiskPtr destination_disk, const String & destination_path, WriteMode write_mode) const override;
size_t copyFileToDisk(const SizeAndChecksum & size_and_checksum, DiskPtr destination_disk, const String & destination_path, WriteMode write_mode) const override;
void writeFile(const BackupFileInfo & info, BackupEntryPtr entry) override;
void finalizeWriting() override;
bool supportsWritingInMultipleThreads() const override { return !use_archive; }
void finalizeWriting() override;
void tryRemoveAllFiles() override;
private:
void open();
@ -107,8 +108,6 @@ private:
bool checkLockFile(bool throw_if_failed) const;
void removeLockFile();
void removeAllFilesAfterFailure();
/// Calculates and sets `compressed_size`.
void setCompressedSize();

View File

@ -375,11 +375,12 @@ private:
};
BackupsWorker::BackupsWorker(ContextMutablePtr global_context, size_t num_backup_threads, size_t num_restore_threads, bool allow_concurrent_backups_, bool allow_concurrent_restores_, bool test_inject_sleep_)
BackupsWorker::BackupsWorker(ContextMutablePtr global_context, size_t num_backup_threads, size_t num_restore_threads)
: thread_pools(std::make_unique<ThreadPools>(num_backup_threads, num_restore_threads))
, allow_concurrent_backups(allow_concurrent_backups_)
, allow_concurrent_restores(allow_concurrent_restores_)
, test_inject_sleep(test_inject_sleep_)
, allow_concurrent_backups(global_context->getConfigRef().getBool("backups.allow_concurrent_backups", true))
, allow_concurrent_restores(global_context->getConfigRef().getBool("backups.allow_concurrent_restores", true))
, remove_backup_files_after_failure(global_context->getConfigRef().getBool("backups.remove_backup_files_after_failure", true))
, test_inject_sleep(global_context->getConfigRef().getBool("backups.test_inject_sleep", false))
, log(getLogger("BackupsWorker"))
, backup_log(global_context->getBackupLog())
, process_list(global_context->getProcessList())
@ -411,6 +412,9 @@ OperationID BackupsWorker::startMakingBackup(const ASTPtr & query, const Context
auto backup_query = std::static_pointer_cast<ASTBackupQuery>(query->clone());
auto backup_settings = BackupSettings::fromBackupQuery(*backup_query);
auto backup_info = BackupInfo::fromAST(*backup_query->backup_name);
String backup_name_for_logging = backup_info.toStringForLogging();
if (!backup_settings.backup_uuid)
backup_settings.backup_uuid = UUIDHelpers::generateV4();
@ -424,22 +428,28 @@ OperationID BackupsWorker::startMakingBackup(const ASTPtr & query, const Context
backup_id = toString(*backup_settings.backup_uuid);
std::shared_ptr<IBackupCoordination> backup_coordination;
if (backup_settings.internal)
{
/// The following call of makeBackupCoordination() is not essential because doBackup() will later create a backup coordination
/// if it's not created here. However to handle errors better it's better to make a coordination here because this way
/// if an exception will be thrown in startMakingBackup() other hosts will know about that.
backup_coordination = makeBackupCoordination(context, backup_settings, /* remote= */ true);
}
BackupMutablePtr backup;
auto backup_info = BackupInfo::fromAST(*backup_query->backup_name);
String backup_name_for_logging = backup_info.toStringForLogging();
String base_backup_name;
if (backup_settings.base_backup_info)
base_backup_name = backup_settings.base_backup_info->toStringForLogging();
/// Called in exception handlers below. This lambda function can be called on a separate thread, so it can't capture local variables by reference.
auto on_exception = [this](BackupMutablePtr & backup_, const OperationID & backup_id_, const String & backup_name_for_logging_,
const BackupSettings & backup_settings_, const std::shared_ptr<IBackupCoordination> & backup_coordination_)
{
/// Something bad happened, the backup has not built.
tryLogCurrentException(log, fmt::format("Failed to make {} {}", (backup_settings_.internal ? "internal backup" : "backup"), backup_name_for_logging_));
setStatusSafe(backup_id_, getBackupStatusFromCurrentException());
sendCurrentExceptionToCoordination(backup_coordination_);
if (backup_ && remove_backup_files_after_failure)
backup_->tryRemoveAllFiles();
backup_.reset();
};
try
{
String base_backup_name;
if (backup_settings.base_backup_info)
base_backup_name = backup_settings.base_backup_info->toStringForLogging();
addInfo(backup_id,
backup_name_for_logging,
base_backup_name,
@ -448,6 +458,14 @@ OperationID BackupsWorker::startMakingBackup(const ASTPtr & query, const Context
context->getProcessListElement(),
BackupStatus::CREATING_BACKUP);
if (backup_settings.internal)
{
/// The following call of makeBackupCoordination() is not essential because doBackup() will later create a backup coordination
/// if it's not created here. However to handle errors better it's better to make a coordination here because this way
/// if an exception will be thrown in startMakingBackup() other hosts will know about that.
backup_coordination = makeBackupCoordination(context, backup_settings, /* remote= */ true);
}
/// Prepare context to use.
ContextPtr context_in_use = context;
ContextMutablePtr mutable_context;
@ -468,7 +486,7 @@ OperationID BackupsWorker::startMakingBackup(const ASTPtr & query, const Context
/// process_list_element_holder is used to make an element in ProcessList live while BACKUP is working asynchronously.
auto process_list_element = context_in_use->getProcessListElement();
thread_pool.scheduleOrThrowOnError(
scheduleFromThreadPool<void>(
[this,
backup_query,
backup_id,
@ -478,25 +496,34 @@ OperationID BackupsWorker::startMakingBackup(const ASTPtr & query, const Context
backup_coordination,
context_in_use,
mutable_context,
thread_group = CurrentThread::getGroup(),
on_exception,
process_list_element_holder = process_list_element ? process_list_element->getProcessListEntry() : nullptr]
{
doBackup(
backup_query,
backup_id,
backup_name_for_logging,
backup_info,
backup_settings,
backup_coordination,
context_in_use,
mutable_context,
thread_group,
/* called_async= */ true);
});
BackupMutablePtr backup_async;
try
{
doBackup(
backup_async,
backup_query,
backup_id,
backup_name_for_logging,
backup_info,
backup_settings,
backup_coordination,
context_in_use,
mutable_context);
}
catch (...)
{
on_exception(backup_async, backup_id, backup_name_for_logging, backup_settings, backup_coordination);
}
},
thread_pool, "BackupWorker");
}
else
{
doBackup(
backup,
backup_query,
backup_id,
backup_name_for_logging,
@ -504,25 +531,21 @@ OperationID BackupsWorker::startMakingBackup(const ASTPtr & query, const Context
backup_settings,
backup_coordination,
context_in_use,
mutable_context,
nullptr,
/* called_async= */ false);
mutable_context);
}
return backup_id;
}
catch (...)
{
tryLogCurrentException(log, fmt::format("Failed to start {} {}", (backup_settings.internal ? "internal backup" : "backup"), backup_name_for_logging));
/// Something bad happened, the backup has not built.
setStatusSafe(backup_id, getBackupStatusFromCurrentException());
sendCurrentExceptionToCoordination(backup_coordination);
on_exception(backup, backup_id, backup_name_for_logging, backup_settings, backup_coordination);
throw;
}
}
void BackupsWorker::doBackup(
BackupMutablePtr & backup,
const std::shared_ptr<ASTBackupQuery> & backup_query,
const OperationID & backup_id,
const String & backup_name_for_logging,
@ -530,147 +553,120 @@ void BackupsWorker::doBackup(
BackupSettings backup_settings,
std::shared_ptr<IBackupCoordination> backup_coordination,
const ContextPtr & context,
ContextMutablePtr mutable_context,
ThreadGroupPtr thread_group,
bool called_async)
ContextMutablePtr mutable_context)
{
SCOPE_EXIT_SAFE(
if (called_async && thread_group)
CurrentThread::detachFromGroupIfNotDetached();
);
bool on_cluster = !backup_query->cluster.empty();
assert(!on_cluster || mutable_context);
try
/// Checks access rights if this is not ON CLUSTER query.
/// (If this is ON CLUSTER query executeDDLQueryOnCluster() will check access rights later.)
auto required_access = getRequiredAccessToBackup(backup_query->elements);
if (!on_cluster)
context->checkAccess(required_access);
ClusterPtr cluster;
if (on_cluster)
{
if (called_async && thread_group)
CurrentThread::attachToGroup(thread_group);
if (called_async)
setThreadName("BackupWorker");
bool on_cluster = !backup_query->cluster.empty();
assert(mutable_context || (!on_cluster && !called_async));
/// Checks access rights if this is not ON CLUSTER query.
/// (If this is ON CLUSTER query executeDDLQueryOnCluster() will check access rights later.)
auto required_access = getRequiredAccessToBackup(backup_query->elements);
if (!on_cluster)
context->checkAccess(required_access);
ClusterPtr cluster;
if (on_cluster)
{
backup_query->cluster = context->getMacros()->expand(backup_query->cluster);
cluster = context->getCluster(backup_query->cluster);
backup_settings.cluster_host_ids = cluster->getHostIDs();
}
/// Make a backup coordination.
if (!backup_coordination)
backup_coordination = makeBackupCoordination(context, backup_settings, /* remote= */ on_cluster);
if (!allow_concurrent_backups && backup_coordination->hasConcurrentBackups(std::ref(num_active_backups)))
throw Exception(ErrorCodes::CONCURRENT_ACCESS_NOT_SUPPORTED, "Concurrent backups not supported, turn on setting 'allow_concurrent_backups'");
/// Opens a backup for writing.
BackupFactory::CreateParams backup_create_params;
backup_create_params.open_mode = IBackup::OpenMode::WRITE;
backup_create_params.context = context;
backup_create_params.backup_info = backup_info;
backup_create_params.base_backup_info = backup_settings.base_backup_info;
backup_create_params.compression_method = backup_settings.compression_method;
backup_create_params.compression_level = backup_settings.compression_level;
backup_create_params.password = backup_settings.password;
backup_create_params.s3_storage_class = backup_settings.s3_storage_class;
backup_create_params.is_internal_backup = backup_settings.internal;
backup_create_params.backup_coordination = backup_coordination;
backup_create_params.backup_uuid = backup_settings.backup_uuid;
backup_create_params.deduplicate_files = backup_settings.deduplicate_files;
backup_create_params.allow_s3_native_copy = backup_settings.allow_s3_native_copy;
backup_create_params.use_same_s3_credentials_for_base_backup = backup_settings.use_same_s3_credentials_for_base_backup;
backup_create_params.read_settings = getReadSettingsForBackup(context, backup_settings);
backup_create_params.write_settings = getWriteSettingsForBackup(context);
BackupMutablePtr backup = BackupFactory::instance().createBackup(backup_create_params);
/// Write the backup.
if (on_cluster)
{
DDLQueryOnClusterParams params;
params.cluster = cluster;
params.only_shard_num = backup_settings.shard_num;
params.only_replica_num = backup_settings.replica_num;
params.access_to_check = required_access;
backup_settings.copySettingsToQuery(*backup_query);
// executeDDLQueryOnCluster() will return without waiting for completion
mutable_context->setSetting("distributed_ddl_task_timeout", Field{0});
mutable_context->setSetting("distributed_ddl_output_mode", Field{"none"});
executeDDLQueryOnCluster(backup_query, mutable_context, params);
/// Wait until all the hosts have written their backup entries.
backup_coordination->waitForStage(Stage::COMPLETED);
backup_coordination->setStage(Stage::COMPLETED,"");
}
else
{
backup_query->setCurrentDatabase(context->getCurrentDatabase());
/// Prepare backup entries.
BackupEntries backup_entries;
{
BackupEntriesCollector backup_entries_collector(
backup_query->elements, backup_settings, backup_coordination,
backup_create_params.read_settings, context, getThreadPool(ThreadPoolId::BACKUP_MAKE_FILES_LIST));
backup_entries = backup_entries_collector.run();
}
/// Write the backup entries to the backup.
buildFileInfosForBackupEntries(backup, backup_entries, backup_create_params.read_settings, backup_coordination, context->getProcessListElement());
writeBackupEntries(backup, std::move(backup_entries), backup_id, backup_coordination, backup_settings.internal, context->getProcessListElement());
/// We have written our backup entries, we need to tell other hosts (they could be waiting for it).
backup_coordination->setStage(Stage::COMPLETED,"");
}
size_t num_files = 0;
UInt64 total_size = 0;
size_t num_entries = 0;
UInt64 uncompressed_size = 0;
UInt64 compressed_size = 0;
/// Finalize backup (write its metadata).
if (!backup_settings.internal)
{
backup->finalizeWriting();
num_files = backup->getNumFiles();
total_size = backup->getTotalSize();
num_entries = backup->getNumEntries();
uncompressed_size = backup->getUncompressedSize();
compressed_size = backup->getCompressedSize();
}
/// Close the backup.
backup.reset();
LOG_INFO(log, "{} {} was created successfully", (backup_settings.internal ? "Internal backup" : "Backup"), backup_name_for_logging);
/// NOTE: we need to update metadata again after backup->finalizeWriting(), because backup metadata is written there.
setNumFilesAndSize(backup_id, num_files, total_size, num_entries, uncompressed_size, compressed_size, 0, 0);
/// NOTE: setStatus is called after setNumFilesAndSize in order to have actual information in a backup log record
setStatus(backup_id, BackupStatus::BACKUP_CREATED);
backup_query->cluster = context->getMacros()->expand(backup_query->cluster);
cluster = context->getCluster(backup_query->cluster);
backup_settings.cluster_host_ids = cluster->getHostIDs();
}
catch (...)
/// Make a backup coordination.
if (!backup_coordination)
backup_coordination = makeBackupCoordination(context, backup_settings, /* remote= */ on_cluster);
if (!allow_concurrent_backups && backup_coordination->hasConcurrentBackups(std::ref(num_active_backups)))
throw Exception(ErrorCodes::CONCURRENT_ACCESS_NOT_SUPPORTED, "Concurrent backups not supported, turn on setting 'allow_concurrent_backups'");
/// Opens a backup for writing.
BackupFactory::CreateParams backup_create_params;
backup_create_params.open_mode = IBackup::OpenMode::WRITE;
backup_create_params.context = context;
backup_create_params.backup_info = backup_info;
backup_create_params.base_backup_info = backup_settings.base_backup_info;
backup_create_params.compression_method = backup_settings.compression_method;
backup_create_params.compression_level = backup_settings.compression_level;
backup_create_params.password = backup_settings.password;
backup_create_params.s3_storage_class = backup_settings.s3_storage_class;
backup_create_params.is_internal_backup = backup_settings.internal;
backup_create_params.backup_coordination = backup_coordination;
backup_create_params.backup_uuid = backup_settings.backup_uuid;
backup_create_params.deduplicate_files = backup_settings.deduplicate_files;
backup_create_params.allow_s3_native_copy = backup_settings.allow_s3_native_copy;
backup_create_params.use_same_s3_credentials_for_base_backup = backup_settings.use_same_s3_credentials_for_base_backup;
backup_create_params.read_settings = getReadSettingsForBackup(context, backup_settings);
backup_create_params.write_settings = getWriteSettingsForBackup(context);
backup = BackupFactory::instance().createBackup(backup_create_params);
/// Write the backup.
if (on_cluster)
{
/// Something bad happened, the backup has not built.
if (called_async)
{
tryLogCurrentException(log, fmt::format("Failed to make {} {}", (backup_settings.internal ? "internal backup" : "backup"), backup_name_for_logging));
setStatusSafe(backup_id, getBackupStatusFromCurrentException());
sendCurrentExceptionToCoordination(backup_coordination);
}
else
{
/// setStatus() and sendCurrentExceptionToCoordination() will be called by startMakingBackup().
throw;
}
DDLQueryOnClusterParams params;
params.cluster = cluster;
params.only_shard_num = backup_settings.shard_num;
params.only_replica_num = backup_settings.replica_num;
params.access_to_check = required_access;
backup_settings.copySettingsToQuery(*backup_query);
// executeDDLQueryOnCluster() will return without waiting for completion
mutable_context->setSetting("distributed_ddl_task_timeout", Field{0});
mutable_context->setSetting("distributed_ddl_output_mode", Field{"none"});
executeDDLQueryOnCluster(backup_query, mutable_context, params);
/// Wait until all the hosts have written their backup entries.
backup_coordination->waitForStage(Stage::COMPLETED);
backup_coordination->setStage(Stage::COMPLETED,"");
}
else
{
backup_query->setCurrentDatabase(context->getCurrentDatabase());
/// Prepare backup entries.
BackupEntries backup_entries;
{
BackupEntriesCollector backup_entries_collector(
backup_query->elements, backup_settings, backup_coordination,
backup_create_params.read_settings, context, getThreadPool(ThreadPoolId::BACKUP_MAKE_FILES_LIST));
backup_entries = backup_entries_collector.run();
}
/// Write the backup entries to the backup.
chassert(backup);
chassert(backup_coordination);
chassert(context);
buildFileInfosForBackupEntries(backup, backup_entries, backup_create_params.read_settings, backup_coordination, context->getProcessListElement());
writeBackupEntries(backup, std::move(backup_entries), backup_id, backup_coordination, backup_settings.internal, context->getProcessListElement());
/// We have written our backup entries, we need to tell other hosts (they could be waiting for it).
backup_coordination->setStage(Stage::COMPLETED,"");
}
size_t num_files = 0;
UInt64 total_size = 0;
size_t num_entries = 0;
UInt64 uncompressed_size = 0;
UInt64 compressed_size = 0;
/// Finalize backup (write its metadata).
if (!backup_settings.internal)
{
backup->finalizeWriting();
num_files = backup->getNumFiles();
total_size = backup->getTotalSize();
num_entries = backup->getNumEntries();
uncompressed_size = backup->getUncompressedSize();
compressed_size = backup->getCompressedSize();
}
/// Close the backup.
backup.reset();
LOG_INFO(log, "{} {} was created successfully", (backup_settings.internal ? "Internal backup" : "Backup"), backup_name_for_logging);
/// NOTE: we need to update metadata again after backup->finalizeWriting(), because backup metadata is written there.
setNumFilesAndSize(backup_id, num_files, total_size, num_entries, uncompressed_size, compressed_size, 0, 0);
/// NOTE: setStatus is called after setNumFilesAndSize in order to have actual information in a backup log record
setStatus(backup_id, BackupStatus::BACKUP_CREATED);
}
@ -800,6 +796,9 @@ OperationID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePt
auto restore_query = std::static_pointer_cast<ASTBackupQuery>(query->clone());
auto restore_settings = RestoreSettings::fromRestoreQuery(*restore_query);
auto backup_info = BackupInfo::fromAST(*restore_query->backup_name);
String backup_name_for_logging = backup_info.toStringForLogging();
if (!restore_settings.restore_uuid)
restore_settings.restore_uuid = UUIDHelpers::generateV4();
@ -813,18 +812,19 @@ OperationID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePt
restore_id = toString(*restore_settings.restore_uuid);
std::shared_ptr<IRestoreCoordination> restore_coordination;
if (restore_settings.internal)
/// Called in exception handlers below. This lambda function can be called on a separate thread, so it can't capture local variables by reference.
auto on_exception = [this](const OperationID & restore_id_, const String & backup_name_for_logging_,
const RestoreSettings & restore_settings_, const std::shared_ptr<IRestoreCoordination> & restore_coordination_)
{
/// The following call of makeRestoreCoordination() is not essential because doRestore() will later create a restore coordination
/// if it's not created here. However to handle errors better it's better to make a coordination here because this way
/// if an exception will be thrown in startRestoring() other hosts will know about that.
restore_coordination = makeRestoreCoordination(context, restore_settings, /* remote= */ true);
}
/// Something bad happened, some data were not restored.
tryLogCurrentException(log, fmt::format("Failed to restore from {} {}", (restore_settings_.internal ? "internal backup" : "backup"), backup_name_for_logging_));
setStatusSafe(restore_id_, getRestoreStatusFromCurrentException());
sendCurrentExceptionToCoordination(restore_coordination_);
};
try
{
auto backup_info = BackupInfo::fromAST(*restore_query->backup_name);
String backup_name_for_logging = backup_info.toStringForLogging();
String base_backup_name;
if (restore_settings.base_backup_info)
base_backup_name = restore_settings.base_backup_info->toStringForLogging();
@ -837,6 +837,14 @@ OperationID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePt
context->getProcessListElement(),
BackupStatus::RESTORING);
if (restore_settings.internal)
{
/// The following call of makeRestoreCoordination() is not essential because doRestore() will later create a restore coordination
/// if it's not created here. However to handle errors better it's better to make a coordination here because this way
/// if an exception will be thrown in startRestoring() other hosts will know about that.
restore_coordination = makeRestoreCoordination(context, restore_settings, /* remote= */ true);
}
/// Prepare context to use.
ContextMutablePtr context_in_use = context;
bool on_cluster = !restore_query->cluster.empty();
@ -856,7 +864,7 @@ OperationID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePt
/// process_list_element_holder is used to make an element in ProcessList live while RESTORE is working asynchronously.
auto process_list_element = context_in_use->getProcessListElement();
thread_pool.scheduleOrThrowOnError(
scheduleFromThreadPool<void>(
[this,
restore_query,
restore_id,
@ -865,20 +873,27 @@ OperationID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePt
restore_settings,
restore_coordination,
context_in_use,
thread_group = CurrentThread::getGroup(),
on_exception,
process_list_element_holder = process_list_element ? process_list_element->getProcessListEntry() : nullptr]
{
doRestore(
restore_query,
restore_id,
backup_name_for_logging,
backup_info,
restore_settings,
restore_coordination,
context_in_use,
thread_group,
/* called_async= */ true);
});
try
{
doRestore(
restore_query,
restore_id,
backup_name_for_logging,
backup_info,
restore_settings,
restore_coordination,
context_in_use);
}
catch (...)
{
on_exception(restore_id, backup_name_for_logging, restore_settings, restore_coordination);
}
},
thread_pool,
"RestoreWorker");
}
else
{
@ -889,18 +904,14 @@ OperationID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePt
backup_info,
restore_settings,
restore_coordination,
context_in_use,
nullptr,
/* called_async= */ false);
context_in_use);
}
return restore_id;
}
catch (...)
{
/// Something bad happened, the backup has not built.
setStatusSafe(restore_id, getRestoreStatusFromCurrentException());
sendCurrentExceptionToCoordination(restore_coordination);
on_exception(restore_id, backup_name_for_logging, restore_settings, restore_coordination);
throw;
}
}
@ -913,133 +924,103 @@ void BackupsWorker::doRestore(
const BackupInfo & backup_info,
RestoreSettings restore_settings,
std::shared_ptr<IRestoreCoordination> restore_coordination,
ContextMutablePtr context,
ThreadGroupPtr thread_group,
bool called_async)
ContextMutablePtr context)
{
SCOPE_EXIT_SAFE(
if (called_async && thread_group)
CurrentThread::detachFromGroupIfNotDetached();
);
/// Open the backup for reading.
BackupFactory::CreateParams backup_open_params;
backup_open_params.open_mode = IBackup::OpenMode::READ;
backup_open_params.context = context;
backup_open_params.backup_info = backup_info;
backup_open_params.base_backup_info = restore_settings.base_backup_info;
backup_open_params.password = restore_settings.password;
backup_open_params.allow_s3_native_copy = restore_settings.allow_s3_native_copy;
backup_open_params.use_same_s3_credentials_for_base_backup = restore_settings.use_same_s3_credentials_for_base_backup;
backup_open_params.read_settings = getReadSettingsForRestore(context);
backup_open_params.write_settings = getWriteSettingsForRestore(context);
BackupPtr backup = BackupFactory::instance().createBackup(backup_open_params);
try
String current_database = context->getCurrentDatabase();
/// Checks access rights if this is ON CLUSTER query.
/// (If this isn't ON CLUSTER query RestorerFromBackup will check access rights later.)
ClusterPtr cluster;
bool on_cluster = !restore_query->cluster.empty();
if (on_cluster)
{
if (called_async && thread_group)
CurrentThread::attachToGroup(thread_group);
if (called_async)
setThreadName("RestoreWorker");
/// Open the backup for reading.
BackupFactory::CreateParams backup_open_params;
backup_open_params.open_mode = IBackup::OpenMode::READ;
backup_open_params.context = context;
backup_open_params.backup_info = backup_info;
backup_open_params.base_backup_info = restore_settings.base_backup_info;
backup_open_params.password = restore_settings.password;
backup_open_params.allow_s3_native_copy = restore_settings.allow_s3_native_copy;
backup_open_params.use_same_s3_credentials_for_base_backup = restore_settings.use_same_s3_credentials_for_base_backup;
backup_open_params.read_settings = getReadSettingsForRestore(context);
backup_open_params.write_settings = getWriteSettingsForRestore(context);
BackupPtr backup = BackupFactory::instance().createBackup(backup_open_params);
String current_database = context->getCurrentDatabase();
/// Checks access rights if this is ON CLUSTER query.
/// (If this isn't ON CLUSTER query RestorerFromBackup will check access rights later.)
ClusterPtr cluster;
bool on_cluster = !restore_query->cluster.empty();
if (on_cluster)
{
restore_query->cluster = context->getMacros()->expand(restore_query->cluster);
cluster = context->getCluster(restore_query->cluster);
restore_settings.cluster_host_ids = cluster->getHostIDs();
}
/// Make a restore coordination.
if (!restore_coordination)
restore_coordination = makeRestoreCoordination(context, restore_settings, /* remote= */ on_cluster);
if (!allow_concurrent_restores && restore_coordination->hasConcurrentRestores(std::ref(num_active_restores)))
throw Exception(
ErrorCodes::CONCURRENT_ACCESS_NOT_SUPPORTED,
"Concurrent restores not supported, turn on setting 'allow_concurrent_restores'");
if (on_cluster)
{
/// We cannot just use access checking provided by the function executeDDLQueryOnCluster(): it would be incorrect
/// because different replicas can contain different set of tables and so the required access rights can differ too.
/// So the right way is pass through the entire cluster and check access for each host.
auto addresses = cluster->filterAddressesByShardOrReplica(restore_settings.shard_num, restore_settings.replica_num);
for (const auto * address : addresses)
{
restore_settings.host_id = address->toString();
auto restore_elements = restore_query->elements;
String addr_database = address->default_database.empty() ? current_database : address->default_database;
for (auto & element : restore_elements)
element.setCurrentDatabase(addr_database);
RestorerFromBackup dummy_restorer{restore_elements, restore_settings, nullptr, backup, context};
dummy_restorer.run(RestorerFromBackup::CHECK_ACCESS_ONLY);
}
}
/// Do RESTORE.
if (on_cluster)
{
DDLQueryOnClusterParams params;
params.cluster = cluster;
params.only_shard_num = restore_settings.shard_num;
params.only_replica_num = restore_settings.replica_num;
restore_settings.copySettingsToQuery(*restore_query);
// executeDDLQueryOnCluster() will return without waiting for completion
context->setSetting("distributed_ddl_task_timeout", Field{0});
context->setSetting("distributed_ddl_output_mode", Field{"none"});
executeDDLQueryOnCluster(restore_query, context, params);
/// Wait until all the hosts have written their backup entries.
restore_coordination->waitForStage(Stage::COMPLETED);
restore_coordination->setStage(Stage::COMPLETED,"");
}
else
{
restore_query->setCurrentDatabase(current_database);
/// Restore metadata and prepare data restoring tasks.
DataRestoreTasks data_restore_tasks;
{
RestorerFromBackup restorer{restore_query->elements, restore_settings, restore_coordination,
backup, context};
data_restore_tasks = restorer.run(RestorerFromBackup::RESTORE);
}
/// Execute the data restoring tasks.
restoreTablesData(restore_id, backup, std::move(data_restore_tasks), getThreadPool(ThreadPoolId::RESTORE_TABLES_DATA), context->getProcessListElement());
/// We have restored everything, we need to tell other hosts (they could be waiting for it).
restore_coordination->setStage(Stage::COMPLETED, "");
}
LOG_INFO(log, "Restored from {} {} successfully", (restore_settings.internal ? "internal backup" : "backup"), backup_name_for_logging);
setStatus(restore_id, BackupStatus::RESTORED);
restore_query->cluster = context->getMacros()->expand(restore_query->cluster);
cluster = context->getCluster(restore_query->cluster);
restore_settings.cluster_host_ids = cluster->getHostIDs();
}
catch (...)
/// Make a restore coordination.
if (!restore_coordination)
restore_coordination = makeRestoreCoordination(context, restore_settings, /* remote= */ on_cluster);
if (!allow_concurrent_restores && restore_coordination->hasConcurrentRestores(std::ref(num_active_restores)))
throw Exception(
ErrorCodes::CONCURRENT_ACCESS_NOT_SUPPORTED,
"Concurrent restores not supported, turn on setting 'allow_concurrent_restores'");
if (on_cluster)
{
/// Something bad happened, the backup has not built.
if (called_async)
/// We cannot just use access checking provided by the function executeDDLQueryOnCluster(): it would be incorrect
/// because different replicas can contain different set of tables and so the required access rights can differ too.
/// So the right way is pass through the entire cluster and check access for each host.
auto addresses = cluster->filterAddressesByShardOrReplica(restore_settings.shard_num, restore_settings.replica_num);
for (const auto * address : addresses)
{
tryLogCurrentException(log, fmt::format("Failed to restore from {} {}", (restore_settings.internal ? "internal backup" : "backup"), backup_name_for_logging));
setStatusSafe(restore_id, getRestoreStatusFromCurrentException());
sendCurrentExceptionToCoordination(restore_coordination);
}
else
{
/// setStatus() and sendCurrentExceptionToCoordination() will be called by startRestoring().
throw;
restore_settings.host_id = address->toString();
auto restore_elements = restore_query->elements;
String addr_database = address->default_database.empty() ? current_database : address->default_database;
for (auto & element : restore_elements)
element.setCurrentDatabase(addr_database);
RestorerFromBackup dummy_restorer{restore_elements, restore_settings, nullptr, backup, context};
dummy_restorer.run(RestorerFromBackup::CHECK_ACCESS_ONLY);
}
}
/// Do RESTORE.
if (on_cluster)
{
DDLQueryOnClusterParams params;
params.cluster = cluster;
params.only_shard_num = restore_settings.shard_num;
params.only_replica_num = restore_settings.replica_num;
restore_settings.copySettingsToQuery(*restore_query);
// executeDDLQueryOnCluster() will return without waiting for completion
context->setSetting("distributed_ddl_task_timeout", Field{0});
context->setSetting("distributed_ddl_output_mode", Field{"none"});
executeDDLQueryOnCluster(restore_query, context, params);
/// Wait until all the hosts have written their backup entries.
restore_coordination->waitForStage(Stage::COMPLETED);
restore_coordination->setStage(Stage::COMPLETED,"");
}
else
{
restore_query->setCurrentDatabase(current_database);
/// Restore metadata and prepare data restoring tasks.
DataRestoreTasks data_restore_tasks;
{
RestorerFromBackup restorer{restore_query->elements, restore_settings, restore_coordination,
backup, context};
data_restore_tasks = restorer.run(RestorerFromBackup::RESTORE);
}
/// Execute the data restoring tasks.
restoreTablesData(restore_id, backup, std::move(data_restore_tasks), getThreadPool(ThreadPoolId::RESTORE_TABLES_DATA), context->getProcessListElement());
/// We have restored everything, we need to tell other hosts (they could be waiting for it).
restore_coordination->setStage(Stage::COMPLETED, "");
}
LOG_INFO(log, "Restored from {} {} successfully", (restore_settings.internal ? "internal backup" : "backup"), backup_name_for_logging);
setStatus(restore_id, BackupStatus::RESTORED);
}

View File

@ -38,20 +38,15 @@ class ProcessList;
class BackupsWorker
{
public:
BackupsWorker(
ContextMutablePtr global_context,
size_t num_backup_threads,
size_t num_restore_threads,
bool allow_concurrent_backups_,
bool allow_concurrent_restores_,
bool test_inject_sleep_);
BackupsWorker(ContextMutablePtr global_context, size_t num_backup_threads, size_t num_restore_threads);
~BackupsWorker();
/// Waits until all tasks have been completed.
void shutdown();
/// Starts executing a BACKUP or RESTORE query. Returns ID of the operation.
/// For asynchronous operations the function throws no exceptions on failure usually,
/// call getInfo() on a returned operation id to check for errors.
BackupOperationID start(const ASTPtr & backup_or_restore_query, ContextMutablePtr context);
/// Waits until the specified backup or restore operation finishes or stops.
@ -75,6 +70,7 @@ private:
BackupOperationID startMakingBackup(const ASTPtr & query, const ContextPtr & context);
void doBackup(
BackupMutablePtr & backup,
const std::shared_ptr<ASTBackupQuery> & backup_query,
const BackupOperationID & backup_id,
const String & backup_name_for_logging,
@ -82,9 +78,7 @@ private:
BackupSettings backup_settings,
std::shared_ptr<IBackupCoordination> backup_coordination,
const ContextPtr & context,
ContextMutablePtr mutable_context,
ThreadGroupPtr thread_group,
bool called_async);
ContextMutablePtr mutable_context);
/// Builds file infos for specified backup entries.
void buildFileInfosForBackupEntries(const BackupPtr & backup, const BackupEntries & backup_entries, const ReadSettings & read_settings, std::shared_ptr<IBackupCoordination> backup_coordination, QueryStatusPtr process_list_element);
@ -101,9 +95,7 @@ private:
const BackupInfo & backup_info,
RestoreSettings restore_settings,
std::shared_ptr<IRestoreCoordination> restore_coordination,
ContextMutablePtr context,
ThreadGroupPtr thread_group,
bool called_async);
ContextMutablePtr context);
/// Run data restoring tasks which insert data to tables.
void restoreTablesData(const BackupOperationID & restore_id, BackupPtr backup, DataRestoreTasks && tasks, ThreadPool & thread_pool, QueryStatusPtr process_list_element);
@ -126,6 +118,7 @@ private:
const bool allow_concurrent_backups;
const bool allow_concurrent_restores;
const bool remove_backup_files_after_failure;
const bool test_inject_sleep;
LoggerPtr log;

View File

@ -117,11 +117,14 @@ public:
/// Puts a new entry to the backup.
virtual void writeFile(const BackupFileInfo & file_info, BackupEntryPtr entry) = 0;
/// Whether it's possible to add new entries to the backup in multiple threads.
virtual bool supportsWritingInMultipleThreads() const = 0;
/// Finalizes writing the backup, should be called after all entries have been successfully written.
virtual void finalizeWriting() = 0;
/// Whether it's possible to add new entries to the backup in multiple threads.
virtual bool supportsWritingInMultipleThreads() const = 0;
/// Try to remove all files copied to the backup. Used after an exception or it the backup was cancelled.
virtual void tryRemoveAllFiles() = 0;
};
using BackupPtr = std::shared_ptr<const IBackup>;

View File

@ -2590,15 +2590,11 @@ BackupsWorker & Context::getBackupsWorker() const
{
callOnce(shared->backups_worker_initialized, [&] {
const auto & config = getConfigRef();
const bool allow_concurrent_backups = config.getBool("backups.allow_concurrent_backups", true);
const bool allow_concurrent_restores = config.getBool("backups.allow_concurrent_restores", true);
const bool test_inject_sleep = config.getBool("backups.test_inject_sleep", false);
const auto & settings_ref = getSettingsRef();
UInt64 backup_threads = config.getUInt64("backup_threads", settings_ref.backup_threads);
UInt64 restore_threads = config.getUInt64("restore_threads", settings_ref.restore_threads);
shared->backups_worker.emplace(getGlobalContext(), backup_threads, restore_threads, allow_concurrent_backups, allow_concurrent_restores, test_inject_sleep);
shared->backups_worker.emplace(getGlobalContext(), backup_threads, restore_threads);
});
return *shared->backups_worker;