Fix storing temporary tables and skipping system tables while making a backup.

This commit is contained in:
Vitaly Baranov 2022-06-09 18:19:54 +02:00
parent 6590f82ec4
commit 1198e86295
18 changed files with 458 additions and 197 deletions

View File

@ -28,6 +28,31 @@ namespace ErrorCodes
}
bool BackupEntriesCollector::TableKey::operator ==(const TableKey & right) const
{
return (name == right.name) && (is_temporary == right.is_temporary);
}
bool BackupEntriesCollector::TableKey::operator <(const TableKey & right) const
{
return (name < right.name) || ((name == right.name) && (is_temporary < right.is_temporary));
}
std::string_view BackupEntriesCollector::toString(Stage stage)
{
switch (stage)
{
case Stage::kPreparing: return "Preparing";
case Stage::kFindingTables: return "Finding tables";
case Stage::kExtractingDataFromTables: return "Extracting data from tables";
case Stage::kRunningPostTasks: return "Running post tasks";
case Stage::kWritingBackup: return "Writing backup";
case Stage::kError: return "Error";
}
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown backup stage: {}", static_cast<int>(stage));
}
BackupEntriesCollector::BackupEntriesCollector(
const ASTBackupQuery::Elements & backup_query_elements_,
const BackupSettings & backup_settings_,
@ -116,20 +141,6 @@ void BackupEntriesCollector::setStage(Stage new_stage, const String & error_mess
}
}
std::string_view BackupEntriesCollector::toString(Stage stage)
{
switch (stage)
{
case Stage::kPreparing: return "Preparing";
case Stage::kFindingTables: return "Finding tables";
case Stage::kExtractingDataFromTables: return "Extracting data from tables";
case Stage::kRunningPostTasks: return "Running post tasks";
case Stage::kWritingBackup: return "Writing backup";
case Stage::kError: return "Error";
}
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown backup stage: {}", static_cast<int>(stage));
}
/// Calculates the root path for collecting backup entries,
/// it's either empty or has the format "shards/<shard_num>/replicas/<replica_num>/".
void BackupEntriesCollector::calculateRootPathInBackup()
@ -164,10 +175,11 @@ void BackupEntriesCollector::collectDatabasesAndTablesInfo()
{
case ASTBackupQuery::ElementType::TABLE:
{
QualifiedTableName table_name{element.database_name, element.table_name};
if (element.is_temporary_database)
table_name.database = DatabaseCatalog::TEMPORARY_DATABASE;
collectTableInfo(table_name, element.partitions, true);
collectTableInfo(
QualifiedTableName{element.database_name, element.table_name},
element.is_temporary_table,
element.partitions,
true);
break;
}
@ -190,9 +202,9 @@ void BackupEntriesCollector::collectDatabasesAndTablesInfo()
checkConsistency();
/// Two passes is absolute minimum (see `previous_table_names` & `previous_database_names`).
auto elapsed = std::chrono::steady_clock::now() - start_time;
if (!consistent && (pass >= 2) && use_timeout)
{
auto elapsed = std::chrono::steady_clock::now() - start_time;
if (elapsed > timeout)
throw Exception(
ErrorCodes::CANNOT_COLLECT_OBJECTS_FOR_BACKUP,
@ -201,6 +213,8 @@ void BackupEntriesCollector::collectDatabasesAndTablesInfo()
to_string(elapsed));
}
if (pass >= 2)
LOG_WARNING(log, "Couldn't collect tables and databases to make a backup (pass #{}, elapsed {})", pass, to_string(elapsed));
++pass;
} while (!consistent);
@ -208,7 +222,7 @@ void BackupEntriesCollector::collectDatabasesAndTablesInfo()
}
void BackupEntriesCollector::collectTableInfo(
const QualifiedTableName & table_name, const std::optional<ASTs> & partitions, bool throw_if_not_found)
const QualifiedTableName & table_name, bool is_temporary_table, const std::optional<ASTs> & partitions, bool throw_if_not_found)
{
/// Gather information about the table.
DatabasePtr database;
@ -216,67 +230,73 @@ void BackupEntriesCollector::collectTableInfo(
TableLockHolder table_lock;
ASTPtr create_table_query;
TableKey table_key{table_name, is_temporary_table};
if (throw_if_not_found)
{
std::tie(database, storage)
= DatabaseCatalog::instance().getDatabaseAndTable(StorageID{table_name.database, table_name.table}, context);
auto resolved_id = is_temporary_table
? context->resolveStorageID(StorageID{"", table_name.table}, Context::ResolveExternal)
: context->resolveStorageID(StorageID{table_name.database, table_name.table}, Context::ResolveGlobal);
std::tie(database, storage) = DatabaseCatalog::instance().getDatabaseAndTable(resolved_id, context);
table_lock = storage->lockForShare(context->getInitialQueryId(), context->getSettingsRef().lock_acquire_timeout);
create_table_query = database->getCreateTableQuery(table_name.table, context);
create_table_query = database->getCreateTableQuery(resolved_id.table_name, context);
}
else
{
std::tie(database, storage)
= DatabaseCatalog::instance().tryGetDatabaseAndTable(StorageID{table_name.database, table_name.table}, context);
if (!storage)
auto resolved_id = is_temporary_table
? context->tryResolveStorageID(StorageID{"", table_name.table}, Context::ResolveExternal)
: context->tryResolveStorageID(StorageID{table_name.database, table_name.table}, Context::ResolveGlobal);
if (!resolved_id.empty())
std::tie(database, storage) = DatabaseCatalog::instance().tryGetDatabaseAndTable(resolved_id, context);
if (storage)
{
consistent &= !table_infos.contains(table_name);
return;
}
try
{
table_lock = storage->lockForShare(context->getInitialQueryId(), context->getSettingsRef().lock_acquire_timeout);
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::TABLE_IS_DROPPED)
try
{
consistent &= !table_infos.contains(table_name);
return;
table_lock = storage->lockForShare(context->getInitialQueryId(), context->getSettingsRef().lock_acquire_timeout);
}
catch (Exception & e)
{
if (e.code() != ErrorCodes::TABLE_IS_DROPPED)
throw;
}
throw;
}
create_table_query = database->tryGetCreateTableQuery(table_name.table, context);
if (table_lock)
create_table_query = database->tryGetCreateTableQuery(resolved_id.table_name, context);
if (!create_table_query)
{
consistent &= !table_infos.contains(table_name);
consistent &= !table_infos.contains(table_key);
return;
}
}
storage->adjustCreateQueryForBackup(create_table_query);
auto new_table_name = renaming_map.getNewTableName(table_name);
fs::path data_path_in_backup
= root_path_in_backup / "data" / escapeForFileName(new_table_name.database) / escapeForFileName(new_table_name.table);
fs::path data_path_in_backup;
if (is_temporary_table)
{
auto table_name_in_backup = renaming_map.getNewTemporaryTableName(table_name.table);
data_path_in_backup = root_path_in_backup / "temporary_tables" / "data" / escapeForFileName(table_name_in_backup);
}
else
{
auto table_name_in_backup = renaming_map.getNewTableName(table_name);
data_path_in_backup
= root_path_in_backup / "data" / escapeForFileName(table_name_in_backup.database) / escapeForFileName(table_name_in_backup.table);
}
/// Check that information is consistent.
const auto & create = create_table_query->as<const ASTCreateQuery &>();
if (create.getTable() != table_name.table)
if ((create.getTable() != table_name.table) || (is_temporary_table != create.temporary) || (create.getDatabase() != table_name.database))
{
/// Table was renamed recently.
consistent = false;
return;
}
if ((create.getDatabase() != table_name.database) || (create.temporary && (table_name.database == DatabaseCatalog::TEMPORARY_DATABASE)))
{
/// Table was renamed recently.
consistent = false;
return;
}
if (auto it = table_infos.find(table_name); it != table_infos.end())
if (auto it = table_infos.find(table_key); it != table_infos.end())
{
const auto & table_info = it->second;
if ((table_info.database != database) || (table_info.storage != storage))
@ -288,7 +308,7 @@ void BackupEntriesCollector::collectTableInfo(
}
/// Add information to `table_infos`.
auto & res_table_info = table_infos[table_name];
auto & res_table_info = table_infos[table_key];
res_table_info.database = database;
res_table_info.storage = storage;
res_table_info.table_lock = table_lock;
@ -360,12 +380,14 @@ void BackupEntriesCollector::collectDatabaseInfo(const String & database_name, c
res_database_info.database = database;
res_database_info.create_database_query = create_database_query;
/// Add information about tables too.
for (auto it = database->getTablesIteratorForBackup(*this); it->isValid(); it->next())
{
if (except_table_names.contains(it->name()))
continue;
collectTableInfo(QualifiedTableName{database_name, it->name()}, {}, false);
collectTableInfo(
QualifiedTableName{database_name, it->name()}, /* is_temporary_table= */ false, {}, /* throw_if_not_found= */ false);
if (!consistent)
return;
}
@ -390,9 +412,9 @@ void BackupEntriesCollector::checkConsistency()
return; /// Already inconsistent, no more checks necessary
/// Databases found while we were scanning tables and while we were scanning databases - must be the same.
for (const auto & [table_name, table_info] : table_infos)
for (const auto & [key, table_info] : table_infos)
{
auto it = database_infos.find(table_name.database);
auto it = database_infos.find(key.name.database);
if (it != database_infos.end())
{
const auto & database_info = it->second;
@ -407,7 +429,7 @@ void BackupEntriesCollector::checkConsistency()
/// We need to scan tables at least twice to be sure that we haven't missed any table which could be renamed
/// while we were scanning.
std::set<String> database_names;
std::set<QualifiedTableName> table_names;
std::set<TableKey> table_names;
boost::range::copy(database_infos | boost::adaptors::map_keys, std::inserter(database_names, database_names.end()));
boost::range::copy(table_infos | boost::adaptors::map_keys, std::inserter(table_names, table_names.end()));
@ -441,9 +463,9 @@ void BackupEntriesCollector::makeBackupEntriesForDatabasesDefs()
/// Calls IDatabase::backupTable() for all the tables found to make backup entries for tables.
void BackupEntriesCollector::makeBackupEntriesForTablesDefs()
{
for (const auto & [table_name, table_info] : table_infos)
for (const auto & [key, table_info] : table_infos)
{
LOG_TRACE(log, "Adding definition of table {}", table_name.getFullName());
LOG_TRACE(log, "Adding definition of {}table {}", (key.is_temporary ? "temporary " : ""), key.name.getFullName());
const auto & database = table_info.database;
const auto & storage = table_info.storage;
database->backupCreateTableQuery(*this, storage, table_info.create_table_query);
@ -455,9 +477,9 @@ void BackupEntriesCollector::makeBackupEntriesForTablesData()
if (backup_settings.structure_only)
return;
for (const auto & [table_name, table_info] : table_infos)
for (const auto & [key, table_info] : table_infos)
{
LOG_TRACE(log, "Adding data of table {}", table_name.getFullName());
LOG_TRACE(log, "Adding data of {}table {}", (key.is_temporary ? "temporary " : ""), key.name.getFullName());
const auto & storage = table_info.storage;
const auto & data_path_in_backup = table_info.data_path_in_backup;
const auto & partitions = table_info.partitions;
@ -490,12 +512,18 @@ void BackupEntriesCollector::addBackupEntryForCreateQuery(const ASTPtr & create_
{
ASTPtr new_create_query = create_query;
renameDatabaseAndTableNameInCreateQuery(context->getGlobalContext(), renaming_map, new_create_query);
const auto & create = new_create_query->as<const ASTCreateQuery &>();
String new_table_name = create.getTable();
String new_database_name = create.getDatabase();
auto metadata_path_in_backup
= root_path_in_backup / "metadata" / escapeForFileName(new_database_name) / (escapeForFileName(new_table_name) + ".sql");
fs::path metadata_path_in_backup;
if (create.temporary)
{
metadata_path_in_backup = root_path_in_backup / "temporary_tables" / "metadata" / (escapeForFileName(create.getTable()) + ".sql");
}
else
{
metadata_path_in_backup
= root_path_in_backup / "metadata" / escapeForFileName(create.getDatabase()) / (escapeForFileName(create.getTable()) + ".sql");
}
addBackupEntry(metadata_path_in_backup, std::make_shared<BackupEntryFromMemory>(serializeAST(*create_query)));
}

View File

@ -84,7 +84,7 @@ private:
void setStage(Stage new_stage, const String & error_message = {});
void calculateRootPathInBackup();
void collectDatabasesAndTablesInfo();
void collectTableInfo(const QualifiedTableName & table_name, const std::optional<ASTs> & partitions, bool throw_if_not_found);
void collectTableInfo(const QualifiedTableName & table_name, bool is_temporary_table, const std::optional<ASTs> & partitions, bool throw_if_not_found);
void collectDatabaseInfo(const String & database_name, const std::set<String> & except_table_names, bool throw_if_not_found);
void collectAllDatabasesInfo(const std::set<String> & except_database_names);
void checkConsistency();
@ -120,10 +120,18 @@ private:
std::optional<ASTs> partitions;
};
struct TableKey
{
QualifiedTableName name;
bool is_temporary = false;
bool operator ==(const TableKey & right) const;
bool operator <(const TableKey & right) const;
};
std::unordered_map<String, DatabaseInfo> database_infos;
std::unordered_map<QualifiedTableName, TableInfo> table_infos;
std::map<TableKey, TableInfo> table_infos;
std::optional<std::set<String>> previous_database_names;
std::optional<std::set<QualifiedTableName>> previous_table_names;
std::optional<std::set<TableKey>> previous_table_names;
bool consistent = false;
BackupEntries backup_entries;

View File

@ -22,18 +22,20 @@ DDLRenamingMap makeRenamingMapFromBackupQuery(const ASTBackupQuery::Elements & e
{
const String & table_name = element.table_name;
const String & new_table_name = element.new_table_name;
String database_name = element.database_name;
String new_database_name = element.new_database_name;
if (element.is_temporary_database)
{
database_name = DatabaseCatalog::TEMPORARY_DATABASE;
new_database_name = DatabaseCatalog::TEMPORARY_DATABASE;
}
assert(!table_name.empty());
assert(!database_name.empty());
assert(!new_table_name.empty());
assert(!new_database_name.empty());
map.setNewTableName({database_name, table_name}, {new_database_name, new_table_name});
if (element.is_temporary_table)
{
map.setNewTemporaryTableName(table_name, new_table_name);
}
else
{
const String & database_name = element.database_name;
const String & new_database_name = element.new_database_name;
assert(!database_name.empty());
assert(!new_database_name.empty());
map.setNewTableName({database_name, table_name}, {new_database_name, new_table_name});
}
break;
}
@ -193,7 +195,7 @@ AccessRightsElements getRequiredAccessToBackup(const ASTBackupQuery::Elements &
{
case ASTBackupQuery::TABLE:
{
if (element.is_temporary_database)
if (element.is_temporary_table)
break;
AccessFlags flags = AccessType::SHOW_TABLES;
if (!backup_settings.structure_only)
@ -235,7 +237,7 @@ AccessRightsElements getRequiredAccessToRestore(const ASTBackupQuery::Elements &
{
case ASTBackupQuery::TABLE:
{
if (element.is_temporary_database)
if (element.is_temporary_table)
{
if (restore_settings.create_table != RestoreTableCreationMode::kMustExist)
required_access.emplace_back(AccessType::CREATE_TEMPORARY_TABLE);

View File

@ -8,6 +8,7 @@
#include <Parsers/parseQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateQuery.h>
@ -33,6 +34,44 @@ namespace ErrorCodes
}
namespace
{
bool hasSystemTableEngine(const IAST & ast)
{
const ASTCreateQuery * create = ast.as<ASTCreateQuery>();
if (!create)
return false;
if (!create->storage || !create->storage->engine)
return false;
return create->storage->engine->name.starts_with("System");
}
}
bool RestorerFromBackup::TableKey::operator ==(const TableKey & right) const
{
return (name == right.name) && (is_temporary == right.is_temporary);
}
bool RestorerFromBackup::TableKey::operator <(const TableKey & right) const
{
return (name < right.name) || ((name == right.name) && (is_temporary < right.is_temporary));
}
std::string_view RestorerFromBackup::toString(Stage stage)
{
switch (stage)
{
case Stage::kPreparing: return "Preparing";
case Stage::kFindingTablesInBackup: return "Finding tables in backup";
case Stage::kCreatingDatabases: return "Creating databases";
case Stage::kCreatingTables: return "Creating tables";
case Stage::kInsertingDataToTables: return "Inserting data to tables";
case Stage::kError: return "Error";
}
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown backup stage: {}", static_cast<int>(stage));
}
RestorerFromBackup::RestorerFromBackup(
const ASTBackupQuery::Elements & restore_query_elements_,
const RestoreSettings & restore_settings_,
@ -101,21 +140,26 @@ RestorerFromBackup::DataRestoreTasks RestorerFromBackup::getDataRestoreTasks()
if (current_stage != Stage::kInsertingDataToTables)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Metadata wasn't restored");
/// Storages and table locks must exist while we're executing data restoring tasks.
DataRestoreTasks res_tasks;
for (auto & [storage, tasks] : data_restore_tasks)
{
if (!tasks.empty())
LOG_TRACE(log, "Will insert data to table {}", storage->getStorageID().getFullTableName());
if (data_restore_tasks.empty())
return {};
TableLockHolder table_lock;
auto it_table_lock = table_locks.find(storage);
if (it_table_lock != table_locks.end())
table_lock = it_table_lock->second;
for (auto & task : tasks)
res_tasks.push_back([task, storage = storage, table_lock] { task(); });
LOG_TRACE(log, "Will insert data to tables");
/// Storages and table locks must exist while we're executing data restoring tasks.
auto storages = std::make_shared<std::vector<StoragePtr>>();
auto table_locks = std::make_shared<std::vector<TableLockHolder>>();
storages->reserve(table_infos.size());
table_locks->reserve(table_infos.size());
for (const auto & table_info : table_infos | boost::adaptors::map_values)
{
storages->push_back(table_info.storage);
table_locks->push_back(table_info.table_lock);
}
DataRestoreTasks res_tasks;
for (const auto & task : data_restore_tasks)
res_tasks.push_back([task, storages, table_locks] { task(); });
return res_tasks;
}
@ -140,21 +184,6 @@ void RestorerFromBackup::setStage(Stage new_stage, const String & error_message)
}
}
std::string_view RestorerFromBackup::toString(Stage stage)
{
switch (stage)
{
case Stage::kPreparing: return "Preparing";
case Stage::kFindingTablesInBackup: return "Finding tables in backup";
case Stage::kCreatingDatabases: return "Creating databases";
case Stage::kCreatingTables: return "Creating tables";
case Stage::kInsertingDataToTables: return "Inserting data to tables";
case Stage::kError: return "Error";
}
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown backup stage: {}", static_cast<int>(stage));
}
void RestorerFromBackup::findRootPathsInBackup()
{
size_t shard_num = 1;
@ -244,10 +273,7 @@ void RestorerFromBackup::collectDatabaseAndTableInfos()
{
case ASTBackupQuery::ElementType::TABLE:
{
QualifiedTableName table_name{element.database_name, element.table_name};
if (element.is_temporary_database)
table_name.database = DatabaseCatalog::TEMPORARY_DATABASE;
collectTableInfo(table_name, element.partitions);
collectTableInfo(QualifiedTableName{element.database_name, element.table_name}, element.is_temporary_table, element.partitions);
break;
}
case ASTBackupQuery::ElementType::DATABASE:
@ -266,14 +292,26 @@ void RestorerFromBackup::collectDatabaseAndTableInfos()
LOG_INFO(log, "Will restore {} databases and {} tables", database_infos.size(), table_infos.size());
}
void RestorerFromBackup::collectTableInfo(const QualifiedTableName & table_name_in_backup, const std::optional<ASTs> & partitions)
void RestorerFromBackup::collectTableInfo(const QualifiedTableName & table_name_in_backup, bool is_temporary_table, const std::optional<ASTs> & partitions)
{
String database_name_in_backup = is_temporary_table ? DatabaseCatalog::TEMPORARY_DATABASE : table_name_in_backup.database;
std::optional<fs::path> metadata_path;
std::optional<fs::path> root_path_in_use;
for (const auto & root_path_in_backup : root_paths_in_backup)
{
fs::path try_metadata_path = root_path_in_backup / "metadata" / escapeForFileName(table_name_in_backup.database)
/ (escapeForFileName(table_name_in_backup.table) + ".sql");
fs::path try_metadata_path;
if (is_temporary_table)
{
try_metadata_path
= root_path_in_backup / "temporary_tables" / "metadata" / (escapeForFileName(table_name_in_backup.table) + ".sql");
}
else
{
try_metadata_path = root_path_in_backup / "metadata" / escapeForFileName(table_name_in_backup.database)
/ (escapeForFileName(table_name_in_backup.table) + ".sql");
}
if (backup->fileExists(try_metadata_path))
{
metadata_path = try_metadata_path;
@ -285,9 +323,20 @@ void RestorerFromBackup::collectTableInfo(const QualifiedTableName & table_name_
if (!metadata_path)
throw Exception(ErrorCodes::BACKUP_ENTRY_NOT_FOUND, "Table {} not found in backup", table_name_in_backup.getFullName());
auto table_name = renaming_map.getNewTableName(table_name_in_backup);
fs::path data_path_in_backup
= *root_path_in_use / "data" / escapeForFileName(table_name_in_backup.database) / escapeForFileName(table_name_in_backup.table);
TableKey table_key;
fs::path data_path_in_backup;
if (is_temporary_table)
{
data_path_in_backup = *root_path_in_use / "temporary_tables" / "data" / escapeForFileName(table_name_in_backup.table);
table_key.name.table = renaming_map.getNewTemporaryTableName(table_name_in_backup.table);
table_key.is_temporary = true;
}
else
{
data_path_in_backup
= *root_path_in_use / "data" / escapeForFileName(table_name_in_backup.database) / escapeForFileName(table_name_in_backup.table);
table_key.name = renaming_map.getNewTableName(table_name_in_backup);
}
auto read_buffer = backup->readFile(*metadata_path)->getReadBuffer();
String create_query_str;
@ -297,24 +346,25 @@ void RestorerFromBackup::collectTableInfo(const QualifiedTableName & table_name_
ASTPtr create_table_query = parseQuery(create_parser, create_query_str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
renameDatabaseAndTableNameInCreateQuery(context->getGlobalContext(), renaming_map, create_table_query);
if (auto it = table_infos.find(table_name); it != table_infos.end())
if (auto it = table_infos.find(table_key); it != table_infos.end())
{
const TableInfo & table_info = it->second;
if (table_info.create_table_query && (serializeAST(*table_info.create_table_query) != serializeAST(*create_table_query)))
{
throw Exception(
ErrorCodes::CANNOT_RESTORE_TABLE,
"Extracted two different create queries for the same table {}: {} and {}",
table_name.getFullName(),
"Extracted two different create queries for the same {}table {}: {} and {}",
(is_temporary_table ? "temporary " : ""),
table_key.name.getFullName(),
serializeAST(*table_info.create_table_query),
serializeAST(*create_table_query));
}
}
TableInfo & res_table_info = table_infos[table_name];
TableInfo & res_table_info = table_infos[table_key];
res_table_info.create_table_query = create_table_query;
res_table_info.data_path_in_backup = data_path_in_backup;
res_table_info.dependencies = getDependenciesSetFromCreateQuery(context->getGlobalContext(), table_name, create_table_query);
res_table_info.dependencies = getDependenciesSetFromCreateQuery(context->getGlobalContext(), table_key.name, create_table_query);
if (partitions)
{
@ -387,7 +437,7 @@ void RestorerFromBackup::collectDatabaseInfo(const String & database_name_in_bac
if (except_table_names.contains(table_name_in_backup))
continue;
collectTableInfo(QualifiedTableName{database_name_in_backup, table_name_in_backup}, {});
collectTableInfo(QualifiedTableName{database_name_in_backup, table_name_in_backup}, /* is_temporary_table= */ false, {});
}
}
@ -419,7 +469,11 @@ void RestorerFromBackup::createDatabases()
{
for (const auto & [database_name, database_info] : database_infos)
{
if (restore_settings.create_database != RestoreDatabaseCreationMode::kMustExist)
bool need_create_database = (restore_settings.create_database != RestoreDatabaseCreationMode::kMustExist);
if (need_create_database && DatabaseCatalog::isPredefinedDatabaseName(database_name))
need_create_database = false; /// Predefined databases always exist.
if (need_create_database)
{
/// Execute CREATE DATABASE query.
auto create_database_query = database_info.create_database_query;
@ -464,11 +518,21 @@ void RestorerFromBackup::createTables()
if (tables_to_create.empty())
break; /// We've already created all the tables.
for (const auto & table_name : tables_to_create)
for (const auto & table_key : tables_to_create)
{
auto & table_info = table_infos.at(table_name);
DatabasePtr database = DatabaseCatalog::instance().getDatabase(table_name.database);
if (restore_settings.create_table != RestoreTableCreationMode::kMustExist)
auto & table_info = table_infos.at(table_key);
DatabasePtr database;
if (table_key.is_temporary)
database = DatabaseCatalog::instance().getDatabaseForTemporaryTables();
else
database = DatabaseCatalog::instance().getDatabase(table_key.name.database);
bool need_create_table = (restore_settings.create_table != RestoreTableCreationMode::kMustExist);
if (need_create_table && hasSystemTableEngine(*table_info.create_table_query))
need_create_table = false; /// Tables with System* table engine already exist or can't be created by SQL anyway.
if (need_create_table)
{
/// Execute CREATE TABLE query (we call IDatabase::createTableRestoredFromBackup() to allow the database to do some
/// database-specific things).
@ -478,17 +542,29 @@ void RestorerFromBackup::createTables()
create_table_query = create_table_query->clone();
create_table_query->as<ASTCreateQuery &>().if_not_exists = true;
}
LOG_TRACE(log, "Creating table {}: {}", table_name.getFullName(), serializeAST(*create_table_query));
LOG_TRACE(
log,
"Creating {}table {}: {}",
(table_key.is_temporary ? "temporary " : ""),
table_key.name.getFullName(),
serializeAST(*create_table_query));
database->createTableRestoredFromBackup(*this, create_table_query);
}
table_info.created = true;
auto storage = database->getTable(table_name.table, context);
table_locks[storage] = storage->lockForShare(context->getInitialQueryId(), context->getSettingsRef().lock_acquire_timeout);
auto resolved_id = table_key.is_temporary
? context->resolveStorageID(StorageID{"", table_key.name.table}, Context::ResolveExternal)
: context->resolveStorageID(StorageID{table_key.name.database, table_key.name.table}, Context::ResolveGlobal);
auto storage = database->getTable(resolved_id.table_name, context);
table_info.storage = storage;
table_info.table_lock = storage->lockForShare(context->getInitialQueryId(), context->getSettingsRef().lock_acquire_timeout);
if (!restore_settings.allow_different_table_def)
{
ASTPtr create_table_query = database->getCreateTableQuery(table_name.table, context);
ASTPtr create_table_query = database->getCreateTableQuery(resolved_id.table_name, context);
ASTPtr expected_create_query = table_info.create_table_query;
storage->adjustCreateQueryForBackup(create_table_query);
storage->adjustCreateQueryForBackup(expected_create_query);
@ -496,9 +572,10 @@ void RestorerFromBackup::createTables()
{
throw Exception(
ErrorCodes::CANNOT_RESTORE_TABLE,
"The table {} has a different definition: {} "
"The {}table {} has a different definition: {} "
"comparing to its definition in the backup: {}",
table_name.getFullName(),
(table_key.is_temporary ? "temporary " : ""),
table_key.name.getFullName(),
serializeAST(*create_table_query),
serializeAST(*expected_create_query));
}
@ -515,12 +592,12 @@ void RestorerFromBackup::createTables()
}
/// Returns the list of tables without dependencies or those which dependencies have been created before.
std::vector<QualifiedTableName> RestorerFromBackup::findTablesWithoutDependencies() const
std::vector<RestorerFromBackup::TableKey> RestorerFromBackup::findTablesWithoutDependencies() const
{
std::vector<QualifiedTableName> tables_without_dependencies;
std::vector<TableKey> tables_without_dependencies;
bool all_tables_created = true;
for (const auto & [table_name, table_info] : table_infos)
for (const auto & [key, table_info] : table_infos)
{
if (table_info.created)
continue;
@ -532,7 +609,7 @@ std::vector<QualifiedTableName> RestorerFromBackup::findTablesWithoutDependencie
bool all_dependencies_met = true;
for (const auto & dependency : table_info.dependencies)
{
auto it = table_infos.find(dependency);
auto it = table_infos.find(TableKey{dependency, false});
if ((it != table_infos.end()) && !it->second.created)
{
all_dependencies_met = false;
@ -541,7 +618,7 @@ std::vector<QualifiedTableName> RestorerFromBackup::findTablesWithoutDependencie
}
if (all_dependencies_met)
tables_without_dependencies.push_back(table_name);
tables_without_dependencies.push_back(key);
}
if (!tables_without_dependencies.empty())
@ -551,11 +628,11 @@ std::vector<QualifiedTableName> RestorerFromBackup::findTablesWithoutDependencie
return {};
/// Cyclic dependency? We'll try to create those tables anyway but probably it's going to fail.
std::vector<QualifiedTableName> tables_with_cyclic_dependencies;
for (const auto & [table_name, table_info] : table_infos)
std::vector<TableKey> tables_with_cyclic_dependencies;
for (const auto & [key, table_info] : table_infos)
{
if (!table_info.created)
tables_with_cyclic_dependencies.push_back(table_name);
tables_with_cyclic_dependencies.push_back(key);
}
/// Only show a warning here, proper exception will be thrown later on creating those tables.
@ -564,24 +641,24 @@ std::vector<QualifiedTableName> RestorerFromBackup::findTablesWithoutDependencie
"Some tables have cyclic dependency from each other: {}",
boost::algorithm::join(
tables_with_cyclic_dependencies
| boost::adaptors::transformed([](const QualifiedTableName & table_name) -> String { return table_name.getFullName(); }),
| boost::adaptors::transformed([](const TableKey & key) -> String { return key.name.getFullName(); }),
", "));
return tables_with_cyclic_dependencies;
}
void RestorerFromBackup::addDataRestoreTask(StoragePtr storage, DataRestoreTask && new_task)
void RestorerFromBackup::addDataRestoreTask(DataRestoreTask && new_task)
{
if (current_stage == Stage::kInsertingDataToTables)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Adding data-restoring tasks is not allowed");
data_restore_tasks[storage].push_back(std::move(new_task));
data_restore_tasks.push_back(std::move(new_task));
}
void RestorerFromBackup::addDataRestoreTasks(StoragePtr storage, DataRestoreTasks && new_tasks)
void RestorerFromBackup::addDataRestoreTasks(DataRestoreTasks && new_tasks)
{
if (current_stage == Stage::kInsertingDataToTables)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Adding data-restoring tasks is not allowed");
insertAtEnd(data_restore_tasks[storage], std::move(new_tasks));
insertAtEnd(data_restore_tasks, std::move(new_tasks));
}
void RestorerFromBackup::executeCreateQuery(const ASTPtr & create_query) const

View File

@ -47,8 +47,8 @@ public:
/// Adds a data restore task which will be later returned by getDataRestoreTasks().
/// This function can be called by implementations of IStorage::restoreFromBackup() in inherited storage classes.
void addDataRestoreTask(StoragePtr storage, DataRestoreTask && data_restore_task);
void addDataRestoreTasks(StoragePtr storage, DataRestoreTasks && data_restore_task);
void addDataRestoreTask(DataRestoreTask && data_restore_task);
void addDataRestoreTasks(DataRestoreTasks && data_restore_task);
/// Reading a backup includes a few stages:
enum class Stage
@ -95,20 +95,17 @@ private:
void setStage(Stage new_stage, const String & error_message = {});
void findRootPathsInBackup();
void collectDatabaseAndTableInfos();
void collectTableInfo(const QualifiedTableName & table_name_in_backup, const std::optional<ASTs> & partitions);
void collectTableInfo(const QualifiedTableName & table_name_in_backup, bool is_temporary, const std::optional<ASTs> & partitions);
void collectDatabaseInfo(const String & database_name_in_backup, const std::set<String> & except_table_names);
void collectAllDatabasesInfo(const std::set<String> & except_database_names);
void createDatabases();
void createTables();
std::vector<QualifiedTableName> findTablesWithoutDependencies() const;
struct DatabaseInfo
{
ASTPtr create_database_query;
};
std::unordered_map<String, DatabaseInfo> database_infos;
struct TableInfo
{
ASTPtr create_table_query;
@ -116,12 +113,23 @@ private:
std::filesystem::path data_path_in_backup;
std::unordered_set<QualifiedTableName> dependencies;
bool created = false;
StoragePtr storage;
TableLockHolder table_lock;
};
std::unordered_map<QualifiedTableName, TableInfo> table_infos;
struct TableKey
{
QualifiedTableName name;
bool is_temporary = false;
bool operator ==(const TableKey & right) const;
bool operator <(const TableKey & right) const;
};
std::unordered_map<StoragePtr, TableLockHolder> table_locks;
std::unordered_map<StoragePtr, std::vector<DataRestoreTask>> data_restore_tasks;
std::vector<TableKey> findTablesWithoutDependencies() const;
std::unordered_map<String, DatabaseInfo> database_infos;
std::map<TableKey, TableInfo> table_infos;
std::vector<DataRestoreTask> data_restore_tasks;
};
}

View File

@ -27,17 +27,20 @@ namespace
/// CREATE TABLE or CREATE DICTIONARY or CREATE VIEW or CREATE TEMPORARY TABLE or CREATE DATABASE query.
void visitCreateQuery(ASTCreateQuery & create, const DDLRenamingVisitor::Data & data)
{
if (create.table)
if (create.temporary)
{
/// CREATE TABLE or CREATE DICTIONARY or CREATE VIEW or CREATE TEMPORARY TABLE
/// CREATE TEMPORARY TABLE
String table_name = create.getTable();
const auto & new_table_name = data.renaming_map.getNewTemporaryTableName(table_name);
if (new_table_name != table_name)
create.setTable(new_table_name);
}
else if (create.table)
{
/// CREATE TABLE or CREATE DICTIONARY or CREATE VIEW
QualifiedTableName qualified_name;
qualified_name.table = create.getTable();
if (create.database)
qualified_name.database = create.getDatabase();
else if (create.temporary)
qualified_name.database = DatabaseCatalog::TEMPORARY_DATABASE;
else
return;
qualified_name.database = create.getDatabase();
if (!qualified_name.database.empty() && !qualified_name.table.empty())
{
@ -45,16 +48,7 @@ namespace
if (new_qualified_name != qualified_name)
{
create.setTable(new_qualified_name.table);
if (new_qualified_name.database == DatabaseCatalog::TEMPORARY_DATABASE)
{
create.temporary = true;
create.setDatabase("");
}
else
{
create.temporary = false;
create.setDatabase(new_qualified_name.database);
}
create.setDatabase(new_qualified_name.database);
}
}
}
@ -358,4 +352,28 @@ QualifiedTableName DDLRenamingMap::getNewTableName(const QualifiedTableName & ol
return {getNewDatabaseName(old_table_name.database), old_table_name.table};
}
void DDLRenamingMap::setNewTemporaryTableName(const String & old_table_name, const String & new_table_name)
{
if (old_table_name.empty() || new_table_name.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty names are not allowed");
auto it = old_to_new_temporary_table_names.find(old_table_name);
if ((it != old_to_new_temporary_table_names.end()))
{
if (it->second == new_table_name)
return;
throw Exception(ErrorCodes::WRONG_DDL_RENAMING_SETTINGS, "Wrong renaming: it's specified that temporary table {} should be renamed to {} and to {} at the same time",
backQuoteIfNeed(old_table_name), backQuoteIfNeed(it->second), backQuoteIfNeed(new_table_name));
}
old_to_new_temporary_table_names[old_table_name] = new_table_name;
}
const String & DDLRenamingMap::getNewTemporaryTableName(const String & old_table_name) const
{
auto it = old_to_new_temporary_table_names.find(old_table_name);
if (it != old_to_new_temporary_table_names.end())
return it->second;
return old_table_name;
}
}

View File

@ -25,13 +25,16 @@ class DDLRenamingMap
public:
void setNewTableName(const QualifiedTableName & old_table_name, const QualifiedTableName & new_table_name);
void setNewDatabaseName(const String & old_database_name, const String & new_database_name);
void setNewTemporaryTableName(const String & old_table_name, const String & new_table_name);
QualifiedTableName getNewTableName(const QualifiedTableName & old_table_name) const;
const String & getNewDatabaseName(const String & old_database_name) const;
const String & getNewTemporaryTableName(const String & old_table_name) const;
private:
std::unordered_map<QualifiedTableName, QualifiedTableName> old_to_new_table_names;
std::unordered_map<String, String> old_to_new_database_names;
std::unordered_map<String, String> old_to_new_temporary_table_names;
};
/// Visits ASTCreateQuery and changes names of databases or tables.

View File

@ -205,6 +205,12 @@ void DatabaseCatalog::shutdownImpl()
view_dependencies.clear();
}
bool DatabaseCatalog::isPredefinedDatabaseName(const std::string_view & database_name)
{
return database_name == TEMPORARY_DATABASE || database_name == SYSTEM_DATABASE || database_name == INFORMATION_SCHEMA
|| database_name == INFORMATION_SCHEMA_UPPERCASE;
}
DatabaseAndTable DatabaseCatalog::tryGetByUUID(const UUID & uuid) const
{
assert(uuid != UUIDHelpers::Nil && getFirstLevelIdx(uuid) < uuid_map.size());

View File

@ -122,11 +122,15 @@ class BackgroundSchedulePoolTaskHolder;
class DatabaseCatalog : boost::noncopyable, WithMutableContext
{
public:
/// Names of predefined databases.
static constexpr const char * TEMPORARY_DATABASE = "_temporary_and_external_tables";
static constexpr const char * SYSTEM_DATABASE = "system";
static constexpr const char * INFORMATION_SCHEMA = "information_schema";
static constexpr const char * INFORMATION_SCHEMA_UPPERCASE = "INFORMATION_SCHEMA";
/// Returns true if a passed string is one of the predefined databases' names
static bool isPredefinedDatabaseName(const std::string_view & database_name);
static DatabaseCatalog & init(ContextMutablePtr global_context_);
static DatabaseCatalog & instance();
static void shutdown();

View File

@ -51,7 +51,7 @@ namespace
case ElementType::TABLE:
{
format.ostr << (format.hilite ? IAST::hilite_keyword : "");
if (element.is_temporary_database)
if (element.is_temporary_table)
format.ostr << "TEMPORARY TABLE ";
else
format.ostr << "TABLE ";
@ -175,7 +175,7 @@ namespace
void ASTBackupQuery::Element::setCurrentDatabase(const String & current_database)
{
if ((type == ASTBackupQuery::TABLE) && !is_temporary_database)
if ((type == ASTBackupQuery::TABLE) && !is_temporary_table)
{
if (database_name.empty())
database_name = current_database;

View File

@ -59,7 +59,7 @@ public:
ElementType type;
String table_name;
String database_name;
bool is_temporary_database = false;
bool is_temporary_table = false;
String new_table_name; /// usually the same as `table_name`, can be different in case of using AS <new_name>
String new_database_name; /// usually the same as `database_name`, can be different in case of using AS <new_name>
std::optional<ASTs> partitions;

View File

@ -91,7 +91,7 @@ namespace
if (ParserKeyword{"TEMPORARY TABLE"}.ignore(pos, expected))
{
element.type = ElementType::TABLE;
element.is_temporary_database = true;
element.is_temporary_table = true;
ASTPtr ast;
if (!ParserIdentifier{}.parse(pos, ast, expected))

View File

@ -250,10 +250,14 @@ bool IStorage::isStaticStorage() const
void IStorage::adjustCreateQueryForBackup(ASTPtr & create_query) const
{
/// We don't want to see any UUIDs in backup.
/// We don't want to see any UUIDs in backup (after RESTORE the table will have another UUID anyway).
auto & create = create_query->as<ASTCreateQuery &>();
create.uuid = UUIDHelpers::Nil;
create.to_inner_uuid = UUIDHelpers::Nil;
/// Remove the comment "SYSTEM TABLE is built on the fly" from the definition of system tables (it would look excessive for backups).
if (isSystemStorage())
create.reset(create.comment);
}
void IStorage::backupCreateQuery(BackupEntriesCollector & backup_entries_collector, const ASTPtr & create_query)

View File

@ -4196,7 +4196,6 @@ void MergeTreeData::restorePartsFromBackup(RestorerFromBackup & restorer, const
continue;
restorer.addDataRestoreTask(
shared_from_this(),
[storage = std::static_pointer_cast<MergeTreeData>(shared_from_this()),
backup,
part_path_in_backup = data_path_in_backup_fs / part_name,

View File

@ -1001,7 +1001,6 @@ void StorageLog::restoreDataFromBackup(RestorerFromBackup & restorer, const Stri
auto lock_timeout = getLockTimeout(restorer.getContext());
restorer.addDataRestoreTask(
shared_from_this(),
[storage = std::static_pointer_cast<StorageLog>(shared_from_this()), backup, data_path_in_backup, lock_timeout]
{ storage->restoreDataImpl(backup, data_path_in_backup, lock_timeout); });
}

View File

@ -501,7 +501,6 @@ void StorageMemory::restoreDataFromBackup(RestorerFromBackup & restorer, const S
RestorerFromBackup::throwTableIsNotEmpty(getStorageID());
restorer.addDataRestoreTask(
shared_from_this(),
[storage = std::static_pointer_cast<StorageMemory>(shared_from_this()), backup, data_path_in_backup]
{ storage->restoreDataImpl(backup, data_path_in_backup); });
}

View File

@ -601,7 +601,6 @@ void StorageStripeLog::restoreDataFromBackup(RestorerFromBackup & restorer, cons
auto lock_timeout = getLockTimeout(restorer.getContext());
restorer.addDataRestoreTask(
shared_from_this(),
[storage = std::static_pointer_cast<StorageStripeLog>(shared_from_this()), backup, data_path_in_backup, lock_timeout]
{ storage->restoreDataImpl(backup, data_path_in_backup, lock_timeout); });
}

View File

@ -53,6 +53,15 @@ def get_path_to_backup(backup_name):
return os.path.join(instance.cluster.instances_dir, "backups", name)
session_id_counter = 0
def new_session_id():
global session_id_counter
session_id_counter += 1
return "Session #" + str(session_id_counter)
@pytest.mark.parametrize(
"engine", ["MergeTree", "Log", "TinyLog", "StripeLog", "Memory"]
)
@ -340,12 +349,18 @@ def test_materialized_view():
backup_name = new_backup_name()
instance.query(f"BACKUP DATABASE test TO {backup_name}")
assert sorted(os.listdir(os.path.join(get_path_to_backup(backup_name), 'metadata/test'))) == ['table.sql', 'view.sql']
assert sorted(os.listdir(os.path.join(get_path_to_backup(backup_name), 'data/test'))) == ['table', 'view']
view_create_query = open(os.path.join(get_path_to_backup(backup_name), 'metadata/test/view.sql')).read()
assert view_create_query.startswith('CREATE MATERIALIZED VIEW test.view')
assert 'POPULATE' not in view_create_query
assert sorted(
os.listdir(os.path.join(get_path_to_backup(backup_name), "metadata/test"))
) == ["table.sql", "view.sql"]
assert sorted(
os.listdir(os.path.join(get_path_to_backup(backup_name), "data/test"))
) == ["table", "view"]
view_create_query = open(
os.path.join(get_path_to_backup(backup_name), "metadata/test/view.sql")
).read()
assert view_create_query.startswith("CREATE MATERIALIZED VIEW test.view")
assert "POPULATE" not in view_create_query
instance.query("DROP DATABASE test")
@ -353,7 +368,9 @@ def test_materialized_view():
instance.query("INSERT INTO test.table VALUES (991, 'b')")
assert instance.query("SELECT * FROM test.view ORDER BY x") == TSV([['0', 0], ['1', 1], ['2', 2], ['3', 3], ['4', 4], ['a', 990], ['b', 991]])
assert instance.query("SELECT * FROM test.view ORDER BY x") == TSV(
[["0", 0], ["1", 1], ["2", 2], ["3", 3], ["4", 4], ["a", 990], ["b", 991]]
)
def test_materialized_view_with_target_table():
@ -368,9 +385,13 @@ def test_materialized_view_with_target_table():
backup_name = new_backup_name()
instance.query(f"BACKUP DATABASE test TO {backup_name}")
assert sorted(os.listdir(os.path.join(get_path_to_backup(backup_name), 'metadata/test'))) == ['table.sql', 'target.sql', 'view.sql']
assert sorted(os.listdir(os.path.join(get_path_to_backup(backup_name), 'data/test'))) == ['table', 'target']
assert sorted(
os.listdir(os.path.join(get_path_to_backup(backup_name), "metadata/test"))
) == ["table.sql", "target.sql", "view.sql"]
assert sorted(
os.listdir(os.path.join(get_path_to_backup(backup_name), "data/test"))
) == ["table", "target"]
instance.query("DROP DATABASE test")
@ -378,4 +399,90 @@ def test_materialized_view_with_target_table():
instance.query("INSERT INTO test.table VALUES (991, 'b')")
assert instance.query("SELECT * FROM test.view ORDER BY x") == TSV([['a', 990], ['b', 991]])
assert instance.query("SELECT * FROM test.view ORDER BY x") == TSV(
[["a", 990], ["b", 991]]
)
def test_temporary_table():
session_id = new_session_id()
instance.http_query(
"CREATE TEMPORARY TABLE temp_tbl(s String)", params={"session_id": session_id}
)
instance.http_query(
"INSERT INTO temp_tbl VALUES ('q')", params={"session_id": session_id}
)
instance.http_query(
"INSERT INTO temp_tbl VALUES ('w'), ('e')", params={"session_id": session_id}
)
backup_name = new_backup_name()
instance.http_query(
f"BACKUP TEMPORARY TABLE temp_tbl TO {backup_name}",
params={"session_id": session_id},
)
session_id = new_session_id()
instance.http_query(
f"RESTORE TEMPORARY TABLE temp_tbl FROM {backup_name}",
params={"session_id": session_id},
)
assert instance.http_query(
"SELECT * FROM temp_tbl ORDER BY s", params={"session_id": session_id}
) == TSV([["e"], ["q"], ["w"]])
# We allow BACKUP DATABASE _temporary_and_external_tables only if the backup doesn't contain any table.
def test_temporary_tables_database():
session_id = new_session_id()
instance.http_query(
"CREATE TEMPORARY TABLE temp_tbl(s String)", params={"session_id": session_id}
)
backup_name = new_backup_name()
instance.query(f"BACKUP DATABASE _temporary_and_external_tables TO {backup_name}")
assert os.listdir(os.path.join(get_path_to_backup(backup_name), "metadata/")) == [
"_temporary_and_external_tables.sql" # database metadata only
]
def test_system_table():
backup_name = new_backup_name()
instance.query(f"BACKUP TABLE system.numbers TO {backup_name}")
assert os.listdir(
os.path.join(get_path_to_backup(backup_name), "metadata/system")
) == ["numbers.sql"]
assert not os.path.isdir(os.path.join(get_path_to_backup(backup_name), "data"))
create_query = open(
os.path.join(get_path_to_backup(backup_name), "metadata/system/numbers.sql")
).read()
assert (
create_query
== "CREATE TABLE system.numbers (`number` UInt64) ENGINE = SystemNumbers"
)
instance.query(f"RESTORE TABLE system.numbers FROM {backup_name}")
def test_system_database():
backup_name = new_backup_name()
instance.query(f"BACKUP DATABASE system TO {backup_name}")
assert "numbers.sql" in os.listdir(
os.path.join(get_path_to_backup(backup_name), "metadata/system")
)
create_query = open(
os.path.join(get_path_to_backup(backup_name), "metadata/system/numbers.sql")
).read()
assert (
create_query
== "CREATE TABLE system.numbers (`number` UInt64) ENGINE = SystemNumbers"
)