Use multiple threads while reading the metadata of tables to restore.

This commit is contained in:
Vitaly Baranov 2024-02-15 19:47:49 +01:00
parent f0a45970fa
commit f238999095
3 changed files with 348 additions and 216 deletions

View File

@ -271,8 +271,8 @@ enum class BackupsWorker::ThreadPoolId
/// "RESTORE ASYNC" waits in background while the data of all tables are restored.
RESTORE_ASYNC,
/// Restores the data of tables.
RESTORE_TABLES_DATA,
/// Restores from backups.
RESTORE,
};
@ -320,13 +320,13 @@ public:
case ThreadPoolId::RESTORE_ASYNC:
case ThreadPoolId::RESTORE_ASYNC_ON_CLUSTER:
case ThreadPoolId::RESTORE_TABLES_DATA:
case ThreadPoolId::RESTORE:
{
metric_threads = CurrentMetrics::RestoreThreads;
metric_active_threads = CurrentMetrics::RestoreThreadsActive;
metric_active_threads = CurrentMetrics::RestoreThreadsScheduled;
max_threads = num_restore_threads;
use_queue = (thread_pool_id != ThreadPoolId::RESTORE_TABLES_DATA);
use_queue = (thread_pool_id != ThreadPoolId::RESTORE);
break;
}
}
@ -347,7 +347,7 @@ public:
auto wait_sequence = {
ThreadPoolId::RESTORE_ASYNC_ON_CLUSTER,
ThreadPoolId::RESTORE_ASYNC,
ThreadPoolId::RESTORE_TABLES_DATA,
ThreadPoolId::RESTORE,
ThreadPoolId::BACKUP_ASYNC_ON_CLUSTER,
ThreadPoolId::BACKUP_ASYNC,
ThreadPoolId::BACKUP_COPY_FILES,
@ -975,7 +975,7 @@ void BackupsWorker::doRestore(
String addr_database = address->default_database.empty() ? current_database : address->default_database;
for (auto & element : restore_elements)
element.setCurrentDatabase(addr_database);
RestorerFromBackup dummy_restorer{restore_elements, restore_settings, nullptr, backup, context};
RestorerFromBackup dummy_restorer{restore_elements, restore_settings, nullptr, backup, context, getThreadPool(ThreadPoolId::RESTORE), {}};
dummy_restorer.run(RestorerFromBackup::CHECK_ACCESS_ONLY);
}
}
@ -1004,19 +1004,17 @@ void BackupsWorker::doRestore(
{
restore_query->setCurrentDatabase(current_database);
/// Restore metadata and prepare data restoring tasks.
DataRestoreTasks data_restore_tasks;
auto after_each_task_callback = [&]
{
RestorerFromBackup restorer{restore_query->elements, restore_settings, restore_coordination,
backup, context};
data_restore_tasks = restorer.run(RestorerFromBackup::RESTORE);
}
maybeSleepForTesting();
setNumFilesAndSize(restore_id, backup->getNumFiles(), backup->getTotalSize(), backup->getNumEntries(),
backup->getUncompressedSize(), backup->getCompressedSize(), backup->getNumReadFiles(), backup->getNumReadBytes());
};
/// Execute the data restoring tasks.
restoreTablesData(restore_id, backup, std::move(data_restore_tasks), getThreadPool(ThreadPoolId::RESTORE_TABLES_DATA), context->getProcessListElement());
/// We have restored everything, we need to tell other hosts (they could be waiting for it).
restore_coordination->setStage(Stage::COMPLETED, "");
/// Restore from the backup.
RestorerFromBackup restorer{restore_query->elements, restore_settings, restore_coordination,
backup, context, getThreadPool(ThreadPoolId::RESTORE), after_each_task_callback};
restorer.run(RestorerFromBackup::RESTORE);
}
LOG_INFO(log, "Restored from {} {} successfully", (restore_settings.internal ? "internal backup" : "backup"), backup_name_for_logging);
@ -1024,83 +1022,6 @@ void BackupsWorker::doRestore(
}
void BackupsWorker::restoreTablesData(const OperationID & restore_id, BackupPtr backup, DataRestoreTasks && tasks, ThreadPool & thread_pool, QueryStatusPtr process_list_element)
{
size_t num_active_jobs = 0;
std::mutex mutex;
std::condition_variable event;
std::exception_ptr exception;
auto thread_group = CurrentThread::getGroup();
for (auto & task : tasks)
{
{
std::unique_lock lock{mutex};
if (exception)
break;
++num_active_jobs;
}
auto job = [&]()
{
SCOPE_EXIT_SAFE(
std::lock_guard lock{mutex};
if (!--num_active_jobs)
event.notify_all();
CurrentThread::detachFromGroupIfNotDetached();
);
try
{
if (thread_group)
CurrentThread::attachToGroup(thread_group);
setThreadName("RestoreWorker");
{
std::lock_guard lock{mutex};
if (exception)
return;
}
if (process_list_element)
process_list_element->checkTimeLimit();
std::move(task)();
maybeSleepForTesting();
setNumFilesAndSize(
restore_id,
backup->getNumFiles(),
backup->getTotalSize(),
backup->getNumEntries(),
backup->getUncompressedSize(),
backup->getCompressedSize(),
backup->getNumReadFiles(),
backup->getNumReadBytes());
}
catch (...)
{
std::lock_guard lock{mutex};
if (!exception)
exception = std::current_exception();
}
};
thread_pool.scheduleOrThrowOnError(job);
}
{
std::unique_lock lock{mutex};
event.wait(lock, [&] { return !num_active_jobs; });
if (exception)
std::rethrow_exception(exception);
}
}
void BackupsWorker::addInfo(const OperationID & id, const String & name, const String & base_backup_name, const String & query_id,
bool internal, QueryStatusPtr process_list_element, BackupStatus status)
{

View File

@ -25,6 +25,7 @@
#include <base/insertAtEnd.h>
#include <boost/algorithm/string/join.hpp>
#include <filesystem>
#include <ranges>
namespace fs = std::filesystem;
@ -80,23 +81,34 @@ RestorerFromBackup::RestorerFromBackup(
const RestoreSettings & restore_settings_,
std::shared_ptr<IRestoreCoordination> restore_coordination_,
const BackupPtr & backup_,
const ContextMutablePtr & context_)
const ContextMutablePtr & context_,
ThreadPool & thread_pool_,
const std::function<void()> & after_each_task_callback_)
: restore_query_elements(restore_query_elements_)
, restore_settings(restore_settings_)
, restore_coordination(restore_coordination_)
, backup(backup_)
, context(context_)
, process_list_element(context->getProcessListElement())
, after_each_task_callback(after_each_task_callback_)
, on_cluster_first_sync_timeout(context->getConfigRef().getUInt64("backups.on_cluster_first_sync_timeout", 180000))
, create_table_timeout(context->getConfigRef().getUInt64("backups.create_table_timeout", 300000))
, log(getLogger("RestorerFromBackup"))
, tables_dependencies("RestorerFromBackup")
, thread_pool(thread_pool_)
{
}
RestorerFromBackup::~RestorerFromBackup() = default;
RestorerFromBackup::~RestorerFromBackup()
{
if (!futures.empty())
{
LOG_ERROR(log, "RestorerFromBackup must not be destroyed while {} tasks are still running", futures.size());
chassert(false && "RestorerFromBackup must not be destroyed while some tasks are still running");
}
}
RestorerFromBackup::DataRestoreTasks RestorerFromBackup::run(Mode mode)
void RestorerFromBackup::run(Mode mode)
{
/// run() can be called onle once.
if (!current_stage.empty())
@ -115,31 +127,87 @@ RestorerFromBackup::DataRestoreTasks RestorerFromBackup::run(Mode mode)
/// Find all the databases and tables which we will read from the backup.
setStage(Stage::FINDING_TABLES_IN_BACKUP);
findDatabasesAndTablesInBackup();
waitFutures();
/// Check access rights.
checkAccessForObjectsFoundInBackup();
if (mode == Mode::CHECK_ACCESS_ONLY)
return {};
return;
/// Create databases using the create queries read from the backup.
setStage(Stage::CREATING_DATABASES);
createDatabases();
waitFutures();
/// Create tables using the create queries read from the backup.
setStage(Stage::CREATING_TABLES);
removeUnresolvedDependencies();
createTables();
waitFutures();
/// All what's left is to insert data to tables.
/// No more data restoring tasks are allowed after this point.
setStage(Stage::INSERTING_DATA_TO_TABLES);
return getDataRestoreTasks();
insertDataToTables();
waitFutures();
runDataRestoreTasks();
/// Restored successfully!
setStage(Stage::COMPLETED);
}
void RestorerFromBackup::waitFutures()
{
std::exception_ptr error;
for (;;)
{
std::vector<std::future<void>> futures_to_wait;
{
std::lock_guard lock{mutex};
std::swap(futures_to_wait, futures);
}
if (futures_to_wait.empty())
break;
/// Wait for all tasks.
for (auto & future : futures_to_wait)
future.wait();
/// Check if there is an exception.
for (auto & future : futures_to_wait)
{
try
{
future.get();
}
catch (...)
{
if (!error)
error = std::current_exception();
exception_caught = true;
}
}
}
if (error)
std::rethrow_exception(error);
}
size_t RestorerFromBackup::getNumFutures() const
{
std::lock_guard lock{mutex};
return futures.size();
}
void RestorerFromBackup::setStage(const String & new_stage, const String & message)
{
LOG_TRACE(log, "Setting stage: {}", new_stage);
if (getNumFutures() != 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot change the stage while some tasks ({}) are still running", getNumFutures());
checkIsQueryCancelled();
current_stage = new_stage;
@ -154,6 +222,32 @@ void RestorerFromBackup::setStage(const String & new_stage, const String & messa
}
}
void RestorerFromBackup::schedule(std::function<void()> && task_, const char * thread_name_)
{
if (exception_caught)
return;
checkIsQueryCancelled();
auto future = scheduleFromThreadPool<void>(
[this, task = std::move(task_)]() mutable
{
if (exception_caught)
return;
checkIsQueryCancelled();
std::move(task)();
if (after_each_task_callback)
after_each_task_callback();
},
thread_pool,
thread_name_);
std::lock_guard lock{mutex};
futures.push_back(std::move(future));
}
void RestorerFromBackup::checkIsQueryCancelled() const
{
if (process_list_element)
@ -241,8 +335,6 @@ void RestorerFromBackup::findRootPathsInBackup()
void RestorerFromBackup::findDatabasesAndTablesInBackup()
{
database_infos.clear();
table_infos.clear();
for (const auto & element : restore_query_elements)
{
switch (element.type)
@ -270,10 +362,17 @@ void RestorerFromBackup::findDatabasesAndTablesInBackup()
}
}
LOG_INFO(log, "Will restore {} databases and {} tables", database_infos.size(), table_infos.size());
LOG_INFO(log, "Will restore {} databases and {} tables", getNumDatabases(), getNumTables());
}
void RestorerFromBackup::findTableInBackup(const QualifiedTableName & table_name_in_backup, const std::optional<ASTs> & partitions)
{
schedule(
[this, table_name_in_backup, partitions]() { findTableInBackupImpl(table_name_in_backup, partitions); },
"Restore_FindTbl");
}
void RestorerFromBackup::findTableInBackupImpl(const QualifiedTableName & table_name_in_backup, const std::optional<ASTs> & partitions)
{
bool is_temporary_table = (table_name_in_backup.database == DatabaseCatalog::TEMPORARY_DATABASE);
@ -326,30 +425,38 @@ void RestorerFromBackup::findTableInBackup(const QualifiedTableName & table_name
ASTPtr create_table_query = parseQuery(create_parser, create_query_str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
applyCustomStoragePolicy(create_table_query);
renameDatabaseAndTableNameInCreateQuery(create_table_query, renaming_map, context->getGlobalContext());
String create_table_query_str = serializeAST(*create_table_query);
QualifiedTableName table_name = renaming_map.getNewTableName(table_name_in_backup);
bool is_predefined_table = DatabaseCatalog::instance().isPredefinedTable(StorageID{table_name.database, table_name.table});
auto table_dependencies = getDependenciesFromCreateQuery(context, table_name, create_table_query);
bool table_has_data = backup->hasFiles(data_path_in_backup);
std::lock_guard lock{mutex};
if (auto it = table_infos.find(table_name); it != table_infos.end())
{
const TableInfo & table_info = it->second;
if (table_info.create_table_query && (serializeAST(*table_info.create_table_query) != serializeAST(*create_table_query)))
if (table_info.create_table_query && (table_info.create_table_query_str != create_table_query_str))
{
throw Exception(
ErrorCodes::CANNOT_RESTORE_TABLE,
"Extracted two different create queries for the same {}: {} and {}",
tableNameWithTypeToString(table_name.database, table_name.table, false),
serializeAST(*table_info.create_table_query),
serializeAST(*create_table_query));
table_info.create_table_query_str,
create_table_query_str);
}
}
TableInfo & res_table_info = table_infos[table_name];
res_table_info.create_table_query = create_table_query;
res_table_info.is_predefined_table = DatabaseCatalog::instance().isPredefinedTable(StorageID{table_name.database, table_name.table});
res_table_info.has_data = backup->hasFiles(data_path_in_backup);
res_table_info.create_table_query_str = create_table_query_str;
res_table_info.is_predefined_table = is_predefined_table;
res_table_info.has_data = table_has_data;
res_table_info.data_path_in_backup = data_path_in_backup;
tables_dependencies.addDependencies(table_name, getDependenciesFromCreateQuery(context, table_name, create_table_query));
tables_dependencies.addDependencies(table_name, table_dependencies);
if (partitions)
{
@ -379,6 +486,13 @@ void RestorerFromBackup::findTableInBackup(const QualifiedTableName & table_name
}
void RestorerFromBackup::findDatabaseInBackup(const String & database_name_in_backup, const std::set<DatabaseAndTableName> & except_table_names)
{
schedule(
[this, database_name_in_backup, except_table_names]() { findDatabaseInBackupImpl(database_name_in_backup, except_table_names); },
"Restore_FindDB");
}
void RestorerFromBackup::findDatabaseInBackupImpl(const String & database_name_in_backup, const std::set<DatabaseAndTableName> & except_table_names)
{
std::optional<fs::path> metadata_path;
std::unordered_set<String> table_names_in_backup;
@ -420,22 +534,28 @@ void RestorerFromBackup::findDatabaseInBackup(const String & database_name_in_ba
ParserCreateQuery create_parser;
ASTPtr create_database_query = parseQuery(create_parser, create_query_str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
renameDatabaseAndTableNameInCreateQuery(create_database_query, renaming_map, context->getGlobalContext());
String create_database_query_str = serializeAST(*create_database_query);
String database_name = renaming_map.getNewDatabaseName(database_name_in_backup);
bool is_predefined_database = DatabaseCatalog::isPredefinedDatabase(database_name);
std::lock_guard lock{mutex};
DatabaseInfo & database_info = database_infos[database_name];
if (database_info.create_database_query && (serializeAST(*database_info.create_database_query) != serializeAST(*create_database_query)))
if (database_info.create_database_query && (database_info.create_database_query_str != create_database_query_str))
{
throw Exception(
ErrorCodes::CANNOT_RESTORE_DATABASE,
"Extracted two different create queries for the same database {}: {} and {}",
backQuoteIfNeed(database_name),
serializeAST(*database_info.create_database_query),
serializeAST(*create_database_query));
database_info.create_database_query_str,
create_database_query_str);
}
database_info.create_database_query = create_database_query;
database_info.is_predefined_database = DatabaseCatalog::isPredefinedDatabase(database_name);
database_info.create_database_query_str = create_database_query_str;
database_info.is_predefined_database = is_predefined_database;
}
for (const String & table_name_in_backup : table_names_in_backup)
@ -474,79 +594,95 @@ void RestorerFromBackup::findEverythingInBackup(const std::set<String> & except_
}
}
size_t RestorerFromBackup::getNumDatabases() const
{
std::lock_guard lock{mutex};
return database_infos.size();
}
size_t RestorerFromBackup::getNumTables() const
{
std::lock_guard lock{mutex};
return table_infos.size();
}
void RestorerFromBackup::checkAccessForObjectsFoundInBackup() const
{
AccessRightsElements required_access;
for (const auto & [database_name, database_info] : database_infos)
{
if (database_info.is_predefined_database)
continue;
AccessFlags flags;
if (restore_settings.create_database != RestoreDatabaseCreationMode::kMustExist)
flags |= AccessType::CREATE_DATABASE;
if (!flags)
flags = AccessType::SHOW_DATABASES;
required_access.emplace_back(flags, database_name);
}
for (const auto & [table_name, table_info] : table_infos)
{
if (table_info.is_predefined_table)
std::lock_guard lock{mutex};
for (const auto & [database_name, database_info] : database_infos)
{
if (isSystemFunctionsTableName(table_name))
if (database_info.is_predefined_database)
continue;
AccessFlags flags;
if (restore_settings.create_database != RestoreDatabaseCreationMode::kMustExist)
flags |= AccessType::CREATE_DATABASE;
if (!flags)
flags = AccessType::SHOW_DATABASES;
required_access.emplace_back(flags, database_name);
}
for (const auto & [table_name, table_info] : table_infos)
{
if (table_info.is_predefined_table)
{
/// CREATE_FUNCTION privilege is required to restore the "system.functions" table.
if (!restore_settings.structure_only && table_info.has_data)
required_access.emplace_back(AccessType::CREATE_FUNCTION);
if (isSystemFunctionsTableName(table_name))
{
/// CREATE_FUNCTION privilege is required to restore the "system.functions" table.
if (!restore_settings.structure_only && table_info.has_data)
required_access.emplace_back(AccessType::CREATE_FUNCTION);
}
/// Privileges required to restore ACL system tables are checked separately
/// (see access_restore_task->getRequiredAccess() below).
continue;
}
/// Privileges required to restore ACL system tables are checked separately
/// (see access_restore_task->getRequiredAccess() below).
continue;
}
if (table_name.database == DatabaseCatalog::TEMPORARY_DATABASE)
{
if (table_name.database == DatabaseCatalog::TEMPORARY_DATABASE)
{
if (restore_settings.create_table != RestoreTableCreationMode::kMustExist)
required_access.emplace_back(AccessType::CREATE_TEMPORARY_TABLE);
continue;
}
AccessFlags flags;
const ASTCreateQuery & create = table_info.create_table_query->as<const ASTCreateQuery &>();
if (restore_settings.create_table != RestoreTableCreationMode::kMustExist)
required_access.emplace_back(AccessType::CREATE_TEMPORARY_TABLE);
continue;
{
if (create.is_dictionary)
flags |= AccessType::CREATE_DICTIONARY;
else if (create.is_ordinary_view || create.is_materialized_view || create.is_live_view)
flags |= AccessType::CREATE_VIEW;
else
flags |= AccessType::CREATE_TABLE;
}
if (!restore_settings.structure_only && table_info.has_data)
{
flags |= AccessType::INSERT;
}
if (!flags)
{
if (create.is_dictionary)
flags = AccessType::SHOW_DICTIONARIES;
else
flags = AccessType::SHOW_TABLES;
}
required_access.emplace_back(flags, table_name.database, table_name.table);
}
AccessFlags flags;
const ASTCreateQuery & create = table_info.create_table_query->as<const ASTCreateQuery &>();
if (restore_settings.create_table != RestoreTableCreationMode::kMustExist)
{
if (create.is_dictionary)
flags |= AccessType::CREATE_DICTIONARY;
else if (create.is_ordinary_view || create.is_materialized_view || create.is_live_view)
flags |= AccessType::CREATE_VIEW;
else
flags |= AccessType::CREATE_TABLE;
}
if (!restore_settings.structure_only && table_info.has_data)
{
flags |= AccessType::INSERT;
}
if (!flags)
{
if (create.is_dictionary)
flags = AccessType::SHOW_DICTIONARIES;
else
flags = AccessType::SHOW_TABLES;
}
required_access.emplace_back(flags, table_name.database, table_name.table);
if (access_restorer)
insertAtEnd(required_access, access_restorer->getRequiredAccess());
}
if (access_restorer)
insertAtEnd(required_access, access_restorer->getRequiredAccess());
/// We convert to AccessRights and back to check access rights in a predictable way
/// (some elements could be duplicated or not sorted).
required_access = AccessRights{required_access}.getElements();
@ -556,7 +692,14 @@ void RestorerFromBackup::checkAccessForObjectsFoundInBackup() const
void RestorerFromBackup::createDatabases()
{
for (const auto & database_name : database_infos | boost::adaptors::map_keys)
Strings database_names;
{
std::lock_guard lock{mutex};
database_names.reserve(database_infos.size());
std::ranges::copy(database_infos | boost::adaptors::map_keys, std::back_inserter(database_names));
}
for (const auto & database_name : database_names)
{
createDatabase(database_name);
checkDatabase(database_name);
@ -568,6 +711,8 @@ void RestorerFromBackup::createDatabase(const String & database_name) const
if (restore_settings.create_database == RestoreDatabaseCreationMode::kMustExist)
return;
std::lock_guard lock{mutex};
/// Predefined databases always exist.
const auto & database_info = database_infos.at(database_name);
if (database_info.is_predefined_database)
@ -603,7 +748,9 @@ void RestorerFromBackup::createDatabase(const String & database_name) const
void RestorerFromBackup::checkDatabase(const String & database_name)
{
std::lock_guard lock{mutex};
auto & database_info = database_infos.at(database_name);
try
{
DatabasePtr database = DatabaseCatalog::instance().getDatabase(database_name);
@ -653,8 +800,9 @@ void RestorerFromBackup::applyCustomStoragePolicy(ASTPtr query_ptr)
void RestorerFromBackup::removeUnresolvedDependencies()
{
auto need_exclude_dependency = [this](const StorageID & table_id)
{
std::lock_guard lock{mutex};
auto need_exclude_dependency = [&](const StorageID & table_id) TSA_REQUIRES(mutex) -> bool {
/// Table will be restored.
if (table_infos.contains(table_id.getQualifiedName()))
return false;
@ -700,14 +848,18 @@ void RestorerFromBackup::removeUnresolvedDependencies()
void RestorerFromBackup::createTables()
{
/// We need to create tables considering their dependencies.
tables_dependencies.log();
auto tables_to_create = tables_dependencies.getTablesSortedByDependency();
std::vector<StorageID> tables_to_create;
{
std::lock_guard lock{mutex};
tables_dependencies.log();
tables_to_create = tables_dependencies.getTablesSortedByDependency();
}
for (const auto & table_id : tables_to_create)
{
auto table_name = table_id.getQualifiedName();
createTable(table_name);
checkTable(table_name);
insertDataToTable(table_name);
}
}
@ -716,6 +868,8 @@ void RestorerFromBackup::createTable(const QualifiedTableName & table_name)
if (restore_settings.create_table == RestoreTableCreationMode::kMustExist)
return;
std::lock_guard lock{mutex};
/// Predefined tables always exist.
auto & table_info = table_infos.at(table_name);
if (table_info.is_predefined_table)
@ -758,6 +912,7 @@ void RestorerFromBackup::createTable(const QualifiedTableName & table_name)
void RestorerFromBackup::checkTable(const QualifiedTableName & table_name)
{
std::lock_guard lock{mutex};
auto & table_info = table_infos.at(table_name);
try
@ -796,20 +951,44 @@ void RestorerFromBackup::checkTable(const QualifiedTableName & table_name)
}
}
void RestorerFromBackup::insertDataToTables()
{
std::vector<QualifiedTableName> table_names;
{
std::lock_guard lock{mutex};
table_names.reserve(table_infos.size());
std::ranges::copy(table_infos | boost::adaptors::map_keys, std::back_inserter(table_names));
}
for (const auto & table_name : table_names)
insertDataToTable(table_name);
}
void RestorerFromBackup::insertDataToTable(const QualifiedTableName & table_name)
{
if (restore_settings.structure_only)
return;
auto & table_info = table_infos.at(table_name);
auto storage = table_info.storage;
StoragePtr storage;
String data_path_in_backup;
std::optional<ASTs> partitions;
{
std::lock_guard lock{mutex};
auto & table_info = table_infos.at(table_name);
storage = table_info.storage;
data_path_in_backup = table_info.data_path_in_backup;
partitions = table_info.partitions;
}
checkIsQueryCancelled();
schedule(
[this, table_name, storage, data_path_in_backup, partitions]() { insertDataToTableImpl(table_name, storage, data_path_in_backup, partitions); },
"Restore_TblData");
}
void RestorerFromBackup::insertDataToTableImpl(const QualifiedTableName & table_name, StoragePtr storage, const String & data_path_in_backup, const std::optional<ASTs> & partitions)
{
try
{
const auto & data_path_in_backup = table_info.data_path_in_backup;
const auto & partitions = table_info.partitions;
if (partitions && !storage->supportsBackupPartition())
{
throw Exception(
@ -828,45 +1007,47 @@ void RestorerFromBackup::insertDataToTable(const QualifiedTableName & table_name
void RestorerFromBackup::addDataRestoreTask(DataRestoreTask && new_task)
{
if (current_stage == Stage::INSERTING_DATA_TO_TABLES)
if (current_stage != Stage::INSERTING_DATA_TO_TABLES)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Adding of data-restoring tasks is not allowed");
std::lock_guard lock{mutex};
data_restore_tasks.push_back(std::move(new_task));
}
void RestorerFromBackup::addDataRestoreTasks(DataRestoreTasks && new_tasks)
{
if (current_stage == Stage::INSERTING_DATA_TO_TABLES)
if (current_stage != Stage::INSERTING_DATA_TO_TABLES)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Adding of data-restoring tasks is not allowed");
std::lock_guard lock{mutex};
insertAtEnd(data_restore_tasks, std::move(new_tasks));
}
RestorerFromBackup::DataRestoreTasks RestorerFromBackup::getDataRestoreTasks()
void RestorerFromBackup::runDataRestoreTasks()
{
if (data_restore_tasks.empty())
return {};
LOG_TRACE(log, "Will insert data to tables");
/// Storages and table locks must exist while we're executing data restoring tasks.
auto storages = std::make_shared<std::vector<StoragePtr>>();
auto table_locks = std::make_shared<std::vector<TableLockHolder>>();
storages->reserve(table_infos.size());
table_locks->reserve(table_infos.size());
for (const auto & table_info : table_infos | boost::adaptors::map_values)
/// Iterations are required here because data restore tasks are allowed to call addDataRestoreTask() and add other data restore tasks.
for (;;)
{
storages->push_back(table_info.storage);
table_locks->push_back(table_info.table_lock);
std::vector<DataRestoreTask> tasks_to_run;
{
std::lock_guard lock{mutex};
std::swap(tasks_to_run, data_restore_tasks);
}
if (tasks_to_run.empty())
break;
for (auto & task : tasks_to_run)
schedule(std::move(task), "Restore_TblTask");
waitFutures();
}
DataRestoreTasks res_tasks;
for (const auto & task : data_restore_tasks)
res_tasks.push_back([task, storages, table_locks] { task(); });
return res_tasks;
}
std::vector<std::pair<UUID, AccessEntityPtr>> RestorerFromBackup::getAccessEntitiesToRestore()
{
std::lock_guard lock{mutex};
if (!access_restorer || access_restored)
return {};

View File

@ -7,6 +7,7 @@
#include <Storages/TableLockHolder.h>
#include <Storages/IStorage_fwd.h>
#include <Interpreters/Context_fwd.h>
#include <Common/ThreadPool_fwd.h>
#include <filesystem>
@ -34,7 +35,9 @@ public:
const RestoreSettings & restore_settings_,
std::shared_ptr<IRestoreCoordination> restore_coordination_,
const BackupPtr & backup_,
const ContextMutablePtr & context_);
const ContextMutablePtr & context_,
ThreadPool & thread_pool_,
const std::function<void()> & after_each_task_callback_);
~RestorerFromBackup();
@ -51,7 +54,7 @@ public:
using DataRestoreTasks = std::vector<DataRestoreTask>;
/// Restores the metadata of databases and tables and returns tasks to restore the data of tables.
DataRestoreTasks run(Mode mode);
void run(Mode mode);
BackupPtr getBackup() const { return backup; }
const RestoreSettings & getRestoreSettings() const { return restore_settings; }
@ -77,6 +80,7 @@ private:
BackupPtr backup;
ContextMutablePtr context;
QueryStatusPtr process_list_element;
std::function<void()> after_each_task_callback;
std::chrono::milliseconds on_cluster_first_sync_timeout;
std::chrono::milliseconds create_table_timeout;
LoggerPtr log;
@ -89,9 +93,14 @@ private:
void findDatabasesAndTablesInBackup();
void findTableInBackup(const QualifiedTableName & table_name_in_backup, const std::optional<ASTs> & partitions);
void findTableInBackupImpl(const QualifiedTableName & table_name_in_backup, const std::optional<ASTs> & partitions);
void findDatabaseInBackup(const String & database_name_in_backup, const std::set<DatabaseAndTableName> & except_table_names);
void findDatabaseInBackupImpl(const String & database_name_in_backup, const std::set<DatabaseAndTableName> & except_table_names);
void findEverythingInBackup(const std::set<String> & except_database_names, const std::set<DatabaseAndTableName> & except_table_names);
size_t getNumDatabases() const;
size_t getNumTables() const;
void checkAccessForObjectsFoundInBackup() const;
void createDatabases();
@ -104,18 +113,32 @@ private:
void createTables();
void createTable(const QualifiedTableName & table_name);
void checkTable(const QualifiedTableName & table_name);
void insertDataToTable(const QualifiedTableName & table_name);
DataRestoreTasks getDataRestoreTasks();
void insertDataToTables();
void insertDataToTable(const QualifiedTableName & table_name);
void insertDataToTableImpl(const QualifiedTableName & table_name, StoragePtr storage, const String & data_path_in_backup, const std::optional<ASTs> & partitions);
void runDataRestoreTasks();
void setStage(const String & new_stage, const String & message = "");
/// Schedule a task from the thread pool and start executing it.
void schedule(std::function<void()> && task_, const char * thread_name_);
/// Returns the number of currently scheduled or executing tasks.
size_t getNumFutures() const;
/// Waits until all tasks are processed (including the tasks scheduled while we're waiting).
/// Throws an exception if any of the tasks throws an exception.
void waitFutures();
/// Throws an exception if the RESTORE query was cancelled.
void checkIsQueryCancelled() const;
struct DatabaseInfo
{
ASTPtr create_database_query;
String create_database_query_str;
bool is_predefined_database = false;
DatabasePtr database;
};
@ -123,6 +146,7 @@ private:
struct TableInfo
{
ASTPtr create_table_query;
String create_table_query_str;
bool is_predefined_table = false;
bool has_data = false;
std::filesystem::path data_path_in_backup;
@ -133,12 +157,18 @@ private:
};
String current_stage;
std::unordered_map<String, DatabaseInfo> database_infos;
std::map<QualifiedTableName, TableInfo> table_infos;
TablesDependencyGraph tables_dependencies;
std::vector<DataRestoreTask> data_restore_tasks;
std::unique_ptr<AccessRestorerFromBackup> access_restorer;
bool access_restored = false;
std::unordered_map<String, DatabaseInfo> database_infos TSA_GUARDED_BY(mutex);
std::map<QualifiedTableName, TableInfo> table_infos TSA_GUARDED_BY(mutex);
TablesDependencyGraph tables_dependencies TSA_GUARDED_BY(mutex);
std::vector<DataRestoreTask> data_restore_tasks TSA_GUARDED_BY(mutex);
std::unique_ptr<AccessRestorerFromBackup> access_restorer TSA_GUARDED_BY(mutex);
bool access_restored TSA_GUARDED_BY(mutex) = false;
std::vector<std::future<void>> futures TSA_GUARDED_BY(mutex);
std::atomic<bool> exception_caught = false;
ThreadPool & thread_pool;
mutable std::mutex mutex;
};
}