Use IDisk in Databases

This commit is contained in:
Tuan Pham Anh 2024-11-18 12:39:53 +00:00
parent 3c0f299148
commit f2e031f1b8
14 changed files with 422 additions and 164 deletions

View File

@ -76,8 +76,10 @@ void DatabaseAtomic::createDirectories()
void DatabaseAtomic::createDirectoriesUnlocked()
{
DatabaseOnDisk::createDirectoriesUnlocked();
fs::create_directories(fs::path(getContext()->getPath()) / "metadata");
fs::create_directories(path_to_table_symlinks);
auto shared_disk = getContext()->getSharedDisk();
shared_disk->createDirectories(fs::path(getContext()->getPath()) / "metadata");
if (shared_disk->isSymlinkSupported())
shared_disk->createDirectories(path_to_table_symlinks);
tryCreateMetadataSymlink();
}
@ -100,18 +102,22 @@ String DatabaseAtomic::getTableDataPath(const ASTCreateQuery & query) const
void DatabaseAtomic::drop(ContextPtr)
{
auto shared_disk = getContext()->getSharedDisk();
waitDatabaseStarted();
assert(TSA_SUPPRESS_WARNING_FOR_READ(tables).empty());
try
{
(void)fs::remove(path_to_metadata_symlink);
(void)fs::remove_all(path_to_table_symlinks);
if (shared_disk->isSymlinkSupported())
{
(void)shared_disk->removeFileIfExists(path_to_metadata_symlink);
(void)shared_disk->removeRecursive(path_to_table_symlinks);
}
}
catch (...)
{
LOG_WARNING(log, getCurrentExceptionMessageAndPattern(/* with_stacktrace */ true));
}
(void)fs::remove_all(getMetadataPath());
(void)shared_disk->removeRecursive(getMetadataPath());
}
void DatabaseAtomic::attachTable(ContextPtr /* context_ */, const String & name, const StoragePtr & table, const String & relative_table_path)
@ -166,6 +172,8 @@ void DatabaseAtomic::dropTable(ContextPtr local_context, const String & table_na
void DatabaseAtomic::dropTableImpl(ContextPtr local_context, const String & table_name, bool sync)
{
auto shared_disk = getContext()->getSharedDisk();
String table_metadata_path = getObjectMetadataPath(table_name);
String table_metadata_path_drop;
StoragePtr table;
@ -174,7 +182,7 @@ void DatabaseAtomic::dropTableImpl(ContextPtr local_context, const String & tabl
table = getTableUnlocked(table_name);
table_metadata_path_drop = DatabaseCatalog::instance().getPathForDroppedMetadata(table->getStorageID());
fs::create_directory(fs::path(table_metadata_path_drop).parent_path());
shared_disk->createDirectories(fs::path(table_metadata_path_drop).parent_path());
auto txn = local_context->getZooKeeperMetadataTransaction();
if (txn && !local_context->isInternalSubquery())
@ -186,7 +194,7 @@ void DatabaseAtomic::dropTableImpl(ContextPtr local_context, const String & tabl
/// (it's more likely to lost connection, than to fail before applying local changes).
/// TODO better detection and recovery
fs::rename(table_metadata_path, table_metadata_path_drop); /// Mark table as dropped
shared_disk->replaceFile(table_metadata_path, table_metadata_path_drop); /// Mark table as dropped
DatabaseOrdinary::detachTableUnlocked(table_name); /// Should never throw
table_name_to_path.erase(table_name);
}
@ -318,11 +326,12 @@ void DatabaseAtomic::renameTable(ContextPtr local_context, const String & table_
/// NOTE: replica will be lost if server crashes before the following rename
/// TODO better detection and recovery
auto shared_disk = getContext()->getSharedDisk();
if (exchange)
renameExchange(old_metadata_path, new_metadata_path);
shared_disk->renameExchange(old_metadata_path, new_metadata_path);
else
renameNoReplace(old_metadata_path, new_metadata_path);
shared_disk->renameNoReplace(old_metadata_path, new_metadata_path);
/// After metadata was successfully moved, the following methods should not throw (if they do, it's a logical error)
table_data_path = detach(*this, table_name, table->storesDataOnDisk());
@ -349,6 +358,8 @@ void DatabaseAtomic::commitCreateTable(const ASTCreateQuery & query, const Stora
const String & table_metadata_tmp_path, const String & table_metadata_path,
ContextPtr query_context)
{
auto shared_disk = getContext()->getSharedDisk();
createDirectories();
DetachedTables not_in_use;
auto table_data_path = getTableDataPath(query);
@ -377,7 +388,7 @@ void DatabaseAtomic::commitCreateTable(const ASTCreateQuery & query, const Stora
}
catch (...)
{
(void)fs::remove(table_metadata_tmp_path);
(void)shared_disk->removeFileIfExists(table_metadata_tmp_path);
throw;
}
if (table->storesDataOnDisk())
@ -387,11 +398,13 @@ void DatabaseAtomic::commitCreateTable(const ASTCreateQuery & query, const Stora
void DatabaseAtomic::commitAlterTable(const StorageID & table_id, const String & table_metadata_tmp_path, const String & table_metadata_path,
const String & /*statement*/, ContextPtr query_context)
{
auto shared_disk = getContext()->getSharedDisk();
bool check_file_exists = true;
SCOPE_EXIT({
std::error_code code;
if (check_file_exists)
(void)std::filesystem::remove(table_metadata_tmp_path, code);
(void)shared_disk->removeFileIfExists(table_metadata_tmp_path);
});
std::lock_guard lock{mutex};
@ -409,7 +422,7 @@ void DatabaseAtomic::commitAlterTable(const StorageID & table_id, const String &
check_file_exists = renameExchangeIfSupported(table_metadata_tmp_path, table_metadata_path);
if (!check_file_exists)
std::filesystem::rename(table_metadata_tmp_path, table_metadata_path);
shared_disk->replaceFile(table_metadata_tmp_path, table_metadata_path);
}
void DatabaseAtomic::assertDetachedTableNotInUse(const UUID & uuid)
@ -483,33 +496,41 @@ UUID DatabaseAtomic::tryGetTableUUID(const String & table_name) const
void DatabaseAtomic::beforeLoadingMetadata(ContextMutablePtr /*context*/, LoadingStrictnessLevel mode)
{
auto shared_disk = getContext()->getSharedDisk();
if (mode < LoadingStrictnessLevel::FORCE_RESTORE)
return;
if (!fs::exists(path_to_table_symlinks))
if (!shared_disk->isSymlinkSupported())
return;
if (!shared_disk->existsFileOrDirectory(path_to_table_symlinks))
return;
/// Recreate symlinks to table data dirs in case of force restore, because some of them may be broken
for (const auto & table_path : fs::directory_iterator(path_to_table_symlinks))
for (const auto dir_it = shared_disk->iterateDirectory(path_to_table_symlinks); dir_it->isValid(); dir_it->next())
{
if (!FS::isSymlink(table_path))
auto table_path = fs::path(dir_it->path());
if (table_path.filename().empty())
table_path = table_path.parent_path();
if (!shared_disk->isSymlink(table_path))
{
throw Exception(ErrorCodes::ABORTED,
"'{}' is not a symlink. Atomic database should contains only symlinks.", std::string(table_path.path()));
throw Exception(
ErrorCodes::ABORTED, "'{}' is not a symlink. Atomic database should contains only symlinks.", std::string(table_path));
}
(void)fs::remove(table_path);
(void)shared_disk->removeFileIfExists(table_path);
}
}
LoadTaskPtr DatabaseAtomic::startupDatabaseAsync(AsyncLoader & async_loader, LoadJobSet startup_after, LoadingStrictnessLevel mode)
{
auto shared_disk = getContext()->getSharedDisk();
auto base = DatabaseOrdinary::startupDatabaseAsync(async_loader, std::move(startup_after), mode);
auto job = makeLoadJob(
base->goals(),
TablesLoaderBackgroundStartupPoolId,
fmt::format("startup Atomic database {}", getDatabaseName()),
[this, mode] (AsyncLoader &, const LoadJobPtr &)
[this, mode, shared_disk](AsyncLoader &, const LoadJobPtr &)
{
if (mode < LoadingStrictnessLevel::FORCE_RESTORE)
return;
@ -518,8 +539,8 @@ LoadTaskPtr DatabaseAtomic::startupDatabaseAsync(AsyncLoader & async_loader, Loa
std::lock_guard lock{mutex};
table_names = table_name_to_path;
}
fs::create_directories(path_to_table_symlinks);
if (shared_disk->isSymlinkSupported())
shared_disk->createDirectories(path_to_table_symlinks);
for (const auto & table : table_names)
{
/// All tables in database should be loaded at this point
@ -561,6 +582,9 @@ void DatabaseAtomic::stopLoading()
void DatabaseAtomic::tryCreateSymlink(const StoragePtr & table, bool if_data_path_exist)
{
auto shared_disk = getContext()->getSharedDisk();
if (!shared_disk->isSymlinkSupported())
return;
try
{
String table_name = table->getStorageID().getTableName();
@ -572,14 +596,13 @@ void DatabaseAtomic::tryCreateSymlink(const StoragePtr & table, bool if_data_pat
fs::path data = fs::weakly_canonical(table->getDataPaths()[0]);
/// If it already points where needed.
std::error_code ec;
if (fs::equivalent(data, link, ec))
if (shared_disk->equivalentNoThrow(data, link))
return;
if (if_data_path_exist && !fs::exists(data))
if (if_data_path_exist && !shared_disk->existsFileOrDirectory(data))
return;
fs::create_directory_symlink(data, link);
shared_disk->createDirectoriesSymlink(data, link);
}
catch (...)
{
@ -589,10 +612,14 @@ void DatabaseAtomic::tryCreateSymlink(const StoragePtr & table, bool if_data_pat
void DatabaseAtomic::tryRemoveSymlink(const String & table_name)
{
auto shared_disk = getContext()->getSharedDisk();
if (!shared_disk->isSymlinkSupported())
return;
try
{
String path = path_to_table_symlinks + escapeForFileName(table_name);
(void)fs::remove(path);
(void)shared_disk->removeFileIfExists(path);
}
catch (...)
{
@ -602,13 +629,18 @@ void DatabaseAtomic::tryRemoveSymlink(const String & table_name)
void DatabaseAtomic::tryCreateMetadataSymlink()
{
auto shared_disk = getContext()->getSharedDisk();
if (!shared_disk->isSymlinkSupported())
return;
/// Symlinks in data/db_name/ directory and metadata/db_name/ are not used by ClickHouse,
/// it's needed only for convenient introspection.
assert(path_to_metadata_symlink != metadata_path);
fs::path metadata_symlink(path_to_metadata_symlink);
if (fs::exists(metadata_symlink))
if (shared_disk->existsFile(metadata_symlink))
{
if (!FS::isSymlink(metadata_symlink))
if (!shared_disk->isSymlink(metadata_symlink))
throw Exception(ErrorCodes::FILE_ALREADY_EXISTS, "Directory {} exists", path_to_metadata_symlink);
}
else
@ -616,9 +648,9 @@ void DatabaseAtomic::tryCreateMetadataSymlink()
try
{
/// fs::exists could return false for broken symlink
if (FS::isSymlinkNoThrow(metadata_symlink))
(void)fs::remove(metadata_symlink);
fs::create_directory_symlink(metadata_path, path_to_metadata_symlink);
if (shared_disk->isSymlinkNoThrow(metadata_symlink))
(void)shared_disk->removeFileIfExists(metadata_symlink);
shared_disk->createDirectoriesSymlink(metadata_path, path_to_metadata_symlink);
}
catch (...)
{
@ -630,6 +662,7 @@ void DatabaseAtomic::tryCreateMetadataSymlink()
void DatabaseAtomic::renameDatabase(ContextPtr query_context, const String & new_name)
{
/// CREATE, ATTACH, DROP, DETACH and RENAME DATABASE must hold DDLGuard
auto shared_disk = getContext()->getSharedDisk();
createDirectories();
waitDatabaseStarted();
@ -645,7 +678,8 @@ void DatabaseAtomic::renameDatabase(ContextPtr query_context, const String & new
try
{
(void)fs::remove(path_to_metadata_symlink);
if (shared_disk->isSymlinkSupported())
(void)shared_disk->removeFileIfExists(path_to_metadata_symlink);
}
catch (...)
{
@ -682,7 +716,8 @@ void DatabaseAtomic::renameDatabase(ContextPtr query_context, const String & new
path_to_table_symlinks = getContext()->getPath() + "data/" + new_name_escaped + "/";
}
fs::rename(old_path_to_table_symlinks, path_to_table_symlinks);
if (shared_disk->isSymlinkSupported())
shared_disk->moveDirectory(old_path_to_table_symlinks, path_to_table_symlinks);
tryCreateMetadataSymlink();
}

View File

@ -1,5 +1,6 @@
#include <filesystem>
#include <Core/Settings.h>
#include <Databases/DatabaseFactory.h>
#include <Databases/DatabaseReplicated.h>
#include <Interpreters/Context.h>
@ -9,7 +10,6 @@
#include <Parsers/queryToString.h>
#include <Common/Macros.h>
#include <Common/filesystemHelpers.h>
#include <Core/Settings.h>
namespace fs = std::filesystem;
@ -32,16 +32,21 @@ namespace ErrorCodes
void cckMetadataPathForOrdinary(const ASTCreateQuery & create, const String & metadata_path)
{
auto shared_disk = Context::getGlobalContextInstance()->getSharedDisk();
if (!shared_disk->isSymlinkSupported())
return;
const String & engine_name = create.storage->engine->name;
const String & database_name = create.getDatabase();
if (engine_name != "Ordinary")
return;
if (!FS::isSymlink(metadata_path))
if (!shared_disk->isSymlink(metadata_path))
return;
String target_path = FS::readSymlink(metadata_path).string();
String target_path = shared_disk->readSymlink(metadata_path);
fs::path path_to_remove = metadata_path;
if (path_to_remove.filename().empty())
path_to_remove = path_to_remove.parent_path();

View File

@ -3,8 +3,10 @@
#include <filesystem>
#include <iterator>
#include <span>
#include <Core/Settings.h>
#include <Databases/DatabaseAtomic.h>
#include <Databases/DatabaseOrdinary.h>
#include <Disks/DiskLocal.h>
#include <Disks/IDisk.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
@ -29,7 +31,6 @@
#include <Common/filesystemHelpers.h>
#include <Common/logger_useful.h>
#include <Common/setThreadName.h>
#include <Core/Settings.h>
namespace fs = std::filesystem;
@ -52,11 +53,12 @@ namespace Setting
extern const SettingsUInt64 max_parser_depth;
}
static constexpr size_t METADATA_FILE_BUFFER_SIZE = 32768;
// static constexpr size_t METADATA_FILE_BUFFER_SIZE = 32768;
namespace ErrorCodes
{
extern const int CANNOT_GET_CREATE_TABLE_QUERY;
extern const int CANNOT_RMDIR;
extern const int NOT_IMPLEMENTED;
extern const int LOGICAL_ERROR;
extern const int FILE_DOESNT_EXIST;
@ -171,14 +173,8 @@ String getObjectDefinitionFromCreateQuery(const ASTPtr & query)
DatabaseOnDisk::DatabaseOnDisk(
const String & name,
const String & metadata_path_,
const String & data_path_,
const String & logger,
ContextPtr local_context)
: DatabaseWithOwnTablesBase(name, logger, local_context)
, metadata_path(metadata_path_)
, data_path(data_path_)
const String & name, const String & metadata_path_, const String & data_path_, const String & logger, ContextPtr local_context)
: DatabaseWithOwnTablesBase(name, logger, local_context), metadata_path(metadata_path_), data_path(data_path_)
{
}
@ -191,8 +187,10 @@ void DatabaseOnDisk::createDirectories()
void DatabaseOnDisk::createDirectoriesUnlocked()
{
fs::create_directories(std::filesystem::path(getContext()->getPath()) / data_path);
fs::create_directories(metadata_path);
auto shared_disk = getContext()->getSharedDisk();
shared_disk->createDirectories(metadata_path);
shared_disk->createDirectories(std::filesystem::path(getContext()->getPath()) / data_path);
}
@ -209,6 +207,8 @@ void DatabaseOnDisk::createTable(
const StoragePtr & table,
const ASTPtr & query)
{
auto shared_disk = getContext()->getSharedDisk();
createDirectories();
const auto & settings = local_context->getSettingsRef();
@ -238,7 +238,7 @@ void DatabaseOnDisk::createTable(
if (create.attach_short_syntax)
{
/// Metadata already exists, table was detached
assert(fs::exists(getObjectMetadataPath(table_name)));
assert(shared_disk->existsFileOrDirectory(getObjectMetadataPath(table_name)));
removeDetachedPermanentlyFlag(local_context, table_name, table_metadata_path, true);
attachTable(local_context, table_name, table, getTableDataPath(create));
return;
@ -247,7 +247,7 @@ void DatabaseOnDisk::createTable(
if (!create.attach)
checkMetadataFilenameAvailability(table_name);
if (create.attach && fs::exists(table_metadata_path))
if (create.attach && shared_disk->existsFileOrDirectory(table_metadata_path))
{
ASTPtr ast_detached = parseQueryFromMetadata(log, local_context, table_metadata_path);
auto & create_detached = ast_detached->as<ASTCreateQuery &>();
@ -285,12 +285,11 @@ void DatabaseOnDisk::createTable(
/// .sql.detached extension, is not needed anymore since we attached the table back
void DatabaseOnDisk::removeDetachedPermanentlyFlag(ContextPtr, const String & table_name, const String & table_metadata_path, bool)
{
auto shared_disk = getContext()->getSharedDisk();
try
{
fs::path detached_permanently_flag(table_metadata_path + detached_suffix);
if (fs::exists(detached_permanently_flag))
(void)fs::remove(detached_permanently_flag);
(void)shared_disk->removeFileIfExists(detached_permanently_flag);
}
catch (Exception & e)
{
@ -303,6 +302,7 @@ void DatabaseOnDisk::commitCreateTable(const ASTCreateQuery & query, const Stora
const String & table_metadata_tmp_path, const String & table_metadata_path,
ContextPtr query_context)
{
auto shared_disk = getContext()->getSharedDisk();
try
{
createDirectories();
@ -312,11 +312,11 @@ void DatabaseOnDisk::commitCreateTable(const ASTCreateQuery & query, const Stora
/// If it was ATTACH query and file with table metadata already exist
/// (so, ATTACH is done after DETACH), then rename atomically replaces old file with new one.
fs::rename(table_metadata_tmp_path, table_metadata_path);
shared_disk->replaceFile(table_metadata_tmp_path, table_metadata_path);
}
catch (...)
{
(void)fs::remove(table_metadata_tmp_path);
(void)shared_disk->removeFileIfExists(table_metadata_tmp_path);
throw;
}
}
@ -325,12 +325,14 @@ void DatabaseOnDisk::detachTablePermanently(ContextPtr query_context, const Stri
{
waitDatabaseStarted();
auto shared_disk = getContext()->getSharedDisk();
auto table = detachTable(query_context, table_name);
fs::path detached_permanently_flag(getObjectMetadataPath(table_name) + detached_suffix);
try
{
FS::createFile(detached_permanently_flag);
shared_disk->createFile(detached_permanently_flag);
std::lock_guard lock(mutex);
const auto it = snapshot_detached_tables.find(table_name);
@ -352,6 +354,8 @@ void DatabaseOnDisk::dropTable(ContextPtr local_context, const String & table_na
{
waitDatabaseStarted();
auto shared_disk = getContext()->getSharedDisk();
String table_metadata_path = getObjectMetadataPath(table_name);
String table_metadata_path_drop = table_metadata_path + drop_suffix;
String table_data_path_relative = getTableDataPath(table_name);
@ -363,7 +367,7 @@ void DatabaseOnDisk::dropTable(ContextPtr local_context, const String & table_na
bool renamed = false;
try
{
fs::rename(table_metadata_path, table_metadata_path_drop);
shared_disk->replaceFile(table_metadata_path, table_metadata_path_drop);
renamed = true;
// The table might be not loaded for Lazy database engine.
if (table)
@ -378,7 +382,7 @@ void DatabaseOnDisk::dropTable(ContextPtr local_context, const String & table_na
if (table)
attachTable(local_context, table_name, table, table_data_path_relative);
if (renamed)
fs::rename(table_metadata_path_drop, table_metadata_path);
shared_disk->replaceFile(table_metadata_path_drop, table_metadata_path);
throw;
}
@ -390,7 +394,7 @@ void DatabaseOnDisk::dropTable(ContextPtr local_context, const String & table_na
LOG_INFO(log, "Removing data directory from disk {} with path {} for dropped table {} ", disk_name, table_data_path_relative, table_name);
disk->removeRecursive(table_data_path_relative);
}
(void)fs::remove(table_metadata_path_drop);
(void)shared_disk->removeFileIfExists(table_metadata_path_drop);
}
void DatabaseOnDisk::checkMetadataFilenameAvailability(const String & to_table_name) const
@ -401,13 +405,14 @@ void DatabaseOnDisk::checkMetadataFilenameAvailability(const String & to_table_n
void DatabaseOnDisk::checkMetadataFilenameAvailabilityUnlocked(const String & to_table_name) const
{
auto shared_disk = getContext()->getSharedDisk();
String table_metadata_path = getObjectMetadataPath(to_table_name);
if (fs::exists(table_metadata_path))
if (shared_disk->existsFile(table_metadata_path))
{
fs::path detached_permanently_flag(table_metadata_path + detached_suffix);
if (fs::exists(detached_permanently_flag))
if (shared_disk->existsFile(detached_permanently_flag))
throw Exception(ErrorCodes::TABLE_ALREADY_EXISTS, "Table {}.{} already exists (detached permanently)",
backQuote(database_name), backQuote(to_table_name));
throw Exception(
@ -426,6 +431,8 @@ void DatabaseOnDisk::renameTable(
if (exchange)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Tables can be exchanged only in Atomic databases");
auto shared_disk = getContext()->getSharedDisk();
bool from_ordinary_to_atomic = false;
bool from_atomic_to_ordinary = false;
if (typeid(*this) != typeid(to_database))
@ -509,7 +516,7 @@ void DatabaseOnDisk::renameTable(
/// Now table data are moved to new database, so we must add metadata and attach table to new database
to_database.createTable(local_context, to_table_name, table, attach_query);
(void)fs::remove(table_metadata_path);
(void)shared_disk->removeFileIfExists(table_metadata_path);
if (from_atomic_to_ordinary)
{
@ -587,27 +594,32 @@ void DatabaseOnDisk::drop(ContextPtr local_context)
{
waitDatabaseStarted();
auto shared_disk = getContext()->getSharedDisk();
assert(TSA_SUPPRESS_WARNING_FOR_READ(tables).empty());
if (local_context->getSettingsRef()[Setting::force_remove_data_recursively_on_drop])
{
(void)fs::remove_all(std::filesystem::path(getContext()->getPath()) / data_path);
(void)fs::remove_all(getMetadataPath());
(void)shared_disk->removeRecursive(std::filesystem::path(getContext()->getPath()) / data_path);
(void)shared_disk->removeRecursive(getMetadataPath());
}
else
{
try
{
(void)fs::remove(std::filesystem::path(getContext()->getPath()) / data_path);
(void)fs::remove(getMetadataPath());
(void)shared_disk->removeDirectoryIfExists(std::filesystem::path(getContext()->getPath()) / data_path);
(void)shared_disk->removeDirectoryIfExists(getMetadataPath());
}
catch (const fs::filesystem_error & e)
catch (const Exception & e)
{
if (e.code() != std::errc::directory_not_empty)
throw Exception(Exception::CreateFromSTDTag{}, e);
throw Exception(ErrorCodes::DATABASE_NOT_EMPTY, "Cannot drop: {}. "
if (e.code() != ErrorCodes::CANNOT_RMDIR)
throw;
throw Exception(
ErrorCodes::DATABASE_NOT_EMPTY,
"Cannot drop: {}. "
"Probably database contain some detached tables or metadata leftovers from Ordinary engine. "
"If you want to remove all data anyway, try to attach database back and drop it again "
"with enabled force_remove_data_recursively_on_drop setting", e.what());
"with enabled force_remove_data_recursively_on_drop setting",
e.what());
}
}
}
@ -619,10 +631,19 @@ String DatabaseOnDisk::getObjectMetadataPath(const String & object_name) const
time_t DatabaseOnDisk::getObjectMetadataModificationTime(const String & object_name) const
{
auto shared_disk = getContext()->getSharedDisk();
String table_metadata_path = getObjectMetadataPath(object_name);
if (!shared_disk->existsFileOrDirectory(table_metadata_path))
return static_cast<time_t>(0);
try
{
return FS::getModificationTime(table_metadata_path);
return shared_disk->getLastModified(table_metadata_path).epochTime();
}
catch (const Exception &)
{
throw;
}
catch (const fs::filesystem_error & e)
{
@ -636,7 +657,8 @@ time_t DatabaseOnDisk::getObjectMetadataModificationTime(const String & object_n
void DatabaseOnDisk::iterateMetadataFiles(const IteratingFunction & process_metadata_file) const
{
if (!fs::exists(metadata_path))
auto shared_disk = getContext()->getSharedDisk();
if (!shared_disk->existsDirectory(metadata_path))
return;
auto process_tmp_drop_metadata_file = [&](const String & file_name)
@ -645,26 +667,26 @@ void DatabaseOnDisk::iterateMetadataFiles(const IteratingFunction & process_meta
static const char * tmp_drop_ext = ".sql.tmp_drop";
const std::string object_name = file_name.substr(0, file_name.size() - strlen(tmp_drop_ext));
if (fs::exists(std::filesystem::path(getContext()->getPath()) / data_path / object_name))
if (shared_disk->existsFile(std::filesystem::path(getContext()->getPath()) / data_path / object_name))
{
fs::rename(getMetadataPath() + file_name, getMetadataPath() + object_name + ".sql");
shared_disk->replaceFile(getMetadataPath() + file_name, getMetadataPath() + object_name + ".sql");
LOG_WARNING(log, "Object {} was not dropped previously and will be restored", backQuote(object_name));
process_metadata_file(object_name + ".sql");
}
else
{
LOG_INFO(log, "Removing file {}", getMetadataPath() + file_name);
(void)fs::remove(getMetadataPath() + file_name);
(void)shared_disk->removeFileIfExists(getMetadataPath() + file_name);
}
};
/// Metadata files to load: name and flag for .tmp_drop files
std::vector<std::pair<String, bool>> metadata_files;
fs::directory_iterator dir_end;
for (fs::directory_iterator dir_it(metadata_path); dir_it != dir_end; ++dir_it)
for (const auto dir_it = shared_disk->iterateDirectory(metadata_path); dir_it->isValid(); dir_it->next())
{
String file_name = dir_it->path().filename();
auto dir_path = fs::path(dir_it->path());
String file_name = dir_path.filename();
/// For '.svn', '.gitignore' directory and similar.
if (file_name.at(0) == '.')
continue;
@ -685,8 +707,8 @@ void DatabaseOnDisk::iterateMetadataFiles(const IteratingFunction & process_meta
else if (endsWith(file_name, ".sql.tmp"))
{
/// There are files .sql.tmp - delete
LOG_INFO(log, "Removing file {}", dir_it->path().string());
(void)fs::remove(dir_it->path());
LOG_INFO(log, "Removing file {}", dir_path.string());
(void)shared_disk->removeFileIfExists(dir_path);
}
else if (endsWith(file_name, ".sql"))
{
@ -729,13 +751,11 @@ ASTPtr DatabaseOnDisk::parseQueryFromMetadata(
bool throw_on_error /*= true*/,
bool remove_empty /*= false*/)
{
String query;
auto shared_disk = Context::getGlobalContextInstance()->getSharedDisk();
int metadata_file_fd = ::open(metadata_file_path.c_str(), O_RDONLY | O_CLOEXEC);
if (metadata_file_fd == -1)
if (!shared_disk->existsFile(metadata_file_path))
{
if (errno == ENOENT && !throw_on_error)
if (!throw_on_error)
return nullptr;
ErrnoException::throwFromPath(
@ -745,8 +765,9 @@ ASTPtr DatabaseOnDisk::parseQueryFromMetadata(
metadata_file_path);
}
ReadBufferFromFile in(metadata_file_fd, metadata_file_path, METADATA_FILE_BUFFER_SIZE);
readStringUntilEOF(query, in);
auto read_buf = shared_disk->readFile(metadata_file_path, ReadSettings{});
String query;
readStringUntilEOF(query, *read_buf);
/** Empty files with metadata are generated after a rough restart of the server.
* Remove these files to slightly reduce the work of the admins on startup.
@ -755,7 +776,7 @@ ASTPtr DatabaseOnDisk::parseQueryFromMetadata(
{
if (logger)
LOG_ERROR(logger, "File {} is empty. Removing.", metadata_file_path);
(void)fs::remove(metadata_file_path);
(void)shared_disk->removeFileIfExists(metadata_file_path);
return nullptr;
}
@ -893,6 +914,7 @@ void DatabaseOnDisk::modifySettingsMetadata(const SettingsChanges & settings_cha
out.sync();
out.close();
fs::rename(metadata_file_tmp_path, metadata_file_path);
auto shared_disk = getContext()->getSharedDisk();
shared_disk->replaceFile(metadata_file_tmp_path, metadata_file_path);
}
}

View File

@ -1,10 +1,11 @@
#pragma once
#include <Common/escapeForFileName.h>
#include <Common/quoteString.h>
#include <Databases/DatabasesCommon.h>
#include <Disks/IDisk.h>
#include <Parsers/ASTCreateQuery.h>
#include <Storages/IStorage.h>
#include <Common/escapeForFileName.h>
#include <Common/quoteString.h>
namespace DB
@ -68,7 +69,8 @@ public:
String getTableDataPath(const ASTCreateQuery & query) const override { return getTableDataPath(query.getTable()); }
String getMetadataPath() const override { return metadata_path; }
static ASTPtr parseQueryFromMetadata(LoggerPtr log, ContextPtr context, const String & metadata_file_path, bool throw_on_error = true, bool remove_empty = false);
static ASTPtr parseQueryFromMetadata(
LoggerPtr log, ContextPtr context, const String & metadata_file_path, bool throw_on_error = true, bool remove_empty = false);
/// will throw when the table we want to attach already exists (in active / detached / detached permanently form)
void checkMetadataFilenameAvailability(const String & to_table_name) const override;

View File

@ -154,6 +154,8 @@ String DatabaseOrdinary::getConvertToReplicatedFlagPath(const String & name, con
void DatabaseOrdinary::convertMergeTreeToReplicatedIfNeeded(ASTPtr ast, const QualifiedTableName & qualified_name, const String & file_name)
{
auto shared_disk = getContext()->getSharedDisk();
fs::path path(getMetadataPath());
fs::path file_path(file_name);
fs::path full_path = path / file_path;
@ -172,7 +174,7 @@ void DatabaseOrdinary::convertMergeTreeToReplicatedIfNeeded(ASTPtr ast, const Qu
auto convert_to_replicated_flag_path = getConvertToReplicatedFlagPath(qualified_name.table, policy, false);
if (!fs::exists(convert_to_replicated_flag_path))
if (!shared_disk->existsFile(convert_to_replicated_flag_path))
return;
if (getUUID() == UUIDHelpers::Nil)
@ -195,7 +197,7 @@ void DatabaseOrdinary::convertMergeTreeToReplicatedIfNeeded(ASTPtr ast, const Qu
out.sync();
out.close();
}
fs::rename(table_metadata_tmp_path, table_metadata_path);
shared_disk->replaceFile(table_metadata_tmp_path, table_metadata_path);
LOG_INFO(
log,
@ -207,10 +209,12 @@ void DatabaseOrdinary::convertMergeTreeToReplicatedIfNeeded(ASTPtr ast, const Qu
void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTablesMetadata & metadata, bool is_startup)
{
auto shared_disk = getContext()->getSharedDisk();
size_t prev_tables_count = metadata.parsed_tables.size();
size_t prev_total_dictionaries = metadata.total_dictionaries;
auto process_metadata = [&metadata, is_startup, local_context, this](const String & file_name)
auto process_metadata = [&metadata, is_startup, local_context, shared_disk, this](const String & file_name)
{
fs::path path(getMetadataPath());
fs::path file_path(file_name);
@ -244,7 +248,7 @@ void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTables
}
}
if (fs::exists(full_path.string() + detached_suffix))
if (shared_disk->existsFile(full_path.string() + detached_suffix))
{
const std::string table_name = unescapeForFileName(file_name.substr(0, file_name.size() - 4));
LOG_DEBUG(log, "Skipping permanently detached table {}.", backQuote(table_name));
@ -355,11 +359,13 @@ void DatabaseOrdinary::restoreMetadataAfterConvertingToReplicated(StoragePtr tab
if (!rmt)
return;
auto shared_disk = getContext()->getSharedDisk();
auto convert_to_replicated_flag_path = getConvertToReplicatedFlagPath(name.table, table->getStoragePolicy(), true);
if (!fs::exists(convert_to_replicated_flag_path))
if (!shared_disk->existsFile(convert_to_replicated_flag_path))
return;
(void)fs::remove(convert_to_replicated_flag_path);
(void)shared_disk->removeFileIfExists(convert_to_replicated_flag_path);
LOG_INFO
(
log,
@ -595,14 +601,15 @@ void DatabaseOrdinary::alterTable(ContextPtr local_context, const StorageID & ta
void DatabaseOrdinary::commitAlterTable(const StorageID &, const String & table_metadata_tmp_path, const String & table_metadata_path, const String & /*statement*/, ContextPtr /*query_context*/)
{
auto shared_disk = getContext()->getSharedDisk();
try
{
/// rename atomically replaces the old file with the new one.
fs::rename(table_metadata_tmp_path, table_metadata_path);
(void)shared_disk->replaceFile(table_metadata_tmp_path, table_metadata_path);
}
catch (...)
{
(void)fs::remove(table_metadata_tmp_path);
(void)shared_disk->removeFileIfExists(table_metadata_tmp_path);
throw;
}
}

View File

@ -185,11 +185,13 @@ void DatabaseMaterializedMySQL::alterTable(ContextPtr context_, const StorageID
void DatabaseMaterializedMySQL::drop(ContextPtr context_)
{
LOG_TRACE(log, "Dropping MaterializeMySQL database");
auto shared_disk = getContext()->getSharedDisk();
/// Remove metadata info
fs::path metadata(getMetadataPath() + "/.metadata");
if (fs::exists(metadata))
(void)fs::remove(metadata);
if (shared_disk->existsFile(metadata))
(void)shared_disk->removeFileIfExists(metadata);
DatabaseAtomic::drop(context_);
}

View File

@ -13,9 +13,11 @@
#include <Disks/loadLocalDiskConfig.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <filesystem>
#include <fstream>
#include <unistd.h>
#include <system_error>
#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h>
#include <Disks/IO/WriteBufferFromTemporaryFile.h>
@ -313,7 +315,7 @@ DirectoryIteratorPtr DiskLocal::iterateDirectory(const String & path) const
void DiskLocal::moveFile(const String & from_path, const String & to_path)
{
renameNoReplace(fs::path(disk_path) / from_path, fs::path(disk_path) / to_path);
DB::renameNoReplace(fs::path(disk_path) / from_path, fs::path(disk_path) / to_path);
}
void DiskLocal::replaceFile(const String & from_path, const String & to_path)
@ -324,6 +326,21 @@ void DiskLocal::replaceFile(const String & from_path, const String & to_path)
fs::rename(from_file, to_file);
}
void DiskLocal::renameNoReplace(const std::string & old_path, const std::string & new_path)
{
DB::renameNoReplace(fs::path(disk_path) / old_path, fs::path(disk_path) / new_path);
}
void DiskLocal::renameExchange(const std::string & old_path, const std::string & new_path)
{
DB::renameExchange(fs::path(disk_path) / old_path, fs::path(disk_path) / new_path);
}
bool DiskLocal::renameExchangeIfSupported(const std::string & old_path, const std::string & new_path)
{
return DB::renameExchangeIfSupported(fs::path(disk_path) / old_path, fs::path(disk_path) / new_path);
}
std::unique_ptr<ReadBufferFromFileBase> DiskLocal::readFile(const String & path, const ReadSettings & settings, std::optional<size_t> read_hint, std::optional<size_t> file_size) const
{
if (!file_size.has_value())
@ -363,7 +380,7 @@ void DiskLocal::removeFile(const String & path)
{
auto fs_path = fs::path(disk_path) / path;
if (0 != unlink(fs_path.c_str()))
ErrnoException::throwFromPath(ErrorCodes::CANNOT_UNLINK, fs_path, "Cannot unlink file {}", fs_path);
fs::remove(fs_path);
}
void DiskLocal::removeFileIfExists(const String & path)
@ -383,6 +400,14 @@ void DiskLocal::removeDirectory(const String & path)
ErrnoException::throwFromPath(ErrorCodes::CANNOT_RMDIR, fs_path, "Cannot remove directory {}", fs_path);
}
void DiskLocal::removeDirectoryIfExists(const String & path)
{
auto fs_path = fs::path(disk_path) / path;
if (0 != rmdir(fs_path.c_str()))
if (errno != ENOENT)
ErrnoException::throwFromPath(ErrorCodes::CANNOT_RMDIR, fs_path, "Cannot remove directory {}", fs_path);
}
void DiskLocal::removeRecursive(const String & path)
{
(void)fs::remove_all(fs::path(disk_path) / path);
@ -415,6 +440,37 @@ void DiskLocal::createHardLink(const String & src_path, const String & dst_path)
DB::createHardLink(fs::path(disk_path) / src_path, fs::path(disk_path) / dst_path);
}
bool DiskLocal::isSymlink(const String & path) const
{
return FS::isSymlink(path);
}
bool DiskLocal::isSymlinkNoThrow(const String & path) const
{
return FS::isSymlinkNoThrow(path);
}
void DiskLocal::createDirectoriesSymlink(const String & target, const String & link)
{
fs::create_directory_symlink(target, link);
}
String DiskLocal::readSymlink(const fs::path & path) const
{
return FS::readSymlink(path);
}
bool DiskLocal::equivalent(const String & p1, const String & p2) const
{
return fs::equivalent(p1, p2);
}
bool DiskLocal::equivalentNoThrow(const String & p1, const String & p2) const
{
std::error_code ec;
return fs::equivalent(p1, p2, ec);
}
void DiskLocal::truncateFile(const String & path, size_t size)
{
int res = truncate((fs::path(disk_path) / path).string().data(), size);

View File

@ -64,6 +64,12 @@ public:
void replaceFile(const String & from_path, const String & to_path) override;
void renameNoReplace(const std::string & old_path, const std::string & new_path) override;
void renameExchange(const std::string & old_path, const std::string & new_path) override;
bool renameExchangeIfSupported(const std::string & old_path, const std::string & new_path) override;
void copyDirectoryContent(
const String & from_dir,
const std::shared_ptr<IDisk> & to_disk,
@ -92,6 +98,7 @@ public:
void removeFile(const String & path) override;
void removeFileIfExists(const String & path) override;
void removeDirectory(const String & path) override;
void removeDirectoryIfExists(const String & path) override;
void removeRecursive(const String & path) override;
void setLastModified(const String & path, const Poco::Timestamp & timestamp) override;
@ -104,6 +111,20 @@ public:
void createHardLink(const String & src_path, const String & dst_path) override;
bool isSymlinkSupported() const override { return true; }
bool isSymlink(const String & path) const override;
bool isSymlinkNoThrow(const String & path) const override;
void createDirectoriesSymlink(const String & target, const String & link) override;
String readSymlink(const fs::path & path) const override;
bool equivalent(const String & p1, const String & p2) const override;
bool equivalentNoThrow(const String & p1, const String & p2) const override;
void truncateFile(const String & path, size_t size) override;
DataSourceDescription getDataSourceDescription() const override;

View File

@ -193,6 +193,26 @@ public:
/// If a file with `to_path` path already exists, it will be replaced.
virtual void replaceFile(const String & from_path, const String & to_path) = 0;
virtual void renameNoReplace(const std::string &, const std::string &)
{
throw Exception(
ErrorCodes::NOT_IMPLEMENTED, "Method `renameNoReplace()` not implemented for disk: {}", getDataSourceDescription().toString());
}
virtual void renameExchange(const std::string &, const std::string &)
{
throw Exception(
ErrorCodes::NOT_IMPLEMENTED, "Method `renameExchange()` not implemented for disk: {}", getDataSourceDescription().toString());
}
virtual bool renameExchangeIfSupported(const std::string &, const std::string &)
{
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
"Method `renameExchangeIfSupported()` not implemented for disk: {}",
getDataSourceDescription().toString());
}
/// Recursively copy files from from_dir to to_dir. Create to_dir if not exists.
virtual void copyDirectoryContent(
const String & from_dir,
@ -246,6 +266,14 @@ public:
/// Remove directory. Throws exception if it's not a directory or if directory is not empty.
virtual void removeDirectory(const String & path) = 0;
virtual void removeDirectoryIfExists(const String &)
{
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
"Method `removeDirectoryIfExists()` is not implemented for disk: {}",
getDataSourceDescription().toString());
}
/// Remove file or directory with all children. Use with extra caution. Throws exception if file doesn't exists.
virtual void removeRecursive(const String & path) = 0;
@ -357,6 +385,49 @@ public:
/// Create hardlink from `src_path` to `dst_path`.
virtual void createHardLink(const String & src_path, const String & dst_path) = 0;
virtual bool isSymlinkSupported() const { return false; }
virtual bool isSymlink(const String &) const
{
throw Exception(
ErrorCodes::NOT_IMPLEMENTED, "Method isSymlink() is not implemented for disk type: {}", getDataSourceDescription().toString());
}
virtual bool isSymlinkNoThrow(const String &) const
{
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
"Method isSymlinkNothrow() is not implemented for disk type: {}",
getDataSourceDescription().toString());
}
virtual void createDirectoriesSymlink(const String &, const String &)
{
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
"Method createDirectoriesSymlink() is not implemented for disk type: {}",
getDataSourceDescription().toString());
}
virtual String readSymlink(const fs::path &) const
{
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
"Method readSymlink() is not implemented for disk type: {}",
getDataSourceDescription().toString());
}
virtual bool equivalent(const String &, const String &) const
{
throw Exception(
ErrorCodes::NOT_IMPLEMENTED, "Method equivalent() is not implemented for disk type: {}", getDataSourceDescription().toString());
}
virtual bool equivalentNoThrow(const String &, const String &) const
{
throw Exception(
ErrorCodes::NOT_IMPLEMENTED, "Method equivalent() is not implemented for disk type: {}", getDataSourceDescription().toString());
}
/// Truncate file to specified size.
virtual void truncateFile(const String & path, size_t size);

View File

@ -360,6 +360,8 @@ struct ContextSharedPart : boost::noncopyable
ConfigurationPtr config TSA_GUARDED_BY(mutex); /// Global configuration settings.
String tmp_path TSA_GUARDED_BY(mutex); /// Path to the temporary files that occur when processing the request.
std::shared_ptr<IDisk> shared_disk;
/// All temporary files that occur when processing the requests accounted here.
/// Child scopes for more fine-grained accounting are created per user/query/etc.
/// Initialized once during server startup.
@ -1129,6 +1131,11 @@ String Context::getFilesystemCachesPath() const
return shared->filesystem_caches_path;
}
std::shared_ptr<IDisk> Context::getSharedDisk() const
{
return shared->shared_disk;
}
String Context::getFilesystemCacheUser() const
{
SharedLockGuard lock(shared->mutex);
@ -1217,6 +1224,8 @@ void Context::setPath(const String & path)
shared->path = path;
shared->shared_disk = std::make_shared<DiskLocal>("shared", path);
if (shared->tmp_path.empty() && !shared->root_temp_data_on_disk)
shared->tmp_path = shared->path + "tmp/";
@ -1254,21 +1263,23 @@ try
{
LOG_DEBUG(log, "Setting up {} to store temporary data in it", path);
if (fs::exists(path))
auto shared_disk = Context::getGlobalContextInstance()->getSharedDisk();
if (shared_disk->existsDirectory(path))
{
/// Clearing old temporary files.
fs::directory_iterator dir_end;
for (fs::directory_iterator it(path); it != dir_end; ++it)
for (const auto dir_it = shared_disk->iterateDirectory(path); dir_it->isValid(); dir_it->next())
{
if (it->is_regular_file())
auto file_path = fs::path(dir_it->path());
if (shared_disk->existsFile(file_path))
{
if (startsWith(it->path().filename(), "tmp"))
if (startsWith(file_path.filename(), "tmp"))
{
LOG_DEBUG(log, "Removing old temporary file {}", it->path().string());
fs::remove(it->path());
LOG_DEBUG(log, "Removing old temporary file {}", file_path.string());
shared_disk->removeFileIfExists(file_path);
}
else
LOG_DEBUG(log, "Found unknown file in temporary path {}", it->path().string());
LOG_DEBUG(log, "Found unknown file in temporary path {}", file_path.string());
}
/// We skip directories (for example, 'http_buffers' - it's used for buffering of the results) and all other file types.
}
@ -4781,14 +4792,16 @@ void Context::checkCanBeDropped(const String & database, const String & table, c
if (!max_size_to_drop || size <= max_size_to_drop)
return;
auto shared_disk = getSharedDisk();
fs::path force_file(getFlagsPath() + "force_drop_table");
bool force_file_exists = fs::exists(force_file);
bool force_file_exists = shared_disk->existsFile(force_file);
if (force_file_exists)
{
try
{
fs::remove(force_file);
shared_disk->removeFileIfExists(force_file);
return;
}
catch (...)

View File

@ -584,6 +584,7 @@ public:
String getUserScriptsPath() const;
String getFilesystemCachesPath() const;
String getFilesystemCacheUser() const;
std::shared_ptr<IDisk> getSharedDisk() const;
/// A list of warnings about server configuration to place in `system.warnings` table.
Strings getWarnings() const;

View File

@ -617,6 +617,7 @@ DatabasePtr DatabaseCatalog::detachDatabase(ContextPtr local_context, const Stri
if (drop)
{
auto shared_disk = getContext()->getSharedDisk();
UUID db_uuid = db->getUUID();
/// Delete the database.
@ -625,9 +626,9 @@ DatabasePtr DatabaseCatalog::detachDatabase(ContextPtr local_context, const Stri
/// Old ClickHouse versions did not store database.sql files
/// Remove metadata dir (if exists) to avoid recreation of .sql file on server startup
fs::path database_metadata_dir = fs::path(getContext()->getPath()) / "metadata" / escapeForFileName(database_name);
fs::remove(database_metadata_dir);
shared_disk->removeDirectoryIfExists(database_metadata_dir);
fs::path database_metadata_file = fs::path(getContext()->getPath()) / "metadata" / (escapeForFileName(database_name) + ".sql");
fs::remove(database_metadata_file);
shared_disk->removeFileIfExists(database_metadata_file);
if (db_uuid != UUIDHelpers::Nil)
removeUUIDMappingFinally(db_uuid);
@ -996,6 +997,8 @@ void DatabaseCatalog::loadMarkedAsDroppedTables()
{
assert(!cleanup_task);
auto shared_disk = getContext()->getSharedDisk();
/// /clickhouse_root/metadata_dropped/ contains files with metadata of tables,
/// which where marked as dropped by Atomic databases.
/// Data directories of such tables still exists in store/
@ -1006,41 +1009,41 @@ void DatabaseCatalog::loadMarkedAsDroppedTables()
std::map<String, StorageID> dropped_metadata;
String path = std::filesystem::path(getContext()->getPath()) / "metadata_dropped" / "";
if (!std::filesystem::exists(path))
{
if (!shared_disk->existsDirectory(path))
return;
}
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator it(path); it != dir_end; ++it)
for (const auto dir_it = shared_disk->iterateDirectory(path); dir_it->isValid(); dir_it->next())
{
auto sub_path = fs::path(dir_it->path());
/// File name has the following format:
/// database_name.table_name.uuid.sql
auto sub_path_filename = sub_path.filename().string();
/// Ignore unexpected files
if (!it.name().ends_with(".sql"))
if (!sub_path_filename.ends_with(".sql"))
continue;
/// Process .sql files with metadata of tables which were marked as dropped
StorageID dropped_id = StorageID::createEmpty();
size_t dot_pos = it.name().find('.');
size_t dot_pos = sub_path_filename.find('.');
if (dot_pos == std::string::npos)
continue;
dropped_id.database_name = unescapeForFileName(it.name().substr(0, dot_pos));
dropped_id.database_name = unescapeForFileName(sub_path_filename.substr(0, dot_pos));
size_t prev_dot_pos = dot_pos;
dot_pos = it.name().find('.', prev_dot_pos + 1);
dot_pos = sub_path_filename.find('.', prev_dot_pos + 1);
if (dot_pos == std::string::npos)
continue;
dropped_id.table_name = unescapeForFileName(it.name().substr(prev_dot_pos + 1, dot_pos - prev_dot_pos - 1));
dropped_id.table_name = unescapeForFileName(sub_path_filename.substr(prev_dot_pos + 1, dot_pos - prev_dot_pos - 1));
prev_dot_pos = dot_pos;
dot_pos = it.name().find('.', prev_dot_pos + 1);
dot_pos = sub_path_filename.find('.', prev_dot_pos + 1);
if (dot_pos == std::string::npos)
continue;
dropped_id.uuid = parse<UUID>(it.name().substr(prev_dot_pos + 1, dot_pos - prev_dot_pos - 1));
dropped_id.uuid = parse<UUID>(sub_path_filename.substr(prev_dot_pos + 1, dot_pos - prev_dot_pos - 1));
String full_path = path + it.name();
String full_path = path + sub_path_filename;
dropped_metadata.emplace(std::move(full_path), std::move(dropped_id));
}
@ -1084,6 +1087,8 @@ void DatabaseCatalog::enqueueDroppedTableCleanup(StorageID table_id, StoragePtr
assert(!table || table->getStorageID().uuid == table_id.uuid);
assert(dropped_metadata_path == getPathForDroppedMetadata(table_id));
auto shared_disk = getContext()->getSharedDisk();
/// Table was removed from database. Enqueue removal of its data from disk.
time_t drop_time;
if (table)
@ -1128,7 +1133,7 @@ void DatabaseCatalog::enqueueDroppedTableCleanup(StorageID table_id, StoragePtr
}
addUUIDMapping(table_id.uuid);
drop_time = FS::getModificationTime(dropped_metadata_path);
drop_time = shared_disk->getLastModified(dropped_metadata_path).epochTime();
}
std::lock_guard lock(tables_marked_dropped_mutex);
@ -1161,6 +1166,8 @@ void DatabaseCatalog::enqueueDroppedTableCleanup(StorageID table_id, StoragePtr
void DatabaseCatalog::undropTable(StorageID table_id)
{
auto shared_disk = getContext()->getSharedDisk();
String latest_metadata_dropped_path;
TableMarkedAsDropped dropped_table;
{
@ -1198,7 +1205,7 @@ void DatabaseCatalog::undropTable(StorageID table_id)
/// a table is successfully marked undropped,
/// if and only if its metadata file was moved to a database.
/// This maybe throw exception.
renameNoReplace(latest_metadata_dropped_path, table_metadata_path);
shared_disk->moveFile(latest_metadata_dropped_path, table_metadata_path);
if (first_async_drop_in_queue == it_dropped_table)
++first_async_drop_in_queue;
@ -1381,6 +1388,8 @@ void DatabaseCatalog::dropTableDataTask()
void DatabaseCatalog::dropTableFinally(const TableMarkedAsDropped & table)
{
auto shared_disk = getContext()->getSharedDisk();
if (table.table)
{
table.table->drop();
@ -1398,7 +1407,7 @@ void DatabaseCatalog::dropTableFinally(const TableMarkedAsDropped & table)
}
LOG_INFO(log, "Removing metadata {} of dropped table {}", table.metadata_path, table.table_id.getNameForLogs());
fs::remove(fs::path(table.metadata_path));
shared_disk->removeFileIfExists(fs::path(table.metadata_path));
removeUUIDMappingFinally(table.table_id.uuid);
CurrentMetrics::sub(CurrentMetrics::TablesToDropQueueSize, 1);

View File

@ -228,16 +228,18 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
db_num_limit, db_count);
}
auto shared_disk = getContext()->getSharedDisk();
/// Will write file with database metadata, if needed.
String database_name_escaped = escapeForFileName(database_name);
fs::path metadata_path = fs::weakly_canonical(getContext()->getPath());
fs::create_directories(metadata_path / "metadata");
shared_disk->createDirectories(metadata_path / "metadata");
fs::path metadata_file_tmp_path = metadata_path / "metadata" / (database_name_escaped + ".sql.tmp");
fs::path metadata_file_path = metadata_path / "metadata" / (database_name_escaped + ".sql");
if (!create.storage && create.attach)
{
if (!fs::exists(metadata_file_path))
if (!shared_disk->existsFile(metadata_file_path))
throw Exception(ErrorCodes::UNKNOWN_DATABASE_ENGINE, "Database engine must be specified for ATTACH DATABASE query");
/// Short syntax: try read database definition from file
auto ast = DatabaseOnDisk::parseQueryFromMetadata(nullptr, getContext(), metadata_file_path);
@ -284,7 +286,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
metadata_path = metadata_path / "store" / DatabaseCatalog::getPathForUUID(create.uuid);
if (!create.attach && fs::exists(metadata_path) && !fs::is_empty(metadata_path))
if (!create.attach && shared_disk->existsDirectory(metadata_path) && !shared_disk->isDirectoryEmpty(metadata_path))
throw Exception(ErrorCodes::DATABASE_ALREADY_EXISTS, "Metadata directory {} already exists and is not empty", metadata_path.string());
}
else if (create.storage->engine->name == "MaterializeMySQL"
@ -356,7 +358,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
"Enable allow_experimental_database_materialized_postgresql to use it");
}
bool need_write_metadata = !create.attach || !fs::exists(metadata_file_path);
bool need_write_metadata = !create.attach || !shared_disk->existsFile(metadata_file_path);
bool need_lock_uuid = internal || need_write_metadata;
auto mode = getLoadingStrictnessLevel(create.attach, force_attach, has_force_restore_data_flag, /*secondary*/ false);
@ -384,7 +386,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
String statement = statement_buf.str();
/// Needed to make database creation retriable if it fails after the file is created
fs::remove(metadata_file_tmp_path);
shared_disk->removeFileIfExists(metadata_file_tmp_path);
/// Exclusive flag guarantees, that database is not created right now in another thread.
WriteBufferFromFile out(metadata_file_tmp_path, statement.size(), O_WRONLY | O_CREAT | O_EXCL);
@ -422,7 +424,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
if (need_write_metadata)
{
/// Prevents from overwriting metadata of detached database
renameNoReplace(metadata_file_tmp_path, metadata_file_path);
shared_disk->moveFile(metadata_file_tmp_path, metadata_file_path);
renamed = true;
}
}
@ -430,8 +432,8 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
{
if (renamed)
{
[[maybe_unused]] bool removed = fs::remove(metadata_file_path);
assert(removed);
assert(shared_disk->existsFile(metadata_file_path));
shared_disk->removeFileIfExists(metadata_file_path);
}
if (added)
DatabaseCatalog::instance().detachDatabase(getContext(), database_name, false, false);
@ -1787,8 +1789,9 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
data_path = database->getTableDataPath(create);
auto full_data_path = fs::path{getContext()->getPath()} / data_path;
auto shared_disk = getContext()->getSharedDisk();
if (!create.attach && !data_path.empty() && fs::exists(full_data_path))
if (!create.attach && !data_path.empty() && shared_disk->existsDirectory(full_data_path))
{
if (getContext()->getZooKeeperMetadataTransaction() &&
!getContext()->getZooKeeperMetadataTransaction()->isInitialQuery() &&
@ -1803,8 +1806,8 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
fs::path trash_path = fs::path{getContext()->getPath()} / "trash" / data_path / getHexUIntLowercase(thread_local_rng());
LOG_WARNING(getLogger("InterpreterCreateQuery"), "Directory for {} data {} already exists. Will move it to {}",
Poco::toLower(storage_name), String(data_path), trash_path);
fs::create_directories(trash_path.parent_path());
renameNoReplace(full_data_path, trash_path);
shared_disk->createDirectories(trash_path.parent_path());
shared_disk->moveFile(full_data_path, trash_path);
}
else
{

View File

@ -1,6 +1,7 @@
#include <Common/thread_local_rng.h>
#include <Common/ThreadPool.h>
#include "Common/tests/gtest_global_context.h"
#include <Common/PoolId.h>
#include <Common/ThreadPool.h>
#include <Common/thread_local_rng.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/ASTCreateQuery.h>
@ -104,7 +105,8 @@ static void loadDatabase(
String database_attach_query;
String database_metadata_file = database_path + ".sql";
if (fs::exists(fs::path(database_metadata_file)))
auto shared_disk = Context::getGlobalContextInstance()->getSharedDisk();
if (shared_disk->existsFile(fs::path(database_metadata_file)))
{
/// There is .sql file with database creation statement.
ReadBufferFromFile in(database_metadata_file, 1024);
@ -130,9 +132,10 @@ static void loadDatabase(
static void checkUnsupportedVersion(ContextMutablePtr context, const String & database_name)
{
auto shared_disk = Context::getGlobalContextInstance()->getSharedDisk();
/// Produce better exception message
String metadata_path = context->getPath() + "metadata/" + database_name;
if (fs::exists(fs::path(metadata_path)))
if (shared_disk->existsDirectory(fs::path(metadata_path)))
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Data directory for {} database exists, but metadata file does not. "
"Probably you are trying to upgrade from version older than 20.7. "
"If so, you should upgrade through intermediate version.", database_name);
@ -143,8 +146,10 @@ static void checkIncompleteOrdinaryToAtomicConversion(ContextPtr context, const
if (context->getConfigRef().has("allow_reserved_database_name_tmp_convert"))
return;
auto shared_disk = Context::getGlobalContextInstance()->getSharedDisk();
auto convert_flag_path = fs::path(context->getFlagsPath()) / "convert_ordinary_to_atomic";
if (!fs::exists(convert_flag_path))
if (!shared_disk->existsFile(convert_flag_path))
return;
/// Flag exists. Let's check if we had an unsuccessful conversion attempt previously
@ -171,6 +176,8 @@ static void checkIncompleteOrdinaryToAtomicConversion(ContextPtr context, const
LoadTaskPtrs loadMetadata(ContextMutablePtr context, const String & default_database_name, bool async_load_databases)
{
auto shared_disk = Context::getGlobalContextInstance()->getSharedDisk();
LoggerPtr log = getLogger("loadMetadata");
String path = context->getPath() + "metadata";
@ -179,12 +186,12 @@ LoadTaskPtrs loadMetadata(ContextMutablePtr context, const String & default_data
/// on difference of data parts while initializing tables.
/// This file is immediately deleted i.e. "one-shot".
auto force_restore_data_flag_file = fs::path(context->getFlagsPath()) / "force_restore_data";
bool has_force_restore_data_flag = fs::exists(force_restore_data_flag_file);
bool has_force_restore_data_flag = shared_disk->existsFile(force_restore_data_flag_file);
if (has_force_restore_data_flag)
{
try
{
fs::remove(force_restore_data_flag_file);
shared_disk->removeFileIfExists(force_restore_data_flag_file);
}
catch (...)
{
@ -195,16 +202,16 @@ LoadTaskPtrs loadMetadata(ContextMutablePtr context, const String & default_data
/// Loop over databases.
std::map<String, String> databases;
fs::directory_iterator dir_end;
for (fs::directory_iterator it(path); it != dir_end; ++it)
for (const auto dir_it = shared_disk->iterateDirectory(path); dir_it->isValid(); dir_it->next())
{
if (it->is_symlink())
auto sub_path = fs::path(dir_it->path());
if (shared_disk->isSymlinkSupported() && shared_disk->isSymlink(sub_path))
continue;
if (it->is_directory())
if (shared_disk->existsDirectory(sub_path))
continue;
const auto current_file = it->path().filename().string();
const auto current_file = sub_path.filename().string();
/// TODO: DETACH DATABASE PERMANENTLY ?
if (fs::path(current_file).extension() == ".sql")
@ -217,10 +224,10 @@ LoadTaskPtrs loadMetadata(ContextMutablePtr context, const String & default_data
/// Temporary fails may be left from previous server runs.
if (fs::path(current_file).extension() == ".tmp")
{
LOG_WARNING(log, "Removing temporary file {}", it->path().string());
LOG_WARNING(log, "Removing temporary file {}", sub_path.string());
try
{
fs::remove(it->path());
shared_disk->removeFileIfExists(sub_path);
}
catch (...)
{
@ -281,12 +288,14 @@ LoadTaskPtrs loadMetadata(ContextMutablePtr context, const String & default_data
static void loadSystemDatabaseImpl(ContextMutablePtr context, const String & database_name, const String & default_engine)
{
auto shared_disk = Context::getGlobalContextInstance()->getSharedDisk();
String path = context->getPath() + "metadata/" + database_name;
String metadata_file = path + ".sql";
if (fs::exists(metadata_file + ".tmp"))
fs::remove(metadata_file + ".tmp");
if (shared_disk->existsFile(metadata_file + ".tmp"))
shared_disk->removeFileIfExists(metadata_file + ".tmp");
if (fs::exists(fs::path(metadata_file)))
if (shared_disk->existsFile(fs::path(metadata_file)))
{
/// 'has_force_restore_data_flag' is true, to not fail on loading query_log table, if it is corrupted.
loadDatabase(context, database_name, path, true);
@ -481,8 +490,10 @@ void maybeConvertSystemDatabase(ContextMutablePtr context, LoadTaskPtrs & load_s
void convertDatabasesEnginesIfNeed(const LoadTaskPtrs & load_metadata, ContextMutablePtr context)
{
auto shared_disk = Context::getGlobalContextInstance()->getSharedDisk();
auto convert_flag_path = fs::path(context->getFlagsPath()) / "convert_ordinary_to_atomic";
if (!fs::exists(convert_flag_path))
if (!shared_disk->existsFile(convert_flag_path))
return;
LOG_INFO(getLogger("loadMetadata"), "Found convert_ordinary_to_atomic file in flags directory, "
@ -496,7 +507,7 @@ void convertDatabasesEnginesIfNeed(const LoadTaskPtrs & load_metadata, ContextMu
maybeConvertOrdinaryDatabaseToAtomic(context, name);
LOG_INFO(getLogger("loadMetadata"), "Conversion finished, removing convert_ordinary_to_atomic flag");
fs::remove(convert_flag_path);
shared_disk->removeFileIfExists(convert_flag_path);
}
LoadTaskPtrs loadMetadataSystem(ContextMutablePtr context, bool async_load_system_database)