Persistent databases in clickhouse-local

This commit is contained in:
Alexey Milovidov 2024-11-11 04:55:21 +01:00
parent eeb25267e5
commit 1902f5306d
5 changed files with 362 additions and 40 deletions

View File

@ -52,6 +52,7 @@
#include <boost/program_options/options_description.hpp> #include <boost/program_options/options_description.hpp>
#include <base/argsToConfig.h> #include <base/argsToConfig.h>
#include <filesystem> #include <filesystem>
#include <Common/filesystemHelpers.h>
#include "config.h" #include "config.h"
@ -261,7 +262,19 @@ static DatabasePtr createMemoryDatabaseIfNotExists(ContextPtr context, const Str
static DatabasePtr createClickHouseLocalDatabaseOverlay(const String & name_, ContextPtr context) static DatabasePtr createClickHouseLocalDatabaseOverlay(const String & name_, ContextPtr context)
{ {
auto overlay = std::make_shared<DatabasesOverlay>(name_, context); auto overlay = std::make_shared<DatabasesOverlay>(name_, context);
overlay->registerNextDatabase(std::make_shared<DatabaseAtomic>(name_, fs::weakly_canonical(context->getPath()), UUIDHelpers::generateV4(), context));
UUID default_database_uuid;
fs::path existing_path_symlink = fs::weakly_canonical(context->getPath()) / "metadata" / "default";
if (FS::isSymlinkNoThrow(existing_path_symlink))
default_database_uuid = parse<UUID>(FS::readSymlink(existing_path_symlink).parent_path().filename());
else
default_database_uuid = UUIDHelpers::generateV4();
fs::path default_database_metadata_path = fs::weakly_canonical(context->getPath()) / "store"
/ DatabaseCatalog::getPathForUUID(default_database_uuid);
overlay->registerNextDatabase(std::make_shared<DatabaseAtomic>(name_, default_database_metadata_path, default_database_uuid, context));
overlay->registerNextDatabase(std::make_shared<DatabaseFilesystem>(name_, "", context)); overlay->registerNextDatabase(std::make_shared<DatabaseFilesystem>(name_, "", context));
return overlay; return overlay;
} }
@ -273,7 +286,7 @@ void LocalServer::tryInitPath()
if (getClientConfiguration().has("path")) if (getClientConfiguration().has("path"))
{ {
// User-supplied path. /// User-supplied path.
path = getClientConfiguration().getString("path"); path = getClientConfiguration().getString("path");
Poco::trimInPlace(path); Poco::trimInPlace(path);
@ -285,17 +298,17 @@ void LocalServer::tryInitPath()
" correct the --path."); " correct the --path.");
} }
} }
else else if (getClientConfiguration().has("tmp"))
{ {
// The path is not provided explicitly - use a unique path in the system temporary directory /// The user requested to use a temporary path - use a unique path in the system temporary directory
// (or in the current dir if a temporary doesn't exist) /// (or in the current dir if a temporary doesn't exist)
LoggerRawPtr log = &logger(); LoggerRawPtr log = &logger();
std::filesystem::path parent_folder; std::filesystem::path parent_folder;
std::filesystem::path default_path; std::filesystem::path default_path;
try try
{ {
// try to guess a tmp folder name, and check if it's a directory (throw exception otherwise) /// Try to guess a tmp folder name, and check if it's a directory (throw an exception otherwise).
parent_folder = std::filesystem::temp_directory_path(); parent_folder = std::filesystem::temp_directory_path();
} }
@ -323,7 +336,15 @@ void LocalServer::tryInitPath()
temporary_directory_to_delete = default_path; temporary_directory_to_delete = default_path;
path = default_path.string(); path = default_path.string();
LOG_DEBUG(log, "Working directory created: {}", path); LOG_DEBUG(log, "Working directory will be created as needed: {}", path);
}
else
{
/// No explicit path specified. Use a subdirectory in the current directory (it will be created lazily only if needed).
/// The subdirectory is named `clickhouse.local`. This name is to not collide with the possible names
/// of the binary file, `clickhouse` or `clickhouse-local`.
path = "clickhouse.local";
getClientConfiguration().setString("path", path);
} }
global_context->setPath(fs::path(path) / ""); global_context->setPath(fs::path(path) / "");
@ -822,18 +843,24 @@ void LocalServer::processConfig()
if (getClientConfiguration().has("path")) if (getClientConfiguration().has("path"))
{ {
String path = global_context->getPath();
fs::create_directories(fs::path(path));
/// Lock path directory before read
status.emplace(fs::path(path) / "status", StatusFile::write_full_info);
LOG_DEBUG(log, "Loading metadata from {}", path);
auto load_system_metadata_tasks = loadMetadataSystem(global_context);
attachSystemTablesServer(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::SYSTEM_DATABASE), false); attachSystemTablesServer(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::SYSTEM_DATABASE), false);
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA)); attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA));
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE)); attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE));
String path = global_context->getPath();
if (fs::exists(fs::path(path) / "metadata"))
{
/// Lock path directory before read
/// Note: this is slightly unsafe. The first instance of clickhouse-local will not be protected.
status.emplace(fs::path(path) / "status", StatusFile::write_full_info);
LOG_DEBUG(log, "Loading metadata from {}", path);
if (fs::exists(std::filesystem::path(path) / "metadata" / "system.sql"))
{
LoadTaskPtrs load_system_metadata_tasks = loadMetadataSystem(global_context);
waitLoad(TablesLoaderForegroundPoolId, load_system_metadata_tasks); waitLoad(TablesLoaderForegroundPoolId, load_system_metadata_tasks);
}
if (!getClientConfiguration().has("only-system-tables")) if (!getClientConfiguration().has("only-system-tables"))
{ {
@ -847,6 +874,7 @@ void LocalServer::processConfig()
LOG_DEBUG(log, "Loaded metadata."); LOG_DEBUG(log, "Loaded metadata.");
} }
}
else if (!getClientConfiguration().has("no-system-tables")) else if (!getClientConfiguration().has("no-system-tables"))
{ {
attachSystemTablesServer(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::SYSTEM_DATABASE), false); attachSystemTablesServer(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::SYSTEM_DATABASE), false);

