Improve gathering metadata for backup.

This commit is contained in:
Vitaly Baranov 2022-06-23 00:56:41 +02:00
parent 47ac47350b
commit 64b51a3772
17 changed files with 528 additions and 378 deletions

View File

@ -27,15 +27,16 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
bool BackupEntriesCollector::TableKey::operator ==(const TableKey & right) const
namespace
{
return (name == right.name) && (is_temporary == right.is_temporary);
}
String tableNameWithTypeToString(const String & database_name, const String & table_name, bool first_char_uppercase)
{
if (database_name == DatabaseCatalog::TEMPORARY_DATABASE)
return fmt::format("{}emporary table {}", first_char_uppercase ? 'T' : 't', backQuoteIfNeed(table_name));
else
return fmt::format("{}able {}.{}", first_char_uppercase ? 'T' : 't', backQuoteIfNeed(database_name), backQuoteIfNeed(table_name));
}
bool BackupEntriesCollector::TableKey::operator <(const TableKey & right) const
{
return (name < right.name) || ((name == right.name) && (is_temporary < right.is_temporary));
}
std::string_view BackupEntriesCollector::toString(Stage stage)
@ -86,7 +87,7 @@ BackupEntries BackupEntriesCollector::getBackupEntries()
/// Find databases and tables which we're going to put to the backup.
setStage(Stage::kFindingTables);
collectDatabasesAndTablesInfo();
gatherMetadataAndCheckConsistency();
/// Make backup entries for the definitions of the found databases.
makeBackupEntriesForDatabasesDefs();
@ -100,7 +101,7 @@ BackupEntries BackupEntriesCollector::getBackupEntries()
/// Run all the tasks added with addPostCollectingTask().
setStage(Stage::kRunningPostTasks);
runPostCollectingTasks();
runPostTasks();
/// No more backup entries or tasks are allowed after this point.
setStage(Stage::kWritingBackup);
@ -156,56 +157,30 @@ void BackupEntriesCollector::calculateRootPathInBackup()
}
/// Finds databases and tables which we will put to the backup.
void BackupEntriesCollector::collectDatabasesAndTablesInfo()
void BackupEntriesCollector::gatherMetadataAndCheckConsistency()
{
bool use_timeout = (timeout.count() >= 0);
auto start_time = std::chrono::steady_clock::now();
int pass = 0;
do
int pass = 1;
for (;;)
{
database_infos.clear();
table_infos.clear();
consistent = true;
consistency = true;
/// Collect information about databases and tables specified in the BACKUP query.
for (const auto & element : backup_query_elements)
{
switch (element.type)
{
case ASTBackupQuery::ElementType::TABLE:
{
collectTableInfo({element.database_name, element.table_name}, false, element.partitions, true);
break;
}
case ASTBackupQuery::ElementType::TEMPORARY_TABLE:
{
collectTableInfo({"", element.table_name}, true, element.partitions, true);
break;
}
case ASTBackupQuery::ElementType::DATABASE:
{
collectDatabaseInfo(element.database_name, element.except_tables, true);
break;
}
case ASTBackupQuery::ElementType::ALL:
{
collectAllDatabasesInfo(element.except_databases, element.except_tables);
break;
}
}
}
gatherDatabasesMetadata();
gatherTablesMetadata();
/// We have to check consistency of collected information to protect from the case when some table or database is
/// renamed during this collecting making the collected information invalid.
checkConsistency();
if (consistency)
break;
/// Two passes is absolute minimum (see `previous_table_names` & `previous_database_names`).
auto elapsed = std::chrono::steady_clock::now() - start_time;
if (!consistent && (pass >= 2) && use_timeout)
if ((pass >= 2) && use_timeout)
{
if (elapsed > timeout)
throw Exception(
@ -218,224 +193,298 @@ void BackupEntriesCollector::collectDatabasesAndTablesInfo()
if (pass >= 2)
LOG_WARNING(log, "Couldn't collect tables and databases to make a backup (pass #{}, elapsed {})", pass, to_string(elapsed));
++pass;
} while (!consistent);
}
LOG_INFO(log, "Will backup {} databases and {} tables", database_infos.size(), table_infos.size());
}
void BackupEntriesCollector::collectTableInfo(
const QualifiedTableName & table_name, bool is_temporary_table, const std::optional<ASTs> & partitions, bool throw_if_not_found)
void BackupEntriesCollector::gatherDatabasesMetadata()
{
/// Gather information about the table.
DatabasePtr database;
StoragePtr storage;
TableLockHolder table_lock;
ASTPtr create_table_query;
database_infos.clear();
TableKey table_key{table_name, is_temporary_table};
if (throw_if_not_found)
/// Collect information about databases and tables specified in the BACKUP query.
for (const auto & element : backup_query_elements)
{
auto resolved_id = is_temporary_table
? context->resolveStorageID(StorageID{"", table_name.table}, Context::ResolveExternal)
: context->resolveStorageID(StorageID{table_name.database, table_name.table}, Context::ResolveGlobal);
std::tie(database, storage) = DatabaseCatalog::instance().getDatabaseAndTable(resolved_id, context);
table_lock = storage->lockForShare(context->getInitialQueryId(), context->getSettingsRef().lock_acquire_timeout);
create_table_query = storage->getCreateQueryForBackup(*this);
switch (element.type)
{
case ASTBackupQuery::ElementType::TABLE:
{
gatherDatabaseMetadata(
element.database_name,
/* throw_if_database_not_found= */ true,
/* backup_create_database_query= */ false,
element.table_name,
/* throw_if_table_not_found= */ true,
element.partitions,
/* all_tables= */ false,
/* except_table_names= */ {});
break;
}
case ASTBackupQuery::ElementType::TEMPORARY_TABLE:
{
gatherDatabaseMetadata(
DatabaseCatalog::TEMPORARY_DATABASE,
/* throw_if_database_not_found= */ true,
/* backup_create_database_query= */ false,
element.table_name,
/* throw_if_table_not_found= */ true,
element.partitions,
/* all_tables= */ false,
/* except_table_names= */ {});
break;
}
case ASTBackupQuery::ElementType::DATABASE:
{
gatherDatabaseMetadata(
element.database_name,
/* throw_if_database_not_found= */ true,
/* backup_create_database_query= */ true,
/* table_name= */ {},
/* throw_if_table_not_found= */ false,
/* partitions= */ {},
/* all_tables= */ true,
/* except_table_names= */ element.except_tables);
break;
}
case ASTBackupQuery::ElementType::ALL:
{
for (const auto & [database_name, database] : DatabaseCatalog::instance().getDatabases())
{
if (!element.except_databases.contains(database_name))
{
gatherDatabaseMetadata(
database_name,
/* throw_if_database_not_found= */ false,
/* backup_create_database_query= */ true,
/* table_name= */ {},
/* throw_if_table_not_found= */ false,
/* partitions= */ {},
/* all_tables= */ true,
/* except_table_names= */ element.except_tables);
if (!consistency)
return;
}
}
break;
}
}
if (!consistency)
return;
}
else
{
auto resolved_id = is_temporary_table
? context->tryResolveStorageID(StorageID{"", table_name.table}, Context::ResolveExternal)
: context->tryResolveStorageID(StorageID{table_name.database, table_name.table}, Context::ResolveGlobal);
if (!resolved_id.empty())
std::tie(database, storage) = DatabaseCatalog::instance().tryGetDatabaseAndTable(resolved_id, context);
}
void BackupEntriesCollector::gatherDatabaseMetadata(
const String & database_name,
bool throw_if_database_not_found,
bool backup_create_database_query,
const std::optional<String> & table_name,
bool throw_if_table_not_found,
const std::optional<ASTs> & partitions,
bool all_tables,
const std::set<DatabaseAndTableName> & except_table_names)
{
auto it = database_infos.find(database_name);
if (it == database_infos.end())
{
DatabasePtr database;
if (throw_if_database_not_found)
{
database = DatabaseCatalog::instance().getDatabase(database_name);
}
else
{
database = DatabaseCatalog::instance().tryGetDatabase(database_name);
if (!database)
return;
}
DatabaseInfo new_database_info;
new_database_info.database = database;
it = database_infos.emplace(database_name, new_database_info).first;
}
DatabaseInfo & database_info = it->second;
if (backup_create_database_query && !database_info.create_database_query && !DatabaseCatalog::isPredefinedDatabaseName(database_name))
{
ASTPtr create_database_query;
try
{
create_database_query = database_info.database->getCreateDatabaseQueryForBackup();
}
catch (...)
{
/// The database has been dropped recently.
consistency = false;
return;
}
database_info.create_database_query = create_database_query;
const auto & create = create_database_query->as<const ASTCreateQuery &>();
if (create.getDatabase() != database_name)
{
/// The database has been renamed recently.
consistency = false;
return;
}
}
if (table_name)
{
auto & table_params = database_info.tables[*table_name];
if (throw_if_table_not_found)
table_params.throw_if_table_not_found = true;
if (partitions)
{
table_params.partitions.emplace();
insertAtEnd(*table_params.partitions, *partitions);
}
database_info.except_table_names.emplace(*table_name);
}
if (all_tables)
{
database_info.all_tables = all_tables;
for (const auto & except_table_name : except_table_names)
if (except_table_name.first == database_name)
database_info.except_table_names.emplace(except_table_name.second);
}
}
void BackupEntriesCollector::gatherTablesMetadata()
{
if (!consistency)
return;
table_infos.clear();
for (const auto & [database_name, database_info] : database_infos)
{
const auto & database = database_info.database;
bool is_temporary_database = (database_name == DatabaseCatalog::TEMPORARY_DATABASE);
auto filter_by_table_name = [database_info = &database_info](const String & table_name)
{
/// We skip inner tables of materialized views.
if (table_name.starts_with(".inner_id."))
return false;
if (database_info->tables.contains(table_name))
return true;
if (database_info->all_tables)
return !database_info->except_table_names.contains(table_name);
return false;
};
auto db_tables = database->getTablesForBackup(filter_by_table_name, context, consistency);
if (!consistency)
return;
/// Check that all tables were found.
std::unordered_set<String> found_table_names;
for (const auto & db_table : db_tables)
{
const auto & create_table_query = db_table.first;
const auto & create = create_table_query->as<const ASTCreateQuery &>();
found_table_names.emplace(create.getTable());
if ((is_temporary_database && !create.temporary) || (!is_temporary_database && (create.getDatabase() != database_name)))
{
consistency = false;
return;
}
}
for (const auto & [table_name, table_info] : database_info.tables)
{
if (table_info.throw_if_table_not_found && !found_table_names.contains(table_name))
throw Exception(ErrorCodes::UNKNOWN_TABLE, "{} not found", tableNameWithTypeToString(database_name, table_name, true));
}
for (const auto & db_table : db_tables)
{
const auto & create_table_query = db_table.first;
const auto & create = create_table_query->as<const ASTCreateQuery &>();
String table_name = create.getTable();
fs::path data_path_in_backup;
if (is_temporary_database)
{
auto table_name_in_backup = renaming_map.getNewTemporaryTableName(table_name);
data_path_in_backup = root_path_in_backup / "temporary_tables" / "data" / escapeForFileName(table_name_in_backup);
}
else
{
auto table_name_in_backup = renaming_map.getNewTableName({database_name, table_name});
data_path_in_backup
= root_path_in_backup / "data" / escapeForFileName(table_name_in_backup.database) / escapeForFileName(table_name_in_backup.table);
}
/// Add information to `table_infos`.
auto & res_table_info = table_infos[QualifiedTableName{database_name, table_name}];
res_table_info.database = database;
res_table_info.storage = db_table.second;
res_table_info.create_table_query = create_table_query;
res_table_info.data_path_in_backup = data_path_in_backup;
auto partitions_it = database_info.tables.find(table_name);
if (partitions_it != database_info.tables.end())
res_table_info.partitions = partitions_it->second.partitions;
}
}
}
void BackupEntriesCollector::lockTablesForReading()
{
if (!consistency)
return;
for (auto & table_info : table_infos | boost::adaptors::map_values)
{
auto storage = table_info.storage;
TableLockHolder table_lock;
if (storage)
{
try
{
table_lock = storage->lockForShare(context->getInitialQueryId(), context->getSettingsRef().lock_acquire_timeout);
create_table_query = storage->getCreateQueryForBackup(*this);
}
catch (Exception & e)
{
if (e.code() != ErrorCodes::TABLE_IS_DROPPED)
throw;
consistency = false;
return;
}
}
if (!create_table_query)
{
consistent &= !table_infos.contains(table_key);
return;
}
}
fs::path data_path_in_backup;
if (is_temporary_table)
{
auto table_name_in_backup = renaming_map.getNewTemporaryTableName(table_name.table);
data_path_in_backup = root_path_in_backup / "temporary_tables" / "data" / escapeForFileName(table_name_in_backup);
}
else
{
auto table_name_in_backup = renaming_map.getNewTableName(table_name);
data_path_in_backup
= root_path_in_backup / "data" / escapeForFileName(table_name_in_backup.database) / escapeForFileName(table_name_in_backup.table);
}
/// Check that information is consistent.
const auto & create = create_table_query->as<const ASTCreateQuery &>();
if ((create.getTable() != table_name.table) || (is_temporary_table != create.temporary) || (create.getDatabase() != table_name.database))
{
/// Table was renamed recently.
consistent = false;
return;
}
if (auto it = table_infos.find(table_key); it != table_infos.end())
{
const auto & table_info = it->second;
if ((table_info.database != database) || (table_info.storage != storage))
{
/// Table was renamed recently.
consistent = false;
return;
}
}
/// Add information to `table_infos`.
auto & res_table_info = table_infos[table_key];
res_table_info.database = database;
res_table_info.storage = storage;
res_table_info.table_lock = table_lock;
res_table_info.create_table_query = create_table_query;
res_table_info.data_path_in_backup = data_path_in_backup;
if (partitions)
{
if (!res_table_info.partitions)
res_table_info.partitions.emplace();
insertAtEnd(*res_table_info.partitions, *partitions);
}
}
void BackupEntriesCollector::collectDatabaseInfo(const String & database_name, const std::set<DatabaseAndTableName> & except_table_names, bool throw_if_not_found)
{
/// Gather information about the database.
DatabasePtr database;
ASTPtr create_database_query;
if (throw_if_not_found)
{
database = DatabaseCatalog::instance().getDatabase(database_name);
create_database_query = database->getCreateDatabaseQueryForBackup();
}
else
{
database = DatabaseCatalog::instance().tryGetDatabase(database_name);
if (!database)
{
consistent &= !database_infos.contains(database_name);
return;
}
try
{
create_database_query = database->getCreateDatabaseQueryForBackup();
}
catch (...)
{
/// The database has been dropped recently.
consistent &= !database_infos.contains(database_name);
return;
}
}
/// Check that information is consistent.
const auto & create = create_database_query->as<const ASTCreateQuery &>();
if (create.getDatabase() != database_name)
{
/// Database was renamed recently.
consistent = false;
return;
}
if (auto it = database_infos.find(database_name); it != database_infos.end())
{
const auto & database_info = it->second;
if (database_info.database != database)
{
/// Database was renamed recently.
consistent = false;
return;
}
}
/// Add information to `database_infos`.
auto & res_database_info = database_infos[database_name];
res_database_info.database = database;
res_database_info.create_database_query = create_database_query;
/// Add information about tables too.
for (auto it = database->getTablesIteratorForBackup(*this); it->isValid(); it->next())
{
if (except_table_names.contains({database_name, it->name()}))
continue;
collectTableInfo({database_name, it->name()}, /* is_temporary_table= */ false, {}, /* throw_if_not_found= */ false);
if (!consistent)
return;
}
}
void BackupEntriesCollector::collectAllDatabasesInfo(const std::set<String> & except_database_names, const std::set<DatabaseAndTableName> & except_table_names)
{
for (const auto & [database_name, database] : DatabaseCatalog::instance().getDatabases())
{
if (except_database_names.contains(database_name))
continue;
collectDatabaseInfo(database_name, except_table_names, false);
if (!consistent)
return;
}
}
/// Check for consistency of collected information about databases and tables.
void BackupEntriesCollector::checkConsistency()
{
if (!consistent)
if (!consistency)
return; /// Already inconsistent, no more checks necessary
/// Databases found while we were scanning tables and while we were scanning databases - must be the same.
for (const auto & [key, table_info] : table_infos)
{
auto it = database_infos.find(key.name.database);
if (it != database_infos.end())
{
const auto & database_info = it->second;
if (database_info.database != table_info.database)
{
consistent = false;
return;
}
}
}
/// We need to scan tables at least twice to be sure that we haven't missed any table which could be renamed
/// while we were scanning.
std::set<String> database_names;
std::set<TableKey> table_names;
std::set<QualifiedTableName> table_names;
boost::range::copy(database_infos | boost::adaptors::map_keys, std::inserter(database_names, database_names.end()));
boost::range::copy(table_infos | boost::adaptors::map_keys, std::inserter(table_names, table_names.end()));
if (!previous_database_names || !previous_table_names || (*previous_database_names != database_names)
|| (*previous_table_names != table_names))
if ((previous_database_names != database_names) || (previous_table_names != table_names))
{
previous_database_names = std::move(database_names);
previous_table_names = std::move(table_names);
consistent = false;
consistency = false;
}
}
@ -444,6 +493,9 @@ void BackupEntriesCollector::makeBackupEntriesForDatabasesDefs()
{
for (const auto & [database_name, database_info] : database_infos)
{
if (!database_info.create_database_query)
continue; /// We don't store CREATE queries for predefined databases (see DatabaseCatalog::isPredefinedDatabaseName()).
LOG_TRACE(log, "Adding definition of database {}", backQuoteIfNeed(database_name));
ASTPtr new_create_query = database_info.create_database_query;
@ -459,22 +511,23 @@ void BackupEntriesCollector::makeBackupEntriesForDatabasesDefs()
/// Calls IDatabase::backupTable() for all the tables found to make backup entries for tables.
void BackupEntriesCollector::makeBackupEntriesForTablesDefs()
{
for (const auto & [key, table_info] : table_infos)
for (const auto & [table_name, table_info] : table_infos)
{
LOG_TRACE(log, "Adding definition of {}table {}", (key.is_temporary ? "temporary " : ""), key.name.getFullName());
LOG_TRACE(log, "Adding definition of {}", tableNameWithTypeToString(table_name.database, table_name.table, false));
bool is_temporary_database = (table_name.database == DatabaseCatalog::TEMPORARY_DATABASE);
ASTPtr new_create_query = table_info.create_table_query;
renameDatabaseAndTableNameInCreateQuery(context->getGlobalContext(), renaming_map, new_create_query);
fs::path metadata_path_in_backup;
if (key.is_temporary)
if (is_temporary_database)
{
auto new_name = renaming_map.getNewTemporaryTableName(key.name.table);
auto new_name = renaming_map.getNewTemporaryTableName(table_name.table);
metadata_path_in_backup = root_path_in_backup / "temporary_tables" / "metadata" / (escapeForFileName(new_name) + ".sql");
}
else
{
auto new_name = renaming_map.getNewTableName(key.name);
auto new_name = renaming_map.getNewTableName({table_name.database, table_name.table});
metadata_path_in_backup
= root_path_in_backup / "metadata" / escapeForFileName(new_name.database) / (escapeForFileName(new_name.table) + ".sql");
}
@ -488,10 +541,18 @@ void BackupEntriesCollector::makeBackupEntriesForTablesData()
if (backup_settings.structure_only)
return;
for (const auto & [key, table_info] : table_infos)
for (const auto & [table_name, table_info] : table_infos)
{
LOG_TRACE(log, "Adding data of {}table {}", (key.is_temporary ? "temporary " : ""), key.name.getFullName());
const auto & storage = table_info.storage;
if (!storage)
{
/// This storage exists on other replica and has not been created on this replica yet.
/// We store metadata only for such tables.
/// TODO: Need special processing if it's a ReplicatedMergeTree.
continue;
}
LOG_TRACE(log, "Adding data of {}", tableNameWithTypeToString(table_name.database, table_name.table, false));
const auto & data_path_in_backup = table_info.data_path_in_backup;
const auto & partitions = table_info.partitions;
storage->backupData(*this, data_path_in_backup, partitions);
@ -519,21 +580,21 @@ void BackupEntriesCollector::addBackupEntries(BackupEntries && backup_entries_)
insertAtEnd(backup_entries, std::move(backup_entries_));
}
void BackupEntriesCollector::addPostCollectingTask(std::function<void()> task)
void BackupEntriesCollector::addPostTask(std::function<void()> task)
{
if (current_stage == Stage::kWritingBackup)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Adding post tasks is not allowed");
post_collecting_tasks.push(std::move(task));
post_tasks.push(std::move(task));
}
/// Runs all the tasks added with addPostCollectingTask().
void BackupEntriesCollector::runPostCollectingTasks()
void BackupEntriesCollector::runPostTasks()
{
/// Post collecting tasks can add other post collecting tasks, our code is fine with that.
while (!post_collecting_tasks.empty())
while (!post_tasks.empty())
{
auto task = std::move(post_collecting_tasks.front());
post_collecting_tasks.pop();
auto task = std::move(post_tasks.front());
post_tasks.pop();
std::move(task)();
}
}

View File

@ -49,7 +49,7 @@ public:
/// Adds a function which must be called after all IStorage::backup() have finished their work on all hosts.
/// This function is designed to help making a consistent in some complex cases like
/// 1) we need to join (in a backup) the data of replicated tables gathered on different hosts.
void addPostCollectingTask(std::function<void()> task);
void addPostTask(std::function<void()> task);
/// Writing a backup includes a few stages:
enum class Stage
@ -79,16 +79,31 @@ public:
private:
void setStage(Stage new_stage, const String & error_message = {});
void calculateRootPathInBackup();
void collectDatabasesAndTablesInfo();
void collectTableInfo(const QualifiedTableName & table_name, bool is_temporary_table, const std::optional<ASTs> & partitions, bool throw_if_not_found);
void collectDatabaseInfo(const String & database_name, const std::set<DatabaseAndTableName> & except_table_names, bool throw_if_not_found);
void collectAllDatabasesInfo(const std::set<String> & except_database_names, const std::set<DatabaseAndTableName> & except_table_names);
void gatherMetadataAndCheckConsistency();
void gatherDatabasesMetadata();
void gatherDatabaseMetadata(
const String & database_name,
bool throw_if_database_not_found,
bool backup_create_database_query,
const std::optional<String> & table_name,
bool throw_if_table_not_found,
const std::optional<ASTs> & partitions,
bool all_tables,
const std::set<DatabaseAndTableName> & except_table_names);
void gatherTablesMetadata();
void lockTablesForReading();
void checkConsistency();
void makeBackupEntriesForDatabasesDefs();
void makeBackupEntriesForTablesDefs();
void makeBackupEntriesForTablesData();
void runPostCollectingTasks();
void runPostTasks();
const ASTBackupQuery::Elements backup_query_elements;
const BackupSettings backup_settings;
@ -105,6 +120,17 @@ private:
{
DatabasePtr database;
ASTPtr create_database_query;
struct TableParams
{
bool throw_if_table_not_found = false;
std::optional<ASTs> partitions;
};
std::unordered_map<String, TableParams> tables;
bool all_tables = false;
std::unordered_set<String> except_table_names;
};
struct TableInfo
@ -117,22 +143,14 @@ private:
std::optional<ASTs> partitions;
};
struct TableKey
{
QualifiedTableName name;
bool is_temporary = false;
bool operator ==(const TableKey & right) const;
bool operator <(const TableKey & right) const;
};
std::unordered_map<String, DatabaseInfo> database_infos;
std::map<TableKey, TableInfo> table_infos;
std::optional<std::set<String>> previous_database_names;
std::optional<std::set<TableKey>> previous_table_names;
bool consistent = false;
std::map<QualifiedTableName, TableInfo> table_infos;
std::set<String> previous_database_names;
std::set<QualifiedTableName> previous_table_names;
bool consistency = false;
BackupEntries backup_entries;
std::queue<std::function<void()>> post_collecting_tasks;
std::queue<std::function<void()>> post_tasks;
};
}

View File

@ -102,6 +102,7 @@ RestorerFromBackup::RestorerFromBackup(
, backup(backup_)
, context(context_)
, timeout(timeout_)
, create_table_timeout_ms(context->getConfigRef().getUInt64("backups.create_table_timeout", 300000))
, log(&Poco::Logger::get("RestorerFromBackup"))
{
}
@ -674,7 +675,7 @@ void RestorerFromBackup::createTables()
table_key.name.getFullName(),
serializeAST(*create_table_query));
database->createTableRestoredFromBackup(create_table_query, *this);
database->createTableRestoredFromBackup(create_table_query, context, restore_coordination, create_table_timeout_ms);
}
table_info.created = true;
@ -689,7 +690,9 @@ void RestorerFromBackup::createTables()
if (!restore_settings.allow_different_table_def)
{
ASTPtr create_table_query = storage->getCreateQueryForBackup(context, nullptr);
ASTPtr create_table_query = database->getCreateTableQuery(resolved_id.table_name, context);
bool consistency = true;
storage->adjustCreateQueryForBackup(create_table_query, consistency);
ASTPtr expected_create_query = table_info.create_table_query;
if (serializeAST(*create_table_query) != serializeAST(*expected_create_query))
{

View File

@ -94,6 +94,7 @@ private:
BackupPtr backup;
ContextMutablePtr context;
std::chrono::seconds timeout;
UInt64 create_table_timeout_ms;
Poco::Logger * log;
Stage current_stage = Stage::kPreparing;

View File

@ -145,4 +145,52 @@ void DatabaseMemory::alterTable(ContextPtr local_context, const StorageID & tabl
DatabaseCatalog::instance().updateLoadingDependencies(table_id, std::move(new_dependencies));
}
std::vector<std::pair<ASTPtr, StoragePtr>> DatabaseMemory::getTablesForBackup(const FilterByNameFunction & filter, const ContextPtr & local_context, bool & consistency) const
{
/// We need a special processing for the temporary database.
if (getDatabaseName() != DatabaseCatalog::TEMPORARY_DATABASE)
return DatabaseWithOwnTablesBase::getTablesForBackup(filter, local_context, consistency);
std::vector<std::pair<ASTPtr, StoragePtr>> res;
/// `this->tables` for the temporary database doesn't contain real names of tables.
/// That's why we need to call Context::getExternalTables() and then resolve those names using tryResolveStorageID() below.
auto external_tables = local_context->getExternalTables();
for (const auto & [table_name, storage] : external_tables)
{
if (!filter(table_name))
continue;
bool ok = false;
if (auto storage_id = local_context->tryResolveStorageID(StorageID{"", table_name}, Context::ResolveExternal))
{
/// Here `storage_id.table_name` looks like looks like "_tmp_ab9b15a3-fb43-4670-abec-14a0e9eb70f1"
/// it's not the real name of the table.
if (auto create_table_query = tryGetCreateTableQuery(storage_id.table_name, local_context))
{
const auto & create = create_table_query->as<const ASTCreateQuery &>();
if (create.getTable() == table_name)
{
storage->adjustCreateQueryForBackup(create_table_query, consistency);
if (consistency)
{
res.emplace_back(create_table_query, storage);
ok = true;
}
}
}
}
if (!ok)
{
consistency = false;
return {};
}
}
return res;
}
}

View File

@ -50,6 +50,8 @@ public:
void alterTable(ContextPtr local_context, const StorageID & table_id, const StorageInMemoryMetadata & metadata) override;
std::vector<std::pair<ASTPtr, StoragePtr>> getTablesForBackup(const FilterByNameFunction & filter, const ContextPtr & local_context, bool & consistency) const override;
private:
const String data_path;
using NameToASTCreate = std::unordered_map<String, ASTPtr>;

View File

@ -923,7 +923,11 @@ String DatabaseReplicated::readMetadataFile(const String & table_name) const
}
void DatabaseReplicated::createTableRestoredFromBackup(const ASTPtr & create_table_query, const RestorerFromBackup & restorer)
void DatabaseReplicated::createTableRestoredFromBackup(
const ASTPtr & create_table_query,
ContextMutablePtr local_context,
std::shared_ptr<IRestoreCoordination> restore_coordination,
UInt64 timeout_ms)
{
/// Because of the replication multiple nodes can try to restore the same tables again and failed with "Table already exists"
/// because of some table could be restored already on other node and then replicated to this node.
@ -931,29 +935,25 @@ void DatabaseReplicated::createTableRestoredFromBackup(const ASTPtr & create_tab
/// IRestoreCoordination::acquireCreatingTableInReplicatedDatabase() and then for other nodes this function returns false which means
/// this table is already being created by some other node.
String table_name = create_table_query->as<const ASTCreateQuery &>().getTable();
if (restorer.getRestoreCoordination()->acquireCreatingTableInReplicatedDatabase(getZooKeeperPath(), table_name))
if (restore_coordination->acquireCreatingTableInReplicatedDatabase(getZooKeeperPath(), table_name))
{
restorer.executeCreateQuery(create_table_query);
DatabaseAtomic::createTableRestoredFromBackup(create_table_query, local_context, restore_coordination, timeout_ms);
}
/// Wait until the table is actually created no matter if it's created by the current or another node and replicated to the
/// current node afterwards. We have to wait because `RestorerFromBackup` is going to restore data of the table then.
/// TODO: The following code doesn't look very reliable, probably we need to rewrite it somehow.
auto timeout = restorer.getTimeout();
bool use_timeout = (timeout.count() >= 0);
auto timeout = std::chrono::milliseconds{timeout_ms};
auto start_time = std::chrono::steady_clock::now();
while (!isTableExist(table_name, restorer.getContext()))
while (!isTableExist(table_name, local_context))
{
waitForReplicaToProcessAllEntries(50);
if (use_timeout)
{
auto elapsed = std::chrono::steady_clock::now() - start_time;
if (elapsed > timeout)
throw Exception(ErrorCodes::CANNOT_RESTORE_TABLE,
"Couldn't restore table {}.{} on other node or sync it (elapsed {})",
backQuoteIfNeed(getDatabaseName()), backQuoteIfNeed(table_name), to_string(elapsed));
}
auto elapsed = std::chrono::steady_clock::now() - start_time;
if (elapsed > timeout)
throw Exception(ErrorCodes::CANNOT_RESTORE_TABLE,
"Couldn't restore table {}.{} on other node or sync it (elapsed {})",
backQuoteIfNeed(getDatabaseName()), backQuoteIfNeed(table_name), to_string(elapsed));
}
}

View File

@ -72,7 +72,7 @@ public:
void shutdown() override;
void createTableRestoredFromBackup(const ASTPtr & create_table_query, const RestorerFromBackup & restorer) override;
void createTableRestoredFromBackup(const ASTPtr & create_table_query, ContextMutablePtr local_context, std::shared_ptr<IRestoreCoordination> restore_coordination, UInt64 timeout_ms) override;
friend struct DatabaseReplicatedTask;
friend class DatabaseReplicatedDDLWorker;

View File

@ -322,22 +322,45 @@ StoragePtr DatabaseWithOwnTablesBase::getTableUnlocked(const String & table_name
backQuote(database_name), backQuote(table_name));
}
DatabaseTablesIteratorPtr DatabaseWithOwnTablesBase::getTablesIteratorForBackup(const BackupEntriesCollector & backup_entries_collector) const
std::vector<std::pair<ASTPtr, StoragePtr>> DatabaseWithOwnTablesBase::getTablesForBackup(const FilterByNameFunction & filter, const ContextPtr & local_context, bool & consistency) const
{
/// Backup all the tables in this database.
/// Here we skip inner tables of materialized views.
auto skip_internal_tables = [](const String & table_name) { return !table_name.starts_with(".inner_id."); };
return getTablesIterator(backup_entries_collector.getContext(), skip_internal_tables);
std::vector<std::pair<ASTPtr, StoragePtr>> res;
for (auto it = getTablesIterator(local_context, filter); it->isValid(); it->next())
{
bool ok = false;
if (auto create_table_query = tryGetCreateTableQuery(it->name(), local_context))
{
const auto & create = create_table_query->as<const ASTCreateQuery &>();
if (create.getTable() == it->name())
{
auto storage = it->table();
storage->adjustCreateQueryForBackup(create_table_query, consistency);
if (consistency)
{
res.emplace_back(create_table_query, storage);
ok = true;
}
}
}
if (!ok)
{
consistency = false;
return {};
}
}
return res;
}
void DatabaseWithOwnTablesBase::checkCreateTableQueryForBackup(const ASTPtr &, const BackupEntriesCollector &) const
{
}
void DatabaseWithOwnTablesBase::createTableRestoredFromBackup(const ASTPtr & create_table_query, const RestorerFromBackup & restorer)
void DatabaseWithOwnTablesBase::createTableRestoredFromBackup(const ASTPtr & create_table_query, ContextMutablePtr local_context, std::shared_ptr<IRestoreCoordination>, UInt64)
{
/// Creates a table by executing a "CREATE TABLE" query.
restorer.executeCreateQuery(create_table_query);
InterpreterCreateQuery interpreter{create_table_query, local_context};
interpreter.setInternal(true);
interpreter.execute();
}
}

View File

@ -36,9 +36,8 @@ public:
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
DatabaseTablesIteratorPtr getTablesIteratorForBackup(const BackupEntriesCollector & backup_entries_collector) const override;
void checkCreateTableQueryForBackup(const ASTPtr & create_table_query, const BackupEntriesCollector & backup_entries_collector) const override;
void createTableRestoredFromBackup(const ASTPtr & create_table_query, const RestorerFromBackup & restorer) override;
std::vector<std::pair<ASTPtr, StoragePtr>> getTablesForBackup(const FilterByNameFunction & filter, const ContextPtr & local_context, bool & consistency) const override;
void createTableRestoredFromBackup(const ASTPtr & create_table_query, ContextMutablePtr local_context, std::shared_ptr<IRestoreCoordination> restore_coordination, UInt64 timeout_ms) override;
void shutdown() override;

View File

@ -1,6 +1,5 @@
#include <Databases/IDatabase.h>
#include <Storages/IStorage.h>
#include <Backups/BackupEntriesCollector.h>
#include <Parsers/ASTCreateQuery.h>
#include <Common/quoteString.h>
@ -26,29 +25,22 @@ ASTPtr IDatabase::getCreateDatabaseQueryForBackup() const
{
auto query = getCreateDatabaseQuery();
/// We don't want to see any UUIDs in backup (after RESTORE the table will have another UUID anyway).
/// We don't want to see any UUIDs in backup (after RESTORE the database will have another UUID anyway).
auto & create = query->as<ASTCreateQuery &>();
create.uuid = UUIDHelpers::Nil;
return query;
}
DatabaseTablesIteratorPtr IDatabase::getTablesIteratorForBackup(const BackupEntriesCollector &) const
{
/// IDatabase doesn't own any tables.
return std::make_unique<DatabaseTablesSnapshotIterator>(Tables{}, getDatabaseName());
}
void IDatabase::checkCreateTableQueryForBackup(const ASTPtr & create_table_query, const BackupEntriesCollector &) const
std::vector<std::pair<ASTPtr, StoragePtr>> IDatabase::getTablesForBackup(const FilterByNameFunction &, const ContextPtr &, bool &) const
{
/// Cannot restore any table because IDatabase doesn't own any tables.
throw Exception(ErrorCodes::CANNOT_BACKUP_TABLE,
"Database engine {} does not support backups, cannot backup table {}.{}",
getEngineName(), backQuoteIfNeed(getDatabaseName()),
backQuoteIfNeed(create_table_query->as<const ASTCreateQuery &>().getTable()));
"Database engine {} does not support backups, cannot backup tables in database {}",
getEngineName(), backQuoteIfNeed(getDatabaseName()));
}
void IDatabase::createTableRestoredFromBackup(const ASTPtr & create_table_query, const RestorerFromBackup &)
void IDatabase::createTableRestoredFromBackup(const ASTPtr & create_table_query, ContextMutablePtr, std::shared_ptr<IRestoreCoordination>, UInt64)
{
/// Cannot restore any table because IDatabase doesn't own any tables.
throw Exception(ErrorCodes::CANNOT_RESTORE_TABLE,

View File

@ -30,8 +30,7 @@ class SettingsChanges;
using DictionariesWithID = std::vector<std::pair<String, UUID>>;
struct ParsedTablesMetadata;
struct QualifiedTableName;
class BackupEntriesCollector;
class RestorerFromBackup;
class IRestoreCoordination;
namespace ErrorCodes
{
@ -333,17 +332,14 @@ public:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Database engine {} does not run a replication thread!", getEngineName());
}
/// Returns a slightly changed version of the CREATE DATABASE query which must be written to a backup.
/// Returns a CREATE DATABASE query prepared for writing to a backup.
virtual ASTPtr getCreateDatabaseQueryForBackup() const;
/// Returns an iterator that passes through all the tables when an user wants to backup the whole database.
virtual DatabaseTablesIteratorPtr getTablesIteratorForBackup(const BackupEntriesCollector & restorer) const;
/// Checks a CREATE TABLE query before it will be written to a backup. Called by IStorage::getCreateQueryForBackup().
virtual void checkCreateTableQueryForBackup(const ASTPtr & create_table_query, const BackupEntriesCollector & backup_entries_collector) const;
/// Returns CREATE TABLE queries and corresponding tables prepared for writing to a backup.
virtual std::vector<std::pair<ASTPtr, StoragePtr>> getTablesForBackup(const FilterByNameFunction & filter, const ContextPtr & context, bool & consistency) const;
/// Creates a table restored from backup.
virtual void createTableRestoredFromBackup(const ASTPtr & create_table_query, const RestorerFromBackup & restorer);
virtual void createTableRestoredFromBackup(const ASTPtr & create_table_query, ContextMutablePtr context, std::shared_ptr<IRestoreCoordination> restore_coordination, UInt64 timeout_ms);
virtual ~IDatabase() = default;

View File

@ -248,40 +248,21 @@ bool IStorage::isStaticStorage() const
return false;
}
ASTPtr IStorage::getCreateQueryForBackup(const ContextPtr & context, DatabasePtr * database) const
void IStorage::adjustCreateQueryForBackup(ASTPtr & create_query, bool &) const
{
auto table_id = getStorageID();
auto db = DatabaseCatalog::instance().tryGetDatabase(table_id.getDatabaseName());
if (!db)
throw Exception(ErrorCodes::TABLE_IS_DROPPED, "Table {}.{} is dropped", table_id.database_name, table_id.table_name);
ASTPtr query = db->tryGetCreateTableQuery(table_id.getTableName(), context);
if (!query)
throw Exception(ErrorCodes::TABLE_IS_DROPPED, "Table {}.{} is dropped", table_id.database_name, table_id.table_name);
create_query = create_query->clone();
/// We don't want to see any UUIDs in backup (after RESTORE the table will have another UUID anyway).
auto & create = query->as<ASTCreateQuery &>();
auto & create = create_query->as<ASTCreateQuery &>();
create.uuid = UUIDHelpers::Nil;
create.to_inner_uuid = UUIDHelpers::Nil;
/// If this is a definition of a system table we'll remove columns and comment because they're excessive for backups.
if (create.storage && create.storage->engine && create.storage->engine->name.starts_with("System"))
/// If this is a definition of a system table we'll remove columns and comment because they're reduntant for backups.
if (isSystemStorage())
{
create.reset(create.columns_list);
create.reset(create.comment);
}
if (database)
*database = db;
return query;
}
ASTPtr IStorage::getCreateQueryForBackup(const BackupEntriesCollector & backup_entries_collector) const
{
DatabasePtr database;
auto query = getCreateQueryForBackup(backup_entries_collector.getContext(), &database);
database->checkCreateTableQueryForBackup(query, backup_entries_collector);
return query;
}
void IStorage::backupData(BackupEntriesCollector &, const String &, const std::optional<ASTs> &)

View File

@ -223,10 +223,8 @@ public:
/// Initially reserved virtual column name may be shadowed by real column.
bool isVirtualColumn(const String & column_name, const StorageMetadataPtr & metadata_snapshot) const;
/// Returns a slightly changed version of the CREATE TABLE query which must be written to a backup.
/// The function can throw `TABLE_IS_DROPPED` if this storage is not attached to a database.
virtual ASTPtr getCreateQueryForBackup(const ContextPtr & context, DatabasePtr * database) const;
virtual ASTPtr getCreateQueryForBackup(const BackupEntriesCollector & backup_entries_collector) const;
/// Modify a CREATE TABLE query to make a variant which must be written to a backup.
virtual void adjustCreateQueryForBackup(ASTPtr & create_query, bool & consistency) const;
/// Makes backup entries to backup the data of this storage.
virtual void backupData(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup, const std::optional<ASTs> & partitions);

View File

@ -8253,44 +8253,58 @@ void StorageReplicatedMergeTree::createAndStoreFreezeMetadata(DiskPtr disk, Data
}
ASTPtr StorageReplicatedMergeTree::getCreateQueryForBackup(const ContextPtr & local_context, DatabasePtr * database) const
void StorageReplicatedMergeTree::adjustCreateQueryForBackup(ASTPtr & create_query, bool & consistency) const
{
ASTPtr query = MergeTreeData::getCreateQueryForBackup(local_context, database);
MergeTreeData::adjustCreateQueryForBackup(create_query, consistency);
/// Before storing the metadata in a backup we have to find a zookeeper path in its definition and turn the table's UUID in there
/// back into "{uuid}", and also we probably can remove the zookeeper path and replica name if they're default.
/// So we're kind of reverting what we had done to the table's definition in registerStorageMergeTree.cpp before we created this table.
auto & create = query->as<ASTCreateQuery &>();
if (create.storage && create.storage->engine && (create.uuid != UUIDHelpers::Nil))
auto & create = create_query->as<ASTCreateQuery &>();
if (!create.storage || !create.storage->engine)
{
auto & engine = *(create.storage->engine);
if (auto * engine_args_ast = typeid_cast<ASTExpressionList *>(engine.arguments.get()))
{
auto & engine_args = engine_args_ast->children;
if (engine_args.size() >= 2)
{
auto * zookeeper_path_ast = typeid_cast<ASTLiteral *>(engine_args[0].get());
auto * replica_name_ast = typeid_cast<ASTLiteral *>(engine_args[1].get());
if (zookeeper_path_ast && (zookeeper_path_ast->value.getType() == Field::Types::String) &&
replica_name_ast && (replica_name_ast->value.getType() == Field::Types::String))
{
String & zookeeper_path_arg = zookeeper_path_ast->value.get<String>();
String & replica_name_arg = replica_name_ast->value.get<String>();
String table_uuid_str = toString(create.uuid);
if (size_t uuid_pos = zookeeper_path_arg.find(table_uuid_str); uuid_pos != String::npos)
zookeeper_path_arg.replace(uuid_pos, table_uuid_str.size(), "{uuid}");
const auto & config = getContext()->getConfigRef();
if ((zookeeper_path_arg == getDefaultZooKeeperPath(config)) && (replica_name_arg == getDefaultReplicaName(config))
&& ((engine_args.size() == 2) || !engine_args[2]->as<ASTLiteral>()))
{
engine_args.erase(engine_args.begin(), engine_args.begin() + 2);
}
}
}
}
/// The CREATE query doesn't correspond to this storage.
consistency = false;
return;
}
return query;
auto & engine = *(create.storage->engine);
if (!engine.name.starts_with("Replicated") || !engine.name.ends_with("MergeTree"))
{
/// The CREATE query doesn't correspond to this storage.
consistency = false;
return;
}
if (create.uuid == UUIDHelpers::Nil)
return;
auto * engine_args_ast = typeid_cast<ASTExpressionList *>(engine.arguments.get());
if (!engine_args_ast)
return;
auto & engine_args = engine_args_ast->children;
if (engine_args.size() < 2)
return;
auto * zookeeper_path_ast = typeid_cast<ASTLiteral *>(engine_args[0].get());
auto * replica_name_ast = typeid_cast<ASTLiteral *>(engine_args[1].get());
if (zookeeper_path_ast && (zookeeper_path_ast->value.getType() == Field::Types::String) &&
replica_name_ast && (replica_name_ast->value.getType() == Field::Types::String))
{
String & zookeeper_path_arg = zookeeper_path_ast->value.get<String>();
String & replica_name_arg = replica_name_ast->value.get<String>();
String table_uuid_str = toString(create.uuid);
if (size_t uuid_pos = zookeeper_path_arg.find(table_uuid_str); uuid_pos != String::npos)
zookeeper_path_arg.replace(uuid_pos, table_uuid_str.size(), "{uuid}");
const auto & config = getContext()->getConfigRef();
if ((zookeeper_path_arg == getDefaultZooKeeperPath(config)) && (replica_name_arg == getDefaultReplicaName(config))
&& ((engine_args.size() == 2) || !engine_args[2]->as<ASTLiteral>()))
{
engine_args.erase(engine_args.begin(), engine_args.begin() + 2);
}
}
}
void StorageReplicatedMergeTree::backupData(
@ -8370,7 +8384,7 @@ void StorageReplicatedMergeTree::backupData(
backup_entries_collector.addBackupEntry(data_path / relative_path, backup_entry);
}
};
backup_entries_collector.addPostCollectingTask(post_collecting_task);
backup_entries_collector.addPostTask(post_collecting_task);
}
void StorageReplicatedMergeTree::restoreDataFromBackup(RestorerFromBackup & restorer, const String & data_path_in_backup, const std::optional<ASTs> & partitions)

