Merge pull request #28373 from ClickHouse/tables_topsort

Resolve table dependencies on metadata loading
This commit is contained in:
tavplubix 2021-09-20 14:46:47 +03:00 committed by GitHub
commit 922cf7ee20
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 1040 additions and 253 deletions

View File

@ -306,6 +306,7 @@ try
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA));
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE));
loadMetadata(global_context);
startupSystemTables();
DatabaseCatalog::instance().loadDatabases();
LOG_DEBUG(log, "Loaded metadata.");
}

View File

@ -1116,6 +1116,7 @@ if (ThreadFuzzer::instance().isEffective())
database_catalog.loadMarkedAsDroppedTables();
/// Then, load remaining databases
loadMetadata(global_context, default_database);
startupSystemTables();
database_catalog.loadDatabases();
/// After loading validate that default database exists
database_catalog.assertDatabaseExists(default_database);

View File

@ -160,26 +160,29 @@ namespace
if (args.size() <= db_name_index)
return;
String db_name = evaluateConstantExpressionForDatabaseName(args[db_name_index], data.context)->as<ASTLiteral &>().value.safeGet<String>();
String name = evaluateConstantExpressionForDatabaseName(args[db_name_index], data.context)->as<ASTLiteral &>().value.safeGet<String>();
String table_name;
size_t table_name_index = static_cast<size_t>(-1);
size_t dot = String::npos;
if (function.name != "Distributed")
dot = db_name.find('.');
if (dot != String::npos)
{
table_name = db_name.substr(dot + 1);
db_name.resize(dot);
}
QualifiedTableName qualified_name;
if (function.name == "Distributed")
qualified_name.table = name;
else
qualified_name = QualifiedTableName::parseFromString(name);
if (qualified_name.database.empty())
{
std::swap(qualified_name.database, qualified_name.table);
table_name_index = 2;
if (args.size() <= table_name_index)
return;
table_name = evaluateConstantExpressionForDatabaseName(args[table_name_index], data.context)->as<ASTLiteral &>().value.safeGet<String>();
qualified_name.table = evaluateConstantExpressionForDatabaseName(args[table_name_index], data.context)->as<ASTLiteral &>().value.safeGet<String>();
}
const String & db_name = qualified_name.database;
const String & table_name = qualified_name.table;
if (db_name.empty() || table_name.empty())
return;

View File

@ -2,11 +2,20 @@
#include <string>
#include <tuple>
#include <optional>
#include <Common/Exception.h>
#include <Common/SipHash.h>
#include <Common/quoteString.h>
#include <fmt/format.h>
namespace DB
{
namespace ErrorCodes
{
extern const int SYNTAX_ERROR;
}
//TODO replace with StorageID
struct QualifiedTableName
{
@ -30,6 +39,46 @@ struct QualifiedTableName
hash_state.update(table.data(), table.size());
return hash_state.get64();
}
/// NOTE: It's different from compound identifier parsing and does not support escaping and dots in name.
/// Usually it's better to use ParserIdentifier instead,
/// but we parse DDL dictionary name (and similar things) this way for historical reasons.
static std::optional<QualifiedTableName> tryParseFromString(const String & maybe_qualified_name)
{
if (maybe_qualified_name.empty())
return {};
/// Do not allow dot at the beginning and at the end
auto pos = maybe_qualified_name.find('.');
if (pos == 0 || pos == (maybe_qualified_name.size() - 1))
return {};
QualifiedTableName name;
if (pos == std::string::npos)
{
name.table = std::move(maybe_qualified_name);
}
else if (maybe_qualified_name.find('.', pos + 1) != std::string::npos)
{
/// Do not allow multiple dots
return {};
}
else
{
name.database = maybe_qualified_name.substr(0, pos);
name.table = maybe_qualified_name.substr(pos + 1);
}
return name;
}
static QualifiedTableName parseFromString(const String & maybe_qualified_name)
{
auto name = tryParseFromString(maybe_qualified_name);
if (!name)
throw Exception(ErrorCodes::SYNTAX_ERROR, "Invalid qualified name: {}", maybe_qualified_name);
return *name;
}
};
}
@ -47,5 +96,23 @@ template <> struct hash<DB::QualifiedTableName>
return qualified_table.hash();
}
};
}
namespace fmt
{
template <>
struct formatter<DB::QualifiedTableName>
{
constexpr auto parse(format_parse_context & ctx)
{
return ctx.begin();
}
template <typename FormatContext>
auto format(const DB::QualifiedTableName & name, FormatContext & ctx)
{
return format_to(ctx.out(), "{}.{}", DB::backQuoteIfNeed(name.database), DB::backQuoteIfNeed(name.table));
}
};
}

View File

@ -0,0 +1,103 @@
#include <Databases/DDLDependencyVisitor.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
#include <Dictionaries/getDictionaryConfigurationFromAST.h>
#include <Poco/String.h>
namespace DB
{
void DDLDependencyVisitor::visit(const ASTPtr & ast, Data & data)
{
/// Looking for functions in column default expressions and dictionary source definition
if (const auto * function = ast->as<ASTFunction>())
visit(*function, data);
else if (const auto * dict_source = ast->as<ASTFunctionWithKeyValueArguments>())
visit(*dict_source, data);
}
bool DDLDependencyVisitor::needChildVisit(const ASTPtr & node, const ASTPtr & /*child*/)
{
return !node->as<ASTStorage>();
}
void DDLDependencyVisitor::visit(const ASTFunction & function, Data & data)
{
if (function.name == "joinGet" ||
function.name == "dictHas" ||
function.name == "dictIsIn" ||
function.name.starts_with("dictGet"))
{
extractTableNameFromArgument(function, data, 0);
}
else if (Poco::toLower(function.name) == "in")
{
extractTableNameFromArgument(function, data, 1);
}
}
void DDLDependencyVisitor::visit(const ASTFunctionWithKeyValueArguments & dict_source, Data & data)
{
if (dict_source.name != "clickhouse")
return;
if (!dict_source.elements)
return;
auto config = getDictionaryConfigurationFromAST(data.create_query->as<ASTCreateQuery &>(), data.global_context);
auto info = getInfoIfClickHouseDictionarySource(config, data.global_context);
if (!info || !info->is_local)
return;
if (info->table_name.database.empty())
info->table_name.database = data.default_database;
data.dependencies.emplace(std::move(info->table_name));
}
void DDLDependencyVisitor::extractTableNameFromArgument(const ASTFunction & function, Data & data, size_t arg_idx)
{
/// Just ignore incorrect arguments, proper exception will be thrown later
if (!function.arguments || function.arguments->children.size() <= arg_idx)
return;
QualifiedTableName qualified_name;
const auto * arg = function.arguments->as<ASTExpressionList>()->children[arg_idx].get();
if (const auto * literal = arg->as<ASTLiteral>())
{
if (literal->value.getType() != Field::Types::String)
return;
auto maybe_qualified_name = QualifiedTableName::tryParseFromString(literal->value.get<String>());
/// Just return if name if invalid
if (!maybe_qualified_name)
return;
qualified_name = std::move(*maybe_qualified_name);
}
else if (const auto * identifier = arg->as<ASTIdentifier>())
{
auto table_identifier = identifier->createTable();
/// Just return if table identified is invalid
if (!table_identifier)
return;
qualified_name.database = table_identifier->getDatabaseName();
qualified_name.table = table_identifier->shortName();
}
else
{
assert(false);
return;
}
if (qualified_name.database.empty())
qualified_name.database = data.default_database;
data.dependencies.emplace(std::move(qualified_name));
}
}

View File