View File

@ -76,7 +76,8 @@ void DatabaseAtomic::createDirectories()
void DatabaseAtomic::createDirectoriesUnlocked() void DatabaseAtomic::createDirectoriesUnlocked()
{ {
DatabaseOnDisk::createDirectoriesUnlocked(); DatabaseOnDisk::createDirectoriesUnlocked();
fs::create_directories(fs::path(getContext()->getPath()) / "metadata"); fs::path catalog_path = fs::path(getContext()->getPath()) / "metadata";
fs::create_directories(catalog_path);
fs::create_directories(path_to_table_symlinks); fs::create_directories(path_to_table_symlinks);
tryCreateMetadataSymlink(); tryCreateMetadataSymlink();
} }
@ -604,12 +605,12 @@ void DatabaseAtomic::tryCreateMetadataSymlink()
{ {
/// Symlinks in data/db_name/ directory and metadata/db_name/ are not used by ClickHouse, /// Symlinks in data/db_name/ directory and metadata/db_name/ are not used by ClickHouse,
/// it's needed only for convenient introspection. /// it's needed only for convenient introspection.
assert(path_to_metadata_symlink != metadata_path); chassert(path_to_metadata_symlink != metadata_path);
fs::path metadata_symlink(path_to_metadata_symlink); fs::path metadata_symlink(path_to_metadata_symlink);
if (fs::exists(metadata_symlink)) if (fs::exists(metadata_symlink))
{ {
if (!FS::isSymlink(metadata_symlink)) if (!FS::isSymlink(metadata_symlink))
throw Exception(ErrorCodes::FILE_ALREADY_EXISTS, "Directory {} exists", path_to_metadata_symlink); throw Exception(ErrorCodes::FILE_ALREADY_EXISTS, "Directory {} already exists", path_to_metadata_symlink);
} }
else else
{ {

View File

@ -63,7 +63,7 @@ void DatabasesOverlay::createTable(ContextPtr context_, const String & table_nam
} }
throw Exception( throw Exception(
ErrorCodes::LOGICAL_ERROR, ErrorCodes::LOGICAL_ERROR,
"There is no databases for CREATE TABLE `{}` query in database `{}` (engine {})", "There are no databases for CREATE TABLE `{}` query in database `{}` (engine {})",
table_name, table_name,
getDatabaseName(), getDatabaseName(),
getEngineName()); getEngineName());
@ -81,7 +81,7 @@ void DatabasesOverlay::dropTable(ContextPtr context_, const String & table_name,
} }
throw Exception( throw Exception(
ErrorCodes::LOGICAL_ERROR, ErrorCodes::LOGICAL_ERROR,
"There is no databases for DROP TABLE `{}` query in database `{}` (engine {})", "There are no databases for DROP TABLE `{}` query in database `{}` (engine {})",
table_name, table_name,
getDatabaseName(), getDatabaseName(),
getEngineName()); getEngineName());
@ -104,7 +104,7 @@ void DatabasesOverlay::attachTable(
} }
throw Exception( throw Exception(
ErrorCodes::LOGICAL_ERROR, ErrorCodes::LOGICAL_ERROR,
"There is no databases for ATTACH TABLE `{}` query in database `{}` (engine {})", "There are no databases for ATTACH TABLE `{}` query in database `{}` (engine {})",
table_name, table_name,
getDatabaseName(), getDatabaseName(),
getEngineName()); getEngineName());
@ -120,7 +120,7 @@ StoragePtr DatabasesOverlay::detachTable(ContextPtr context_, const String & tab
} }
throw Exception( throw Exception(
ErrorCodes::LOGICAL_ERROR, ErrorCodes::LOGICAL_ERROR,
"There is no databases for DETACH TABLE `{}` query in database `{}` (engine {})", "There are no databases for DETACH TABLE `{}` query in database `{}` (engine {})",
table_name, table_name,
getDatabaseName(), getDatabaseName(),
getEngineName()); getEngineName());
@ -255,7 +255,7 @@ void DatabasesOverlay::alterTable(ContextPtr local_context, const StorageID & ta
} }
throw Exception( throw Exception(
ErrorCodes::LOGICAL_ERROR, ErrorCodes::LOGICAL_ERROR,
"There is no databases for ALTER TABLE `{}` query in database `{}` (engine {})", "There are no databases for ALTER TABLE `{}` query in database `{}` (engine {})",
table_id.table_name, table_id.table_name,
getDatabaseName(), getDatabaseName(),
getEngineName()); getEngineName());
@ -313,4 +313,248 @@ DatabaseTablesIteratorPtr DatabasesOverlay::getTablesIterator(ContextPtr context
return std::make_unique<DatabaseTablesSnapshotIterator>(std::move(tables), getDatabaseName()); return std::make_unique<DatabaseTablesSnapshotIterator>(std::move(tables), getDatabaseName());
} }
bool DatabasesOverlay::canContainMergeTreeTables() const
{
for (auto & db : databases)
if (db->canContainMergeTreeTables())
return true;
return false;
}
bool DatabasesOverlay::canContainDistributedTables() const
{
for (auto & db : databases)
if (db->canContainDistributedTables())
return true;
return false;
}
void DatabasesOverlay::loadStoredObjects(ContextMutablePtr local_context, LoadingStrictnessLevel mode)
{
for (auto & db : databases)
if (!db->isReadOnly())
db->loadStoredObjects(local_context, mode);
}
bool DatabasesOverlay::supportsLoadingInTopologicalOrder() const
{
for (auto & db : databases)
if (db->supportsLoadingInTopologicalOrder())
return true;
return false;
}
void DatabasesOverlay::beforeLoadingMetadata(ContextMutablePtr local_context, LoadingStrictnessLevel mode)
{
for (auto & db : databases)
if (!db->isReadOnly())
db->beforeLoadingMetadata(local_context, mode);
}
void DatabasesOverlay::loadTablesMetadata(ContextPtr local_context, ParsedTablesMetadata & metadata, bool is_startup)
{
for (auto & db : databases)
if (!db->isReadOnly())
db->loadTablesMetadata(local_context, metadata, is_startup);
}
void DatabasesOverlay::loadTableFromMetadata(
ContextMutablePtr local_context,
const String & file_path,
const QualifiedTableName & name,
const ASTPtr & ast,
LoadingStrictnessLevel mode)
{
for (auto & db : databases)
{
if (db->isReadOnly())
continue;
try
{
db->loadTableFromMetadata(local_context, file_path, name, ast, mode);
return;
}
catch (...)
{
continue;
}
}
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"There are no databases capable of loading table `{}` from path `{}` in database `{}` (engine {})",
name,
file_path,
getDatabaseName(),
getEngineName());
}
LoadTaskPtr DatabasesOverlay::loadTableFromMetadataAsync(
AsyncLoader & async_loader,
LoadJobSet load_after,
ContextMutablePtr local_context,
const String & file_path,
const QualifiedTableName & name,
const ASTPtr & ast,
LoadingStrictnessLevel mode)
{
for (auto & db : databases)
{
if (db->isReadOnly())
continue;
try
{
return db->loadTableFromMetadataAsync(async_loader, load_after, local_context, file_path, name, ast, mode);
}
catch (...)
{
continue;
}
}
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"There are no databases capable of loading table `{}` from path `{}` in database `{}` (engine {})",
name,
file_path,
getDatabaseName(),
getEngineName());
}
LoadTaskPtr DatabasesOverlay::startupTableAsync(
AsyncLoader & async_loader,
LoadJobSet startup_after,
const QualifiedTableName & name,
LoadingStrictnessLevel mode)
{
for (auto & db : databases)
{
if (db->isReadOnly())
continue;
try
{
return db->startupTableAsync(async_loader, startup_after, name, mode);
}
catch (...)
{
continue;
}
}
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"There are no databases capable of starting up table `{}` in database `{}` (engine {})",
name,
getDatabaseName(),
getEngineName());
}
LoadTaskPtr DatabasesOverlay::startupDatabaseAsync(
AsyncLoader & async_loader,
LoadJobSet startup_after,
LoadingStrictnessLevel mode)
{
for (auto & db : databases)
{
if (db->isReadOnly())
continue;
try
{
return db->startupDatabaseAsync(async_loader, startup_after, mode);
}
catch (...)
{
continue;
}
}
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"There are no databases capable of starting up asynchronously in database `{}` (engine {})",
getDatabaseName(),
getEngineName());
}
void DatabasesOverlay::waitTableStarted(const String & name) const
{
for (auto & db : databases)
{
if (db->isReadOnly())
continue;
try
{
return db->waitTableStarted(name);
}
catch (...)
{
continue;
}
}
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"There are no databases capable of waiting for table startup `{}` in database `{}` (engine {})",
name,
getDatabaseName(),
getEngineName());
}
void DatabasesOverlay::waitDatabaseStarted() const
{
for (auto & db : databases)
{
if (db->isReadOnly())
continue;
try
{
return db->waitDatabaseStarted();
}
catch (...)
{
continue;
}
}
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"There are no databases capable of waiting for startup in database `{}` (engine {})",
getDatabaseName(),
getEngineName());
}
void DatabasesOverlay::stopLoading()
{
for (auto & db : databases)
{
if (db->isReadOnly())
continue;
try
{
return db->stopLoading();
}
catch (...)
{
continue;
}
}
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"There are no databases capable of stop loading in database `{}` (engine {})",
getDatabaseName(),
getEngineName());
}
void DatabasesOverlay::checkMetadataFilenameAvailability(const String & table_name) const
{
for (auto & db : databases)
{
if (db->isReadOnly())
continue;
db->checkMetadataFilenameAvailability(table_name);
return;
}
}
} }