View File

@ -232,8 +232,8 @@ public:
int getMetadataVersion() const { return metadata_version; }
/// Returns a slightly changed version of the CREATE TABLE query which must be written to a backup.
ASTPtr getCreateQueryForBackup(const ContextPtr & context, DatabasePtr * database) const override;
/// Modify a CREATE TABLE query to make a variant which must be written to a backup.
void adjustCreateQueryForBackup(ASTPtr & create_query, bool & consistency) const override;
/// Makes backup entries to backup the data of the storage.
void backupData(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup, const std::optional<ASTs> & partitions) override;

View File

@ -138,7 +138,7 @@ def test_backup_table_under_another_name():
assert instance.query("SELECT count(), sum(x) FROM test.table2") == "100\t4950\n"
def test_materialized_view():
def test_materialized_view_select_1():
backup_name = new_backup_name()
instance.query(
"CREATE MATERIALIZED VIEW mv_1(x UInt8) ENGINE=MergeTree ORDER BY tuple() POPULATE AS SELECT 1 AS x"
@ -456,18 +456,32 @@ def test_temporary_table():
) == TSV([["e"], ["q"], ["w"]])
# "BACKUP DATABASE _temporary_and_external_tables" is allowed but the backup must not contain these tables.
def test_temporary_tables_database():
# The backup created by "BACKUP DATABASE _temporary_and_external_tables" must not contain tables from other sessions.
def test_temporary_database():
session_id = new_session_id()
instance.http_query(
"CREATE TEMPORARY TABLE temp_tbl(s String)", params={"session_id": session_id}
)
backup_name = new_backup_name()
instance.query(f"BACKUP DATABASE _temporary_and_external_tables TO {backup_name}")
other_session_id = new_session_id()
instance.http_query(
"CREATE TEMPORARY TABLE other_temp_tbl(s String)",
params={"session_id": other_session_id},
)
assert os.listdir(os.path.join(get_path_to_backup(backup_name), "metadata/")) == [
"_temporary_and_external_tables.sql" # database metadata only
backup_name = new_backup_name()
instance.http_query(
f"BACKUP DATABASE _temporary_and_external_tables TO {backup_name}",
params={"session_id": session_id},
)
assert os.listdir(
os.path.join(get_path_to_backup(backup_name), "temporary_tables/metadata")
) == ["temp_tbl.sql"]
assert sorted(os.listdir(get_path_to_backup(backup_name))) == [
".backup",
"temporary_tables",
]