@ -0,0 +1,42 @@
#pragma once
#include <Core/QualifiedTableName.h>
#include <Parsers/IAST_fwd.h>
#include <Interpreters/InDepthNodeVisitor.h>
namespace DB
{
class ASTFunction;
class ASTFunctionWithKeyValueArguments;
/// Visits ASTCreateQuery and extracts names of table (or dictionary) dependencies
/// from column default expressions (joinGet, dictGet, etc)
/// or dictionary source (for dictionaries from local ClickHouse table).
/// Does not validate AST, works a best-effort way.
class DDLDependencyVisitor
{
public:
struct Data
{
using TableNamesSet = std::set<QualifiedTableName>;
String default_database;
TableNamesSet dependencies;
ContextPtr global_context;
ASTPtr create_query;
};
using Visitor = ConstInDepthNodeVisitor<DDLDependencyVisitor, true>;
static void visit(const ASTPtr & ast, Data & data);
static bool needChildVisit(const ASTPtr & node, const ASTPtr & child);
private:
static void visit(const ASTFunction & function, Data & data);
static void visit(const ASTFunctionWithKeyValueArguments & dict_source, Data & data);
static void extractTableNameFromArgument(const ASTFunction & function, Data & data, size_t arg_idx);
};
using TableLoadingDependenciesVisitor = DDLDependencyVisitor::Visitor;
}

View File