View File

@ -66,6 +66,40 @@ public:
void shutdown() override; void shutdown() override;
bool canContainMergeTreeTables() const override;
bool canContainDistributedTables() const override;
void loadStoredObjects(ContextMutablePtr local_context, LoadingStrictnessLevel mode) override;
bool supportsLoadingInTopologicalOrder() const override;
void beforeLoadingMetadata(ContextMutablePtr local_context, LoadingStrictnessLevel mode) override;
void loadTablesMetadata(ContextPtr local_context, ParsedTablesMetadata & metadata, bool is_startup) override;
void loadTableFromMetadata(
ContextMutablePtr local_context,
const String & file_path,
const QualifiedTableName & name,
const ASTPtr & ast,
LoadingStrictnessLevel mode) override;
LoadTaskPtr loadTableFromMetadataAsync(
AsyncLoader & async_loader,
LoadJobSet load_after,
ContextMutablePtr local_context,
const String & file_path,
const QualifiedTableName & name,
const ASTPtr & ast,
LoadingStrictnessLevel mode) override;
[[nodiscard]] LoadTaskPtr startupTableAsync(
AsyncLoader & async_loader,
LoadJobSet startup_after,
const QualifiedTableName & name,
LoadingStrictnessLevel mode) override;
[[nodiscard]] LoadTaskPtr startupDatabaseAsync(
AsyncLoader & async_loader,
LoadJobSet startup_after,
LoadingStrictnessLevel mode) override;
void waitTableStarted(const String & name) const override;
void waitDatabaseStarted() const override;
void stopLoading() override;
void checkMetadataFilenameAvailability(const String & table_name) const override;
protected: protected:
std::vector<DatabasePtr> databases; std::vector<DatabasePtr> databases;
LoggerPtr log; LoggerPtr log;

