add symlinks

This commit is contained in:
Alexander Tokmakov 2020-04-10 03:08:43 +03:00
parent 58067438cd
commit 03ed9d59b0
2 changed files with 72 additions and 15 deletions

View File

@ -29,7 +29,9 @@ public:
DatabaseAtomic::DatabaseAtomic(String name_, String metadata_path_, Context & context_)
: DatabaseOrdinary(name_, metadata_path_, "store/", "DatabaseAtomic (" + name_ + ")", context_)
, path_to_table_symlinks(context_.getPath() + "data/" + escapeForFileName(name_) + "/")
{
Poco::File(path_to_table_symlinks).createDirectories();
}
String DatabaseAtomic::getTableDataPath(const String & table_name) const
@ -52,6 +54,7 @@ String DatabaseAtomic::getTableDataPath(const ASTCreateQuery & query) const
void DatabaseAtomic::drop(const Context &)
{
Poco::File(path_to_table_symlinks).remove(true);
Poco::File(getMetadataPath()).remove(true);
}
@ -64,6 +67,7 @@ void DatabaseAtomic::attachTable(const String & name, const StoragePtr & table,
assertDetachedTableNotInUse(table->getStorageID().uuid);
DatabaseWithDictionaries::attachTableUnlocked(name, table, relative_table_path);
table_name_to_path.emplace(std::make_pair(name, relative_table_path));
tryCreateSymlink(name, relative_table_path);
}
StoragePtr DatabaseAtomic::detachTable(const String & name)
@ -74,6 +78,7 @@ StoragePtr DatabaseAtomic::detachTable(const String & name)
table_name_to_path.erase(name);
detached_tables.emplace(table->getStorageID().uuid, table);
not_in_use = cleenupDetachedTables();
tryRemoveSymlink(name);
return table;
}
@ -90,6 +95,7 @@ void DatabaseAtomic::dropTable(const Context &, const String & table_name, bool
DatabaseWithDictionaries::detachTableUnlocked(table_name); /// Should never throw
table_name_to_path.erase(table_name);
}
tryRemoveSymlink(table_name);
DatabaseCatalog::instance().enqueueDroppedTableCleanup(table->getStorageID(), table, table_metadata_path_drop, no_delay);
}
@ -106,31 +112,23 @@ void DatabaseAtomic::renameTable(const Context & context, const String & table_n
}
auto & other_db = dynamic_cast<DatabaseAtomic &>(to_database);
StoragePtr table = tryGetTable(context, table_name);
StoragePtr other_table;
if (exchange)
other_table = other_db.tryGetTable(context, to_table_name);
if (!table)
throw Exception("Table " + backQuote(getDatabaseName()) + "." + backQuote(table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
if (exchange && !other_table)
throw Exception("Table " + backQuote(other_db.getDatabaseName()) + "." + backQuote(to_table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
String old_metadata_path = getObjectMetadataPath(table_name);
String new_metadata_path = to_database.getObjectMetadataPath(to_table_name);
auto detach = [](DatabaseAtomic & db, const String & table_name_)
auto detach = [this](DatabaseAtomic & db, const String & table_name_)
{
auto table_data_path_saved = db.table_name_to_path.find(table_name_)->second;
db.tables.erase(table_name_);
db.table_name_to_path.erase(table_name_);
tryRemoveSymlink(table_name_);
return table_data_path_saved;
};
auto attach = [](DatabaseAtomic & db, const String & table_name_, const String & table_data_path_, const StoragePtr & table_)
auto attach = [this](DatabaseAtomic & db, const String & table_name_, const String & table_data_path_, const StoragePtr & table_)
{
db.tables.emplace(table_name_, table_);
db.table_name_to_path.emplace(table_name_, table_data_path_);
tryCreateSymlink(table_name_, table_data_path_);
};
String table_data_path;
@ -155,6 +153,11 @@ void DatabaseAtomic::renameTable(const Context & context, const String & table_n
db_lock = std::unique_lock{mutex};
}
StoragePtr table = getTableUnlocked(table_name);
StoragePtr other_table;
if (exchange)
other_table = other_db.getTableUnlocked(to_table_name);
if (exchange)
renameExchange(old_metadata_path, new_metadata_path);
else
@ -199,7 +202,7 @@ void DatabaseAtomic::commitCreateTable(const ASTCreateQuery & query, const Stora
Poco::File(table_metadata_tmp_path).remove();
throw;
}
tryCreateSymlink(query.table, table_data_path);
}
void DatabaseAtomic::commitAlterTable(const StorageID & table_id, const String & table_metadata_tmp_path, const String & table_metadata_path)
@ -251,5 +254,52 @@ DatabaseTablesIteratorPtr DatabaseAtomic::getTablesWithDictionaryTablesIterator(
return std::make_unique<AtomicDatabaseTablesSnapshotIterator>(std::move(typeid_cast<DatabaseTablesSnapshotIterator &>(*base_iter)));
}
void DatabaseAtomic::loadStoredObjects(Context & context, bool has_force_restore_data_flag)
{
/// Recreate symlinks to table data dirs in case of force restore, because some of them may be broken
if (has_force_restore_data_flag)
Poco::File(path_to_table_symlinks).remove(true);
DatabaseOrdinary::loadStoredObjects(context, has_force_restore_data_flag);
if (has_force_restore_data_flag)
{
NameToPathMap table_names;
{
std::lock_guard lock{mutex};
table_names = table_name_to_path;
}
for (const auto & table : table_names)
tryCreateSymlink(table.first, table.second);
}
}
void DatabaseAtomic::tryCreateSymlink(const String & table_name, const String & actual_data_path)
{
try
{
String link = path_to_table_symlinks + escapeForFileName(table_name);
String data = global_context.getPath() + actual_data_path;
Poco::File{data}.linkTo(link, Poco::File::LINK_SYMBOLIC);
}
catch (...)
{
tryLogCurrentException(log);
}
}
void DatabaseAtomic::tryRemoveSymlink(const String & table_name)
{
try
{
String path = path_to_table_symlinks + escapeForFileName(table_name);
Poco::File{path}.remove();
}
catch (...)
{
tryLogCurrentException(log);
}
}
}

View File

@ -36,19 +36,26 @@ public:
DatabaseTablesIteratorPtr getTablesIterator(const FilterByNameFunction & filter_by_table_name) override;
DatabaseTablesIteratorPtr getTablesWithDictionaryTablesIterator(const FilterByNameFunction & filter_by_dictionary_name) override;
void loadStoredObjects(Context & context, bool has_force_restore_data_flag) override;
private:
void commitAlterTable(const StorageID & table_id, const String & table_metadata_tmp_path, const String & table_metadata_path) override;
void commitCreateTable(const ASTCreateQuery & query, const StoragePtr & table,
const String & table_metadata_tmp_path, const String & table_metadata_path) override;
void assertDetachedTableNotInUse(const UUID & uuid);
typedef std::map<UUID, StoragePtr> DetachedTables;
typedef std::unordered_map<UUID, StoragePtr> DetachedTables;
DetachedTables cleenupDetachedTables();
void tryCreateSymlink(const String & table_name, const String & actual_data_path);
void tryRemoveSymlink(const String & table_name);
//TODO store path in DatabaseWithOwnTables::tables
std::map<String, String> table_name_to_path;
typedef std::unordered_map<String, String> NameToPathMap;
NameToPathMap table_name_to_path;
DetachedTables detached_tables;
const String path_to_table_symlinks;
};
}