@ -416,40 +416,49 @@ UUID DatabaseAtomic::tryGetTableUUID(const String & table_name) const
return UUIDHelpers::Nil;
}
void DatabaseAtomic::loadStoredObjects(
ContextMutablePtr local_context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables)
void DatabaseAtomic::beforeLoadingMetadata(ContextMutablePtr /*context*/, bool force_restore, bool /*force_attach*/)
{
if (!force_restore)
return;
/// Recreate symlinks to table data dirs in case of force restore, because some of them may be broken
if (has_force_restore_data_flag)
for (const auto & table_path : fs::directory_iterator(path_to_table_symlinks))
{
for (const auto & table_path : fs::directory_iterator(path_to_table_symlinks))
if (!fs::is_symlink(table_path))
{
if (!fs::is_symlink(table_path))
{
throw Exception(ErrorCodes::ABORTED,
"'{}' is not a symlink. Atomic database should contains only symlinks.", std::string(table_path.path()));
}
fs::remove(table_path);
}
}
DatabaseOrdinary::loadStoredObjects(local_context, has_force_restore_data_flag, force_attach, skip_startup_tables);
if (has_force_restore_data_flag)
{
NameToPathMap table_names;
{
std::lock_guard lock{mutex};
table_names = table_name_to_path;
throw Exception(ErrorCodes::ABORTED,
"'{}' is not a symlink. Atomic database should contains only symlinks.", std::string(table_path.path()));
}
fs::create_directories(path_to_table_symlinks);
for (const auto & table : table_names)
tryCreateSymlink(table.first, table.second, true);
fs::remove(table_path);
}
}
void DatabaseAtomic::loadStoredObjects(
ContextMutablePtr local_context, bool force_restore, bool force_attach, bool skip_startup_tables)
{
beforeLoadingMetadata(local_context, force_restore, force_attach);
DatabaseOrdinary::loadStoredObjects(local_context, force_restore, force_attach, skip_startup_tables);
}
void DatabaseAtomic::startupTables(ThreadPool & thread_pool, bool force_restore, bool force_attach)
{
DatabaseOrdinary::startupTables(thread_pool, force_restore, force_attach);
if (!force_restore)
return;
NameToPathMap table_names;
{
std::lock_guard lock{mutex};
table_names = table_name_to_path;
}
fs::create_directories(path_to_table_symlinks);
for (const auto & table : table_names)
tryCreateSymlink(table.first, table.second, true);
}
void DatabaseAtomic::tryCreateSymlink(const String & table_name, const String & actual_data_path, bool if_data_path_exist)
{
try

View File

@ -47,7 +47,11 @@ public:
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
void loadStoredObjects(ContextMutablePtr context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables) override;
void loadStoredObjects(ContextMutablePtr context, bool force_restore, bool force_attach, bool skip_startup_tables) override;
void beforeLoadingMetadata(ContextMutablePtr context, bool force_restore, bool force_attach) override;
void startupTables(ThreadPool & thread_pool, bool force_restore, bool force_attach) override;
/// Atomic database cannot be detached if there is detached table which still in use
void assertCanBeDetached(bool cleanup) override;

View File

@ -36,7 +36,7 @@ DatabaseLazy::DatabaseLazy(const String & name_, const String & metadata_path_,
void DatabaseLazy::loadStoredObjects(
ContextMutablePtr local_context, bool /* has_force_restore_data_flag */, bool /*force_attach*/, bool /* skip_startup_tables */)
ContextMutablePtr local_context, bool /* force_restore */, bool /*force_attach*/, bool /* skip_startup_tables */)
{
iterateMetadataFiles(local_context, [this](const String & file_name)
{

View File

@ -26,7 +26,7 @@ public:
bool canContainDistributedTables() const override { return false; }
void loadStoredObjects(ContextMutablePtr context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables) override;
void loadStoredObjects(ContextMutablePtr context, bool force_restore, bool force_attach, bool skip_startup_tables) override;
void createTable(
ContextPtr context,

View File

@ -46,7 +46,7 @@ std::pair<String, StoragePtr> createTableFromAST(
const String & database_name,
const String & table_data_path_relative,
ContextMutablePtr context,
bool has_force_restore_data_flag)
bool force_restore)
{
ast_create_query.attach = true;
ast_create_query.database = database_name;
@ -88,7 +88,7 @@ std::pair<String, StoragePtr> createTableFromAST(
context->getGlobalContext(),
columns,
constraints,
has_force_restore_data_flag)
force_restore)
};
}

View File

@ -16,7 +16,7 @@ std::pair<String, StoragePtr> createTableFromAST(
const String & database_name,
const String & table_data_path_relative,
ContextMutablePtr context,
bool has_force_restore_data_flag);
bool force_restore);
/** Get the string with the table definition based on the CREATE query.
* It is an ATTACH query that you can execute to create a table from the correspondent database.

View File

@ -4,6 +4,8 @@
#include <Databases/DatabaseOnDisk.h>
#include <Databases/DatabaseOrdinary.h>
#include <Databases/DatabasesCommon.h>
#include <Databases/DDLDependencyVisitor.h>
#include <Databases/TablesLoader.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
@ -27,8 +29,6 @@ namespace fs = std::filesystem;
namespace DB
{
static constexpr size_t PRINT_MESSAGE_EACH_N_OBJECTS = 256;
static constexpr size_t PRINT_MESSAGE_EACH_N_SECONDS = 5;
static constexpr size_t METADATA_FILE_BUFFER_SIZE = 32768;
namespace
@ -39,7 +39,7 @@ namespace
DatabaseOrdinary & database,
const String & database_name,
const String & metadata_path,
bool has_force_restore_data_flag)
bool force_restore)
{
try
{
@ -48,7 +48,7 @@ namespace
database_name,
database.getTableDataPath(query),
context,
has_force_restore_data_flag);
force_restore);
database.attachTable(table_name, table, database.getTableDataPath(query));
}
@ -60,15 +60,6 @@ namespace
throw;
}
}
void logAboutProgress(Poco::Logger * log, size_t processed, size_t total, AtomicStopwatch & watch)
{
if (processed % PRINT_MESSAGE_EACH_N_OBJECTS == 0 || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS))
{
LOG_INFO(log, "{}%", processed * 100.0 / total);
watch.restart();
}
}
}
@ -84,20 +75,88 @@ DatabaseOrdinary::DatabaseOrdinary(
}
void DatabaseOrdinary::loadStoredObjects(
ContextMutablePtr local_context, bool has_force_restore_data_flag, bool /*force_attach*/, bool skip_startup_tables)
ContextMutablePtr local_context, bool force_restore, bool force_attach, bool skip_startup_tables)
{
/** Tables load faster if they are loaded in sorted (by name) order.
* Otherwise (for the ext4 filesystem), `DirectoryIterator` iterates through them in some order,
* which does not correspond to order tables creation and does not correspond to order of their location on disk.
*/
using FileNames = std::map<std::string, ASTPtr>;
std::mutex file_names_mutex;
FileNames file_names;
size_t total_dictionaries = 0;
ParsedTablesMetadata metadata;
loadTablesMetadata(local_context, metadata);
auto process_metadata = [&file_names, &total_dictionaries, &file_names_mutex, this](
const String & file_name)
size_t total_tables = metadata.parsed_tables.size() - metadata.total_dictionaries;
AtomicStopwatch watch;
std::atomic<size_t> dictionaries_processed{0};
std::atomic<size_t> tables_processed{0};
ThreadPool pool;
/// We must attach dictionaries before attaching tables
/// because while we're attaching tables we may need to have some dictionaries attached
/// (for example, dictionaries can be used in the default expressions for some tables).
/// On the other hand we can attach any dictionary (even sourced from ClickHouse table)
/// without having any tables attached. It is so because attaching of a dictionary means
/// loading of its config only, it doesn't involve loading the dictionary itself.
/// Attach dictionaries.
for (const auto & name_with_path_and_query : metadata.parsed_tables)
{
const auto & name = name_with_path_and_query.first;
const auto & path = name_with_path_and_query.second.path;
const auto & ast = name_with_path_and_query.second.ast;
const auto & create_query = ast->as<const ASTCreateQuery &>();
if (create_query.is_dictionary)
{
pool.scheduleOrThrowOnError([&]()
{
loadTableFromMetadata(local_context, path, name, ast, force_restore);
/// Messages, so that it's not boring to wait for the server to load for a long time.
logAboutProgress(log, ++dictionaries_processed, metadata.total_dictionaries, watch);
});
}
}
pool.wait();
/// Attach tables.
for (const auto & name_with_path_and_query : metadata.parsed_tables)
{
const auto & name = name_with_path_and_query.first;
const auto & path = name_with_path_and_query.second.path;
const auto & ast = name_with_path_and_query.second.ast;
const auto & create_query = ast->as<const ASTCreateQuery &>();
if (!create_query.is_dictionary)
{
pool.scheduleOrThrowOnError([&]()
{
loadTableFromMetadata(local_context, path, name, ast, force_restore);
/// Messages, so that it's not boring to wait for the server to load for a long time.
logAboutProgress(log, ++tables_processed, total_tables, watch);
});
}
}
pool.wait();
if (!skip_startup_tables)
{
/// After all tables was basically initialized, startup them.
startupTables(pool, force_restore, force_attach);
}
}
void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTablesMetadata & metadata)
{
size_t prev_tables_count = metadata.parsed_tables.size();
size_t prev_total_dictionaries = metadata.total_dictionaries;
auto process_metadata = [&metadata, this](const String & file_name)
{
fs::path path(getMetadataPath());
fs::path file_path(file_name);
@ -122,9 +181,29 @@ void DatabaseOrdinary::loadStoredObjects(
return;
}
std::lock_guard lock{file_names_mutex};
file_names[file_name] = ast;
total_dictionaries += create_query->is_dictionary;
TableLoadingDependenciesVisitor::Data data;
data.default_database = metadata.default_database;
data.create_query = ast;
data.global_context = getContext();
TableLoadingDependenciesVisitor visitor{data};
visitor.visit(ast);
QualifiedTableName qualified_name{database_name, create_query->table};
std::lock_guard lock{metadata.mutex};
metadata.parsed_tables[qualified_name] = ParsedTableMetadata{full_path.string(), ast};
if (data.dependencies.empty())
{
metadata.independent_database_objects.emplace_back(std::move(qualified_name));
}
else
{
for (const auto & dependency : data.dependencies)
{
metadata.dependencies_info[dependency].dependent_database_objects.push_back(qualified_name);
++metadata.dependencies_info[qualified_name].dependencies_count;
}
}
metadata.total_dictionaries += create_query->is_dictionary;
}
}
catch (Exception & e)
@ -136,86 +215,29 @@ void DatabaseOrdinary::loadStoredObjects(
iterateMetadataFiles(local_context, process_metadata);
size_t total_tables = file_names.size() - total_dictionaries;
size_t objects_in_database = metadata.parsed_tables.size() - prev_tables_count;
size_t dictionaries_in_database = metadata.total_dictionaries - prev_total_dictionaries;
size_t tables_in_database = objects_in_database - dictionaries_in_database;
LOG_INFO(log, "Total {} tables and {} dictionaries.", total_tables, total_dictionaries);
AtomicStopwatch watch;
std::atomic<size_t> tables_processed{0};
ThreadPool pool;
/// We must attach dictionaries before attaching tables
/// because while we're attaching tables we may need to have some dictionaries attached
/// (for example, dictionaries can be used in the default expressions for some tables).
/// On the other hand we can attach any dictionary (even sourced from ClickHouse table)
/// without having any tables attached. It is so because attaching of a dictionary means
/// loading of its config only, it doesn't involve loading the dictionary itself.
/// Attach dictionaries.
for (const auto & name_with_query : file_names)
{
const auto & create_query = name_with_query.second->as<const ASTCreateQuery &>();
if (create_query.is_dictionary)
{
pool.scheduleOrThrowOnError([&]()
{
tryAttachTable(
local_context,
create_query,
*this,
database_name,
getMetadataPath() + name_with_query.first,
has_force_restore_data_flag);
/// Messages, so that it's not boring to wait for the server to load for a long time.
logAboutProgress(log, ++tables_processed, total_tables, watch);
});
}
}
pool.wait();
/// Attach tables.
for (const auto & name_with_query : file_names)
{
const auto & create_query = name_with_query.second->as<const ASTCreateQuery &>();
if (!create_query.is_dictionary)
{
pool.scheduleOrThrowOnError([&]()
{
tryAttachTable(
local_context,
create_query,
*this,
database_name,
getMetadataPath() + name_with_query.first,
has_force_restore_data_flag);
/// Messages, so that it's not boring to wait for the server to load for a long time.
logAboutProgress(log, ++tables_processed, total_tables, watch);
});
}
}
pool.wait();
if (!skip_startup_tables)
{
/// After all tables was basically initialized, startup them.
startupTablesImpl(pool);
}
LOG_INFO(log, "Metadata processed, database {} has {} tables and {} dictionaries in total.",
database_name, tables_in_database, dictionaries_in_database);
}
void DatabaseOrdinary::startupTables()
void DatabaseOrdinary::loadTableFromMetadata(ContextMutablePtr local_context, const String & file_path, const QualifiedTableName & name, const ASTPtr & ast, bool force_restore)
{
ThreadPool pool;
startupTablesImpl(pool);
assert(name.database == database_name);
const auto & create_query = ast->as<const ASTCreateQuery &>();
tryAttachTable(
local_context,
create_query,
*this,
name.database,
file_path,
force_restore);
}
void DatabaseOrdinary::startupTablesImpl(ThreadPool & thread_pool)
void DatabaseOrdinary::startupTables(ThreadPool & thread_pool, bool /*force_restore*/, bool /*force_attach*/)
{
LOG_INFO(log, "Starting up tables.");
@ -240,6 +262,7 @@ void DatabaseOrdinary::startupTablesImpl(ThreadPool & thread_pool)
}
catch (...)
{
/// We have to wait for jobs to finish here, because job function has reference to variables on the stack of current thread.
thread_pool.wait();
throw;
}

View File

@ -21,9 +21,15 @@ public:
String getEngineName() const override { return "Ordinary"; }
void loadStoredObjects(ContextMutablePtr context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables) override;
void loadStoredObjects(ContextMutablePtr context, bool force_restore, bool force_attach, bool skip_startup_tables) override;
void startupTables() override;
bool supportsLoadingInTopologicalOrder() const override { return true; }
void loadTablesMetadata(ContextPtr context, ParsedTablesMetadata & metadata) override;
void loadTableFromMetadata(ContextMutablePtr local_context, const String & file_path, const QualifiedTableName & name, const ASTPtr & ast, bool force_restore) override;
void startupTables(ThreadPool & thread_pool, bool force_restore, bool force_attach) override;
void alterTable(
ContextPtr context,
@ -37,8 +43,6 @@ protected:
const String & table_metadata_path,
const String & statement,
ContextPtr query_context);
void startupTablesImpl(ThreadPool & thread_pool);
};
}