View File

@ -101,6 +101,10 @@ static void loadDatabase(
const String & database_path, const String & database_path,
bool force_restore_data) bool force_restore_data)
{ {
/// If it is already loaded.
if (DatabaseCatalog::instance().isDatabaseExist(database))
return;
String database_attach_query; String database_attach_query;
String database_metadata_file = database_path + ".sql"; String database_metadata_file = database_path + ".sql";
@ -195,21 +199,25 @@ LoadTaskPtrs loadMetadata(ContextMutablePtr context, const String & default_data
/// Loop over databases. /// Loop over databases.
std::map<String, String> databases; std::map<String, String> databases;
/// Some databases don't have an .sql metadata file.
std::map<String, String> orphan_directories_and_symlinks;
fs::directory_iterator dir_end; fs::directory_iterator dir_end;
for (fs::directory_iterator it(path); it != dir_end; ++it) for (fs::directory_iterator it(path); it != dir_end; ++it)
{ {
if (it->is_symlink()) if (it->is_symlink() || it->is_directory())
continue; {
String db_name = it->path().filename().string();
if (it->is_directory()) orphan_directories_and_symlinks.emplace(unescapeForFileName(db_name), fs::path(path) / db_name);
continue; continue;
}
const auto current_file = it->path().filename().string(); const auto current_file = it->path().filename().string();
/// TODO: DETACH DATABASE PERMANENTLY ?
if (fs::path(current_file).extension() == ".sql") if (fs::path(current_file).extension() == ".sql")
{ {
String db_name = fs::path(current_file).stem(); String db_name = fs::path(current_file).stem();
orphan_directories_and_symlinks.erase(db_name);
if (!isSystemOrInformationSchema(db_name)) if (!isSystemOrInformationSchema(db_name))
databases.emplace(unescapeForFileName(db_name), fs::path(path) / db_name); databases.emplace(unescapeForFileName(db_name), fs::path(path) / db_name);
} }
@ -249,6 +257,12 @@ LoadTaskPtrs loadMetadata(ContextMutablePtr context, const String & default_data
loaded_databases.insert({name, DatabaseCatalog::instance().getDatabase(name)}); loaded_databases.insert({name, DatabaseCatalog::instance().getDatabase(name)});
} }
for (const auto & [name, db_path] : orphan_directories_and_symlinks)
{
loadDatabase(context, name, db_path, has_force_restore_data_flag);
loaded_databases.insert({name, DatabaseCatalog::instance().getDatabase(name)});
}
auto mode = getLoadingStrictnessLevel(/* attach */ true, /* force_attach */ true, has_force_restore_data_flag, /* secondary */ false); auto mode = getLoadingStrictnessLevel(/* attach */ true, /* force_attach */ true, has_force_restore_data_flag, /* secondary */ false);
TablesLoader loader{context, std::move(loaded_databases), mode}; TablesLoader loader{context, std::move(loaded_databases), mode};
auto load_tasks = loader.loadTablesAsync(); auto load_tasks = loader.loadTablesAsync();
@ -501,7 +515,8 @@ void convertDatabasesEnginesIfNeed(const LoadTaskPtrs & load_metadata, ContextMu
LoadTaskPtrs loadMetadataSystem(ContextMutablePtr context, bool async_load_system_database) LoadTaskPtrs loadMetadataSystem(ContextMutablePtr context, bool async_load_system_database)
{ {
loadSystemDatabaseImpl(context, DatabaseCatalog::SYSTEM_DATABASE, "Atomic"); loadSystemDatabaseImpl(context, DatabaseCatalog::SYSTEM_DATABASE,
context->getApplicationType() == Context::ApplicationType::SERVER ? "Atomic" : "Memory");
loadSystemDatabaseImpl(context, DatabaseCatalog::INFORMATION_SCHEMA, "Memory"); loadSystemDatabaseImpl(context, DatabaseCatalog::INFORMATION_SCHEMA, "Memory");
loadSystemDatabaseImpl(context, DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE, "Memory"); loadSystemDatabaseImpl(context, DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE, "Memory");