dynamic update system.backups

This commit is contained in:
AVMusorin 2023-02-27 15:42:38 +01:00
parent 4303b91ee7
commit 038bfb40ab
No known key found for this signature in database
GPG Key ID: 6D8C8B3C90094A13
4 changed files with 172 additions and 157 deletions

View File

@ -1,10 +1,7 @@
#include <Backups/BackupUtils.h> #include <Backups/BackupUtils.h>
#include <Backups/IBackup.h>
#include <Backups/RestoreSettings.h>
#include <Access/Common/AccessRightsElement.h> #include <Access/Common/AccessRightsElement.h>
#include <Databases/DDLRenamingVisitor.h> #include <Databases/DDLRenamingVisitor.h>
#include <Interpreters/DatabaseCatalog.h> #include <Interpreters/DatabaseCatalog.h>
#include <Common/scope_guard_safe.h>
#include <Common/setThreadName.h> #include <Common/setThreadName.h>
@ -60,140 +57,6 @@ DDLRenamingMap makeRenamingMapFromBackupQuery(const ASTBackupQuery::Elements & e
} }
void writeBackupEntries(BackupMutablePtr backup, BackupEntries && backup_entries, ThreadPool & thread_pool)
{
size_t num_active_jobs = 0;
std::mutex mutex;
std::condition_variable event;
std::exception_ptr exception;
bool always_single_threaded = !backup->supportsWritingInMultipleThreads();
auto thread_group = CurrentThread::getGroup();
for (auto & name_and_entry : backup_entries)
{
auto & name = name_and_entry.first;
auto & entry = name_and_entry.second;
{
std::unique_lock lock{mutex};
if (exception)
break;
++num_active_jobs;
}
auto job = [&](bool async)
{
SCOPE_EXIT_SAFE(
std::lock_guard lock{mutex};
if (!--num_active_jobs)
event.notify_all();
if (async)
CurrentThread::detachFromGroupIfNotDetached();
);
try
{
if (async && thread_group)
CurrentThread::attachToGroup(thread_group);
if (async)
setThreadName("BackupWorker");
{
std::lock_guard lock{mutex};
if (exception)
return;
}
backup->writeFile(name, std::move(entry));
}
catch (...)
{
std::lock_guard lock{mutex};
if (!exception)
exception = std::current_exception();
}
};
if (always_single_threaded || !thread_pool.trySchedule([job] { job(true); }))
job(false);
}
{
std::unique_lock lock{mutex};
event.wait(lock, [&] { return !num_active_jobs; });
if (exception)
std::rethrow_exception(exception);
}
}
void restoreTablesData(DataRestoreTasks && tasks, ThreadPool & thread_pool)
{
size_t num_active_jobs = 0;
std::mutex mutex;
std::condition_variable event;
std::exception_ptr exception;
auto thread_group = CurrentThread::getGroup();
for (auto & task : tasks)
{
{
std::unique_lock lock{mutex};
if (exception)
break;
++num_active_jobs;
}
auto job = [&](bool async)
{
SCOPE_EXIT_SAFE(
std::lock_guard lock{mutex};
if (!--num_active_jobs)
event.notify_all();
if (async)
CurrentThread::detachFromGroupIfNotDetached();
);
try
{
if (async && thread_group)
CurrentThread::attachToGroup(thread_group);
if (async)
setThreadName("RestoreWorker");
{
std::lock_guard lock{mutex};
if (exception)
return;
}
std::move(task)();
}
catch (...)
{
std::lock_guard lock{mutex};
if (!exception)
exception = std::current_exception();
}
};
if (!thread_pool.trySchedule([job] { job(true); }))
job(false);
}
{
std::unique_lock lock{mutex};
event.wait(lock, [&] { return !num_active_jobs; });
if (exception)
std::rethrow_exception(exception);
}
}
/// Returns access required to execute BACKUP query. /// Returns access required to execute BACKUP query.
AccessRightsElements getRequiredAccessToBackup(const ASTBackupQuery::Elements & elements) AccessRightsElements getRequiredAccessToBackup(const ASTBackupQuery::Elements & elements)
{ {

View File

@ -7,21 +7,12 @@
namespace DB namespace DB
{ {
class IBackup; class IBackup;
using BackupMutablePtr = std::shared_ptr<IBackup>;
class IBackupEntry;
using BackupEntries = std::vector<std::pair<String, std::shared_ptr<const IBackupEntry>>>;
using DataRestoreTasks = std::vector<std::function<void()>>;
class AccessRightsElements; class AccessRightsElements;
class DDLRenamingMap; class DDLRenamingMap;
/// Initializes a DDLRenamingMap from a BACKUP or RESTORE query. /// Initializes a DDLRenamingMap from a BACKUP or RESTORE query.
DDLRenamingMap makeRenamingMapFromBackupQuery(const ASTBackupQuery::Elements & elements); DDLRenamingMap makeRenamingMapFromBackupQuery(const ASTBackupQuery::Elements & elements);
/// Write backup entries to an opened backup.
void writeBackupEntries(BackupMutablePtr backup, BackupEntries && backup_entries, ThreadPool & thread_pool);
/// Run data restoring tasks which insert data to tables.
void restoreTablesData(DataRestoreTasks && tasks, ThreadPool & thread_pool);
/// Returns access required to execute BACKUP query. /// Returns access required to execute BACKUP query.
AccessRightsElements getRequiredAccessToBackup(const ASTBackupQuery::Elements & elements); AccessRightsElements getRequiredAccessToBackup(const ASTBackupQuery::Elements & elements);

View File

@ -21,6 +21,7 @@
#include <Common/Macros.h> #include <Common/Macros.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <Common/setThreadName.h> #include <Common/setThreadName.h>
#include <Common/scope_guard_safe.h>
namespace DB namespace DB
@ -346,7 +347,7 @@ void BackupsWorker::doBackup(
} }
/// Write the backup entries to the backup. /// Write the backup entries to the backup.
writeBackupEntries(backup, std::move(backup_entries), backups_thread_pool); writeBackupEntries(backup_id, backup, std::move(backup_entries), backups_thread_pool, backup_settings.internal);
/// We have written our backup entries, we need to tell other hosts (they could be waiting for it). /// We have written our backup entries, we need to tell other hosts (they could be waiting for it).
backup_coordination->setStage(backup_settings.host_id, Stage::COMPLETED, ""); backup_coordination->setStage(backup_settings.host_id, Stage::COMPLETED, "");
@ -374,6 +375,7 @@ void BackupsWorker::doBackup(
LOG_INFO(log, "{} {} was created successfully", (backup_settings.internal ? "Internal backup" : "Backup"), backup_name_for_logging); LOG_INFO(log, "{} {} was created successfully", (backup_settings.internal ? "Internal backup" : "Backup"), backup_name_for_logging);
setStatus(backup_id, BackupStatus::BACKUP_CREATED); setStatus(backup_id, BackupStatus::BACKUP_CREATED);
/// NOTE: we need to update metadata again after backup->finalizeWriting(), because backup metadata is written there.
setNumFilesAndSize(backup_id, num_files, total_size, num_entries, uncompressed_size, compressed_size, 0, 0); setNumFilesAndSize(backup_id, num_files, total_size, num_entries, uncompressed_size, compressed_size, 0, 0);
} }
catch (...) catch (...)
@ -394,6 +396,88 @@ void BackupsWorker::doBackup(
} }
void BackupsWorker::writeBackupEntries(const OperationID & backup_id, BackupMutablePtr backup, BackupEntries && backup_entries, ThreadPool & thread_pool, bool internal)
{
size_t num_active_jobs = 0;
std::mutex mutex;
std::condition_variable event;
std::exception_ptr exception;
bool always_single_threaded = !backup->supportsWritingInMultipleThreads();
auto thread_group = CurrentThread::getGroup();
for (auto & name_and_entry : backup_entries)
{
auto & name = name_and_entry.first;
auto & entry = name_and_entry.second;
{
std::unique_lock lock{mutex};
if (exception)
break;
++num_active_jobs;
}
auto job = [&](bool async)
{
SCOPE_EXIT_SAFE(
std::lock_guard lock{mutex};
if (!--num_active_jobs)
event.notify_all();
if (async)
CurrentThread::detachFromGroupIfNotDetached();
);
try
{
if (async && thread_group)
CurrentThread::attachToGroup(thread_group);
if (async)
setThreadName("BackupWorker");
{
std::lock_guard lock{mutex};
if (exception)
return;
}
backup->writeFile(name, std::move(entry));
// Update metadata
if (!internal)
{
setNumFilesAndSize(
backup_id,
backup->getNumFiles(),
backup->getTotalSize(),
backup->getNumEntries(),
backup->getUncompressedSize(),
backup->getCompressedSize(),
0, 0);
}
}
catch (...)
{
std::lock_guard lock{mutex};
if (!exception)
exception = std::current_exception();
}
};
if (always_single_threaded || !thread_pool.trySchedule([job] { job(true); }))
job(false);
}
{
std::unique_lock lock{mutex};
event.wait(lock, [&] { return !num_active_jobs; });
if (exception)
std::rethrow_exception(exception);
}
}
OperationID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePtr context) OperationID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePtr context)
{ {
auto restore_query = std::static_pointer_cast<ASTBackupQuery>(query->clone()); auto restore_query = std::static_pointer_cast<ASTBackupQuery>(query->clone());
@ -578,7 +662,7 @@ void BackupsWorker::doRestore(
} }
/// Execute the data restoring tasks. /// Execute the data restoring tasks.
restoreTablesData(std::move(data_restore_tasks), restores_thread_pool); restoreTablesData(restore_id, backup, std::move(data_restore_tasks), restores_thread_pool);
/// We have restored everything, we need to tell other hosts (they could be waiting for it). /// We have restored everything, we need to tell other hosts (they could be waiting for it).
restore_coordination->setStage(restore_settings.host_id, Stage::COMPLETED, ""); restore_coordination->setStage(restore_settings.host_id, Stage::COMPLETED, "");
@ -586,15 +670,6 @@ void BackupsWorker::doRestore(
LOG_INFO(log, "Restored from {} {} successfully", (restore_settings.internal ? "internal backup" : "backup"), backup_name_for_logging); LOG_INFO(log, "Restored from {} {} successfully", (restore_settings.internal ? "internal backup" : "backup"), backup_name_for_logging);
setStatus(restore_id, BackupStatus::RESTORED); setStatus(restore_id, BackupStatus::RESTORED);
setNumFilesAndSize(
restore_id,
backup->getNumFiles(),
backup->getTotalSize(),
backup->getNumEntries(),
backup->getUncompressedSize(),
backup->getCompressedSize(),
backup->getNumReadFiles(),
backup->getNumReadBytes());
} }
catch (...) catch (...)
{ {
@ -614,6 +689,80 @@ void BackupsWorker::doRestore(
} }
void BackupsWorker::restoreTablesData(const OperationID & restore_id, BackupPtr backup, DataRestoreTasks && tasks, ThreadPool & thread_pool)
{
size_t num_active_jobs = 0;
std::mutex mutex;
std::condition_variable event;
std::exception_ptr exception;
auto thread_group = CurrentThread::getGroup();
for (auto & task : tasks)
{
{
std::unique_lock lock{mutex};
if (exception)
break;
++num_active_jobs;
}
auto job = [&](bool async)
{
SCOPE_EXIT_SAFE(
std::lock_guard lock{mutex};
if (!--num_active_jobs)
event.notify_all();
if (async)
CurrentThread::detachFromGroupIfNotDetached();
);
try
{
if (async && thread_group)
CurrentThread::attachToGroup(thread_group);
if (async)
setThreadName("RestoreWorker");
{
std::lock_guard lock{mutex};
if (exception)
return;
}
std::move(task)();
setNumFilesAndSize(
restore_id,
backup->getNumFiles(),
backup->getTotalSize(),
backup->getNumEntries(),
backup->getUncompressedSize(),
backup->getCompressedSize(),
backup->getNumReadFiles(),
backup->getNumReadBytes());
}
catch (...)
{
std::lock_guard lock{mutex};
if (!exception)
exception = std::current_exception();
}
};
if (!thread_pool.trySchedule([job] { job(true); }))
job(false);
}
{
std::unique_lock lock{mutex};
event.wait(lock, [&] { return !num_active_jobs; });
if (exception)
std::rethrow_exception(exception);
}
}
void BackupsWorker::addInfo(const OperationID & id, const String & name, bool internal, BackupStatus status) void BackupsWorker::addInfo(const OperationID & id, const String & name, bool internal, BackupStatus status)
{ {
Info info; Info info;

View File

@ -17,6 +17,12 @@ struct RestoreSettings;
struct BackupInfo; struct BackupInfo;
class IBackupCoordination; class IBackupCoordination;
class IRestoreCoordination; class IRestoreCoordination;
class IBackup;
using BackupMutablePtr = std::shared_ptr<IBackup>;
using BackupPtr = std::shared_ptr<const IBackup>;
class IBackupEntry;
using BackupEntries = std::vector<std::pair<String, std::shared_ptr<const IBackupEntry>>>;
using DataRestoreTasks = std::vector<std::function<void()>>;
/// Manager of backups and restores: executes backups and restores' threads in the background. /// Manager of backups and restores: executes backups and restores' threads in the background.
/// Keeps information about backups and restores started in this session. /// Keeps information about backups and restores started in this session.
@ -99,6 +105,9 @@ private:
ContextMutablePtr mutable_context, ContextMutablePtr mutable_context,
bool called_async); bool called_async);
/// Write backup entries to an opened backup.
void writeBackupEntries(const OperationID & backup_id, BackupMutablePtr backup, BackupEntries && backup_entries, ThreadPool & thread_pool, bool internal);
OperationID startRestoring(const ASTPtr & query, ContextMutablePtr context); OperationID startRestoring(const ASTPtr & query, ContextMutablePtr context);
void doRestore( void doRestore(
@ -111,6 +120,9 @@ private:
ContextMutablePtr context, ContextMutablePtr context,
bool called_async); bool called_async);
/// Run data restoring tasks which insert data to tables.
void restoreTablesData(const OperationID & restore_id, BackupPtr backup, DataRestoreTasks && tasks, ThreadPool & thread_pool);
void addInfo(const OperationID & id, const String & name, bool internal, BackupStatus status); void addInfo(const OperationID & id, const String & name, bool internal, BackupStatus status);
void setStatus(const OperationID & id, BackupStatus status, bool throw_if_error = true); void setStatus(const OperationID & id, BackupStatus status, bool throw_if_error = true);
void setStatusSafe(const String & id, BackupStatus status) { setStatus(id, status, false); } void setStatusSafe(const String & id, BackupStatus status) { setStatus(id, status, false); }