View File

@ -305,13 +305,21 @@ void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPt
createEmptyLogEntry(current_zookeeper);
}
void DatabaseReplicated::loadStoredObjects(
ContextMutablePtr local_context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables)
void DatabaseReplicated::beforeLoadingMetadata(ContextMutablePtr /*context*/, bool /*force_restore*/, bool force_attach)
{
tryConnectToZooKeeperAndInitDatabase(force_attach);
}
DatabaseAtomic::loadStoredObjects(local_context, has_force_restore_data_flag, force_attach, skip_startup_tables);
void DatabaseReplicated::loadStoredObjects(
ContextMutablePtr local_context, bool force_restore, bool force_attach, bool skip_startup_tables)
{
beforeLoadingMetadata(local_context, force_restore, force_attach);
DatabaseAtomic::loadStoredObjects(local_context, force_restore, force_attach, skip_startup_tables);
}
void DatabaseReplicated::startupTables(ThreadPool & thread_pool, bool force_restore, bool force_attach)
{
DatabaseAtomic::startupTables(thread_pool, force_restore, force_attach);
ddl_worker = std::make_unique<DatabaseReplicatedDDLWorker>(this, getContext());
ddl_worker->startup();
}

View File

@ -57,7 +57,12 @@ public:
void drop(ContextPtr /*context*/) override;
void loadStoredObjects(ContextMutablePtr context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables) override;
void loadStoredObjects(ContextMutablePtr context, bool force_restore, bool force_attach, bool skip_startup_tables) override;
void beforeLoadingMetadata(ContextMutablePtr context, bool force_restore, bool force_attach) override;
void startupTables(ThreadPool & thread_pool, bool force_restore, bool force_attach) override;
void shutdown() override;
friend struct DatabaseReplicatedTask;

View File

@ -5,6 +5,7 @@
#include <Storages/IStorage_fwd.h>
#include <Interpreters/Context_fwd.h>
#include <Common/Exception.h>
#include <Common/ThreadPool.h>
#include <Core/UUID.h>
#include <ctime>
@ -27,11 +28,14 @@ class ASTCreateQuery;
class AlterCommands;
class SettingsChanges;
using DictionariesWithID = std::vector<std::pair<String, UUID>>;
struct ParsedTablesMetadata;
struct QualifiedTableName;
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int CANNOT_GET_CREATE_TABLE_QUERY;
extern const int LOGICAL_ERROR;
}
class IDatabaseTablesIterator
@ -127,13 +131,32 @@ public:
/// You can call only once, right after the object is created.
virtual void loadStoredObjects(
ContextMutablePtr /*context*/,
bool /*has_force_restore_data_flag*/,
bool /*force_restore*/,
bool /*force_attach*/ = false,
bool /* skip_startup_tables */ = false)
{
}
virtual void startupTables() {}
virtual bool supportsLoadingInTopologicalOrder() const { return false; }
virtual void beforeLoadingMetadata(
ContextMutablePtr /*context*/,
bool /*force_restore*/,
bool /*force_attach*/)
{
}
virtual void loadTablesMetadata(ContextPtr /*local_context*/, ParsedTablesMetadata & /*metadata*/)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented");
}
virtual void loadTableFromMetadata(ContextMutablePtr /*local_context*/, const String & /*file_path*/, const QualifiedTableName & /*name*/, const ASTPtr & /*ast*/, bool /*force_restore*/)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented");
}
virtual void startupTables(ThreadPool & /*thread_pool*/, bool /*force_restore*/, bool /*force_attach*/) {}
/// Check the existence of the table.
virtual bool isTableExist(const String & name, ContextPtr context) const = 0;

View File

