Make BACKUP more consistent with RESTORE.

This commit is contained in:
Vitaly Baranov 2022-01-18 16:44:42 +07:00 committed by Vitaly Baranov
parent 7a63feb3f7
commit c160494f43
3 changed files with 256 additions and 356 deletions

View File

@ -10,18 +10,15 @@
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/formatAST.h>
#include <Storages/IStorage.h>
#include <base/insertAtEnd.h>
#include <base/sort.h>
#include <boost/range/adaptor/reversed.hpp>
namespace DB
{
namespace ErrorCodes
{
extern const int BACKUP_ELEMENT_DUPLICATE;
extern const int CANNOT_BACKUP_TABLE;
extern const int CANNOT_BACKUP_DATABASE;
extern const int BACKUP_IS_EMPTY;
extern const int LOGICAL_ERROR;
}
namespace
@ -31,33 +28,6 @@ namespace
using Elements = ASTBackupQuery::Elements;
using ElementType = ASTBackupQuery::ElementType;
/// Replace elements of types DICTIONARY or EVERYTHING with elements of other types.
void replaceElementTypesWithBaseElementTypes(Elements & elements)
{
for (size_t i = 0; i != elements.size(); ++i)
{
auto & element = elements[i];
switch (element.type)
{
case ElementType::DICTIONARY:
{
element.type = ElementType::TABLE;
break;
}
case ElementType::EVERYTHING:
{
element.type = ElementType::ALL_DATABASES;
auto & new_element = elements.emplace_back();
new_element.type = ElementType::ALL_TEMPORARY_TABLES;
break;
}
default:
break;
}
}
}
/// Replaces an empty database with the current database.
void replaceEmptyDatabaseWithCurrentDatabase(Elements & elements, const String & current_database)
@ -74,352 +44,286 @@ namespace
}
}
/// Replaces elements of types TEMPORARY_TABLE or ALL_TEMPORARY_TABLES with elements of type TABLE or DATABASE.
void replaceTemporaryTablesWithTemporaryDatabase(Elements & elements)
/// Makes backup entries to backup databases and tables according to the elements of ASTBackupQuery.
/// Keep this class consistent with RestoreTasksBuilder.
class BackupEntriesBuilder
{
for (auto & element : elements)
public:
BackupEntriesBuilder(ContextPtr context_) : context(context_) { }
/// Prepares internal structures for making backup entries.
void prepare(const ASTBackupQuery::Elements & elements)
{
switch (element.type)
auto elements2 = elements;
replaceEmptyDatabaseWithCurrentDatabase(elements2, context->getCurrentDatabase());
auto new_renaming_config = std::make_shared<BackupRenamingConfig>();
new_renaming_config->setFromBackupQueryElements(elements2);
renaming_config = new_renaming_config;
for (const auto & element : elements2)
{
case ElementType::TEMPORARY_TABLE:
switch (element.type)
{
element.type = ElementType::TABLE;
element.name.first = DatabaseCatalog::TEMPORARY_DATABASE;
if (element.new_name.first.empty() && !element.new_name.second.empty())
element.new_name.first = DatabaseCatalog::TEMPORARY_DATABASE;
break;
}
case ElementType::TABLE: [[fallthrough]];
case ElementType::DICTIONARY:
{
const String & database_name = element.name.first;
const String & table_name = element.name.second;
prepareToBackupTable(DatabaseAndTableName{database_name, table_name}, element.partitions);
break;
}
case ElementType::ALL_TEMPORARY_TABLES:
{
element.type = ElementType::DATABASE;
element.name.first = DatabaseCatalog::TEMPORARY_DATABASE;
break;
}
case ElementType::TEMPORARY_TABLE:
{
String database_name = DatabaseCatalog::TEMPORARY_DATABASE;
const String & table_name = element.name.second;
prepareToBackupTable(DatabaseAndTableName{database_name, table_name}, element.partitions);
break;
}
default:
break;
case ElementType::DATABASE:
{
const String & database_name = element.name.first;
prepareToBackupDatabase(database_name, element.except_list);
break;
}
case ElementType::ALL_TEMPORARY_TABLES:
{
prepareToBackupDatabase(DatabaseCatalog::TEMPORARY_DATABASE, element.except_list);
break;
}
case ElementType::ALL_DATABASES:
{
prepareToBackupAllDatabases(element.except_list);
break;
}
case ElementType::EVERYTHING:
{
prepareToBackupAllDatabases({});
prepareToBackupDatabase(DatabaseCatalog::TEMPORARY_DATABASE, {});
break;
}
}
}
}
}
/// Set new names if they are not specified.
void setNewNamesIfNotSet(Elements & elements)
{
for (auto & element : elements)
/// Makes backup entries, should be called after prepare().
BackupEntries makeBackupEntries() const
{
switch (element.type)
/// Check that there are not `different_create_query`. (If it's set it means error.)
for (auto & info : databases | boost::adaptors::map_values)
{
case ElementType::TABLE:
{
if (element.new_name.second.empty())
element.new_name = element.name;
break;
}
if (info.different_create_query)
throw Exception(ErrorCodes::CANNOT_BACKUP_DATABASE,
"Couldn't backup a database because two different create queries were generated for it: {} and {}",
serializeAST(*info.create_query), serializeAST(*info.different_create_query));
}
case ElementType::DATABASE:
{
if (element.new_name.first.empty())
element.new_name = element.name;
break;
}
BackupEntries res;
for (auto & info : databases | boost::adaptors::map_values)
res.push_back(makeBackupEntryForMetadata(*info.create_query));
default:
break;
for (auto & info : tables | boost::adaptors::map_values)
{
res.push_back(makeBackupEntryForMetadata(*info.create_query));
auto data_backup = info.storage->backup(info.partitions, context);
if (!data_backup.empty())
{
String data_path = getDataPathInBackup(*info.create_query);
for (auto & [path_in_backup, backup_entry] : data_backup)
res.emplace_back(data_path + path_in_backup, std::move(backup_entry));
}
}
/// A backup cannot be empty.
if (res.empty())
throw Exception("Backup must not be empty", ErrorCodes::BACKUP_IS_EMPTY);
return res;
}
private:
/// Prepares to backup a single table and probably its database's definition.
void prepareToBackupTable(const DatabaseAndTableName & table_name_, const ASTs & partitions_)
{
auto [database, storage] = DatabaseCatalog::instance().getDatabaseAndTable({table_name_.first, table_name_.second}, context);
prepareToBackupTable(table_name_, {database, storage}, partitions_);
}
void prepareToBackupTable(const DatabaseAndTableName & table_name_, const DatabaseAndTable & table_, const ASTs & partitions_)
{
context->checkAccess(AccessType::SELECT, table_name_.first, table_name_.second);
/// Check that we are not trying to backup the same table again.
DatabaseAndTableName new_table_name = renaming_config->getNewTableName(table_name_);
if (tables.contains(new_table_name))
{
String message;
if (new_table_name.first == DatabaseCatalog::TEMPORARY_DATABASE)
message = fmt::format("Couldn't backup temporary table {} twice", backQuoteIfNeed(new_table_name.second));
else
message = fmt::format("Couldn't backup table {}.{} twice", backQuoteIfNeed(new_table_name.first), backQuoteIfNeed(new_table_name.second));
throw Exception(ErrorCodes::CANNOT_BACKUP_TABLE, message);
}
/// Make a create query for this table.
auto create_query = renameInCreateQuery(table_.first->getCreateTableQuery(table_name_.second, context));
CreateTableInfo info;
info.create_query = create_query;
info.storage = table_.second;
info.name_in_backup = new_table_name;
info.partitions = partitions_;
tables[new_table_name] = std::move(info);
/// If it's not system or temporary database then probably we need to backup the database's definition too.
if (!isSystemOrTemporaryDatabase(table_name_.first))
{
if (!databases.contains(new_table_name.first))
{
/// Add a create query to backup the database if we haven't done it yet.
auto create_db_query = renameInCreateQuery(table_.first->getCreateDatabaseQuery());
create_db_query->setDatabase(new_table_name.first);
CreateDatabaseInfo info_db;
info_db.create_query = create_db_query;
info_db.original_name = table_name_.first;
info_db.is_explicit = false;
databases[new_table_name.first] = std::move(info_db);
}
else
{
/// We already have added a create query to backup the database,
/// set `different_create_query` if it's not the same.
auto & info_db = databases[new_table_name.first];
if (!info_db.is_explicit && (info_db.original_name != table_name_.first) && !info_db.different_create_query)
{
auto create_db_query = renameInCreateQuery(table_.first->getCreateDatabaseQuery());
create_db_query->setDatabase(new_table_name.first);
if (serializeAST(*info_db.create_query) != serializeAST(*create_db_query))
info_db.different_create_query = create_db_query;
}
}
}
}
}
/// Removes duplications in the elements of a backup query by removing some excessive elements and by updating except_lists.
/// This function helps deduplicate elements in queries like "BACKUP ALL DATABASES, DATABASE xxx USING NAME yyy"
/// (we need a deduplication for that query because `ALL DATABASES` includes `xxx` however we don't want
/// to backup/restore the same database twice while executing the same query).
/// Also this function slightly reorders elements: it puts databases before tables and dictionaries they contain.
void deduplicateAndReorderElements(Elements & elements)
{
std::set<size_t> skip_indices; /// Indices of elements which should be removed in the end of this function.
size_t index_all_databases = static_cast<size_t>(-1); /// Index of the first element of type ALL_DATABASES or -1 if not found.
struct DatabaseInfo
/// Prepares to restore a database and all tables in it.
void prepareToBackupDatabase(const String & database_name_, const std::set<String> & except_list_)
{
size_t index = static_cast<size_t>(-1);
std::unordered_map<std::string_view, size_t> tables;
auto database = DatabaseCatalog::instance().getDatabase(database_name_, context);
prepareToBackupDatabase(database_name_, database, except_list_);
}
void prepareToBackupDatabase(const String & database_name_, const DatabasePtr & database_, const std::set<String> & except_list_)
{
context->checkAccess(AccessType::SHOW_DATABASES, database_name_);
/// Check that we are not trying to restore the same database again.
String new_database_name = renaming_config->getNewDatabaseName(database_name_);
if (databases.contains(new_database_name) && databases[new_database_name].is_explicit)
throw Exception(ErrorCodes::CANNOT_BACKUP_DATABASE, "Couldn't backup database {} twice", backQuoteIfNeed(new_database_name));
/// Of course we're not going to backup the definition of the system or the temporary database.
if (!isSystemOrTemporaryDatabase(database_name_))
{
/// Make a create query for this database.
auto create_db_query = renameInCreateQuery(database_->getCreateDatabaseQuery());
CreateDatabaseInfo info_db;
info_db.create_query = create_db_query;
info_db.original_name = database_name_;
info_db.is_explicit = true;
databases[new_database_name] = std::move(info_db);
}
/// Backup tables in this database.
for (auto it = database_->getTablesIteratorForBackup(context); it->isValid(); it->next())
{
if (except_list_.contains(it->name()))
continue;
prepareToBackupTable({database_name_, it->name()}, {database_, it->table()}, {});
}
}
/// Prepares to backup all the databases contained in the backup.
void prepareToBackupAllDatabases(const std::set<String> & except_list_)
{
for (const auto & [database_name, database] : DatabaseCatalog::instance().getDatabases())
{
if (except_list_.contains(database_name))
continue;
if (isSystemOrTemporaryDatabase(database_name))
continue;
prepareToBackupDatabase(database_name, database, {});
}
}
/// Do renaming in the create query according to the renaming config.
std::shared_ptr<ASTCreateQuery> renameInCreateQuery(const ASTPtr & ast) const
{
return typeid_cast<std::shared_ptr<ASTCreateQuery>>(::DB::renameInCreateQuery(ast, renaming_config, context));
}
static bool isSystemOrTemporaryDatabase(const String & database_name)
{
return (database_name == DatabaseCatalog::SYSTEM_DATABASE) || (database_name == DatabaseCatalog::TEMPORARY_DATABASE);
}
static std::pair<String, BackupEntryPtr> makeBackupEntryForMetadata(const IAST & create_query)
{
auto metadata_entry = std::make_unique<BackupEntryFromMemory>(serializeAST(create_query));
String metadata_path = getMetadataPathInBackup(create_query);
return {metadata_path, std::move(metadata_entry)};
}
/// Information which is used to make an instance of RestoreTableFromBackupTask.
struct CreateTableInfo
{
ASTPtr create_query;
StoragePtr storage;
DatabaseAndTableName name_in_backup;
ASTs partitions;
};
std::unordered_map<std::string_view, DatabaseInfo> databases; /// Found databases and tables.
for (size_t i = 0; i != elements.size(); ++i)
/// Information which is used to make an instance of RestoreDatabaseFromBackupTask.
struct CreateDatabaseInfo
{
auto & element = elements[i];
switch (element.type)
{
case ElementType::TABLE:
{
auto & tables = databases.emplace(element.name.first, DatabaseInfo{}).first->second.tables;
auto it = tables.find(element.name.second);
if (it == tables.end())
{
tables.emplace(element.name.second, i);
}
else
{
size_t prev_index = it->second;
if ((elements[i].new_name == elements[prev_index].new_name)
&& (elements[i].partitions.empty() == elements[prev_index].partitions.empty()))
{
insertAtEnd(elements[prev_index].partitions, elements[i].partitions);
skip_indices.emplace(i);
}
else
{
throw Exception(
"Table " + backQuote(element.name.first) + "." + backQuote(element.name.second) + " was specified twice",
ErrorCodes::BACKUP_ELEMENT_DUPLICATE);
}
}
break;
}
ASTPtr create_query;
String original_name;
case ElementType::DATABASE:
{
auto it = databases.find(element.name.first);
if (it == databases.end())
{
DatabaseInfo new_db_info;
new_db_info.index = i;
databases.emplace(element.name.first, new_db_info);
}
else if (it->second.index == static_cast<size_t>(-1))
{
it->second.index = i;
}
else
{
size_t prev_index = it->second.index;
if ((elements[i].new_name == elements[prev_index].new_name)
&& (elements[i].except_list == elements[prev_index].except_list))
{
skip_indices.emplace(i);
}
else
{
throw Exception("Database " + backQuote(element.name.first) + " was specified twice", ErrorCodes::BACKUP_ELEMENT_DUPLICATE);
}
/// Whether the creation of this database is specified explicitly, via RESTORE DATABASE or
/// RESTORE ALL DATABASES.
/// It's false if the creation of this database is caused by creating a table contained in it.
bool is_explicit = false;
}
break;
}
/// If this is set it means the following error:
/// it means that for implicitly created database there were two different create query
/// generated so we cannot restore the database.
ASTPtr different_create_query;
};
case ElementType::ALL_DATABASES:
{
if (index_all_databases == static_cast<size_t>(-1))
{
index_all_databases = i;
}
else
{
size_t prev_index = index_all_databases;
if (elements[i].except_list == elements[prev_index].except_list)
skip_indices.emplace(i);
else
throw Exception("The tag ALL DATABASES was specified twice", ErrorCodes::BACKUP_ELEMENT_DUPLICATE);
}
break;
}
default:
/// replaceElementTypesWithBaseElementTypes() and replaceTemporaryTablesWithTemporaryDatabase() should have removed all other element types.
throw Exception("Unexpected element type: " + std::to_string(static_cast<int>(element.type)), ErrorCodes::LOGICAL_ERROR);
}
}
if (index_all_databases != static_cast<size_t>(-1))
{
for (auto & [database_name, database] : databases)
{
elements[index_all_databases].except_list.emplace(database_name);
if (database.index == static_cast<size_t>(-1))
{
auto & new_element = elements.emplace_back();
new_element.type = ElementType::DATABASE;
new_element.name.first = database_name;
new_element.new_name = new_element.name;
database.index = elements.size() - 1;
}
}
}
for (auto & [database_name, database] : databases)
{
if (database.index == static_cast<size_t>(-1))
continue;
for (const auto & [table_name, table_index] : database.tables)
elements[database.index].except_list.emplace(table_name);
}
for (auto skip_index : skip_indices | boost::adaptors::reversed)
elements.erase(elements.begin() + skip_index);
}
Elements adjustElements(const Elements & elements, const String & current_database)
{
auto res = elements;
replaceElementTypesWithBaseElementTypes(res);
replaceEmptyDatabaseWithCurrentDatabase(res, current_database);
replaceTemporaryTablesWithTemporaryDatabase(res);
setNewNamesIfNotSet(res);
deduplicateAndReorderElements(res);
return res;
}
void backupCreateQuery(const IAST & create_query, BackupEntries & backup_entries)
{
auto metadata_entry = std::make_unique<BackupEntryFromMemory>(serializeAST(create_query));
String metadata_path = getMetadataPathInBackup(create_query);
backup_entries.emplace_back(metadata_path, std::move(metadata_entry));
}
void backupTable(
const DatabaseAndTable & database_and_table,
const String & table_name,
const ASTs & partitions,
const ContextPtr & context,
const BackupRenamingConfigPtr & renaming_config,
BackupEntries & backup_entries)
{
const auto & database = database_and_table.first;
const auto & storage = database_and_table.second;
context->checkAccess(AccessType::SELECT, database->getDatabaseName(), table_name);
auto create_query = database->getCreateTableQuery(table_name, context);
ASTPtr new_create_query = renameInCreateQuery(create_query, renaming_config, context);
backupCreateQuery(*new_create_query, backup_entries);
auto data_backup = storage->backup(partitions, context);
if (!data_backup.empty())
{
String data_path = getDataPathInBackup(*new_create_query);
for (auto & [path_in_backup, backup_entry] : data_backup)
backup_entries.emplace_back(data_path + path_in_backup, std::move(backup_entry));
}
}
void backupDatabase(
const DatabasePtr & database,
const std::set<String> & except_list,
const ContextPtr & context,
const BackupRenamingConfigPtr & renaming_config,
BackupEntries & backup_entries)
{
context->checkAccess(AccessType::SHOW_TABLES, database->getDatabaseName());
auto create_query = database->getCreateDatabaseQuery();
ASTPtr new_create_query = renameInCreateQuery(create_query, renaming_config, context);
backupCreateQuery(*new_create_query, backup_entries);
for (auto it = database->getTablesIteratorForBackup(context); it->isValid(); it->next())
{
if (except_list.contains(it->name()))
continue;
backupTable({database, it->table()}, it->name(), {}, context, renaming_config, backup_entries);
}
}
void backupAllDatabases(
const std::set<String> & except_list,
const ContextPtr & context,
const BackupRenamingConfigPtr & renaming_config,
BackupEntries & backup_entries)
{
for (const auto & [database_name, database] : DatabaseCatalog::instance().getDatabases())
{
if (except_list.contains(database_name))
continue;
if (database_name == DatabaseCatalog::SYSTEM_DATABASE || database_name == DatabaseCatalog::TEMPORARY_DATABASE)
continue;
backupDatabase(database, {}, context, renaming_config, backup_entries);
}
}
ContextPtr context;
BackupMutablePtr backup;
BackupRenamingConfigPtr renaming_config;
std::map<String, CreateDatabaseInfo> databases;
std::map<DatabaseAndTableName, CreateTableInfo> tables;
};
}
BackupEntries makeBackupEntries(const Elements & elements, const ContextPtr & context)
{
BackupEntries backup_entries;
auto elements2 = adjustElements(elements, context->getCurrentDatabase());
auto renaming_config = std::make_shared<BackupRenamingConfig>();
renaming_config->setFromBackupQueryElements(elements2);
for (const auto & element : elements2)
{
switch (element.type)
{
case ElementType::TABLE:
{
const String & database_name = element.name.first;
const String & table_name = element.name.second;
auto [database, storage] = DatabaseCatalog::instance().getDatabaseAndTable({database_name, table_name}, context);
backupTable({database, storage}, table_name, element.partitions, context, renaming_config, backup_entries);
break;
}
case ElementType::DATABASE:
{
const String & database_name = element.name.first;
auto database = DatabaseCatalog::instance().getDatabase(database_name, context);
backupDatabase(database, element.except_list, context, renaming_config, backup_entries);
break;
}
case ElementType::ALL_DATABASES:
{
backupAllDatabases(element.except_list, context, renaming_config, backup_entries);
break;
}
default:
throw Exception("Unexpected element type", ErrorCodes::LOGICAL_ERROR); /// other element types have been removed in deduplicateElements()
}
}
/// A backup cannot be empty.
if (backup_entries.empty())
throw Exception("Backup must not be empty", ErrorCodes::BACKUP_IS_EMPTY);
/// Check that all backup entries are unique.
::sort(
backup_entries.begin(),
backup_entries.end(),
[](const std::pair<String, std::unique_ptr<IBackupEntry>> & lhs, const std::pair<String, std::unique_ptr<IBackupEntry>> & rhs)
{
return lhs.first < rhs.first;
});
auto adjacent = std::adjacent_find(backup_entries.begin(), backup_entries.end());
if (adjacent != backup_entries.end())
throw Exception("Cannot write multiple entries with the same name " + quoteString(adjacent->first), ErrorCodes::BACKUP_ELEMENT_DUPLICATE);
return backup_entries;
BackupEntriesBuilder builder{context};
builder.prepare(elements);
return builder.makeBackupEntries();
}
UInt64 estimateBackupSize(const BackupEntries & backup_entries, const BackupPtr & base_backup)
{
UInt64 total_size = 0;
for (const auto & [name, entry] : backup_entries)
{
UInt64 data_size = entry->getSize();
if (base_backup)
{
if (base_backup->fileExists(name) && (data_size == base_backup->getFileSize(name)))
{
auto checksum = entry->getChecksum();
if (checksum && (*checksum == base_backup->getFileChecksum(name)))
continue;
}
}
total_size += data_size;
}
return total_size;
}
void writeBackupEntries(BackupMutablePtr backup, BackupEntries && backup_entries, size_t num_threads)
{
@ -482,6 +386,7 @@ void writeBackupEntries(BackupMutablePtr backup, BackupEntries && backup_entries
backup->finalizeWriting();
}
String getDataPathInBackup(const DatabaseAndTableName & table_name)
{
if (table_name.first.empty() || table_name.second.empty())

View File

@ -5,9 +5,7 @@
namespace DB
{
class IBackup;
using BackupPtr = std::shared_ptr<const IBackup>;
using BackupMutablePtr = std::shared_ptr<IBackup>;
class IBackupEntry;
using BackupEntryPtr = std::unique_ptr<IBackupEntry>;
@ -15,13 +13,9 @@ using BackupEntries = std::vector<std::pair<String, BackupEntryPtr>>;
class Context;
using ContextPtr = std::shared_ptr<const Context>;
/// Prepares backup entries.
BackupEntries makeBackupEntries(const ASTBackupQuery::Elements & elements, const ContextPtr & context);
/// Estimate total size of the backup which would be written from the specified entries.
UInt64 estimateBackupSize(const BackupEntries & backup_entries, const BackupPtr & base_backup);
/// Write backup entries to an opened backup.
void writeBackupEntries(BackupMutablePtr backup, BackupEntries && backup_entries, size_t num_threads);

View File

@ -575,7 +575,7 @@
M(604, BACKUP_ENTRY_ALREADY_EXISTS) \
M(605, BACKUP_ENTRY_NOT_FOUND) \
M(606, BACKUP_IS_EMPTY) \
M(607, BACKUP_ELEMENT_DUPLICATE) \
M(607, CANNOT_RESTORE_DATABASE) \
M(608, CANNOT_RESTORE_TABLE) \
M(609, FUNCTION_ALREADY_EXISTS) \
M(610, CANNOT_DROP_FUNCTION) \
@ -615,7 +615,8 @@
M(644, REMOTE_FS_OBJECT_CACHE_ERROR) \
M(645, NUMBER_OF_DIMENSIONS_MISMATHED) \
M(646, RBAC_VERSION_IS_TOO_NEW) \
M(647, CANNOT_RESTORE_DATABASE) \
M(647, CANNOT_BACKUP_DATABASE) \
M(648, CANNOT_BACKUP_TABLE) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \