From f2e031f1b849bc24e62fe4c6912ca8e2c0dffc89 Mon Sep 17 00:00:00 2001 From: Tuan Pham Anh Date: Mon, 18 Nov 2024 12:39:53 +0000 Subject: [PATCH] Use IDisk in Databases --- src/Databases/DatabaseAtomic.cpp | 101 +++++++++----- src/Databases/DatabaseFactory.cpp | 11 +- src/Databases/DatabaseOnDisk.cpp | 130 ++++++++++-------- src/Databases/DatabaseOnDisk.h | 8 +- src/Databases/DatabaseOrdinary.cpp | 23 ++-- .../MySQL/DatabaseMaterializedMySQL.cpp | 6 +- src/Disks/DiskLocal.cpp | 62 ++++++++- src/Disks/DiskLocal.h | 21 +++ src/Disks/IDisk.h | 71 ++++++++++ src/Interpreters/Context.cpp | 33 +++-- src/Interpreters/Context.h | 1 + src/Interpreters/DatabaseCatalog.cpp | 45 +++--- src/Interpreters/InterpreterCreateQuery.cpp | 25 ++-- src/Interpreters/loadMetadata.cpp | 49 ++++--- 14 files changed, 422 insertions(+), 164 deletions(-) diff --git a/src/Databases/DatabaseAtomic.cpp b/src/Databases/DatabaseAtomic.cpp index bd077ccd7b5..bbf8752b305 100644 --- a/src/Databases/DatabaseAtomic.cpp +++ b/src/Databases/DatabaseAtomic.cpp @@ -76,8 +76,10 @@ void DatabaseAtomic::createDirectories() void DatabaseAtomic::createDirectoriesUnlocked() { DatabaseOnDisk::createDirectoriesUnlocked(); - fs::create_directories(fs::path(getContext()->getPath()) / "metadata"); - fs::create_directories(path_to_table_symlinks); + auto shared_disk = getContext()->getSharedDisk(); + shared_disk->createDirectories(fs::path(getContext()->getPath()) / "metadata"); + if (shared_disk->isSymlinkSupported()) + shared_disk->createDirectories(path_to_table_symlinks); tryCreateMetadataSymlink(); } @@ -100,18 +102,22 @@ String DatabaseAtomic::getTableDataPath(const ASTCreateQuery & query) const void DatabaseAtomic::drop(ContextPtr) { + auto shared_disk = getContext()->getSharedDisk(); waitDatabaseStarted(); assert(TSA_SUPPRESS_WARNING_FOR_READ(tables).empty()); try { - (void)fs::remove(path_to_metadata_symlink); - (void)fs::remove_all(path_to_table_symlinks); + if (shared_disk->isSymlinkSupported()) + { + (void)shared_disk->removeFileIfExists(path_to_metadata_symlink); + (void)shared_disk->removeRecursive(path_to_table_symlinks); + } } catch (...) { LOG_WARNING(log, getCurrentExceptionMessageAndPattern(/* with_stacktrace */ true)); } - (void)fs::remove_all(getMetadataPath()); + (void)shared_disk->removeRecursive(getMetadataPath()); } void DatabaseAtomic::attachTable(ContextPtr /* context_ */, const String & name, const StoragePtr & table, const String & relative_table_path) @@ -166,6 +172,8 @@ void DatabaseAtomic::dropTable(ContextPtr local_context, const String & table_na void DatabaseAtomic::dropTableImpl(ContextPtr local_context, const String & table_name, bool sync) { + auto shared_disk = getContext()->getSharedDisk(); + String table_metadata_path = getObjectMetadataPath(table_name); String table_metadata_path_drop; StoragePtr table; @@ -174,7 +182,7 @@ void DatabaseAtomic::dropTableImpl(ContextPtr local_context, const String & tabl table = getTableUnlocked(table_name); table_metadata_path_drop = DatabaseCatalog::instance().getPathForDroppedMetadata(table->getStorageID()); - fs::create_directory(fs::path(table_metadata_path_drop).parent_path()); + shared_disk->createDirectories(fs::path(table_metadata_path_drop).parent_path()); auto txn = local_context->getZooKeeperMetadataTransaction(); if (txn && !local_context->isInternalSubquery()) @@ -186,7 +194,7 @@ void DatabaseAtomic::dropTableImpl(ContextPtr local_context, const String & tabl /// (it's more likely to lost connection, than to fail before applying local changes). /// TODO better detection and recovery - fs::rename(table_metadata_path, table_metadata_path_drop); /// Mark table as dropped + shared_disk->replaceFile(table_metadata_path, table_metadata_path_drop); /// Mark table as dropped DatabaseOrdinary::detachTableUnlocked(table_name); /// Should never throw table_name_to_path.erase(table_name); } @@ -318,11 +326,12 @@ void DatabaseAtomic::renameTable(ContextPtr local_context, const String & table_ /// NOTE: replica will be lost if server crashes before the following rename /// TODO better detection and recovery + auto shared_disk = getContext()->getSharedDisk(); if (exchange) - renameExchange(old_metadata_path, new_metadata_path); + shared_disk->renameExchange(old_metadata_path, new_metadata_path); else - renameNoReplace(old_metadata_path, new_metadata_path); + shared_disk->renameNoReplace(old_metadata_path, new_metadata_path); /// After metadata was successfully moved, the following methods should not throw (if they do, it's a logical error) table_data_path = detach(*this, table_name, table->storesDataOnDisk()); @@ -349,6 +358,8 @@ void DatabaseAtomic::commitCreateTable(const ASTCreateQuery & query, const Stora const String & table_metadata_tmp_path, const String & table_metadata_path, ContextPtr query_context) { + auto shared_disk = getContext()->getSharedDisk(); + createDirectories(); DetachedTables not_in_use; auto table_data_path = getTableDataPath(query); @@ -377,7 +388,7 @@ void DatabaseAtomic::commitCreateTable(const ASTCreateQuery & query, const Stora } catch (...) { - (void)fs::remove(table_metadata_tmp_path); + (void)shared_disk->removeFileIfExists(table_metadata_tmp_path); throw; } if (table->storesDataOnDisk()) @@ -387,11 +398,13 @@ void DatabaseAtomic::commitCreateTable(const ASTCreateQuery & query, const Stora void DatabaseAtomic::commitAlterTable(const StorageID & table_id, const String & table_metadata_tmp_path, const String & table_metadata_path, const String & /*statement*/, ContextPtr query_context) { + auto shared_disk = getContext()->getSharedDisk(); + bool check_file_exists = true; SCOPE_EXIT({ std::error_code code; if (check_file_exists) - (void)std::filesystem::remove(table_metadata_tmp_path, code); + (void)shared_disk->removeFileIfExists(table_metadata_tmp_path); }); std::lock_guard lock{mutex}; @@ -409,7 +422,7 @@ void DatabaseAtomic::commitAlterTable(const StorageID & table_id, const String & check_file_exists = renameExchangeIfSupported(table_metadata_tmp_path, table_metadata_path); if (!check_file_exists) - std::filesystem::rename(table_metadata_tmp_path, table_metadata_path); + shared_disk->replaceFile(table_metadata_tmp_path, table_metadata_path); } void DatabaseAtomic::assertDetachedTableNotInUse(const UUID & uuid) @@ -483,33 +496,41 @@ UUID DatabaseAtomic::tryGetTableUUID(const String & table_name) const void DatabaseAtomic::beforeLoadingMetadata(ContextMutablePtr /*context*/, LoadingStrictnessLevel mode) { + auto shared_disk = getContext()->getSharedDisk(); if (mode < LoadingStrictnessLevel::FORCE_RESTORE) return; - if (!fs::exists(path_to_table_symlinks)) + if (!shared_disk->isSymlinkSupported()) + return; + + if (!shared_disk->existsFileOrDirectory(path_to_table_symlinks)) return; /// Recreate symlinks to table data dirs in case of force restore, because some of them may be broken - for (const auto & table_path : fs::directory_iterator(path_to_table_symlinks)) + for (const auto dir_it = shared_disk->iterateDirectory(path_to_table_symlinks); dir_it->isValid(); dir_it->next()) { - if (!FS::isSymlink(table_path)) + auto table_path = fs::path(dir_it->path()); + if (table_path.filename().empty()) + table_path = table_path.parent_path(); + if (!shared_disk->isSymlink(table_path)) { - throw Exception(ErrorCodes::ABORTED, - "'{}' is not a symlink. Atomic database should contains only symlinks.", std::string(table_path.path())); + throw Exception( + ErrorCodes::ABORTED, "'{}' is not a symlink. Atomic database should contains only symlinks.", std::string(table_path)); } - (void)fs::remove(table_path); + (void)shared_disk->removeFileIfExists(table_path); } } LoadTaskPtr DatabaseAtomic::startupDatabaseAsync(AsyncLoader & async_loader, LoadJobSet startup_after, LoadingStrictnessLevel mode) { + auto shared_disk = getContext()->getSharedDisk(); auto base = DatabaseOrdinary::startupDatabaseAsync(async_loader, std::move(startup_after), mode); auto job = makeLoadJob( base->goals(), TablesLoaderBackgroundStartupPoolId, fmt::format("startup Atomic database {}", getDatabaseName()), - [this, mode] (AsyncLoader &, const LoadJobPtr &) + [this, mode, shared_disk](AsyncLoader &, const LoadJobPtr &) { if (mode < LoadingStrictnessLevel::FORCE_RESTORE) return; @@ -518,8 +539,8 @@ LoadTaskPtr DatabaseAtomic::startupDatabaseAsync(AsyncLoader & async_loader, Loa std::lock_guard lock{mutex}; table_names = table_name_to_path; } - - fs::create_directories(path_to_table_symlinks); + if (shared_disk->isSymlinkSupported()) + shared_disk->createDirectories(path_to_table_symlinks); for (const auto & table : table_names) { /// All tables in database should be loaded at this point @@ -561,6 +582,9 @@ void DatabaseAtomic::stopLoading() void DatabaseAtomic::tryCreateSymlink(const StoragePtr & table, bool if_data_path_exist) { + auto shared_disk = getContext()->getSharedDisk(); + if (!shared_disk->isSymlinkSupported()) + return; try { String table_name = table->getStorageID().getTableName(); @@ -572,14 +596,13 @@ void DatabaseAtomic::tryCreateSymlink(const StoragePtr & table, bool if_data_pat fs::path data = fs::weakly_canonical(table->getDataPaths()[0]); /// If it already points where needed. - std::error_code ec; - if (fs::equivalent(data, link, ec)) + if (shared_disk->equivalentNoThrow(data, link)) return; - if (if_data_path_exist && !fs::exists(data)) + if (if_data_path_exist && !shared_disk->existsFileOrDirectory(data)) return; - fs::create_directory_symlink(data, link); + shared_disk->createDirectoriesSymlink(data, link); } catch (...) { @@ -589,10 +612,14 @@ void DatabaseAtomic::tryCreateSymlink(const StoragePtr & table, bool if_data_pat void DatabaseAtomic::tryRemoveSymlink(const String & table_name) { + auto shared_disk = getContext()->getSharedDisk(); + if (!shared_disk->isSymlinkSupported()) + return; + try { String path = path_to_table_symlinks + escapeForFileName(table_name); - (void)fs::remove(path); + (void)shared_disk->removeFileIfExists(path); } catch (...) { @@ -602,13 +629,18 @@ void DatabaseAtomic::tryRemoveSymlink(const String & table_name) void DatabaseAtomic::tryCreateMetadataSymlink() { + auto shared_disk = getContext()->getSharedDisk(); + + if (!shared_disk->isSymlinkSupported()) + return; + /// Symlinks in data/db_name/ directory and metadata/db_name/ are not used by ClickHouse, /// it's needed only for convenient introspection. assert(path_to_metadata_symlink != metadata_path); fs::path metadata_symlink(path_to_metadata_symlink); - if (fs::exists(metadata_symlink)) + if (shared_disk->existsFile(metadata_symlink)) { - if (!FS::isSymlink(metadata_symlink)) + if (!shared_disk->isSymlink(metadata_symlink)) throw Exception(ErrorCodes::FILE_ALREADY_EXISTS, "Directory {} exists", path_to_metadata_symlink); } else @@ -616,9 +648,9 @@ void DatabaseAtomic::tryCreateMetadataSymlink() try { /// fs::exists could return false for broken symlink - if (FS::isSymlinkNoThrow(metadata_symlink)) - (void)fs::remove(metadata_symlink); - fs::create_directory_symlink(metadata_path, path_to_metadata_symlink); + if (shared_disk->isSymlinkNoThrow(metadata_symlink)) + (void)shared_disk->removeFileIfExists(metadata_symlink); + shared_disk->createDirectoriesSymlink(metadata_path, path_to_metadata_symlink); } catch (...) { @@ -630,6 +662,7 @@ void DatabaseAtomic::tryCreateMetadataSymlink() void DatabaseAtomic::renameDatabase(ContextPtr query_context, const String & new_name) { /// CREATE, ATTACH, DROP, DETACH and RENAME DATABASE must hold DDLGuard + auto shared_disk = getContext()->getSharedDisk(); createDirectories(); waitDatabaseStarted(); @@ -645,7 +678,8 @@ void DatabaseAtomic::renameDatabase(ContextPtr query_context, const String & new try { - (void)fs::remove(path_to_metadata_symlink); + if (shared_disk->isSymlinkSupported()) + (void)shared_disk->removeFileIfExists(path_to_metadata_symlink); } catch (...) { @@ -682,7 +716,8 @@ void DatabaseAtomic::renameDatabase(ContextPtr query_context, const String & new path_to_table_symlinks = getContext()->getPath() + "data/" + new_name_escaped + "/"; } - fs::rename(old_path_to_table_symlinks, path_to_table_symlinks); + if (shared_disk->isSymlinkSupported()) + shared_disk->moveDirectory(old_path_to_table_symlinks, path_to_table_symlinks); tryCreateMetadataSymlink(); } diff --git a/src/Databases/DatabaseFactory.cpp b/src/Databases/DatabaseFactory.cpp index 28bb9825614..c835d88ddd3 100644 --- a/src/Databases/DatabaseFactory.cpp +++ b/src/Databases/DatabaseFactory.cpp @@ -1,5 +1,6 @@ #include +#include #include #include #include @@ -9,7 +10,6 @@ #include #include #include -#include namespace fs = std::filesystem; @@ -32,16 +32,21 @@ namespace ErrorCodes void cckMetadataPathForOrdinary(const ASTCreateQuery & create, const String & metadata_path) { + auto shared_disk = Context::getGlobalContextInstance()->getSharedDisk(); + + if (!shared_disk->isSymlinkSupported()) + return; + const String & engine_name = create.storage->engine->name; const String & database_name = create.getDatabase(); if (engine_name != "Ordinary") return; - if (!FS::isSymlink(metadata_path)) + if (!shared_disk->isSymlink(metadata_path)) return; - String target_path = FS::readSymlink(metadata_path).string(); + String target_path = shared_disk->readSymlink(metadata_path); fs::path path_to_remove = metadata_path; if (path_to_remove.filename().empty()) path_to_remove = path_to_remove.parent_path(); diff --git a/src/Databases/DatabaseOnDisk.cpp b/src/Databases/DatabaseOnDisk.cpp index 93ecf9cf11c..b79ce5c8d06 100644 --- a/src/Databases/DatabaseOnDisk.cpp +++ b/src/Databases/DatabaseOnDisk.cpp @@ -3,8 +3,10 @@ #include #include #include +#include #include #include +#include #include #include #include @@ -29,7 +31,6 @@ #include #include #include -#include namespace fs = std::filesystem; @@ -52,11 +53,12 @@ namespace Setting extern const SettingsUInt64 max_parser_depth; } -static constexpr size_t METADATA_FILE_BUFFER_SIZE = 32768; +// static constexpr size_t METADATA_FILE_BUFFER_SIZE = 32768; namespace ErrorCodes { extern const int CANNOT_GET_CREATE_TABLE_QUERY; + extern const int CANNOT_RMDIR; extern const int NOT_IMPLEMENTED; extern const int LOGICAL_ERROR; extern const int FILE_DOESNT_EXIST; @@ -171,14 +173,8 @@ String getObjectDefinitionFromCreateQuery(const ASTPtr & query) DatabaseOnDisk::DatabaseOnDisk( - const String & name, - const String & metadata_path_, - const String & data_path_, - const String & logger, - ContextPtr local_context) - : DatabaseWithOwnTablesBase(name, logger, local_context) - , metadata_path(metadata_path_) - , data_path(data_path_) + const String & name, const String & metadata_path_, const String & data_path_, const String & logger, ContextPtr local_context) + : DatabaseWithOwnTablesBase(name, logger, local_context), metadata_path(metadata_path_), data_path(data_path_) { } @@ -191,8 +187,10 @@ void DatabaseOnDisk::createDirectories() void DatabaseOnDisk::createDirectoriesUnlocked() { - fs::create_directories(std::filesystem::path(getContext()->getPath()) / data_path); - fs::create_directories(metadata_path); + auto shared_disk = getContext()->getSharedDisk(); + + shared_disk->createDirectories(metadata_path); + shared_disk->createDirectories(std::filesystem::path(getContext()->getPath()) / data_path); } @@ -209,6 +207,8 @@ void DatabaseOnDisk::createTable( const StoragePtr & table, const ASTPtr & query) { + auto shared_disk = getContext()->getSharedDisk(); + createDirectories(); const auto & settings = local_context->getSettingsRef(); @@ -238,7 +238,7 @@ void DatabaseOnDisk::createTable( if (create.attach_short_syntax) { /// Metadata already exists, table was detached - assert(fs::exists(getObjectMetadataPath(table_name))); + assert(shared_disk->existsFileOrDirectory(getObjectMetadataPath(table_name))); removeDetachedPermanentlyFlag(local_context, table_name, table_metadata_path, true); attachTable(local_context, table_name, table, getTableDataPath(create)); return; @@ -247,7 +247,7 @@ void DatabaseOnDisk::createTable( if (!create.attach) checkMetadataFilenameAvailability(table_name); - if (create.attach && fs::exists(table_metadata_path)) + if (create.attach && shared_disk->existsFileOrDirectory(table_metadata_path)) { ASTPtr ast_detached = parseQueryFromMetadata(log, local_context, table_metadata_path); auto & create_detached = ast_detached->as(); @@ -285,12 +285,11 @@ void DatabaseOnDisk::createTable( /// .sql.detached extension, is not needed anymore since we attached the table back void DatabaseOnDisk::removeDetachedPermanentlyFlag(ContextPtr, const String & table_name, const String & table_metadata_path, bool) { + auto shared_disk = getContext()->getSharedDisk(); try { fs::path detached_permanently_flag(table_metadata_path + detached_suffix); - - if (fs::exists(detached_permanently_flag)) - (void)fs::remove(detached_permanently_flag); + (void)shared_disk->removeFileIfExists(detached_permanently_flag); } catch (Exception & e) { @@ -303,6 +302,7 @@ void DatabaseOnDisk::commitCreateTable(const ASTCreateQuery & query, const Stora const String & table_metadata_tmp_path, const String & table_metadata_path, ContextPtr query_context) { + auto shared_disk = getContext()->getSharedDisk(); try { createDirectories(); @@ -312,11 +312,11 @@ void DatabaseOnDisk::commitCreateTable(const ASTCreateQuery & query, const Stora /// If it was ATTACH query and file with table metadata already exist /// (so, ATTACH is done after DETACH), then rename atomically replaces old file with new one. - fs::rename(table_metadata_tmp_path, table_metadata_path); + shared_disk->replaceFile(table_metadata_tmp_path, table_metadata_path); } catch (...) { - (void)fs::remove(table_metadata_tmp_path); + (void)shared_disk->removeFileIfExists(table_metadata_tmp_path); throw; } } @@ -325,12 +325,14 @@ void DatabaseOnDisk::detachTablePermanently(ContextPtr query_context, const Stri { waitDatabaseStarted(); + auto shared_disk = getContext()->getSharedDisk(); + auto table = detachTable(query_context, table_name); fs::path detached_permanently_flag(getObjectMetadataPath(table_name) + detached_suffix); try { - FS::createFile(detached_permanently_flag); + shared_disk->createFile(detached_permanently_flag); std::lock_guard lock(mutex); const auto it = snapshot_detached_tables.find(table_name); @@ -352,6 +354,8 @@ void DatabaseOnDisk::dropTable(ContextPtr local_context, const String & table_na { waitDatabaseStarted(); + auto shared_disk = getContext()->getSharedDisk(); + String table_metadata_path = getObjectMetadataPath(table_name); String table_metadata_path_drop = table_metadata_path + drop_suffix; String table_data_path_relative = getTableDataPath(table_name); @@ -363,7 +367,7 @@ void DatabaseOnDisk::dropTable(ContextPtr local_context, const String & table_na bool renamed = false; try { - fs::rename(table_metadata_path, table_metadata_path_drop); + shared_disk->replaceFile(table_metadata_path, table_metadata_path_drop); renamed = true; // The table might be not loaded for Lazy database engine. if (table) @@ -378,7 +382,7 @@ void DatabaseOnDisk::dropTable(ContextPtr local_context, const String & table_na if (table) attachTable(local_context, table_name, table, table_data_path_relative); if (renamed) - fs::rename(table_metadata_path_drop, table_metadata_path); + shared_disk->replaceFile(table_metadata_path_drop, table_metadata_path); throw; } @@ -390,7 +394,7 @@ void DatabaseOnDisk::dropTable(ContextPtr local_context, const String & table_na LOG_INFO(log, "Removing data directory from disk {} with path {} for dropped table {} ", disk_name, table_data_path_relative, table_name); disk->removeRecursive(table_data_path_relative); } - (void)fs::remove(table_metadata_path_drop); + (void)shared_disk->removeFileIfExists(table_metadata_path_drop); } void DatabaseOnDisk::checkMetadataFilenameAvailability(const String & to_table_name) const @@ -401,13 +405,14 @@ void DatabaseOnDisk::checkMetadataFilenameAvailability(const String & to_table_n void DatabaseOnDisk::checkMetadataFilenameAvailabilityUnlocked(const String & to_table_name) const { + auto shared_disk = getContext()->getSharedDisk(); String table_metadata_path = getObjectMetadataPath(to_table_name); - if (fs::exists(table_metadata_path)) + if (shared_disk->existsFile(table_metadata_path)) { fs::path detached_permanently_flag(table_metadata_path + detached_suffix); - if (fs::exists(detached_permanently_flag)) + if (shared_disk->existsFile(detached_permanently_flag)) throw Exception(ErrorCodes::TABLE_ALREADY_EXISTS, "Table {}.{} already exists (detached permanently)", backQuote(database_name), backQuote(to_table_name)); throw Exception( @@ -426,6 +431,8 @@ void DatabaseOnDisk::renameTable( if (exchange) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Tables can be exchanged only in Atomic databases"); + auto shared_disk = getContext()->getSharedDisk(); + bool from_ordinary_to_atomic = false; bool from_atomic_to_ordinary = false; if (typeid(*this) != typeid(to_database)) @@ -509,7 +516,7 @@ void DatabaseOnDisk::renameTable( /// Now table data are moved to new database, so we must add metadata and attach table to new database to_database.createTable(local_context, to_table_name, table, attach_query); - (void)fs::remove(table_metadata_path); + (void)shared_disk->removeFileIfExists(table_metadata_path); if (from_atomic_to_ordinary) { @@ -587,27 +594,32 @@ void DatabaseOnDisk::drop(ContextPtr local_context) { waitDatabaseStarted(); + auto shared_disk = getContext()->getSharedDisk(); + assert(TSA_SUPPRESS_WARNING_FOR_READ(tables).empty()); if (local_context->getSettingsRef()[Setting::force_remove_data_recursively_on_drop]) { - (void)fs::remove_all(std::filesystem::path(getContext()->getPath()) / data_path); - (void)fs::remove_all(getMetadataPath()); + (void)shared_disk->removeRecursive(std::filesystem::path(getContext()->getPath()) / data_path); + (void)shared_disk->removeRecursive(getMetadataPath()); } else { try { - (void)fs::remove(std::filesystem::path(getContext()->getPath()) / data_path); - (void)fs::remove(getMetadataPath()); + (void)shared_disk->removeDirectoryIfExists(std::filesystem::path(getContext()->getPath()) / data_path); + (void)shared_disk->removeDirectoryIfExists(getMetadataPath()); } - catch (const fs::filesystem_error & e) + catch (const Exception & e) { - if (e.code() != std::errc::directory_not_empty) - throw Exception(Exception::CreateFromSTDTag{}, e); - throw Exception(ErrorCodes::DATABASE_NOT_EMPTY, "Cannot drop: {}. " + if (e.code() != ErrorCodes::CANNOT_RMDIR) + throw; + throw Exception( + ErrorCodes::DATABASE_NOT_EMPTY, + "Cannot drop: {}. " "Probably database contain some detached tables or metadata leftovers from Ordinary engine. " "If you want to remove all data anyway, try to attach database back and drop it again " - "with enabled force_remove_data_recursively_on_drop setting", e.what()); + "with enabled force_remove_data_recursively_on_drop setting", + e.what()); } } } @@ -619,10 +631,19 @@ String DatabaseOnDisk::getObjectMetadataPath(const String & object_name) const time_t DatabaseOnDisk::getObjectMetadataModificationTime(const String & object_name) const { + auto shared_disk = getContext()->getSharedDisk(); + String table_metadata_path = getObjectMetadataPath(object_name); + if (!shared_disk->existsFileOrDirectory(table_metadata_path)) + return static_cast(0); + try { - return FS::getModificationTime(table_metadata_path); + return shared_disk->getLastModified(table_metadata_path).epochTime(); + } + catch (const Exception &) + { + throw; } catch (const fs::filesystem_error & e) { @@ -636,7 +657,8 @@ time_t DatabaseOnDisk::getObjectMetadataModificationTime(const String & object_n void DatabaseOnDisk::iterateMetadataFiles(const IteratingFunction & process_metadata_file) const { - if (!fs::exists(metadata_path)) + auto shared_disk = getContext()->getSharedDisk(); + if (!shared_disk->existsDirectory(metadata_path)) return; auto process_tmp_drop_metadata_file = [&](const String & file_name) @@ -645,26 +667,26 @@ void DatabaseOnDisk::iterateMetadataFiles(const IteratingFunction & process_meta static const char * tmp_drop_ext = ".sql.tmp_drop"; const std::string object_name = file_name.substr(0, file_name.size() - strlen(tmp_drop_ext)); - if (fs::exists(std::filesystem::path(getContext()->getPath()) / data_path / object_name)) + if (shared_disk->existsFile(std::filesystem::path(getContext()->getPath()) / data_path / object_name)) { - fs::rename(getMetadataPath() + file_name, getMetadataPath() + object_name + ".sql"); + shared_disk->replaceFile(getMetadataPath() + file_name, getMetadataPath() + object_name + ".sql"); LOG_WARNING(log, "Object {} was not dropped previously and will be restored", backQuote(object_name)); process_metadata_file(object_name + ".sql"); } else { LOG_INFO(log, "Removing file {}", getMetadataPath() + file_name); - (void)fs::remove(getMetadataPath() + file_name); + (void)shared_disk->removeFileIfExists(getMetadataPath() + file_name); } }; /// Metadata files to load: name and flag for .tmp_drop files std::vector> metadata_files; - fs::directory_iterator dir_end; - for (fs::directory_iterator dir_it(metadata_path); dir_it != dir_end; ++dir_it) + for (const auto dir_it = shared_disk->iterateDirectory(metadata_path); dir_it->isValid(); dir_it->next()) { - String file_name = dir_it->path().filename(); + auto dir_path = fs::path(dir_it->path()); + String file_name = dir_path.filename(); /// For '.svn', '.gitignore' directory and similar. if (file_name.at(0) == '.') continue; @@ -685,8 +707,8 @@ void DatabaseOnDisk::iterateMetadataFiles(const IteratingFunction & process_meta else if (endsWith(file_name, ".sql.tmp")) { /// There are files .sql.tmp - delete - LOG_INFO(log, "Removing file {}", dir_it->path().string()); - (void)fs::remove(dir_it->path()); + LOG_INFO(log, "Removing file {}", dir_path.string()); + (void)shared_disk->removeFileIfExists(dir_path); } else if (endsWith(file_name, ".sql")) { @@ -729,13 +751,11 @@ ASTPtr DatabaseOnDisk::parseQueryFromMetadata( bool throw_on_error /*= true*/, bool remove_empty /*= false*/) { - String query; + auto shared_disk = Context::getGlobalContextInstance()->getSharedDisk(); - int metadata_file_fd = ::open(metadata_file_path.c_str(), O_RDONLY | O_CLOEXEC); - - if (metadata_file_fd == -1) + if (!shared_disk->existsFile(metadata_file_path)) { - if (errno == ENOENT && !throw_on_error) + if (!throw_on_error) return nullptr; ErrnoException::throwFromPath( @@ -745,8 +765,9 @@ ASTPtr DatabaseOnDisk::parseQueryFromMetadata( metadata_file_path); } - ReadBufferFromFile in(metadata_file_fd, metadata_file_path, METADATA_FILE_BUFFER_SIZE); - readStringUntilEOF(query, in); + auto read_buf = shared_disk->readFile(metadata_file_path, ReadSettings{}); + String query; + readStringUntilEOF(query, *read_buf); /** Empty files with metadata are generated after a rough restart of the server. * Remove these files to slightly reduce the work of the admins on startup. @@ -755,7 +776,7 @@ ASTPtr DatabaseOnDisk::parseQueryFromMetadata( { if (logger) LOG_ERROR(logger, "File {} is empty. Removing.", metadata_file_path); - (void)fs::remove(metadata_file_path); + (void)shared_disk->removeFileIfExists(metadata_file_path); return nullptr; } @@ -893,6 +914,7 @@ void DatabaseOnDisk::modifySettingsMetadata(const SettingsChanges & settings_cha out.sync(); out.close(); - fs::rename(metadata_file_tmp_path, metadata_file_path); + auto shared_disk = getContext()->getSharedDisk(); + shared_disk->replaceFile(metadata_file_tmp_path, metadata_file_path); } } diff --git a/src/Databases/DatabaseOnDisk.h b/src/Databases/DatabaseOnDisk.h index 1e11d21cc87..afbe0c35eda 100644 --- a/src/Databases/DatabaseOnDisk.h +++ b/src/Databases/DatabaseOnDisk.h @@ -1,10 +1,11 @@ #pragma once -#include -#include #include +#include #include #include +#include +#include namespace DB @@ -68,7 +69,8 @@ public: String getTableDataPath(const ASTCreateQuery & query) const override { return getTableDataPath(query.getTable()); } String getMetadataPath() const override { return metadata_path; } - static ASTPtr parseQueryFromMetadata(LoggerPtr log, ContextPtr context, const String & metadata_file_path, bool throw_on_error = true, bool remove_empty = false); + static ASTPtr parseQueryFromMetadata( + LoggerPtr log, ContextPtr context, const String & metadata_file_path, bool throw_on_error = true, bool remove_empty = false); /// will throw when the table we want to attach already exists (in active / detached / detached permanently form) void checkMetadataFilenameAvailability(const String & to_table_name) const override; diff --git a/src/Databases/DatabaseOrdinary.cpp b/src/Databases/DatabaseOrdinary.cpp index ad12f1264f8..ef53b5caa71 100644 --- a/src/Databases/DatabaseOrdinary.cpp +++ b/src/Databases/DatabaseOrdinary.cpp @@ -154,6 +154,8 @@ String DatabaseOrdinary::getConvertToReplicatedFlagPath(const String & name, con void DatabaseOrdinary::convertMergeTreeToReplicatedIfNeeded(ASTPtr ast, const QualifiedTableName & qualified_name, const String & file_name) { + auto shared_disk = getContext()->getSharedDisk(); + fs::path path(getMetadataPath()); fs::path file_path(file_name); fs::path full_path = path / file_path; @@ -172,7 +174,7 @@ void DatabaseOrdinary::convertMergeTreeToReplicatedIfNeeded(ASTPtr ast, const Qu auto convert_to_replicated_flag_path = getConvertToReplicatedFlagPath(qualified_name.table, policy, false); - if (!fs::exists(convert_to_replicated_flag_path)) + if (!shared_disk->existsFile(convert_to_replicated_flag_path)) return; if (getUUID() == UUIDHelpers::Nil) @@ -195,7 +197,7 @@ void DatabaseOrdinary::convertMergeTreeToReplicatedIfNeeded(ASTPtr ast, const Qu out.sync(); out.close(); } - fs::rename(table_metadata_tmp_path, table_metadata_path); + shared_disk->replaceFile(table_metadata_tmp_path, table_metadata_path); LOG_INFO( log, @@ -207,10 +209,12 @@ void DatabaseOrdinary::convertMergeTreeToReplicatedIfNeeded(ASTPtr ast, const Qu void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTablesMetadata & metadata, bool is_startup) { + auto shared_disk = getContext()->getSharedDisk(); + size_t prev_tables_count = metadata.parsed_tables.size(); size_t prev_total_dictionaries = metadata.total_dictionaries; - auto process_metadata = [&metadata, is_startup, local_context, this](const String & file_name) + auto process_metadata = [&metadata, is_startup, local_context, shared_disk, this](const String & file_name) { fs::path path(getMetadataPath()); fs::path file_path(file_name); @@ -244,7 +248,7 @@ void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTables } } - if (fs::exists(full_path.string() + detached_suffix)) + if (shared_disk->existsFile(full_path.string() + detached_suffix)) { const std::string table_name = unescapeForFileName(file_name.substr(0, file_name.size() - 4)); LOG_DEBUG(log, "Skipping permanently detached table {}.", backQuote(table_name)); @@ -355,11 +359,13 @@ void DatabaseOrdinary::restoreMetadataAfterConvertingToReplicated(StoragePtr tab if (!rmt) return; + auto shared_disk = getContext()->getSharedDisk(); + auto convert_to_replicated_flag_path = getConvertToReplicatedFlagPath(name.table, table->getStoragePolicy(), true); - if (!fs::exists(convert_to_replicated_flag_path)) + if (!shared_disk->existsFile(convert_to_replicated_flag_path)) return; - (void)fs::remove(convert_to_replicated_flag_path); + (void)shared_disk->removeFileIfExists(convert_to_replicated_flag_path); LOG_INFO ( log, @@ -595,14 +601,15 @@ void DatabaseOrdinary::alterTable(ContextPtr local_context, const StorageID & ta void DatabaseOrdinary::commitAlterTable(const StorageID &, const String & table_metadata_tmp_path, const String & table_metadata_path, const String & /*statement*/, ContextPtr /*query_context*/) { + auto shared_disk = getContext()->getSharedDisk(); try { /// rename atomically replaces the old file with the new one. - fs::rename(table_metadata_tmp_path, table_metadata_path); + (void)shared_disk->replaceFile(table_metadata_tmp_path, table_metadata_path); } catch (...) { - (void)fs::remove(table_metadata_tmp_path); + (void)shared_disk->removeFileIfExists(table_metadata_tmp_path); throw; } } diff --git a/src/Databases/MySQL/DatabaseMaterializedMySQL.cpp b/src/Databases/MySQL/DatabaseMaterializedMySQL.cpp index 3d5fcc153fd..932d6aed895 100644 --- a/src/Databases/MySQL/DatabaseMaterializedMySQL.cpp +++ b/src/Databases/MySQL/DatabaseMaterializedMySQL.cpp @@ -185,11 +185,13 @@ void DatabaseMaterializedMySQL::alterTable(ContextPtr context_, const StorageID void DatabaseMaterializedMySQL::drop(ContextPtr context_) { LOG_TRACE(log, "Dropping MaterializeMySQL database"); + + auto shared_disk = getContext()->getSharedDisk(); /// Remove metadata info fs::path metadata(getMetadataPath() + "/.metadata"); - if (fs::exists(metadata)) - (void)fs::remove(metadata); + if (shared_disk->existsFile(metadata)) + (void)shared_disk->removeFileIfExists(metadata); DatabaseAtomic::drop(context_); } diff --git a/src/Disks/DiskLocal.cpp b/src/Disks/DiskLocal.cpp index 732d2552628..e2c4b12366c 100644 --- a/src/Disks/DiskLocal.cpp +++ b/src/Disks/DiskLocal.cpp @@ -13,9 +13,11 @@ #include #include +#include #include -#include +#include #include +#include #include #include @@ -313,7 +315,7 @@ DirectoryIteratorPtr DiskLocal::iterateDirectory(const String & path) const void DiskLocal::moveFile(const String & from_path, const String & to_path) { - renameNoReplace(fs::path(disk_path) / from_path, fs::path(disk_path) / to_path); + DB::renameNoReplace(fs::path(disk_path) / from_path, fs::path(disk_path) / to_path); } void DiskLocal::replaceFile(const String & from_path, const String & to_path) @@ -324,6 +326,21 @@ void DiskLocal::replaceFile(const String & from_path, const String & to_path) fs::rename(from_file, to_file); } +void DiskLocal::renameNoReplace(const std::string & old_path, const std::string & new_path) +{ + DB::renameNoReplace(fs::path(disk_path) / old_path, fs::path(disk_path) / new_path); +} + +void DiskLocal::renameExchange(const std::string & old_path, const std::string & new_path) +{ + DB::renameExchange(fs::path(disk_path) / old_path, fs::path(disk_path) / new_path); +} + +bool DiskLocal::renameExchangeIfSupported(const std::string & old_path, const std::string & new_path) +{ + return DB::renameExchangeIfSupported(fs::path(disk_path) / old_path, fs::path(disk_path) / new_path); +} + std::unique_ptr DiskLocal::readFile(const String & path, const ReadSettings & settings, std::optional read_hint, std::optional file_size) const { if (!file_size.has_value()) @@ -363,7 +380,7 @@ void DiskLocal::removeFile(const String & path) { auto fs_path = fs::path(disk_path) / path; if (0 != unlink(fs_path.c_str())) - ErrnoException::throwFromPath(ErrorCodes::CANNOT_UNLINK, fs_path, "Cannot unlink file {}", fs_path); + fs::remove(fs_path); } void DiskLocal::removeFileIfExists(const String & path) @@ -383,6 +400,14 @@ void DiskLocal::removeDirectory(const String & path) ErrnoException::throwFromPath(ErrorCodes::CANNOT_RMDIR, fs_path, "Cannot remove directory {}", fs_path); } +void DiskLocal::removeDirectoryIfExists(const String & path) +{ + auto fs_path = fs::path(disk_path) / path; + if (0 != rmdir(fs_path.c_str())) + if (errno != ENOENT) + ErrnoException::throwFromPath(ErrorCodes::CANNOT_RMDIR, fs_path, "Cannot remove directory {}", fs_path); +} + void DiskLocal::removeRecursive(const String & path) { (void)fs::remove_all(fs::path(disk_path) / path); @@ -415,6 +440,37 @@ void DiskLocal::createHardLink(const String & src_path, const String & dst_path) DB::createHardLink(fs::path(disk_path) / src_path, fs::path(disk_path) / dst_path); } +bool DiskLocal::isSymlink(const String & path) const +{ + return FS::isSymlink(path); +} + +bool DiskLocal::isSymlinkNoThrow(const String & path) const +{ + return FS::isSymlinkNoThrow(path); +} + +void DiskLocal::createDirectoriesSymlink(const String & target, const String & link) +{ + fs::create_directory_symlink(target, link); +} + +String DiskLocal::readSymlink(const fs::path & path) const +{ + return FS::readSymlink(path); +} + +bool DiskLocal::equivalent(const String & p1, const String & p2) const +{ + return fs::equivalent(p1, p2); +} + +bool DiskLocal::equivalentNoThrow(const String & p1, const String & p2) const +{ + std::error_code ec; + return fs::equivalent(p1, p2, ec); +} + void DiskLocal::truncateFile(const String & path, size_t size) { int res = truncate((fs::path(disk_path) / path).string().data(), size); diff --git a/src/Disks/DiskLocal.h b/src/Disks/DiskLocal.h index 1edff4cec34..07f887c77ec 100644 --- a/src/Disks/DiskLocal.h +++ b/src/Disks/DiskLocal.h @@ -64,6 +64,12 @@ public: void replaceFile(const String & from_path, const String & to_path) override; + void renameNoReplace(const std::string & old_path, const std::string & new_path) override; + + void renameExchange(const std::string & old_path, const std::string & new_path) override; + + bool renameExchangeIfSupported(const std::string & old_path, const std::string & new_path) override; + void copyDirectoryContent( const String & from_dir, const std::shared_ptr & to_disk, @@ -92,6 +98,7 @@ public: void removeFile(const String & path) override; void removeFileIfExists(const String & path) override; void removeDirectory(const String & path) override; + void removeDirectoryIfExists(const String & path) override; void removeRecursive(const String & path) override; void setLastModified(const String & path, const Poco::Timestamp & timestamp) override; @@ -104,6 +111,20 @@ public: void createHardLink(const String & src_path, const String & dst_path) override; + bool isSymlinkSupported() const override { return true; } + + bool isSymlink(const String & path) const override; + + bool isSymlinkNoThrow(const String & path) const override; + + void createDirectoriesSymlink(const String & target, const String & link) override; + + String readSymlink(const fs::path & path) const override; + + bool equivalent(const String & p1, const String & p2) const override; + + bool equivalentNoThrow(const String & p1, const String & p2) const override; + void truncateFile(const String & path, size_t size) override; DataSourceDescription getDataSourceDescription() const override; diff --git a/src/Disks/IDisk.h b/src/Disks/IDisk.h index 692020c86a6..681fb241030 100644 --- a/src/Disks/IDisk.h +++ b/src/Disks/IDisk.h @@ -193,6 +193,26 @@ public: /// If a file with `to_path` path already exists, it will be replaced. virtual void replaceFile(const String & from_path, const String & to_path) = 0; + virtual void renameNoReplace(const std::string &, const std::string &) + { + throw Exception( + ErrorCodes::NOT_IMPLEMENTED, "Method `renameNoReplace()` not implemented for disk: {}", getDataSourceDescription().toString()); + } + + virtual void renameExchange(const std::string &, const std::string &) + { + throw Exception( + ErrorCodes::NOT_IMPLEMENTED, "Method `renameExchange()` not implemented for disk: {}", getDataSourceDescription().toString()); + } + + virtual bool renameExchangeIfSupported(const std::string &, const std::string &) + { + throw Exception( + ErrorCodes::NOT_IMPLEMENTED, + "Method `renameExchangeIfSupported()` not implemented for disk: {}", + getDataSourceDescription().toString()); + } + /// Recursively copy files from from_dir to to_dir. Create to_dir if not exists. virtual void copyDirectoryContent( const String & from_dir, @@ -246,6 +266,14 @@ public: /// Remove directory. Throws exception if it's not a directory or if directory is not empty. virtual void removeDirectory(const String & path) = 0; + virtual void removeDirectoryIfExists(const String &) + { + throw Exception( + ErrorCodes::NOT_IMPLEMENTED, + "Method `removeDirectoryIfExists()` is not implemented for disk: {}", + getDataSourceDescription().toString()); + } + /// Remove file or directory with all children. Use with extra caution. Throws exception if file doesn't exists. virtual void removeRecursive(const String & path) = 0; @@ -357,6 +385,49 @@ public: /// Create hardlink from `src_path` to `dst_path`. virtual void createHardLink(const String & src_path, const String & dst_path) = 0; + virtual bool isSymlinkSupported() const { return false; } + virtual bool isSymlink(const String &) const + { + throw Exception( + ErrorCodes::NOT_IMPLEMENTED, "Method isSymlink() is not implemented for disk type: {}", getDataSourceDescription().toString()); + } + + virtual bool isSymlinkNoThrow(const String &) const + { + throw Exception( + ErrorCodes::NOT_IMPLEMENTED, + "Method isSymlinkNothrow() is not implemented for disk type: {}", + getDataSourceDescription().toString()); + } + + virtual void createDirectoriesSymlink(const String &, const String &) + { + throw Exception( + ErrorCodes::NOT_IMPLEMENTED, + "Method createDirectoriesSymlink() is not implemented for disk type: {}", + getDataSourceDescription().toString()); + } + + virtual String readSymlink(const fs::path &) const + { + throw Exception( + ErrorCodes::NOT_IMPLEMENTED, + "Method readSymlink() is not implemented for disk type: {}", + getDataSourceDescription().toString()); + } + + virtual bool equivalent(const String &, const String &) const + { + throw Exception( + ErrorCodes::NOT_IMPLEMENTED, "Method equivalent() is not implemented for disk type: {}", getDataSourceDescription().toString()); + } + + virtual bool equivalentNoThrow(const String &, const String &) const + { + throw Exception( + ErrorCodes::NOT_IMPLEMENTED, "Method equivalent() is not implemented for disk type: {}", getDataSourceDescription().toString()); + } + /// Truncate file to specified size. virtual void truncateFile(const String & path, size_t size); diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index d2aad0a52d8..e907e655656 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -360,6 +360,8 @@ struct ContextSharedPart : boost::noncopyable ConfigurationPtr config TSA_GUARDED_BY(mutex); /// Global configuration settings. String tmp_path TSA_GUARDED_BY(mutex); /// Path to the temporary files that occur when processing the request. + std::shared_ptr shared_disk; + /// All temporary files that occur when processing the requests accounted here. /// Child scopes for more fine-grained accounting are created per user/query/etc. /// Initialized once during server startup. @@ -1129,6 +1131,11 @@ String Context::getFilesystemCachesPath() const return shared->filesystem_caches_path; } +std::shared_ptr Context::getSharedDisk() const +{ + return shared->shared_disk; +} + String Context::getFilesystemCacheUser() const { SharedLockGuard lock(shared->mutex); @@ -1217,6 +1224,8 @@ void Context::setPath(const String & path) shared->path = path; + shared->shared_disk = std::make_shared("shared", path); + if (shared->tmp_path.empty() && !shared->root_temp_data_on_disk) shared->tmp_path = shared->path + "tmp/"; @@ -1254,21 +1263,23 @@ try { LOG_DEBUG(log, "Setting up {} to store temporary data in it", path); - if (fs::exists(path)) + auto shared_disk = Context::getGlobalContextInstance()->getSharedDisk(); + + if (shared_disk->existsDirectory(path)) { /// Clearing old temporary files. - fs::directory_iterator dir_end; - for (fs::directory_iterator it(path); it != dir_end; ++it) + for (const auto dir_it = shared_disk->iterateDirectory(path); dir_it->isValid(); dir_it->next()) { - if (it->is_regular_file()) + auto file_path = fs::path(dir_it->path()); + if (shared_disk->existsFile(file_path)) { - if (startsWith(it->path().filename(), "tmp")) + if (startsWith(file_path.filename(), "tmp")) { - LOG_DEBUG(log, "Removing old temporary file {}", it->path().string()); - fs::remove(it->path()); + LOG_DEBUG(log, "Removing old temporary file {}", file_path.string()); + shared_disk->removeFileIfExists(file_path); } else - LOG_DEBUG(log, "Found unknown file in temporary path {}", it->path().string()); + LOG_DEBUG(log, "Found unknown file in temporary path {}", file_path.string()); } /// We skip directories (for example, 'http_buffers' - it's used for buffering of the results) and all other file types. } @@ -4781,14 +4792,16 @@ void Context::checkCanBeDropped(const String & database, const String & table, c if (!max_size_to_drop || size <= max_size_to_drop) return; + auto shared_disk = getSharedDisk(); + fs::path force_file(getFlagsPath() + "force_drop_table"); - bool force_file_exists = fs::exists(force_file); + bool force_file_exists = shared_disk->existsFile(force_file); if (force_file_exists) { try { - fs::remove(force_file); + shared_disk->removeFileIfExists(force_file); return; } catch (...) diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index e8ccc31f597..db3084a374e 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -584,6 +584,7 @@ public: String getUserScriptsPath() const; String getFilesystemCachesPath() const; String getFilesystemCacheUser() const; + std::shared_ptr getSharedDisk() const; /// A list of warnings about server configuration to place in `system.warnings` table. Strings getWarnings() const; diff --git a/src/Interpreters/DatabaseCatalog.cpp b/src/Interpreters/DatabaseCatalog.cpp index dc9ce23ddb9..6014f509e41 100644 --- a/src/Interpreters/DatabaseCatalog.cpp +++ b/src/Interpreters/DatabaseCatalog.cpp @@ -617,6 +617,7 @@ DatabasePtr DatabaseCatalog::detachDatabase(ContextPtr local_context, const Stri if (drop) { + auto shared_disk = getContext()->getSharedDisk(); UUID db_uuid = db->getUUID(); /// Delete the database. @@ -625,9 +626,9 @@ DatabasePtr DatabaseCatalog::detachDatabase(ContextPtr local_context, const Stri /// Old ClickHouse versions did not store database.sql files /// Remove metadata dir (if exists) to avoid recreation of .sql file on server startup fs::path database_metadata_dir = fs::path(getContext()->getPath()) / "metadata" / escapeForFileName(database_name); - fs::remove(database_metadata_dir); + shared_disk->removeDirectoryIfExists(database_metadata_dir); fs::path database_metadata_file = fs::path(getContext()->getPath()) / "metadata" / (escapeForFileName(database_name) + ".sql"); - fs::remove(database_metadata_file); + shared_disk->removeFileIfExists(database_metadata_file); if (db_uuid != UUIDHelpers::Nil) removeUUIDMappingFinally(db_uuid); @@ -996,6 +997,8 @@ void DatabaseCatalog::loadMarkedAsDroppedTables() { assert(!cleanup_task); + auto shared_disk = getContext()->getSharedDisk(); + /// /clickhouse_root/metadata_dropped/ contains files with metadata of tables, /// which where marked as dropped by Atomic databases. /// Data directories of such tables still exists in store/ @@ -1006,41 +1009,41 @@ void DatabaseCatalog::loadMarkedAsDroppedTables() std::map dropped_metadata; String path = std::filesystem::path(getContext()->getPath()) / "metadata_dropped" / ""; - if (!std::filesystem::exists(path)) - { + if (!shared_disk->existsDirectory(path)) return; - } - Poco::DirectoryIterator dir_end; - for (Poco::DirectoryIterator it(path); it != dir_end; ++it) + for (const auto dir_it = shared_disk->iterateDirectory(path); dir_it->isValid(); dir_it->next()) { + auto sub_path = fs::path(dir_it->path()); /// File name has the following format: /// database_name.table_name.uuid.sql + auto sub_path_filename = sub_path.filename().string(); + /// Ignore unexpected files - if (!it.name().ends_with(".sql")) + if (!sub_path_filename.ends_with(".sql")) continue; /// Process .sql files with metadata of tables which were marked as dropped StorageID dropped_id = StorageID::createEmpty(); - size_t dot_pos = it.name().find('.'); + size_t dot_pos = sub_path_filename.find('.'); if (dot_pos == std::string::npos) continue; - dropped_id.database_name = unescapeForFileName(it.name().substr(0, dot_pos)); + dropped_id.database_name = unescapeForFileName(sub_path_filename.substr(0, dot_pos)); size_t prev_dot_pos = dot_pos; - dot_pos = it.name().find('.', prev_dot_pos + 1); + dot_pos = sub_path_filename.find('.', prev_dot_pos + 1); if (dot_pos == std::string::npos) continue; - dropped_id.table_name = unescapeForFileName(it.name().substr(prev_dot_pos + 1, dot_pos - prev_dot_pos - 1)); + dropped_id.table_name = unescapeForFileName(sub_path_filename.substr(prev_dot_pos + 1, dot_pos - prev_dot_pos - 1)); prev_dot_pos = dot_pos; - dot_pos = it.name().find('.', prev_dot_pos + 1); + dot_pos = sub_path_filename.find('.', prev_dot_pos + 1); if (dot_pos == std::string::npos) continue; - dropped_id.uuid = parse(it.name().substr(prev_dot_pos + 1, dot_pos - prev_dot_pos - 1)); + dropped_id.uuid = parse(sub_path_filename.substr(prev_dot_pos + 1, dot_pos - prev_dot_pos - 1)); - String full_path = path + it.name(); + String full_path = path + sub_path_filename; dropped_metadata.emplace(std::move(full_path), std::move(dropped_id)); } @@ -1084,6 +1087,8 @@ void DatabaseCatalog::enqueueDroppedTableCleanup(StorageID table_id, StoragePtr assert(!table || table->getStorageID().uuid == table_id.uuid); assert(dropped_metadata_path == getPathForDroppedMetadata(table_id)); + auto shared_disk = getContext()->getSharedDisk(); + /// Table was removed from database. Enqueue removal of its data from disk. time_t drop_time; if (table) @@ -1128,7 +1133,7 @@ void DatabaseCatalog::enqueueDroppedTableCleanup(StorageID table_id, StoragePtr } addUUIDMapping(table_id.uuid); - drop_time = FS::getModificationTime(dropped_metadata_path); + drop_time = shared_disk->getLastModified(dropped_metadata_path).epochTime(); } std::lock_guard lock(tables_marked_dropped_mutex); @@ -1161,6 +1166,8 @@ void DatabaseCatalog::enqueueDroppedTableCleanup(StorageID table_id, StoragePtr void DatabaseCatalog::undropTable(StorageID table_id) { + auto shared_disk = getContext()->getSharedDisk(); + String latest_metadata_dropped_path; TableMarkedAsDropped dropped_table; { @@ -1198,7 +1205,7 @@ void DatabaseCatalog::undropTable(StorageID table_id) /// a table is successfully marked undropped, /// if and only if its metadata file was moved to a database. /// This maybe throw exception. - renameNoReplace(latest_metadata_dropped_path, table_metadata_path); + shared_disk->moveFile(latest_metadata_dropped_path, table_metadata_path); if (first_async_drop_in_queue == it_dropped_table) ++first_async_drop_in_queue; @@ -1381,6 +1388,8 @@ void DatabaseCatalog::dropTableDataTask() void DatabaseCatalog::dropTableFinally(const TableMarkedAsDropped & table) { + auto shared_disk = getContext()->getSharedDisk(); + if (table.table) { table.table->drop(); @@ -1398,7 +1407,7 @@ void DatabaseCatalog::dropTableFinally(const TableMarkedAsDropped & table) } LOG_INFO(log, "Removing metadata {} of dropped table {}", table.metadata_path, table.table_id.getNameForLogs()); - fs::remove(fs::path(table.metadata_path)); + shared_disk->removeFileIfExists(fs::path(table.metadata_path)); removeUUIDMappingFinally(table.table_id.uuid); CurrentMetrics::sub(CurrentMetrics::TablesToDropQueueSize, 1); diff --git a/src/Interpreters/InterpreterCreateQuery.cpp b/src/Interpreters/InterpreterCreateQuery.cpp index b027f6aad61..0522cccf73a 100644 --- a/src/Interpreters/InterpreterCreateQuery.cpp +++ b/src/Interpreters/InterpreterCreateQuery.cpp @@ -228,16 +228,18 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create) db_num_limit, db_count); } + auto shared_disk = getContext()->getSharedDisk(); + /// Will write file with database metadata, if needed. String database_name_escaped = escapeForFileName(database_name); fs::path metadata_path = fs::weakly_canonical(getContext()->getPath()); - fs::create_directories(metadata_path / "metadata"); + shared_disk->createDirectories(metadata_path / "metadata"); fs::path metadata_file_tmp_path = metadata_path / "metadata" / (database_name_escaped + ".sql.tmp"); fs::path metadata_file_path = metadata_path / "metadata" / (database_name_escaped + ".sql"); if (!create.storage && create.attach) { - if (!fs::exists(metadata_file_path)) + if (!shared_disk->existsFile(metadata_file_path)) throw Exception(ErrorCodes::UNKNOWN_DATABASE_ENGINE, "Database engine must be specified for ATTACH DATABASE query"); /// Short syntax: try read database definition from file auto ast = DatabaseOnDisk::parseQueryFromMetadata(nullptr, getContext(), metadata_file_path); @@ -284,7 +286,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create) metadata_path = metadata_path / "store" / DatabaseCatalog::getPathForUUID(create.uuid); - if (!create.attach && fs::exists(metadata_path) && !fs::is_empty(metadata_path)) + if (!create.attach && shared_disk->existsDirectory(metadata_path) && !shared_disk->isDirectoryEmpty(metadata_path)) throw Exception(ErrorCodes::DATABASE_ALREADY_EXISTS, "Metadata directory {} already exists and is not empty", metadata_path.string()); } else if (create.storage->engine->name == "MaterializeMySQL" @@ -356,7 +358,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create) "Enable allow_experimental_database_materialized_postgresql to use it"); } - bool need_write_metadata = !create.attach || !fs::exists(metadata_file_path); + bool need_write_metadata = !create.attach || !shared_disk->existsFile(metadata_file_path); bool need_lock_uuid = internal || need_write_metadata; auto mode = getLoadingStrictnessLevel(create.attach, force_attach, has_force_restore_data_flag, /*secondary*/ false); @@ -384,7 +386,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create) String statement = statement_buf.str(); /// Needed to make database creation retriable if it fails after the file is created - fs::remove(metadata_file_tmp_path); + shared_disk->removeFileIfExists(metadata_file_tmp_path); /// Exclusive flag guarantees, that database is not created right now in another thread. WriteBufferFromFile out(metadata_file_tmp_path, statement.size(), O_WRONLY | O_CREAT | O_EXCL); @@ -422,7 +424,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create) if (need_write_metadata) { /// Prevents from overwriting metadata of detached database - renameNoReplace(metadata_file_tmp_path, metadata_file_path); + shared_disk->moveFile(metadata_file_tmp_path, metadata_file_path); renamed = true; } } @@ -430,8 +432,8 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create) { if (renamed) { - [[maybe_unused]] bool removed = fs::remove(metadata_file_path); - assert(removed); + assert(shared_disk->existsFile(metadata_file_path)); + shared_disk->removeFileIfExists(metadata_file_path); } if (added) DatabaseCatalog::instance().detachDatabase(getContext(), database_name, false, false); @@ -1787,8 +1789,9 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create, data_path = database->getTableDataPath(create); auto full_data_path = fs::path{getContext()->getPath()} / data_path; + auto shared_disk = getContext()->getSharedDisk(); - if (!create.attach && !data_path.empty() && fs::exists(full_data_path)) + if (!create.attach && !data_path.empty() && shared_disk->existsDirectory(full_data_path)) { if (getContext()->getZooKeeperMetadataTransaction() && !getContext()->getZooKeeperMetadataTransaction()->isInitialQuery() && @@ -1803,8 +1806,8 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create, fs::path trash_path = fs::path{getContext()->getPath()} / "trash" / data_path / getHexUIntLowercase(thread_local_rng()); LOG_WARNING(getLogger("InterpreterCreateQuery"), "Directory for {} data {} already exists. Will move it to {}", Poco::toLower(storage_name), String(data_path), trash_path); - fs::create_directories(trash_path.parent_path()); - renameNoReplace(full_data_path, trash_path); + shared_disk->createDirectories(trash_path.parent_path()); + shared_disk->moveFile(full_data_path, trash_path); } else { diff --git a/src/Interpreters/loadMetadata.cpp b/src/Interpreters/loadMetadata.cpp index 12de96ef03a..c761be790f1 100644 --- a/src/Interpreters/loadMetadata.cpp +++ b/src/Interpreters/loadMetadata.cpp @@ -1,6 +1,7 @@ -#include -#include +#include "Common/tests/gtest_global_context.h" #include +#include +#include #include #include @@ -104,7 +105,8 @@ static void loadDatabase( String database_attach_query; String database_metadata_file = database_path + ".sql"; - if (fs::exists(fs::path(database_metadata_file))) + auto shared_disk = Context::getGlobalContextInstance()->getSharedDisk(); + if (shared_disk->existsFile(fs::path(database_metadata_file))) { /// There is .sql file with database creation statement. ReadBufferFromFile in(database_metadata_file, 1024); @@ -130,9 +132,10 @@ static void loadDatabase( static void checkUnsupportedVersion(ContextMutablePtr context, const String & database_name) { + auto shared_disk = Context::getGlobalContextInstance()->getSharedDisk(); /// Produce better exception message String metadata_path = context->getPath() + "metadata/" + database_name; - if (fs::exists(fs::path(metadata_path))) + if (shared_disk->existsDirectory(fs::path(metadata_path))) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Data directory for {} database exists, but metadata file does not. " "Probably you are trying to upgrade from version older than 20.7. " "If so, you should upgrade through intermediate version.", database_name); @@ -143,8 +146,10 @@ static void checkIncompleteOrdinaryToAtomicConversion(ContextPtr context, const if (context->getConfigRef().has("allow_reserved_database_name_tmp_convert")) return; + auto shared_disk = Context::getGlobalContextInstance()->getSharedDisk(); + auto convert_flag_path = fs::path(context->getFlagsPath()) / "convert_ordinary_to_atomic"; - if (!fs::exists(convert_flag_path)) + if (!shared_disk->existsFile(convert_flag_path)) return; /// Flag exists. Let's check if we had an unsuccessful conversion attempt previously @@ -171,6 +176,8 @@ static void checkIncompleteOrdinaryToAtomicConversion(ContextPtr context, const LoadTaskPtrs loadMetadata(ContextMutablePtr context, const String & default_database_name, bool async_load_databases) { + auto shared_disk = Context::getGlobalContextInstance()->getSharedDisk(); + LoggerPtr log = getLogger("loadMetadata"); String path = context->getPath() + "metadata"; @@ -179,12 +186,12 @@ LoadTaskPtrs loadMetadata(ContextMutablePtr context, const String & default_data /// on difference of data parts while initializing tables. /// This file is immediately deleted i.e. "one-shot". auto force_restore_data_flag_file = fs::path(context->getFlagsPath()) / "force_restore_data"; - bool has_force_restore_data_flag = fs::exists(force_restore_data_flag_file); + bool has_force_restore_data_flag = shared_disk->existsFile(force_restore_data_flag_file); if (has_force_restore_data_flag) { try { - fs::remove(force_restore_data_flag_file); + shared_disk->removeFileIfExists(force_restore_data_flag_file); } catch (...) { @@ -195,16 +202,16 @@ LoadTaskPtrs loadMetadata(ContextMutablePtr context, const String & default_data /// Loop over databases. std::map databases; - fs::directory_iterator dir_end; - for (fs::directory_iterator it(path); it != dir_end; ++it) + for (const auto dir_it = shared_disk->iterateDirectory(path); dir_it->isValid(); dir_it->next()) { - if (it->is_symlink()) + auto sub_path = fs::path(dir_it->path()); + if (shared_disk->isSymlinkSupported() && shared_disk->isSymlink(sub_path)) continue; - if (it->is_directory()) + if (shared_disk->existsDirectory(sub_path)) continue; - const auto current_file = it->path().filename().string(); + const auto current_file = sub_path.filename().string(); /// TODO: DETACH DATABASE PERMANENTLY ? if (fs::path(current_file).extension() == ".sql") @@ -217,10 +224,10 @@ LoadTaskPtrs loadMetadata(ContextMutablePtr context, const String & default_data /// Temporary fails may be left from previous server runs. if (fs::path(current_file).extension() == ".tmp") { - LOG_WARNING(log, "Removing temporary file {}", it->path().string()); + LOG_WARNING(log, "Removing temporary file {}", sub_path.string()); try { - fs::remove(it->path()); + shared_disk->removeFileIfExists(sub_path); } catch (...) { @@ -281,12 +288,14 @@ LoadTaskPtrs loadMetadata(ContextMutablePtr context, const String & default_data static void loadSystemDatabaseImpl(ContextMutablePtr context, const String & database_name, const String & default_engine) { + auto shared_disk = Context::getGlobalContextInstance()->getSharedDisk(); + String path = context->getPath() + "metadata/" + database_name; String metadata_file = path + ".sql"; - if (fs::exists(metadata_file + ".tmp")) - fs::remove(metadata_file + ".tmp"); + if (shared_disk->existsFile(metadata_file + ".tmp")) + shared_disk->removeFileIfExists(metadata_file + ".tmp"); - if (fs::exists(fs::path(metadata_file))) + if (shared_disk->existsFile(fs::path(metadata_file))) { /// 'has_force_restore_data_flag' is true, to not fail on loading query_log table, if it is corrupted. loadDatabase(context, database_name, path, true); @@ -481,8 +490,10 @@ void maybeConvertSystemDatabase(ContextMutablePtr context, LoadTaskPtrs & load_s void convertDatabasesEnginesIfNeed(const LoadTaskPtrs & load_metadata, ContextMutablePtr context) { + auto shared_disk = Context::getGlobalContextInstance()->getSharedDisk(); + auto convert_flag_path = fs::path(context->getFlagsPath()) / "convert_ordinary_to_atomic"; - if (!fs::exists(convert_flag_path)) + if (!shared_disk->existsFile(convert_flag_path)) return; LOG_INFO(getLogger("loadMetadata"), "Found convert_ordinary_to_atomic file in flags directory, " @@ -496,7 +507,7 @@ void convertDatabasesEnginesIfNeed(const LoadTaskPtrs & load_metadata, ContextMu maybeConvertOrdinaryDatabaseToAtomic(context, name); LOG_INFO(getLogger("loadMetadata"), "Conversion finished, removing convert_ordinary_to_atomic flag"); - fs::remove(convert_flag_path); + shared_disk->removeFileIfExists(convert_flag_path); } LoadTaskPtrs loadMetadataSystem(ContextMutablePtr context, bool async_load_system_database)