@ -94,10 +94,10 @@ void DatabaseMaterializedMySQL<Base>::setException(const std::exception_ptr & ex
}
template <typename Base>
void DatabaseMaterializedMySQL<Base>::loadStoredObjects(
ContextMutablePtr context_, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables)
void DatabaseMaterializedMySQL<Base>::startupTables(ThreadPool & thread_pool, bool force_restore, bool force_attach)
{
Base::loadStoredObjects(context_, has_force_restore_data_flag, force_attach, skip_startup_tables);
Base::startupTables(thread_pool, force_restore, force_attach);
if (!force_attach)
materialize_thread.assertMySQLAvailable();

View File

@ -43,7 +43,7 @@ protected:
public:
String getEngineName() const override { return "MaterializedMySQL"; }
void loadStoredObjects(ContextMutablePtr context_, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables) override;
void startupTables(ThreadPool & thread_pool, bool force_restore, bool force_attach) override;
void createTable(ContextPtr context_, const String & name, const StoragePtr & table, const ASTPtr & query) override;

View File

@ -108,11 +108,9 @@ void DatabaseMaterializedPostgreSQL::startSynchronization()
}
void DatabaseMaterializedPostgreSQL::loadStoredObjects(
ContextMutablePtr local_context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables)
void DatabaseMaterializedPostgreSQL::startupTables(ThreadPool & thread_pool, bool force_restore, bool force_attach)
{
DatabaseAtomic::loadStoredObjects(local_context, has_force_restore_data_flag, force_attach, skip_startup_tables);
DatabaseAtomic::startupTables(thread_pool, force_restore, force_attach);
try
{
startSynchronization();

View File

@ -42,7 +42,7 @@ public:
String getMetadataPath() const override { return metadata_path; }
void loadStoredObjects(ContextMutablePtr, bool, bool force_attach, bool skip_startup_tables) override;
void startupTables(ThreadPool & thread_pool, bool force_restore, bool force_attach) override;
DatabaseTablesIteratorPtr
getTablesIterator(ContextPtr context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const override;

View File

@ -0,0 +1,255 @@
#include <Databases/TablesLoader.h>
#include <Databases/IDatabase.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExternalDictionariesLoader.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <common/logger_useful.h>
#include <Common/ThreadPool.h>
#include <numeric>
namespace DB
{
namespace ErrorCodes
{
extern const int INFINITE_LOOP;
extern const int LOGICAL_ERROR;
}
static constexpr size_t PRINT_MESSAGE_EACH_N_OBJECTS = 256;
static constexpr size_t PRINT_MESSAGE_EACH_N_SECONDS = 5;
void logAboutProgress(Poco::Logger * log, size_t processed, size_t total, AtomicStopwatch & watch)
{
if (processed % PRINT_MESSAGE_EACH_N_OBJECTS == 0 || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS))
{
LOG_INFO(log, "{}%", processed * 100.0 / total);
watch.restart();
}
}
TablesLoader::TablesLoader(ContextMutablePtr global_context_, Databases databases_, bool force_restore_, bool force_attach_)
: global_context(global_context_)
, databases(std::move(databases_))
, force_restore(force_restore_)
, force_attach(force_attach_)
{
metadata.default_database = global_context->getCurrentDatabase();
log = &Poco::Logger::get("TablesLoader");
}
void TablesLoader::loadTables()
{
bool need_resolve_dependencies = !global_context->getConfigRef().has("ignore_table_dependencies_on_metadata_loading");
/// Load all Lazy, MySQl, PostgreSQL, SQLite, etc databases first.
for (auto & database : databases)
{
if (need_resolve_dependencies && database.second->supportsLoadingInTopologicalOrder())
databases_to_load.push_back(database.first);
else
database.second->loadStoredObjects(global_context, force_restore, force_attach, true);
}
if (databases_to_load.empty())
return;
/// Read and parse metadata from Ordinary, Atomic, Materialized*, Replicated, etc databases. Build dependency graph.
for (auto & database_name : databases_to_load)
{
databases[database_name]->beforeLoadingMetadata(global_context, force_restore, force_attach);
databases[database_name]->loadTablesMetadata(global_context, metadata);
}
LOG_INFO(log, "Parsed metadata of {} tables in {} databases in {} sec",
metadata.parsed_tables.size(), databases_to_load.size(), stopwatch.elapsedSeconds());
stopwatch.restart();
logDependencyGraph();
/// Some tables were loaded by database with loadStoredObjects(...). Remove them from graph if necessary.
removeUnresolvableDependencies();
loadTablesInTopologicalOrder(pool);
}
void TablesLoader::startupTables()
{
/// Startup tables after all tables are loaded. Background tasks (merges, mutations, etc) may slow down data parts loading.
for (auto & database : databases)
database.second->startupTables(pool, force_restore, force_attach);
}
void TablesLoader::removeUnresolvableDependencies()
{
auto need_exclude_dependency = [this](const QualifiedTableName & dependency_name, const DependenciesInfo & info)
{
/// Table exists and will be loaded
if (metadata.parsed_tables.contains(dependency_name))
return false;
/// Table exists and it's already loaded
if (DatabaseCatalog::instance().isTableExist(StorageID(dependency_name.database, dependency_name.table), global_context))
return true;
/// It's XML dictionary. It was loaded before tables and DDL dictionaries.
if (dependency_name.database == metadata.default_database &&
global_context->getExternalDictionariesLoader().has(dependency_name.table))
return true;
/// Some tables depends on table "dependency_name", but there is no such table in DatabaseCatalog and we don't have its metadata.
/// We will ignore it and try to load dependent tables without "dependency_name"
/// (but most likely dependent tables will fail to load).
LOG_WARNING(log, "Tables {} depend on {}, but seems like the it does not exist. Will ignore it and try to load existing tables",
fmt::join(info.dependent_database_objects, ", "), dependency_name);
if (info.dependencies_count)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Table {} does not exist, but we have seen its AST and found {} dependencies."
"It's a bug", dependency_name, info.dependencies_count);
if (info.dependent_database_objects.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Table {} does not have dependencies and dependent tables as it expected to."
"It's a bug", dependency_name);
return true;
};
auto table_it = metadata.dependencies_info.begin();
while (table_it != metadata.dependencies_info.end())
{
auto & info = table_it->second;
if (need_exclude_dependency(table_it->first, info))
table_it = removeResolvedDependency(table_it, metadata.independent_database_objects);
else
++table_it;
}
}
void TablesLoader::loadTablesInTopologicalOrder(ThreadPool & pool)
{
/// Load independent tables in parallel.
/// Then remove loaded tables from dependency graph, find tables/dictionaries that do not have unresolved dependencies anymore,
/// move them to the list of independent tables and load.
/// Repeat until we have some tables to load.
/// If we do not, then either all objects are loaded or there is cyclic dependency.
/// Complexity: O(V + E)
size_t level = 0;
do
{
assert(metadata.parsed_tables.size() == tables_processed + metadata.independent_database_objects.size() + getNumberOfTablesWithDependencies());
logDependencyGraph();
startLoadingIndependentTables(pool, level);
TableNames new_independent_database_objects;
for (const auto & table_name : metadata.independent_database_objects)
{
auto info_it = metadata.dependencies_info.find(table_name);
if (info_it == metadata.dependencies_info.end())
{
/// No tables depend on table_name and it was not even added to dependencies_info
continue;
}
removeResolvedDependency(info_it, new_independent_database_objects);
}
pool.wait();
metadata.independent_database_objects = std::move(new_independent_database_objects);
++level;
} while (!metadata.independent_database_objects.empty());
checkCyclicDependencies();
}
DependenciesInfosIter TablesLoader::removeResolvedDependency(const DependenciesInfosIter & info_it, TableNames & independent_database_objects)
{
auto & info = info_it->second;
if (info.dependencies_count)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Table {} is in list of independent tables, but dependencies count is {}."
"It's a bug", info_it->first, info.dependencies_count);
if (info.dependent_database_objects.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Table {} does not have dependent tables. It's a bug", info_it->first);
/// Decrement number of dependencies for each dependent table
for (auto & dependent_table : info.dependent_database_objects)
{
auto & dependent_info = metadata.dependencies_info[dependent_table];
auto & dependencies_count = dependent_info.dependencies_count;
if (dependencies_count == 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to decrement 0 dependencies counter for {}. It's a bug", dependent_table);
--dependencies_count;
if (dependencies_count == 0)
{
independent_database_objects.push_back(dependent_table);
if (dependent_info.dependent_database_objects.empty())
metadata.dependencies_info.erase(dependent_table);
}
}
return metadata.dependencies_info.erase(info_it);
}
void TablesLoader::startLoadingIndependentTables(ThreadPool & pool, size_t level)
{
size_t total_tables = metadata.parsed_tables.size();
LOG_INFO(log, "Loading {} tables with {} dependency level", metadata.independent_database_objects.size(), level);
for (const auto & table_name : metadata.independent_database_objects)
{
pool.scheduleOrThrowOnError([this, total_tables, &table_name]()
{
const auto & path_and_query = metadata.parsed_tables[table_name];
databases[table_name.database]->loadTableFromMetadata(global_context, path_and_query.path, table_name, path_and_query.ast, force_restore);
logAboutProgress(log, ++tables_processed, total_tables, stopwatch);
});
}
}
size_t TablesLoader::getNumberOfTablesWithDependencies() const
{
size_t number_of_tables_with_dependencies = 0;
for (const auto & info : metadata.dependencies_info)
if (info.second.dependencies_count)
++number_of_tables_with_dependencies;
return number_of_tables_with_dependencies;
}
void TablesLoader::checkCyclicDependencies() const
{
/// Loading is finished if all dependencies are resolved
if (metadata.dependencies_info.empty())
return;
for (const auto & info : metadata.dependencies_info)
{
LOG_WARNING(log, "Cannot resolve dependencies: Table {} have {} dependencies and {} dependent tables. List of dependent tables: {}",
info.first, info.second.dependencies_count,
info.second.dependent_database_objects.size(), fmt::join(info.second.dependent_database_objects, ", "));
assert(info.second.dependencies_count == 0);
}
throw Exception(ErrorCodes::INFINITE_LOOP, "Cannot attach {} tables due to cyclic dependencies. "
"See server log for details.", metadata.dependencies_info.size());
}
void TablesLoader::logDependencyGraph() const
{
LOG_TEST(log, "Have {} independent tables: {}",
metadata.independent_database_objects.size(),
fmt::join(metadata.independent_database_objects, ", "));
for (const auto & dependencies : metadata.dependencies_info)
{
LOG_TEST(log,
"Table {} have {} dependencies and {} dependent tables. List of dependent tables: {}",
dependencies.first,
dependencies.second.dependencies_count,
dependencies.second.dependent_database_objects.size(),
fmt::join(dependencies.second.dependent_database_objects, ", "));
}
}
}

View File

@ -0,0 +1,112 @@
#pragma once
#include <Core/Types.h>
#include <Core/QualifiedTableName.h>
#include <Parsers/IAST_fwd.h>
#include <Interpreters/Context_fwd.h>
#include <Common/ThreadPool.h>
#include <Common/Stopwatch.h>
#include <map>
#include <unordered_map>
#include <unordered_set>
#include <mutex>
namespace Poco
{
class Logger;
}
class AtomicStopwatch;
namespace DB
{
void logAboutProgress(Poco::Logger * log, size_t processed, size_t total, AtomicStopwatch & watch);
class IDatabase;
using DatabasePtr = std::shared_ptr<IDatabase>;
struct ParsedTableMetadata
{
String path;
ASTPtr ast;
};
using ParsedMetadata = std::map<QualifiedTableName, ParsedTableMetadata>;
using TableNames = std::vector<QualifiedTableName>;
struct DependenciesInfo
{
/// How many dependencies this table have
size_t dependencies_count = 0;
/// List of tables/dictionaries which depend on this table/dictionary
TableNames dependent_database_objects;
};
using DependenciesInfos = std::unordered_map<QualifiedTableName, DependenciesInfo>;
using DependenciesInfosIter = std::unordered_map<QualifiedTableName, DependenciesInfo>::iterator;
struct ParsedTablesMetadata
{
String default_database;
std::mutex mutex;
ParsedMetadata parsed_tables;
/// For logging
size_t total_dictionaries = 0;
/// List of tables/dictionaries that do not have any dependencies and can be loaded
TableNames independent_database_objects;
/// Actually it contains two different maps (with, probably, intersecting keys):
/// 1. table/dictionary name -> number of dependencies
/// 2. table/dictionary name -> dependent tables/dictionaries list (adjacency list of dependencies graph).
/// If table A depends on table B, then there is an edge B --> A, i.e. dependencies_info[B].dependent_database_objects contains A.
/// And dependencies_info[C].dependencies_count is a number of incoming edges for vertex C (how many tables we have to load before C).
DependenciesInfos dependencies_info;
};
/// Loads tables (and dictionaries) from specified databases
/// taking into account dependencies between them.
class TablesLoader
{
public:
using Databases = std::map<String, DatabasePtr>;
TablesLoader(ContextMutablePtr global_context_, Databases databases_, bool force_restore_ = false, bool force_attach_ = false);
TablesLoader() = delete;
void loadTables();
void startupTables();
private:
ContextMutablePtr global_context;
Databases databases;
bool force_restore;
bool force_attach;
Strings databases_to_load;
ParsedTablesMetadata metadata;
Poco::Logger * log;
std::atomic<size_t> tables_processed{0};
AtomicStopwatch stopwatch;
ThreadPool pool;
void removeUnresolvableDependencies();
void loadTablesInTopologicalOrder(ThreadPool & pool);
DependenciesInfosIter removeResolvedDependency(const DependenciesInfosIter & info_it, TableNames & independent_database_objects);
void startLoadingIndependentTables(ThreadPool & pool, size_t level);
void checkCyclicDependencies() const;
size_t getNumberOfTablesWithDependencies() const;
void logDependencyGraph() const;
};
}

View File

@ -1,6 +1,7 @@
#include "PostgreSQLDictionarySource.h"
#include <Poco/Util/AbstractConfiguration.h>
#include <Core/QualifiedTableName.h>
#include "DictionarySourceFactory.h"
#include "registerDictionaries.h"
@ -29,19 +30,13 @@ namespace
{
ExternalQueryBuilder makeExternalQueryBuilder(const DictionaryStructure & dict_struct, const String & schema, const String & table, const String & query, const String & where)
{
auto schema_value = schema;
auto table_value = table;
QualifiedTableName qualified_name{schema, table};
if (qualified_name.database.empty() && !qualified_name.table.empty())
qualified_name = QualifiedTableName::parseFromString(qualified_name.table);
if (schema_value.empty())
{
if (auto pos = table_value.find('.'); pos != std::string::npos)
{
schema_value = table_value.substr(0, pos);
table_value = table_value.substr(pos + 1);
}
}
/// Do not need db because it is already in a connection string.
return {dict_struct, "", schema_value, table_value, query, where, IdentifierQuotingStyle::DoubleQuotes};
return {dict_struct, "", qualified_name.database, qualified_name.table, query, where, IdentifierQuotingStyle::DoubleQuotes};
}
}

View File

@ -38,29 +38,22 @@ namespace
const std::string & where_,
IXDBCBridgeHelper & bridge_)
{
std::string schema = schema_;
std::string table = table_;
QualifiedTableName qualified_name{schema_, table_};
if (bridge_.isSchemaAllowed())
{
if (schema.empty())
{
if (auto pos = table.find('.'); pos != std::string::npos)
{
schema = table.substr(0, pos);
table = table.substr(pos + 1);
}
}
if (qualified_name.database.empty())
qualified_name = QualifiedTableName::parseFromString(qualified_name.table);
}
else
{
if (!schema.empty())
if (!qualified_name.database.empty())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Dictionary source of type {} specifies a schema but schema is not supported by {}-driver",
bridge_.getName());
}
return {dict_struct_, db_, schema, table, query_, where_, bridge_.getIdentifierQuotingStyle()};
return {dict_struct_, db_, qualified_name.database, qualified_name.table, query_, where_, bridge_.getIdentifierQuotingStyle()};
}
}

