diff --git a/src/Backups/BackupUtils.cpp b/src/Backups/BackupUtils.cpp index c6a0840964b..89b75a103c2 100644 --- a/src/Backups/BackupUtils.cpp +++ b/src/Backups/BackupUtils.cpp @@ -1,10 +1,7 @@ #include -#include -#include #include #include #include -#include #include @@ -60,140 +57,6 @@ DDLRenamingMap makeRenamingMapFromBackupQuery(const ASTBackupQuery::Elements & e } -void writeBackupEntries(BackupMutablePtr backup, BackupEntries && backup_entries, ThreadPool & thread_pool) -{ - size_t num_active_jobs = 0; - std::mutex mutex; - std::condition_variable event; - std::exception_ptr exception; - - bool always_single_threaded = !backup->supportsWritingInMultipleThreads(); - auto thread_group = CurrentThread::getGroup(); - - for (auto & name_and_entry : backup_entries) - { - auto & name = name_and_entry.first; - auto & entry = name_and_entry.second; - - { - std::unique_lock lock{mutex}; - if (exception) - break; - ++num_active_jobs; - } - - auto job = [&](bool async) - { - SCOPE_EXIT_SAFE( - std::lock_guard lock{mutex}; - if (!--num_active_jobs) - event.notify_all(); - if (async) - CurrentThread::detachFromGroupIfNotDetached(); - ); - - try - { - if (async && thread_group) - CurrentThread::attachToGroup(thread_group); - - if (async) - setThreadName("BackupWorker"); - - { - std::lock_guard lock{mutex}; - if (exception) - return; - } - - backup->writeFile(name, std::move(entry)); - } - catch (...) - { - std::lock_guard lock{mutex}; - if (!exception) - exception = std::current_exception(); - } - }; - - if (always_single_threaded || !thread_pool.trySchedule([job] { job(true); })) - job(false); - } - - { - std::unique_lock lock{mutex}; - event.wait(lock, [&] { return !num_active_jobs; }); - if (exception) - std::rethrow_exception(exception); - } -} - - -void restoreTablesData(DataRestoreTasks && tasks, ThreadPool & thread_pool) -{ - size_t num_active_jobs = 0; - std::mutex mutex; - std::condition_variable event; - std::exception_ptr exception; - - auto thread_group = CurrentThread::getGroup(); - - for (auto & task : tasks) - { - { - std::unique_lock lock{mutex}; - if (exception) - break; - ++num_active_jobs; - } - - auto job = [&](bool async) - { - SCOPE_EXIT_SAFE( - std::lock_guard lock{mutex}; - if (!--num_active_jobs) - event.notify_all(); - if (async) - CurrentThread::detachFromGroupIfNotDetached(); - ); - - try - { - if (async && thread_group) - CurrentThread::attachToGroup(thread_group); - - if (async) - setThreadName("RestoreWorker"); - - { - std::lock_guard lock{mutex}; - if (exception) - return; - } - - std::move(task)(); - } - catch (...) - { - std::lock_guard lock{mutex}; - if (!exception) - exception = std::current_exception(); - } - }; - - if (!thread_pool.trySchedule([job] { job(true); })) - job(false); - } - - { - std::unique_lock lock{mutex}; - event.wait(lock, [&] { return !num_active_jobs; }); - if (exception) - std::rethrow_exception(exception); - } -} - - /// Returns access required to execute BACKUP query. AccessRightsElements getRequiredAccessToBackup(const ASTBackupQuery::Elements & elements) { diff --git a/src/Backups/BackupUtils.h b/src/Backups/BackupUtils.h index cda9121b1fa..f451b003652 100644 --- a/src/Backups/BackupUtils.h +++ b/src/Backups/BackupUtils.h @@ -7,21 +7,12 @@ namespace DB { class IBackup; -using BackupMutablePtr = std::shared_ptr; -class IBackupEntry; -using BackupEntries = std::vector>>; -using DataRestoreTasks = std::vector>; class AccessRightsElements; class DDLRenamingMap; /// Initializes a DDLRenamingMap from a BACKUP or RESTORE query. DDLRenamingMap makeRenamingMapFromBackupQuery(const ASTBackupQuery::Elements & elements); -/// Write backup entries to an opened backup. -void writeBackupEntries(BackupMutablePtr backup, BackupEntries && backup_entries, ThreadPool & thread_pool); - -/// Run data restoring tasks which insert data to tables. -void restoreTablesData(DataRestoreTasks && tasks, ThreadPool & thread_pool); /// Returns access required to execute BACKUP query. AccessRightsElements getRequiredAccessToBackup(const ASTBackupQuery::Elements & elements); diff --git a/src/Backups/BackupsWorker.cpp b/src/Backups/BackupsWorker.cpp index bdcff249e7d..58185053124 100644 --- a/src/Backups/BackupsWorker.cpp +++ b/src/Backups/BackupsWorker.cpp @@ -21,6 +21,7 @@ #include #include #include +#include namespace DB @@ -346,7 +347,7 @@ void BackupsWorker::doBackup( } /// Write the backup entries to the backup. - writeBackupEntries(backup, std::move(backup_entries), backups_thread_pool); + writeBackupEntries(backup_id, backup, std::move(backup_entries), backups_thread_pool, backup_settings.internal); /// We have written our backup entries, we need to tell other hosts (they could be waiting for it). backup_coordination->setStage(backup_settings.host_id, Stage::COMPLETED, ""); @@ -374,6 +375,7 @@ void BackupsWorker::doBackup( LOG_INFO(log, "{} {} was created successfully", (backup_settings.internal ? "Internal backup" : "Backup"), backup_name_for_logging); setStatus(backup_id, BackupStatus::BACKUP_CREATED); + /// NOTE: we need to update metadata again after backup->finalizeWriting(), because backup metadata is written there. setNumFilesAndSize(backup_id, num_files, total_size, num_entries, uncompressed_size, compressed_size, 0, 0); } catch (...) @@ -394,6 +396,88 @@ void BackupsWorker::doBackup( } +void BackupsWorker::writeBackupEntries(const OperationID & backup_id, BackupMutablePtr backup, BackupEntries && backup_entries, ThreadPool & thread_pool, bool internal) +{ + size_t num_active_jobs = 0; + std::mutex mutex; + std::condition_variable event; + std::exception_ptr exception; + + bool always_single_threaded = !backup->supportsWritingInMultipleThreads(); + auto thread_group = CurrentThread::getGroup(); + + for (auto & name_and_entry : backup_entries) + { + auto & name = name_and_entry.first; + auto & entry = name_and_entry.second; + + { + std::unique_lock lock{mutex}; + if (exception) + break; + ++num_active_jobs; + } + + auto job = [&](bool async) + { + SCOPE_EXIT_SAFE( + std::lock_guard lock{mutex}; + if (!--num_active_jobs) + event.notify_all(); + if (async) + CurrentThread::detachFromGroupIfNotDetached(); + ); + + try + { + if (async && thread_group) + CurrentThread::attachToGroup(thread_group); + + if (async) + setThreadName("BackupWorker"); + + { + std::lock_guard lock{mutex}; + if (exception) + return; + } + + backup->writeFile(name, std::move(entry)); + // Update metadata + if (!internal) + { + setNumFilesAndSize( + backup_id, + backup->getNumFiles(), + backup->getTotalSize(), + backup->getNumEntries(), + backup->getUncompressedSize(), + backup->getCompressedSize(), + 0, 0); + } + + } + catch (...) + { + std::lock_guard lock{mutex}; + if (!exception) + exception = std::current_exception(); + } + }; + + if (always_single_threaded || !thread_pool.trySchedule([job] { job(true); })) + job(false); + } + + { + std::unique_lock lock{mutex}; + event.wait(lock, [&] { return !num_active_jobs; }); + if (exception) + std::rethrow_exception(exception); + } +} + + OperationID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePtr context) { auto restore_query = std::static_pointer_cast(query->clone()); @@ -578,7 +662,7 @@ void BackupsWorker::doRestore( } /// Execute the data restoring tasks. - restoreTablesData(std::move(data_restore_tasks), restores_thread_pool); + restoreTablesData(restore_id, backup, std::move(data_restore_tasks), restores_thread_pool); /// We have restored everything, we need to tell other hosts (they could be waiting for it). restore_coordination->setStage(restore_settings.host_id, Stage::COMPLETED, ""); @@ -586,15 +670,6 @@ void BackupsWorker::doRestore( LOG_INFO(log, "Restored from {} {} successfully", (restore_settings.internal ? "internal backup" : "backup"), backup_name_for_logging); setStatus(restore_id, BackupStatus::RESTORED); - setNumFilesAndSize( - restore_id, - backup->getNumFiles(), - backup->getTotalSize(), - backup->getNumEntries(), - backup->getUncompressedSize(), - backup->getCompressedSize(), - backup->getNumReadFiles(), - backup->getNumReadBytes()); } catch (...) { @@ -614,6 +689,80 @@ void BackupsWorker::doRestore( } +void BackupsWorker::restoreTablesData(const OperationID & restore_id, BackupPtr backup, DataRestoreTasks && tasks, ThreadPool & thread_pool) +{ + size_t num_active_jobs = 0; + std::mutex mutex; + std::condition_variable event; + std::exception_ptr exception; + + auto thread_group = CurrentThread::getGroup(); + + for (auto & task : tasks) + { + { + std::unique_lock lock{mutex}; + if (exception) + break; + ++num_active_jobs; + } + + auto job = [&](bool async) + { + SCOPE_EXIT_SAFE( + std::lock_guard lock{mutex}; + if (!--num_active_jobs) + event.notify_all(); + if (async) + CurrentThread::detachFromGroupIfNotDetached(); + ); + + try + { + if (async && thread_group) + CurrentThread::attachToGroup(thread_group); + + if (async) + setThreadName("RestoreWorker"); + + { + std::lock_guard lock{mutex}; + if (exception) + return; + } + + std::move(task)(); + setNumFilesAndSize( + restore_id, + backup->getNumFiles(), + backup->getTotalSize(), + backup->getNumEntries(), + backup->getUncompressedSize(), + backup->getCompressedSize(), + backup->getNumReadFiles(), + backup->getNumReadBytes()); + } + catch (...) + { + std::lock_guard lock{mutex}; + if (!exception) + exception = std::current_exception(); + } + }; + + if (!thread_pool.trySchedule([job] { job(true); })) + job(false); + } + + { + std::unique_lock lock{mutex}; + event.wait(lock, [&] { return !num_active_jobs; }); + if (exception) + std::rethrow_exception(exception); + } +} + + void BackupsWorker::addInfo(const OperationID & id, const String & name, bool internal, BackupStatus status) { Info info; diff --git a/src/Backups/BackupsWorker.h b/src/Backups/BackupsWorker.h index 0f5c16cd71f..c36b58da14f 100644 --- a/src/Backups/BackupsWorker.h +++ b/src/Backups/BackupsWorker.h @@ -17,6 +17,12 @@ struct RestoreSettings; struct BackupInfo; class IBackupCoordination; class IRestoreCoordination; +class IBackup; +using BackupMutablePtr = std::shared_ptr; +using BackupPtr = std::shared_ptr; +class IBackupEntry; +using BackupEntries = std::vector>>; +using DataRestoreTasks = std::vector>; /// Manager of backups and restores: executes backups and restores' threads in the background. /// Keeps information about backups and restores started in this session. @@ -99,6 +105,9 @@ private: ContextMutablePtr mutable_context, bool called_async); + /// Write backup entries to an opened backup. + void writeBackupEntries(const OperationID & backup_id, BackupMutablePtr backup, BackupEntries && backup_entries, ThreadPool & thread_pool, bool internal); + OperationID startRestoring(const ASTPtr & query, ContextMutablePtr context); void doRestore( @@ -111,6 +120,9 @@ private: ContextMutablePtr context, bool called_async); + /// Run data restoring tasks which insert data to tables. + void restoreTablesData(const OperationID & restore_id, BackupPtr backup, DataRestoreTasks && tasks, ThreadPool & thread_pool); + void addInfo(const OperationID & id, const String & name, bool internal, BackupStatus status); void setStatus(const OperationID & id, BackupStatus status, bool throw_if_error = true); void setStatusSafe(const String & id, BackupStatus status) { setStatus(id, status, false); }