2021-08-26 13:19:52 +00:00
|
|
|
#include <Databases/TablesLoader.h>
|
2021-08-31 08:53:48 +00:00
|
|
|
#include <Databases/IDatabase.h>
|
2021-11-01 18:53:07 +00:00
|
|
|
#include <Databases/DDLDependencyVisitor.h>
|
2022-12-02 14:05:46 +00:00
|
|
|
#include <Databases/DDLLoadingDependencyVisitor.h>
|
2021-08-31 08:53:48 +00:00
|
|
|
#include <Interpreters/DatabaseCatalog.h>
|
|
|
|
#include <Interpreters/Context.h>
|
2021-09-01 19:42:49 +00:00
|
|
|
#include <Interpreters/ExternalDictionariesLoader.h>
|
2021-08-31 08:53:48 +00:00
|
|
|
#include <Poco/Util/AbstractConfiguration.h>
|
2022-04-27 15:05:45 +00:00
|
|
|
#include <Common/logger_useful.h>
|
2021-08-31 08:53:48 +00:00
|
|
|
#include <Common/ThreadPool.h>
|
2023-03-22 07:49:22 +00:00
|
|
|
#include <Common/CurrentMetrics.h>
|
2021-09-01 19:42:49 +00:00
|
|
|
#include <numeric>
|
2021-08-26 13:19:52 +00:00
|
|
|
|
2023-04-26 18:25:39 +00:00
|
|
|
|
2023-03-22 07:49:22 +00:00
|
|
|
namespace CurrentMetrics
|
|
|
|
{
|
2023-04-26 18:25:39 +00:00
|
|
|
extern const Metric AsyncLoaderThreads;
|
|
|
|
extern const Metric AsyncLoaderThreadsActive;
|
2023-03-22 07:49:22 +00:00
|
|
|
}
|
|
|
|
|
2021-08-26 13:19:52 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2021-08-31 08:53:48 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2021-09-02 13:34:46 +00:00
|
|
|
extern const int LOGICAL_ERROR;
|
2021-08-31 08:53:48 +00:00
|
|
|
}
|
|
|
|
|
2022-07-20 20:54:43 +00:00
|
|
|
TablesLoader::TablesLoader(ContextMutablePtr global_context_, Databases databases_, LoadingStrictnessLevel strictness_mode_)
|
2023-03-22 07:49:22 +00:00
|
|
|
: global_context(global_context_)
|
|
|
|
, databases(std::move(databases_))
|
|
|
|
, strictness_mode(strictness_mode_)
|
|
|
|
, referential_dependencies("ReferentialDeps")
|
|
|
|
, loading_dependencies("LoadingDeps")
|
|
|
|
, all_loading_dependencies("LoadingDeps")
|
2023-04-26 18:25:39 +00:00
|
|
|
, async_loader(global_context->getAsyncLoader())
|
|
|
|
, pool(CurrentMetrics::AsyncLoaderThreads, CurrentMetrics::AsyncLoaderThreadsActive)
|
2021-08-26 13:19:52 +00:00
|
|
|
{
|
2021-09-13 19:11:16 +00:00
|
|
|
metadata.default_database = global_context->getCurrentDatabase();
|
2021-08-26 13:19:52 +00:00
|
|
|
log = &Poco::Logger::get("TablesLoader");
|
|
|
|
}
|
|
|
|
|
2023-04-26 18:25:39 +00:00
|
|
|
void TablesLoader::createTasks(LoadJobSet load_after)
|
|
|
|
{
|
|
|
|
bool need_resolve_dependencies = !global_context->getConfigRef().has("ignore_table_dependencies_on_metadata_loading");
|
|
|
|
|
|
|
|
/// Load all Lazy, MySQl, PostgreSQL, SQLite, etc databases first.
|
|
|
|
for (auto & database : databases)
|
|
|
|
{
|
|
|
|
if (need_resolve_dependencies && database.second->supportsLoadingInTopologicalOrder())
|
|
|
|
databases_to_load.push_back(database.first);
|
|
|
|
else
|
|
|
|
load_tables.push_back(database.second->loadStoredObjectsAsync(async_loader, load_after, global_context, strictness_mode, /* skip_startup_tables */ true));
|
|
|
|
}
|
|
|
|
|
|
|
|
if (databases_to_load.empty())
|
|
|
|
return;
|
|
|
|
|
|
|
|
/// Read and parse metadata from Ordinary, Atomic, Materialized*, Replicated, etc databases. Build dependency graph.
|
|
|
|
for (auto & database_name : databases_to_load)
|
|
|
|
{
|
|
|
|
databases[database_name]->beforeLoadingMetadata(global_context, strictness_mode);
|
|
|
|
bool is_startup = LoadingStrictnessLevel::FORCE_ATTACH <= strictness_mode;
|
|
|
|
databases[database_name]->loadTablesMetadata(global_context, metadata, is_startup);
|
|
|
|
}
|
|
|
|
|
|
|
|
LOG_INFO(log, "Parsed metadata of {} tables in {} databases in {} sec",
|
|
|
|
metadata.parsed_tables.size(), databases_to_load.size(), stopwatch.elapsedSeconds());
|
|
|
|
|
|
|
|
stopwatch.restart();
|
|
|
|
|
|
|
|
LoadJobSet load_databases_without_dependencies;
|
|
|
|
for (const auto task : load_tables)
|
|
|
|
load_databases_without_dependencies.insert(task->goals().begin(), task->goals().end());
|
|
|
|
if (load_databases_without_dependencies.empty())
|
|
|
|
load_databases_without_dependencies = std::move(load_after);
|
|
|
|
|
|
|
|
buildDependencyGraph();
|
|
|
|
|
|
|
|
/// Update existing info (it's important for ATTACH DATABASE)
|
|
|
|
DatabaseCatalog::instance().addDependencies(referential_dependencies, loading_dependencies);
|
|
|
|
|
|
|
|
/// Remove tables that do not exist
|
|
|
|
removeUnresolvableDependencies();
|
|
|
|
|
2023-04-26 18:34:01 +00:00
|
|
|
/// Compatibility setting which should be enabled by default on attach
|
|
|
|
/// Otherwise server will be unable to start for some old-format of IPv6/IPv4 types of columns
|
|
|
|
ContextMutablePtr load_context = Context::createCopy(global_context);
|
|
|
|
load_context->setSetting("cast_ipv4_ipv6_default_on_conversion_error", 1);
|
2023-04-26 18:25:39 +00:00
|
|
|
|
|
|
|
std::unordered_map<UUID, LoadTaskPtr> load_table; /// table uuid -> load task
|
|
|
|
std::unordered_map<String, LoadTaskPtrs> startup_database; /// database name -> all its tables startup tasks
|
|
|
|
for (const auto & table_id : all_loading_dependencies.getTablesSortedByDependency())
|
|
|
|
{
|
|
|
|
/// Make set of jobs to load before this table
|
2023-04-27 14:51:24 +00:00
|
|
|
LoadJobSet load_dependencies;
|
2023-04-26 18:25:39 +00:00
|
|
|
for (StorageID dependency_id : all_loading_dependencies.getDependencies(table_id))
|
|
|
|
{
|
|
|
|
const auto & goals = load_table[dependency_id.uuid]->goals();
|
2023-04-27 14:51:24 +00:00
|
|
|
load_dependencies.insert(goals.begin(), goals.end());
|
2023-04-26 18:25:39 +00:00
|
|
|
}
|
2023-04-27 14:51:24 +00:00
|
|
|
if (load_dependencies.empty())
|
|
|
|
load_dependencies = load_databases_without_dependencies;
|
2023-04-26 18:25:39 +00:00
|
|
|
|
|
|
|
// Make load table task
|
|
|
|
auto table_name = table_id.getQualifiedName();
|
|
|
|
const auto & path_and_query = metadata.parsed_tables[table_name];
|
2023-04-27 14:51:24 +00:00
|
|
|
auto load_task = databases[table_name.database]->loadTableFromMetadataAsync(async_loader, load_dependencies, load_context, path_and_query.path, table_name, path_and_query.ast, strictness_mode);
|
2023-04-26 18:25:39 +00:00
|
|
|
load_table[table_id.uuid] = load_task;
|
|
|
|
load_tables.push_back(load_task);
|
|
|
|
|
|
|
|
// Make startup table task
|
|
|
|
auto startup_task = databases[table_name.database]->startupTableAsync(async_loader, load_task->goals(), table_name, strictness_mode);
|
2023-04-26 18:34:01 +00:00
|
|
|
startup_database[table_name.database].push_back(startup_task);
|
2023-04-26 18:25:39 +00:00
|
|
|
startup_tables.push_back(startup_task);
|
2023-04-27 14:51:24 +00:00
|
|
|
}
|
2023-04-26 18:25:39 +00:00
|
|
|
|
2023-04-27 14:51:24 +00:00
|
|
|
/// Make startup database tasks
|
|
|
|
for (auto [database_name, startup_table_tasks] : startup_database)
|
|
|
|
{
|
|
|
|
LoadJobSet startup_after;
|
|
|
|
for (const auto & startup_task : startup_table_tasks)
|
|
|
|
startup_after.insert(startup_task->goals().begin(), startup_task->goals().end());
|
|
|
|
auto startup_database_task = databases[database_name]->startupDatabaseAsync(async_loader, startup_after, strictness_mode);
|
|
|
|
startup_databases.push_back(startup_database_task);
|
2023-04-26 18:25:39 +00:00
|
|
|
}
|
|
|
|
|
2023-04-27 14:51:24 +00:00
|
|
|
// Schedule all tasks in right order
|
|
|
|
async_loader.schedule(load_tables);
|
|
|
|
async_loader.schedule(startup_tables);
|
|
|
|
async_loader.schedule(startup_databases);
|
2023-04-26 18:25:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
LoadTaskPtrs TablesLoader::loadTablesAsync(LoadJobSet load_after)
|
|
|
|
{
|
|
|
|
createTasks(load_after);
|
|
|
|
return load_tables;
|
|
|
|
}
|
|
|
|
|
2023-04-26 18:34:01 +00:00
|
|
|
LoadTaskPtrs TablesLoader::startupTablesAsync()
|
2023-04-26 18:25:39 +00:00
|
|
|
{
|
|
|
|
return startup_tables;
|
|
|
|
}
|
|
|
|
|
2021-08-26 13:19:52 +00:00
|
|
|
|
|
|
|
void TablesLoader::loadTables()
|
|
|
|
{
|
2023-04-26 18:25:39 +00:00
|
|
|
// TODO(serxa): rewrite using loadTablesAsync()
|
2021-08-31 08:53:48 +00:00
|
|
|
bool need_resolve_dependencies = !global_context->getConfigRef().has("ignore_table_dependencies_on_metadata_loading");
|
2021-09-01 19:42:49 +00:00
|
|
|
|
|
|
|
/// Load all Lazy, MySQl, PostgreSQL, SQLite, etc databases first.
|
2021-08-26 13:19:52 +00:00
|
|
|
for (auto & database : databases)
|
|
|
|
{
|
2021-09-01 19:42:49 +00:00
|
|
|
if (need_resolve_dependencies && database.second->supportsLoadingInTopologicalOrder())
|
|
|
|
databases_to_load.push_back(database.first);
|
2021-08-26 13:19:52 +00:00
|
|
|
else
|
2022-07-20 20:54:43 +00:00
|
|
|
database.second->loadStoredObjects(global_context, strictness_mode, /* skip_startup_tables */ true);
|
2021-08-26 13:19:52 +00:00
|
|
|
}
|
|
|
|
|
2021-09-13 19:11:16 +00:00
|
|
|
if (databases_to_load.empty())
|
|
|
|
return;
|
|
|
|
|
2021-09-01 19:42:49 +00:00
|
|
|
/// Read and parse metadata from Ordinary, Atomic, Materialized*, Replicated, etc databases. Build dependency graph.
|
|
|
|
for (auto & database_name : databases_to_load)
|
2021-08-31 08:53:48 +00:00
|
|
|
{
|
2022-07-20 20:54:43 +00:00
|
|
|
databases[database_name]->beforeLoadingMetadata(global_context, strictness_mode);
|
|
|
|
bool is_startup = LoadingStrictnessLevel::FORCE_ATTACH <= strictness_mode;
|
|
|
|
databases[database_name]->loadTablesMetadata(global_context, metadata, is_startup);
|
2021-08-31 08:53:48 +00:00
|
|
|
}
|
2021-08-26 13:19:52 +00:00
|
|
|
|
2021-09-02 13:34:46 +00:00
|
|
|
LOG_INFO(log, "Parsed metadata of {} tables in {} databases in {} sec",
|
2021-09-13 19:11:16 +00:00
|
|
|
metadata.parsed_tables.size(), databases_to_load.size(), stopwatch.elapsedSeconds());
|
|
|
|
|
2021-09-01 19:42:49 +00:00
|
|
|
stopwatch.restart();
|
|
|
|
|
2022-12-02 14:05:46 +00:00
|
|
|
buildDependencyGraph();
|
2021-11-01 18:53:07 +00:00
|
|
|
|
|
|
|
/// Update existing info (it's important for ATTACH DATABASE)
|
2023-02-01 23:30:49 +00:00
|
|
|
DatabaseCatalog::instance().addDependencies(referential_dependencies, loading_dependencies);
|
2021-11-01 18:53:07 +00:00
|
|
|
|
2022-12-02 14:05:46 +00:00
|
|
|
/// Remove tables that do not exist
|
|
|
|
removeUnresolvableDependencies();
|
2021-09-01 19:42:49 +00:00
|
|
|
|
2023-04-26 18:25:39 +00:00
|
|
|
loadTablesInTopologicalOrder();
|
2021-09-01 19:42:49 +00:00
|
|
|
}
|
|
|
|
|
2022-12-02 14:05:46 +00:00
|
|
|
|
2021-09-01 19:42:49 +00:00
|
|
|
void TablesLoader::startupTables()
|
|
|
|
{
|
2023-04-26 18:25:39 +00:00
|
|
|
// TODO(serxa): rewrite using startupTablesAsync()
|
2021-09-01 19:42:49 +00:00
|
|
|
/// Startup tables after all tables are loaded. Background tasks (merges, mutations, etc) may slow down data parts loading.
|
|
|
|
for (auto & database : databases)
|
2022-07-20 20:54:43 +00:00
|
|
|
database.second->startupTables(pool, strictness_mode);
|
2021-09-01 19:42:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2022-12-02 14:05:46 +00:00
|
|
|
void TablesLoader::buildDependencyGraph()
|
2021-09-01 19:42:49 +00:00
|
|
|
{
|
2022-12-02 14:05:46 +00:00
|
|
|
for (const auto & [table_name, table_metadata] : metadata.parsed_tables)
|
|
|
|
{
|
2023-02-01 23:30:49 +00:00
|
|
|
auto new_ref_dependencies = getDependenciesFromCreateQuery(global_context, table_name, table_metadata.ast);
|
2022-12-02 14:05:46 +00:00
|
|
|
auto new_loading_dependencies = getLoadingDependenciesFromCreateQuery(global_context, table_name, table_metadata.ast);
|
|
|
|
|
2023-02-01 23:30:49 +00:00
|
|
|
if (!new_ref_dependencies.empty())
|
|
|
|
referential_dependencies.addDependencies(table_name, new_ref_dependencies);
|
|
|
|
|
2022-12-02 14:05:46 +00:00
|
|
|
if (!new_loading_dependencies.empty())
|
2023-02-01 23:30:49 +00:00
|
|
|
loading_dependencies.addDependencies(table_name, new_loading_dependencies);
|
2022-12-02 14:05:46 +00:00
|
|
|
|
|
|
|
/// We're adding `new_loading_dependencies` to the graph here even if they're empty because
|
|
|
|
/// we need to have all tables from `metadata.parsed_tables` in the graph.
|
2023-02-01 23:30:49 +00:00
|
|
|
all_loading_dependencies.addDependencies(table_name, new_loading_dependencies);
|
2022-12-02 14:05:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
referential_dependencies.log();
|
2023-02-01 23:30:49 +00:00
|
|
|
all_loading_dependencies.log();
|
2022-12-02 14:05:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void TablesLoader::removeUnresolvableDependencies()
|
|
|
|
{
|
|
|
|
auto need_exclude_dependency = [this](const StorageID & table_id)
|
2021-08-31 08:53:48 +00:00
|
|
|
{
|
2021-09-01 19:42:49 +00:00
|
|
|
/// Table exists and will be loaded
|
2022-12-02 14:05:46 +00:00
|
|
|
if (metadata.parsed_tables.contains(table_id.getQualifiedName()))
|
2021-08-26 13:19:52 +00:00
|
|
|
return false;
|
2022-12-02 14:05:46 +00:00
|
|
|
|
|
|
|
if (DatabaseCatalog::instance().isTableExist(table_id, global_context))
|
2021-12-06 13:35:34 +00:00
|
|
|
{
|
2022-12-02 14:05:46 +00:00
|
|
|
/// Table exists and it's already loaded
|
|
|
|
}
|
|
|
|
else if (table_id.database_name == metadata.default_database &&
|
|
|
|
global_context->getExternalDictionariesLoader().has(table_id.table_name))
|
|
|
|
{
|
|
|
|
/// Tables depend on a XML dictionary.
|
|
|
|
LOG_WARNING(
|
|
|
|
log,
|
|
|
|
"Tables {} depend on XML dictionary {}, but XML dictionaries are loaded independently."
|
|
|
|
"Consider converting it to DDL dictionary.",
|
2023-02-01 23:30:49 +00:00
|
|
|
fmt::join(all_loading_dependencies.getDependents(table_id), ", "),
|
2022-12-02 14:05:46 +00:00
|
|
|
table_id);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
/// Some tables depend on table "table_id", but there is no such table in DatabaseCatalog and we don't have its metadata.
|
|
|
|
/// We will ignore it and try to load dependent tables without "table_id"
|
|
|
|
/// (but most likely dependent tables will fail to load).
|
|
|
|
LOG_WARNING(
|
|
|
|
log,
|
|
|
|
"Tables {} depend on {}, but seems like that does not exist. Will ignore it and try to load existing tables",
|
2023-02-01 23:30:49 +00:00
|
|
|
fmt::join(all_loading_dependencies.getDependents(table_id), ", "),
|
2022-12-02 14:05:46 +00:00
|
|
|
table_id);
|
2021-12-06 13:35:34 +00:00
|
|
|
}
|
2021-08-26 13:19:52 +00:00
|
|
|
|
2022-12-02 14:05:46 +00:00
|
|
|
size_t num_dependencies, num_dependents;
|
2023-02-01 23:30:49 +00:00
|
|
|
all_loading_dependencies.getNumberOfAdjacents(table_id, num_dependencies, num_dependents);
|
2022-12-02 14:05:46 +00:00
|
|
|
if (num_dependencies || !num_dependents)
|
2021-09-01 19:42:49 +00:00
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Table {} does not have dependencies and dependent tables as it expected to."
|
2022-12-02 14:05:46 +00:00
|
|
|
"It's a bug", table_id);
|
2021-08-26 13:19:52 +00:00
|
|
|
|
2022-12-02 14:05:46 +00:00
|
|
|
return true; /// Exclude this dependency.
|
2021-09-01 19:42:49 +00:00
|
|
|
};
|
2021-08-26 13:19:52 +00:00
|
|
|
|
2023-02-01 23:30:49 +00:00
|
|
|
all_loading_dependencies.removeTablesIf(need_exclude_dependency);
|
2022-12-02 14:05:46 +00:00
|
|
|
|
2023-02-01 23:30:49 +00:00
|
|
|
if (all_loading_dependencies.getNumberOfTables() != metadata.parsed_tables.size())
|
2022-12-02 14:05:46 +00:00
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Number of tables to be loaded is not as expected. It's a bug");
|
|
|
|
|
|
|
|
/// Cannot load tables with cyclic dependencies.
|
2023-02-01 23:30:49 +00:00
|
|
|
all_loading_dependencies.checkNoCyclicDependencies();
|
2021-09-01 19:42:49 +00:00
|
|
|
}
|
2021-08-26 13:19:52 +00:00
|
|
|
|
2022-12-02 14:05:46 +00:00
|
|
|
|
2023-04-26 18:25:39 +00:00
|
|
|
void TablesLoader::loadTablesInTopologicalOrder()
|
2021-09-01 19:42:49 +00:00
|
|
|
{
|
2022-06-29 15:53:08 +00:00
|
|
|
/// Compatibility setting which should be enabled by default on attach
|
|
|
|
/// Otherwise server will be unable to start for some old-format of IPv6/IPv4 types of columns
|
|
|
|
ContextMutablePtr load_context = Context::createCopy(global_context);
|
|
|
|
load_context->setSetting("cast_ipv4_ipv6_default_on_conversion_error", 1);
|
|
|
|
|
2023-04-26 18:25:39 +00:00
|
|
|
// TODO(serxa): remove the following code. Return waitable job or job set instead of sync wait.
|
2022-12-02 14:05:46 +00:00
|
|
|
/// Load tables in parallel.
|
2023-02-01 23:30:49 +00:00
|
|
|
auto tables_to_load = all_loading_dependencies.getTablesSortedByDependencyForParallel();
|
2021-08-26 13:19:52 +00:00
|
|
|
|
2022-12-02 14:05:46 +00:00
|
|
|
for (size_t level = 0; level != tables_to_load.size(); ++level)
|
2021-08-26 13:19:52 +00:00
|
|
|
{
|
2023-04-26 18:25:39 +00:00
|
|
|
startLoadingTables(load_context, tables_to_load[level], level);
|
|
|
|
pool.wait();
|
2021-08-26 13:19:52 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-04-26 18:25:39 +00:00
|
|
|
void TablesLoader::startLoadingTables(ContextMutablePtr load_context, const std::vector<StorageID> & tables_to_load, size_t level)
|
2021-08-26 13:19:52 +00:00
|
|
|
{
|
2021-09-13 19:11:16 +00:00
|
|
|
size_t total_tables = metadata.parsed_tables.size();
|
2021-08-26 13:19:52 +00:00
|
|
|
|
2022-12-02 14:05:46 +00:00
|
|
|
LOG_INFO(log, "Loading {} tables with dependency level {}", tables_to_load.size(), level);
|
2021-08-26 13:19:52 +00:00
|
|
|
|
2022-12-02 14:05:46 +00:00
|
|
|
for (const auto & table_id : tables_to_load)
|
2021-08-26 13:19:52 +00:00
|
|
|
{
|
2023-04-26 18:25:39 +00:00
|
|
|
pool.scheduleOrThrowOnError([this, load_context, total_tables, table_name = table_id.getQualifiedName()]()
|
2021-08-26 13:19:52 +00:00
|
|
|
{
|
2021-09-13 19:11:16 +00:00
|
|
|
const auto & path_and_query = metadata.parsed_tables[table_name];
|
2022-07-20 20:54:43 +00:00
|
|
|
databases[table_name.database]->loadTableFromMetadata(load_context, path_and_query.path, table_name, path_and_query.ast, strictness_mode);
|
2021-09-01 19:42:49 +00:00
|
|
|
logAboutProgress(log, ++tables_processed, total_tables, stopwatch);
|
2021-08-26 13:19:52 +00:00
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-01 19:42:49 +00:00
|
|
|
}
|