View File

@ -4,7 +4,6 @@
#include <Poco/DOM/Document.h>
#include <Poco/DOM/Element.h>
#include <Poco/DOM/Text.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Util/XMLConfiguration.h>
#include <IO/WriteHelpers.h>
#include <Parsers/queryToString.h>
@ -16,6 +15,8 @@
#include <Parsers/ASTDictionaryAttributeDeclaration.h>
#include <Dictionaries/DictionaryFactory.h>
#include <Functions/FunctionFactory.h>
#include <Common/isLocalAddress.h>
#include <Interpreters/Context.h>
namespace DB
@ -576,4 +577,28 @@ getDictionaryConfigurationFromAST(const ASTCreateQuery & query, ContextPtr conte
return conf;
}
std::optional<ClickHouseDictionarySourceInfo>
getInfoIfClickHouseDictionarySource(DictionaryConfigurationPtr & config, ContextPtr global_context)
{
ClickHouseDictionarySourceInfo info;
String host = config->getString("dictionary.source.clickhouse.host", "");
UInt16 port = config->getUInt("dictionary.source.clickhouse.port", 0);
String database = config->getString("dictionary.source.clickhouse.db", "");
String table = config->getString("dictionary.source.clickhouse.table", "");
bool secure = config->getBool("dictionary.source.clickhouse.secure", false);
if (host.empty() || port == 0 || table.empty())
return {};
info.table_name = {database, table};
UInt16 default_port = secure ? global_context->getTCPPortSecure().value_or(0) : global_context->getTCPPort();
if (!isLocalAddress({host, port}, default_port))
return info;
info.is_local = true;
return info;
}
}

View File

@ -15,4 +15,13 @@ using DictionaryConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfigurati
DictionaryConfigurationPtr
getDictionaryConfigurationFromAST(const ASTCreateQuery & query, ContextPtr context, const std::string & database_ = "");
struct ClickHouseDictionarySourceInfo
{
QualifiedTableName table_name;
bool is_local = false;
};
std::optional<ClickHouseDictionarySourceInfo>
getInfoIfClickHouseDictionarySource(DictionaryConfigurationPtr & config, ContextPtr global_context);
}

View File

@ -48,22 +48,11 @@ getJoin(const ColumnsWithTypeAndName & arguments, ContextPtr context)
"Illegal type " + arguments[0].type->getName() + " of first argument of function joinGet, expected a const string.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
size_t dot = join_name.find('.');
String database_name;
if (dot == String::npos)
{
database_name = context->getCurrentDatabase();
dot = 0;
}
else
{
database_name = join_name.substr(0, dot);
++dot;
}
String table_name = join_name.substr(dot);
if (table_name.empty())
throw Exception("joinGet does not allow empty table name", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
auto table = DatabaseCatalog::instance().getTable({database_name, table_name}, std::const_pointer_cast<Context>(context));
auto qualified_name = QualifiedTableName::parseFromString(join_name);
if (qualified_name.database.empty())
qualified_name.database = context->getCurrentDatabase();
auto table = DatabaseCatalog::instance().getTable({qualified_name.database, qualified_name.table}, std::const_pointer_cast<Context>(context));
auto storage_join = std::dynamic_pointer_cast<StorageJoin>(table);
if (!storage_join)
throw Exception("Table " + join_name + " should have engine StorageJoin", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);

View File

@ -157,15 +157,6 @@ void DatabaseCatalog::loadDatabases()
/// Another background thread which drops temporary LiveViews.
/// We should start it after loadMarkedAsDroppedTables() to avoid race condition.
TemporaryLiveViewCleaner::instance().startup();
/// Start up tables after all databases are loaded.
for (const auto & [database_name, database] : databases)
{
if (database_name == DatabaseCatalog::TEMPORARY_DATABASE)
continue;
database->startupTables();
}
}
void DatabaseCatalog::shutdownImpl()

View File

@ -89,47 +89,40 @@ DictionaryStructure ExternalDictionariesLoader::getDictionaryStructure(const std
std::string ExternalDictionariesLoader::resolveDictionaryName(const std::string & dictionary_name, const std::string & current_database_name) const
{
bool has_dictionary = has(dictionary_name);
if (has_dictionary)
if (has(dictionary_name))
return dictionary_name;
std::string resolved_name = resolveDictionaryNameFromDatabaseCatalog(dictionary_name);
has_dictionary = has(resolved_name);
std::string resolved_name = resolveDictionaryNameFromDatabaseCatalog(dictionary_name, current_database_name);
if (!has_dictionary)
{
/// If dictionary not found. And database was not implicitly specified
/// we can qualify dictionary name with current database name.
/// It will help if dictionary is created with DDL and is in current database.
if (dictionary_name.find('.') == std::string::npos)
{
String dictionary_name_with_database = current_database_name + '.' + dictionary_name;
resolved_name = resolveDictionaryNameFromDatabaseCatalog(dictionary_name_with_database);
has_dictionary = has(resolved_name);
}
}
if (has(resolved_name))
return resolved_name;
if (!has_dictionary)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Dictionary ({}) not found", backQuote(dictionary_name));
return resolved_name;
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Dictionary ({}) not found", backQuote(dictionary_name));
}
std::string ExternalDictionariesLoader::resolveDictionaryNameFromDatabaseCatalog(const std::string & name) const
std::string ExternalDictionariesLoader::resolveDictionaryNameFromDatabaseCatalog(const std::string & name, const std::string & current_database_name) const
{
/// If it's dictionary from Atomic database, then we need to convert qualified name to UUID.
/// Try to split name and get id from associated StorageDictionary.
/// If something went wrong, return name as is.
auto pos = name.find('.');
if (pos == std::string::npos || name.find('.', pos + 1) != std::string::npos)
auto qualified_name = QualifiedTableName::tryParseFromString(name);
if (!qualified_name)
return name;
std::string maybe_database_name = name.substr(0, pos);
std::string maybe_table_name = name.substr(pos + 1);
if (qualified_name->database.empty())
{
/// Ether database name is not specified and we should use current one
/// or it's an XML dictionary.
bool is_xml_dictionary = has(name);
if (is_xml_dictionary)
return name;
else
qualified_name->database = current_database_name;
}
auto [db, table] = DatabaseCatalog::instance().tryGetDatabaseAndTable(
{maybe_database_name, maybe_table_name},
{qualified_name->database, qualified_name->table},
const_pointer_cast<Context>(getContext()));
if (!db)

View File

@ -42,7 +42,7 @@ protected:
std::string resolveDictionaryName(const std::string & dictionary_name, const std::string & current_database_name) const;
/// Try convert qualified dictionary name to persistent UUID
std::string resolveDictionaryNameFromDatabaseCatalog(const std::string & name) const;
std::string resolveDictionaryNameFromDatabaseCatalog(const std::string & name, const std::string & current_database_name) const;
friend class StorageSystemDictionaries;
friend class DatabaseDictionary;

View File

@ -53,6 +53,7 @@
#include <Databases/DatabaseReplicated.h>
#include <Databases/IDatabase.h>
#include <Databases/DatabaseOnDisk.h>
#include <Databases/TablesLoader.h>
#include <Compression/CompressionFactory.h>
@ -271,9 +272,13 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
renamed = true;
}
/// We use global context here, because storages lifetime is bigger than query context lifetime
database->loadStoredObjects(
getContext()->getGlobalContext(), has_force_restore_data_flag, create.attach && force_attach, skip_startup_tables); //-V560
if (!load_database_without_tables)
{
/// We use global context here, because storages lifetime is bigger than query context lifetime
TablesLoader loader{getContext()->getGlobalContext(), {{database_name, database}}, has_force_restore_data_flag, create.attach && force_attach}; //-V560
loader.loadTables();
loader.startupTables();
}
}
catch (...)
{

View File

@ -52,9 +52,9 @@ public:
force_attach = force_attach_;
}
void setSkipStartupTables(bool skip_startup_tables_)
void setLoadDatabaseWithoutTables(bool load_database_without_tables_)
{
skip_startup_tables = skip_startup_tables_;
load_database_without_tables = load_database_without_tables_;
}
/// Obtain information about columns, their types, default values and column comments,
@ -99,7 +99,7 @@ private:
/// Is this an internal query - not from the user.
bool internal = false;
bool force_attach = false;
bool skip_startup_tables = false;
bool load_database_without_tables = false;
mutable String as_database_saved;
mutable String as_table_saved;

View File

@ -355,6 +355,13 @@ BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query,
}
}
if (!drop && query.no_delay)
{
/// Avoid "some tables are still in use" when sync mode is enabled
for (const auto & table_uuid : uuids_to_wait)
database->waitDetachedTableNotInUse(table_uuid);
}
/// Protects from concurrent CREATE TABLE queries
auto db_guard = DatabaseCatalog::instance().getExclusiveDDLGuardForDatabase(database_name);

View File

@ -11,6 +11,7 @@
#include <Interpreters/loadMetadata.h>
#include <Databases/DatabaseOrdinary.h>
#include <Databases/TablesLoader.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
@ -43,7 +44,7 @@ static void executeCreateQuery(
interpreter.setInternal(true);
interpreter.setForceAttach(true);
interpreter.setForceRestoreData(has_force_restore_data_flag);
interpreter.setSkipStartupTables(true);
interpreter.setLoadDatabaseWithoutTables(true);
interpreter.execute();
}
@ -161,8 +162,16 @@ void loadMetadata(ContextMutablePtr context, const String & default_database_nam
if (create_default_db_if_not_exists && !metadata_dir_for_default_db_already_exists)
databases.emplace(default_database_name, path + "/" + escapeForFileName(default_database_name));
TablesLoader::Databases loaded_databases;
for (const auto & [name, db_path] : databases)
{
loadDatabase(context, name, db_path, has_force_restore_data_flag);
loaded_databases.insert({name, DatabaseCatalog::instance().getDatabase(name)});
}
TablesLoader loader{context, std::move(loaded_databases), has_force_restore_data_flag, /* force_attach */ true};
loader.loadTables();
loader.startupTables();
if (has_force_restore_data_flag)
{
@ -197,11 +206,28 @@ static void loadSystemDatabaseImpl(ContextMutablePtr context, const String & dat
}
}
void startupSystemTables()
{
ThreadPool pool;
DatabaseCatalog::instance().getSystemDatabase()->startupTables(pool, /* force_restore */ true, /* force_attach */ true);
}
void loadMetadataSystem(ContextMutablePtr context)
{
loadSystemDatabaseImpl(context, DatabaseCatalog::SYSTEM_DATABASE, "Atomic");
loadSystemDatabaseImpl(context, DatabaseCatalog::INFORMATION_SCHEMA, "Memory");
loadSystemDatabaseImpl(context, DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE, "Memory");
TablesLoader::Databases databases =
{
{DatabaseCatalog::SYSTEM_DATABASE, DatabaseCatalog::instance().getSystemDatabase()},
{DatabaseCatalog::INFORMATION_SCHEMA, DatabaseCatalog::instance().getDatabase(DatabaseCatalog::INFORMATION_SCHEMA)},
{DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE, DatabaseCatalog::instance().getDatabase(DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE)},
};
TablesLoader loader{context, databases, /* force_restore */ true, /* force_attach */ true};
loader.loadTables();
/// Will startup tables in system database after all databases are loaded.
}
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <Interpreters/Context_fwd.h>
#include <Databases/TablesLoader.h>
namespace DB
@ -14,4 +15,8 @@ void loadMetadataSystem(ContextMutablePtr context);
/// Use separate function to load system tables.
void loadMetadata(ContextMutablePtr context, const String & default_database_name = {});
/// Background operations in system tables may slowdown loading of the rest tables,
/// so we startup system tables after all databases are loaded.
void startupSystemTables();
}

View File

@ -93,14 +93,8 @@ void TableFunctionRemote::parseArguments(const ASTPtr & ast_function, ContextPtr
++arg_num;
size_t dot = remote_database.find('.');
if (dot != String::npos)
{
/// NOTE Bad - do not support identifiers in backquotes.
remote_table = remote_database.substr(dot + 1);
remote_database = remote_database.substr(0, dot);
}
else
auto qualified_name = QualifiedTableName::parseFromString(remote_database);
if (qualified_name.database.empty())
{
if (arg_num >= args.size())
{
@ -108,11 +102,15 @@ void TableFunctionRemote::parseArguments(const ASTPtr & ast_function, ContextPtr
}
else
{
std::swap(qualified_name.database, qualified_name.table);
args[arg_num] = evaluateConstantExpressionOrIdentifierAsLiteral(args[arg_num], context);
remote_table = args[arg_num]->as<ASTLiteral &>().value.safeGet<String>();
qualified_name.table = args[arg_num]->as<ASTLiteral &>().value.safeGet<String>();
++arg_num;
}
}
remote_database = std::move(qualified_name.database);
remote_table = std::move(qualified_name.table);
}
/// Cluster function may have sharding key for insert

View File

@ -6,7 +6,7 @@ DICTIONARY_FILES = ['configs/dictionaries/dep_x.xml', 'configs/dictionaries/dep_
'configs/dictionaries/dep_z.xml']
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance', dictionaries=DICTIONARY_FILES)
instance = cluster.add_instance('instance', dictionaries=DICTIONARY_FILES, stay_alive=True)
@pytest.fixture(scope="module")
@ -73,3 +73,45 @@ def test_get_data(started_cluster):
assert query("SELECT dictGetString('dep_x', 'a', toUInt64(4))") == "ether\n"
assert query("SELECT dictGetString('dep_y', 'a', toUInt64(4))") == "ether\n"
assert query("SELECT dictGetString('dep_z', 'a', toUInt64(4))") == "ether\n"
def dependent_tables_assert():
res = instance.query("select database || '.' || name from system.tables")
assert "system.join" in res
assert "default.src" in res
assert "dict.dep_y" in res
assert "lazy.log" in res
assert "test.d" in res
assert "default.join" in res
assert "a.t" in res
def test_dependent_tables(started_cluster):
query = instance.query
query("create database lazy engine=Lazy(10)")
query("create database a")
query("create table lazy.src (n int, m int) engine=Log")
query("create dictionary a.d (n int default 0, m int default 42) primary key n "
"source(clickhouse(host 'localhost' port tcpPort() user 'default' table 'src' password '' db 'lazy'))"
"lifetime(min 1 max 10) layout(flat())")
query("create table system.join (n int, m int) engine=Join(any, left, n)")
query("insert into system.join values (1, 1)")
query("create table src (n int, m default joinGet('system.join', 'm', 1::int),"
"t default dictGetOrNull('a.d', 'm', toUInt64(3)),"
"k default dictGet('a.d', 'm', toUInt64(4))) engine=MergeTree order by n")
query("create dictionary test.d (n int default 0, m int default 42) primary key n "
"source(clickhouse(host 'localhost' port tcpPort() user 'default' table 'src' password '' db 'default'))"
"lifetime(min 1 max 10) layout(flat())")
query("create table join (n int, m default dictGet('a.d', 'm', toUInt64(3)),"
"k default dictGet('test.d', 'm', toUInt64(0))) engine=Join(any, left, n)")
query("create table lazy.log (n default dictGet(test.d, 'm', toUInt64(0))) engine=Log")
query("create table a.t (n default joinGet('system.join', 'm', 1::int),"
"m default dictGet('test.d', 'm', toUInt64(3)),"
"k default joinGet(join, 'm', 1::int)) engine=MergeTree order by n")
dependent_tables_assert()
instance.restart_clickhouse()
dependent_tables_assert()
query("drop database a")
query("drop database lazy")
query("drop table src")
query("drop table join")
query("drop table system.join")

View File

@ -0,0 +1,6 @@
dict1
dict2
dict_src
join
s
t

View File

@ -0,0 +1,45 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "drop table if exists dict_src;"
$CLICKHOUSE_CLIENT -q "drop dictionary if exists dict1;"
$CLICKHOUSE_CLIENT -q "drop dictionary if exists dict2;"
$CLICKHOUSE_CLIENT -q "drop table if exists join;"
$CLICKHOUSE_CLIENT -q "drop table if exists t;"
$CLICKHOUSE_CLIENT -q "create table dict_src (n int, m int, s String) engine=MergeTree order by n;"
$CLICKHOUSE_CLIENT -q "create dictionary dict1 (n int default 0, m int default 1, s String default 'qqq')
PRIMARY KEY n
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'dict_src' PASSWORD '' DB '$CLICKHOUSE_DATABASE'))
LIFETIME(MIN 1 MAX 10) LAYOUT(FLAT());"
$CLICKHOUSE_CLIENT -q "create table join(n int, m int default dictGet('$CLICKHOUSE_DATABASE.dict1', 'm', 42::UInt64)) engine=Join(any, left, n);"
$CLICKHOUSE_CLIENT -q "create dictionary dict2 (n int default 0, m int DEFAULT 2, s String default 'asd')
PRIMARY KEY n
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'join' PASSWORD '' DB '$CLICKHOUSE_DATABASE'))
LIFETIME(MIN 1 MAX 10) LAYOUT(FLAT());"
$CLICKHOUSE_CLIENT -q "create table s (x default joinGet($CLICKHOUSE_DATABASE.join, 'm', 42::int)) engine=Set"
$CLICKHOUSE_CLIENT -q "create table t (n int, m int default joinGet($CLICKHOUSE_DATABASE.join, 'm', 42::int),
s String default dictGet($CLICKHOUSE_DATABASE.dict1, 's', 42::UInt64), x default in(1, $CLICKHOUSE_DATABASE.s)) engine=MergeTree order by n;"
CLICKHOUSE_CLIENT_DEFAULT_DB=$(echo ${CLICKHOUSE_CLIENT} | sed 's/'"--database=${CLICKHOUSE_DATABASE}"'/--database=default/g')
for _ in {1..10}; do
$CLICKHOUSE_CLIENT_DEFAULT_DB -q "detach database $CLICKHOUSE_DATABASE;"
$CLICKHOUSE_CLIENT_DEFAULT_DB -q "attach database $CLICKHOUSE_DATABASE;"
done
$CLICKHOUSE_CLIENT -q "show tables from $CLICKHOUSE_DATABASE;"
$CLICKHOUSE_CLIENT -q "drop table dict_src;"
$CLICKHOUSE_CLIENT -q "drop dictionary dict1;"
$CLICKHOUSE_CLIENT -q "drop dictionary dict2;"
$CLICKHOUSE_CLIENT -q "drop table join;"
$CLICKHOUSE_CLIENT -q "drop table t;"

View File

@ -1,4 +1,4 @@
SELECT * FROM remote('127..2', 'a.'); -- { serverError 36 }
SELECT * FROM remote('127..2', 'a.'); -- { serverError 62 }
-- Clear cache to avoid future errors in the logs
SYSTEM DROP DNS CACHE