mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 17:12:03 +00:00
better code, moar logging
This commit is contained in:
parent
c8d8f0a38c
commit
024a24aaf7
@ -303,6 +303,7 @@ try
|
|||||||
loadMetadataSystem(global_context);
|
loadMetadataSystem(global_context);
|
||||||
attachSystemTables(global_context);
|
attachSystemTables(global_context);
|
||||||
loadMetadata(global_context);
|
loadMetadata(global_context);
|
||||||
|
startupSystemTables();
|
||||||
DatabaseCatalog::instance().loadDatabases();
|
DatabaseCatalog::instance().loadDatabases();
|
||||||
LOG_DEBUG(log, "Loaded metadata.");
|
LOG_DEBUG(log, "Loaded metadata.");
|
||||||
}
|
}
|
||||||
|
@ -1127,6 +1127,7 @@ if (ThreadFuzzer::instance().isEffective())
|
|||||||
attachSystemTablesServer(*database_catalog.getSystemDatabase(), has_zookeeper);
|
attachSystemTablesServer(*database_catalog.getSystemDatabase(), has_zookeeper);
|
||||||
/// Then, load remaining databases
|
/// Then, load remaining databases
|
||||||
loadMetadata(global_context, default_database);
|
loadMetadata(global_context, default_database);
|
||||||
|
startupSystemTables();
|
||||||
database_catalog.loadDatabases();
|
database_catalog.loadDatabases();
|
||||||
/// After loading validate that default database exists
|
/// After loading validate that default database exists
|
||||||
database_catalog.assertDatabaseExists(default_database);
|
database_catalog.assertDatabaseExists(default_database);
|
||||||
|
@ -78,7 +78,7 @@ void DDLDependencyVisitor::extractTableNameFromArgument(const ASTFunction & func
|
|||||||
|
|
||||||
if (database_name.empty())
|
if (database_name.empty())
|
||||||
database_name = data.default_database;
|
database_name = data.default_database;
|
||||||
data.dependencies.push_back(QualifiedTableName{std::move(database_name), std::move(table_name)});
|
data.dependencies.emplace(QualifiedTableName{std::move(database_name), std::move(table_name)});
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -14,9 +14,9 @@ class DDLDependencyVisitor
|
|||||||
public:
|
public:
|
||||||
struct Data
|
struct Data
|
||||||
{
|
{
|
||||||
using TableDependencies = std::vector<QualifiedTableName>;
|
using TableNamesSet = std::set<QualifiedTableName>;
|
||||||
String default_database;
|
String default_database;
|
||||||
TableDependencies dependencies;
|
TableNamesSet dependencies;
|
||||||
};
|
};
|
||||||
|
|
||||||
using Visitor = ConstInDepthNodeVisitor<DDLDependencyVisitor, true>;
|
using Visitor = ConstInDepthNodeVisitor<DDLDependencyVisitor, true>;
|
||||||
|
@ -608,7 +608,7 @@ void DatabaseOnDisk::iterateMetadataFiles(ContextPtr local_context, const Iterat
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Read and parse metadata in parallel
|
/// Read and parse metadata in parallel
|
||||||
ThreadPool pool{1};
|
ThreadPool pool;
|
||||||
for (const auto & file : metadata_files)
|
for (const auto & file : metadata_files)
|
||||||
{
|
{
|
||||||
pool.scheduleOrThrowOnError([&]()
|
pool.scheduleOrThrowOnError([&]()
|
||||||
|
@ -190,9 +190,17 @@ void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTables
|
|||||||
std::lock_guard lock{metadata.mutex};
|
std::lock_guard lock{metadata.mutex};
|
||||||
metadata.metadata[qualified_name] = std::make_pair(full_path.string(), std::move(ast));
|
metadata.metadata[qualified_name] = std::make_pair(full_path.string(), std::move(ast));
|
||||||
if (data.dependencies.empty())
|
if (data.dependencies.empty())
|
||||||
metadata.independent_tables.insert(std::move(qualified_name));
|
{
|
||||||
|
metadata.independent_tables.emplace_back(std::move(qualified_name));
|
||||||
|
}
|
||||||
else
|
else
|
||||||
metadata.table_dependencies.insert({std::move(qualified_name), std::move(data.dependencies)});
|
{
|
||||||
|
for (const auto & dependency : data.dependencies)
|
||||||
|
{
|
||||||
|
metadata.dependencies_info[dependency].dependent_tables.push_back(qualified_name);
|
||||||
|
++metadata.dependencies_info[qualified_name].dependencies_count;
|
||||||
|
}
|
||||||
|
}
|
||||||
metadata.total_dictionaries += create_query->is_dictionary;
|
metadata.total_dictionaries += create_query->is_dictionary;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -96,7 +96,7 @@ void DatabaseMaterializedMySQL<Base>::setException(const std::exception_ptr & ex
|
|||||||
template <typename Base>
|
template <typename Base>
|
||||||
void DatabaseMaterializedMySQL<Base>::startupTables(ThreadPool & thread_pool, bool force_restore, bool force_attach)
|
void DatabaseMaterializedMySQL<Base>::startupTables(ThreadPool & thread_pool, bool force_restore, bool force_attach)
|
||||||
{
|
{
|
||||||
Base::startupTables(thread_pool, force_attach, force_restore);
|
Base::startupTables(thread_pool, force_restore, force_attach);
|
||||||
|
|
||||||
if (!force_attach)
|
if (!force_attach)
|
||||||
materialize_thread.assertMySQLAvailable();
|
materialize_thread.assertMySQLAvailable();
|
||||||
|
@ -2,10 +2,11 @@
|
|||||||
#include <Databases/IDatabase.h>
|
#include <Databases/IDatabase.h>
|
||||||
#include <Interpreters/DatabaseCatalog.h>
|
#include <Interpreters/DatabaseCatalog.h>
|
||||||
#include <Interpreters/Context.h>
|
#include <Interpreters/Context.h>
|
||||||
|
#include <Interpreters/ExternalDictionariesLoader.h>
|
||||||
#include <Poco/Util/AbstractConfiguration.h>
|
#include <Poco/Util/AbstractConfiguration.h>
|
||||||
#include <common/logger_useful.h>
|
#include <common/logger_useful.h>
|
||||||
#include <Common/Stopwatch.h>
|
|
||||||
#include <Common/ThreadPool.h>
|
#include <Common/ThreadPool.h>
|
||||||
|
#include <numeric>
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
@ -42,94 +43,146 @@ TablesLoader::TablesLoader(ContextMutablePtr global_context_, Databases database
|
|||||||
void TablesLoader::loadTables()
|
void TablesLoader::loadTables()
|
||||||
{
|
{
|
||||||
bool need_resolve_dependencies = !global_context->getConfigRef().has("ignore_table_dependencies_on_metadata_loading");
|
bool need_resolve_dependencies = !global_context->getConfigRef().has("ignore_table_dependencies_on_metadata_loading");
|
||||||
|
|
||||||
|
/// Load all Lazy, MySQl, PostgreSQL, SQLite, etc databases first.
|
||||||
for (auto & database : databases)
|
for (auto & database : databases)
|
||||||
{
|
{
|
||||||
if (need_resolve_dependencies && database->supportsLoadingInTopologicalOrder())
|
if (need_resolve_dependencies && database.second->supportsLoadingInTopologicalOrder())
|
||||||
databases_to_load.emplace(database->getDatabaseName(), database);
|
databases_to_load.push_back(database.first);
|
||||||
else
|
else
|
||||||
database->loadStoredObjects(global_context, force_restore, force_attach, true);
|
database.second->loadStoredObjects(global_context, force_restore, force_attach, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (auto & database : databases_to_load)
|
/// Read and parse metadata from Ordinary, Atomic, Materialized*, Replicated, etc databases. Build dependency graph.
|
||||||
|
for (auto & database_name : databases_to_load)
|
||||||
{
|
{
|
||||||
database.second->beforeLoadingMetadata(global_context, force_restore, force_attach);
|
databases[database_name]->beforeLoadingMetadata(global_context, force_restore, force_attach);
|
||||||
database.second->loadTablesMetadata(global_context, all_tables);
|
databases[database_name]->loadTablesMetadata(global_context, all_tables);
|
||||||
}
|
}
|
||||||
|
|
||||||
auto table_does_not_exist = [this](const QualifiedTableName & table_name, const QualifiedTableName & dependency_name)
|
LOG_INFO(log, "Parsed metadata of {} tables in {} sec", all_tables.metadata.size(), stopwatch.elapsedSeconds());
|
||||||
|
stopwatch.restart();
|
||||||
|
|
||||||
|
logDependencyGraph();
|
||||||
|
|
||||||
|
/// Some tables were loaded by database with loadStoredObjects(...). Remove them from graph if necessary.
|
||||||
|
removeUnresolvableDependencies();
|
||||||
|
|
||||||
|
loadTablesInTopologicalOrder(pool);
|
||||||
|
}
|
||||||
|
|
||||||
|
void TablesLoader::startupTables()
|
||||||
{
|
{
|
||||||
|
/// Startup tables after all tables are loaded. Background tasks (merges, mutations, etc) may slow down data parts loading.
|
||||||
|
for (auto & database : databases)
|
||||||
|
database.second->startupTables(pool, force_restore, force_attach);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void TablesLoader::removeUnresolvableDependencies()
|
||||||
|
{
|
||||||
|
auto need_exclude_dependency = [this](const QualifiedTableName & dependency_name, const DependenciesInfo & info)
|
||||||
|
{
|
||||||
|
/// Table exists and will be loaded
|
||||||
if (all_tables.metadata.contains(dependency_name))
|
if (all_tables.metadata.contains(dependency_name))
|
||||||
return false;
|
return false;
|
||||||
|
/// Table exists and it's already loaded
|
||||||
if (DatabaseCatalog::instance().isTableExist(StorageID(dependency_name.database, dependency_name.table), global_context))
|
if (DatabaseCatalog::instance().isTableExist(StorageID(dependency_name.database, dependency_name.table), global_context))
|
||||||
return false;
|
return false;
|
||||||
/// FIXME if XML dict
|
/// It's XML dictionary. It was loaded before tables and DDL dictionaries.
|
||||||
|
if (dependency_name.database == all_tables.default_database &&
|
||||||
|
global_context->getExternalDictionariesLoader().has(dependency_name.table))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
/// Some tables depends on table "dependency_name", but there is no such table in DatabaseCatalog and we don't have its metadata.
|
||||||
|
/// We will ignore it and try to load dependent tables without "dependency_name"
|
||||||
|
/// (but most likely dependent tables will fail to load).
|
||||||
|
LOG_WARNING(log, "Tables {} depend on {}, but seems like the it does not exist. Will ignore it and try to load existing tables",
|
||||||
|
fmt::join(info.dependent_tables, ", "), dependency_name);
|
||||||
|
|
||||||
|
if (info.dependencies_count)
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Table {} does not exist, but we have seen its AST and found {} dependencies."
|
||||||
|
"It's a bug", dependency_name, info.dependencies_count);
|
||||||
|
if (info.dependent_tables.empty())
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Table {} does not have dependencies and dependent tables as it expected to."
|
||||||
|
"It's a bug", dependency_name);
|
||||||
|
|
||||||
LOG_WARNING(log, "Table {} depends on {}, but seems like the second one does not exist", table_name, dependency_name);
|
|
||||||
return true;
|
return true;
|
||||||
};
|
};
|
||||||
|
|
||||||
removeDependencies(table_does_not_exist, all_tables.independent_tables);
|
auto table_it = all_tables.dependencies_info.begin();
|
||||||
|
while (table_it != all_tables.dependencies_info.end())
|
||||||
//LOG_TRACE(log, "Independent database objects: {}", fmt::join(all_tables.independent_tables, ", "));
|
|
||||||
//for (const auto & dependencies : all_tables.table_dependencies)
|
|
||||||
// LOG_TRACE(log, "Database object {} depends on: {}", dependencies.first, fmt::join(dependencies.second, ", "));
|
|
||||||
|
|
||||||
auto is_dependency_loaded = [this](const QualifiedTableName & /*table_name*/, const QualifiedTableName & dependency_name)
|
|
||||||
{
|
{
|
||||||
return all_tables.independent_tables.contains(dependency_name);
|
auto & info = table_it->second;
|
||||||
};
|
if (need_exclude_dependency(table_it->first, info))
|
||||||
|
table_it = removeResolvedDependency(table_it, all_tables.independent_tables);
|
||||||
AtomicStopwatch watch;
|
|
||||||
ThreadPool pool;
|
|
||||||
size_t level = 0;
|
|
||||||
do
|
|
||||||
{
|
|
||||||
assert(all_tables.metadata.size() == tables_processed + all_tables.independent_tables.size() + all_tables.table_dependencies.size());
|
|
||||||
startLoadingIndependentTables(pool, watch, level);
|
|
||||||
std::unordered_set<QualifiedTableName> new_independent_tables;
|
|
||||||
removeDependencies(is_dependency_loaded, new_independent_tables);
|
|
||||||
pool.wait();
|
|
||||||
all_tables.independent_tables = std::move(new_independent_tables);
|
|
||||||
checkCyclicDependencies();
|
|
||||||
++level;
|
|
||||||
assert(all_tables.metadata.size() == tables_processed + all_tables.independent_tables.size() + all_tables.table_dependencies.size());
|
|
||||||
} while (!all_tables.independent_tables.empty());
|
|
||||||
|
|
||||||
for (auto & database : databases_to_load)
|
|
||||||
{
|
|
||||||
database.second->startupTables(pool, force_restore, force_attach);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void TablesLoader::removeDependencies(RemoveDependencyPredicate need_remove_dependency, std::unordered_set<QualifiedTableName> & independent_tables)
|
|
||||||
{
|
|
||||||
auto table_it = all_tables.table_dependencies.begin();
|
|
||||||
while (table_it != all_tables.table_dependencies.end())
|
|
||||||
{
|
|
||||||
auto & dependencies = table_it->second;
|
|
||||||
assert(!dependencies.empty());
|
|
||||||
auto dependency_it = dependencies.begin();
|
|
||||||
while (dependency_it != dependencies.end())
|
|
||||||
{
|
|
||||||
if (need_remove_dependency(table_it->first, *dependency_it))
|
|
||||||
dependency_it = dependencies.erase(dependency_it);
|
|
||||||
else
|
else
|
||||||
++dependency_it;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (dependencies.empty())
|
|
||||||
{
|
|
||||||
independent_tables.emplace(std::move(table_it->first));
|
|
||||||
table_it = all_tables.table_dependencies.erase(table_it);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
++table_it;
|
++table_it;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void TablesLoader::loadTablesInTopologicalOrder(ThreadPool & pool)
|
||||||
|
{
|
||||||
|
/// While we have some independent tables to load, load them in parallel.
|
||||||
|
/// Then remove independent tables from graph and find new ones.
|
||||||
|
size_t level = 0;
|
||||||
|
do
|
||||||
|
{
|
||||||
|
assert(all_tables.metadata.size() == tables_processed + all_tables.independent_tables.size() + getNumberOfTablesWithDependencies());
|
||||||
|
logDependencyGraph();
|
||||||
|
|
||||||
|
startLoadingIndependentTables(pool, level);
|
||||||
|
|
||||||
|
TableNames new_independent_tables;
|
||||||
|
for (const auto & table_name : all_tables.independent_tables)
|
||||||
|
{
|
||||||
|
auto info_it = all_tables.dependencies_info.find(table_name);
|
||||||
|
if (info_it == all_tables.dependencies_info.end())
|
||||||
|
{
|
||||||
|
/// No tables depend on table_name and it was not even added to dependencies_info
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
removeResolvedDependency(info_it, new_independent_tables);
|
||||||
}
|
}
|
||||||
|
|
||||||
void TablesLoader::startLoadingIndependentTables(ThreadPool & pool, AtomicStopwatch & watch, size_t level)
|
pool.wait();
|
||||||
|
|
||||||
|
all_tables.independent_tables = std::move(new_independent_tables);
|
||||||
|
++level;
|
||||||
|
} while (!all_tables.independent_tables.empty());
|
||||||
|
|
||||||
|
checkCyclicDependencies();
|
||||||
|
}
|
||||||
|
|
||||||
|
DependenciesInfosIter TablesLoader::removeResolvedDependency(const DependenciesInfosIter & info_it, TableNames & independent_tables)
|
||||||
|
{
|
||||||
|
auto & info = info_it->second;
|
||||||
|
if (info.dependencies_count)
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Table {} is in list of independent tables, but dependencies count is {}."
|
||||||
|
"It's a bug", info_it->first, info.dependencies_count);
|
||||||
|
if (info.dependent_tables.empty())
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Table {} does not have dependent tables. It's a bug", info_it->first);
|
||||||
|
|
||||||
|
/// Decrement number of dependencies for each dependent table
|
||||||
|
for (auto & dependent_table : info.dependent_tables)
|
||||||
|
{
|
||||||
|
auto & dependent_info = all_tables.dependencies_info[dependent_table];
|
||||||
|
auto & dependencies_count = dependent_info.dependencies_count;
|
||||||
|
if (dependencies_count == 0)
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to decrement 0 dependencies counter for {}. It's a bug", dependent_table);
|
||||||
|
--dependencies_count;
|
||||||
|
if (dependencies_count == 0)
|
||||||
|
{
|
||||||
|
independent_tables.push_back(dependent_table);
|
||||||
|
if (dependent_info.dependent_tables.empty())
|
||||||
|
all_tables.dependencies_info.erase(dependent_table);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return all_tables.dependencies_info.erase(info_it);
|
||||||
|
}
|
||||||
|
|
||||||
|
void TablesLoader::startLoadingIndependentTables(ThreadPool & pool, size_t level)
|
||||||
{
|
{
|
||||||
size_t total_tables = all_tables.metadata.size();
|
size_t total_tables = all_tables.metadata.size();
|
||||||
|
|
||||||
@ -137,32 +190,56 @@ void TablesLoader::startLoadingIndependentTables(ThreadPool & pool, AtomicStopwa
|
|||||||
|
|
||||||
for (const auto & table_name : all_tables.independent_tables)
|
for (const auto & table_name : all_tables.independent_tables)
|
||||||
{
|
{
|
||||||
pool.scheduleOrThrowOnError([this, total_tables, &table_name, &watch]()
|
pool.scheduleOrThrowOnError([this, total_tables, &table_name]()
|
||||||
{
|
{
|
||||||
const auto & path_and_query = all_tables.metadata[table_name];
|
const auto & path_and_query = all_tables.metadata[table_name];
|
||||||
const auto & path = path_and_query.first;
|
const auto & path = path_and_query.first;
|
||||||
const auto & ast = path_and_query.second;
|
const auto & ast = path_and_query.second;
|
||||||
databases_to_load[table_name.database]->loadTableFromMetadata(global_context, path, table_name, ast, force_restore);
|
databases[table_name.database]->loadTableFromMetadata(global_context, path, table_name, ast, force_restore);
|
||||||
logAboutProgress(log, ++tables_processed, total_tables, watch);
|
logAboutProgress(log, ++tables_processed, total_tables, stopwatch);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
size_t TablesLoader::getNumberOfTablesWithDependencies() const
|
||||||
|
{
|
||||||
|
size_t number_of_tables_with_dependencies = 0;
|
||||||
|
for (const auto & info : all_tables.dependencies_info)
|
||||||
|
if (info.second.dependencies_count)
|
||||||
|
++number_of_tables_with_dependencies;
|
||||||
|
return number_of_tables_with_dependencies;
|
||||||
|
}
|
||||||
|
|
||||||
void TablesLoader::checkCyclicDependencies() const
|
void TablesLoader::checkCyclicDependencies() const
|
||||||
{
|
{
|
||||||
if (!all_tables.independent_tables.empty())
|
/// Loading is finished if all dependencies are resolved
|
||||||
return;
|
if (all_tables.dependencies_info.empty())
|
||||||
if (all_tables.table_dependencies.empty())
|
|
||||||
return;
|
return;
|
||||||
|
|
||||||
for (const auto & dependencies : all_tables.table_dependencies)
|
for (const auto & info : all_tables.dependencies_info)
|
||||||
{
|
{
|
||||||
LOG_WARNING(log, "Cannot resolve dependencies: Table {} depends on {}", dependencies.first, fmt::join(dependencies.second, ", "));
|
LOG_WARNING(log, "Cannot resolve dependencies: Table {} have {} dependencies and {} dependent tables. List of dependent tables: {}",
|
||||||
|
info.first, info.second.dependencies_count,
|
||||||
|
info.second.dependent_tables.size(), fmt::join(info.second.dependent_tables, ", "));
|
||||||
|
assert(info.second.dependencies_count == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
throw Exception(ErrorCodes::INFINITE_LOOP, "Cannot attach {} tables due to cyclic dependencies. "
|
throw Exception(ErrorCodes::INFINITE_LOOP, "Cannot attach {} tables due to cyclic dependencies. "
|
||||||
"See server log for details.", all_tables.table_dependencies.size());
|
"See server log for details.", all_tables.dependencies_info.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
void TablesLoader::logDependencyGraph() const
|
||||||
|
{
|
||||||
|
LOG_TRACE(log, "Have {} independent tables: {}", all_tables.independent_tables.size(), fmt::join(all_tables.independent_tables, ", "));
|
||||||
|
for (const auto & dependencies : all_tables.dependencies_info)
|
||||||
|
{
|
||||||
|
LOG_TRACE(log,
|
||||||
|
"Table {} have {} dependencies and {} dependent tables. List of dependent tables: {}",
|
||||||
|
dependencies.first,
|
||||||
|
dependencies.second.dependencies_count,
|
||||||
|
dependencies.second.dependent_tables.size(),
|
||||||
|
fmt::join(dependencies.second.dependent_tables, ", "));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4,6 +4,7 @@
|
|||||||
#include <Parsers/IAST_fwd.h>
|
#include <Parsers/IAST_fwd.h>
|
||||||
#include <Interpreters/Context_fwd.h>
|
#include <Interpreters/Context_fwd.h>
|
||||||
#include <Common/ThreadPool.h>
|
#include <Common/ThreadPool.h>
|
||||||
|
#include <Common/Stopwatch.h>
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
#include <unordered_set>
|
#include <unordered_set>
|
||||||
@ -19,32 +20,59 @@ class AtomicStopwatch;
|
|||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
|
void logAboutProgress(Poco::Logger * log, size_t processed, size_t total, AtomicStopwatch & watch);
|
||||||
|
|
||||||
|
|
||||||
class IDatabase;
|
class IDatabase;
|
||||||
using DatabasePtr = std::shared_ptr<IDatabase>;
|
using DatabasePtr = std::shared_ptr<IDatabase>;
|
||||||
|
|
||||||
void logAboutProgress(Poco::Logger * log, size_t processed, size_t total, AtomicStopwatch & watch);
|
using ParsedMetadata = std::map<QualifiedTableName, std::pair<String, ASTPtr>>;
|
||||||
|
using TableNames = std::vector<QualifiedTableName>;
|
||||||
|
|
||||||
|
struct DependenciesInfo
|
||||||
|
{
|
||||||
|
/// How many dependencies this table have
|
||||||
|
size_t dependencies_count = 0;
|
||||||
|
/// List of tables which depend on this table
|
||||||
|
TableNames dependent_tables;
|
||||||
|
};
|
||||||
|
|
||||||
|
using DependenciesInfos = std::unordered_map<QualifiedTableName, DependenciesInfo>;
|
||||||
|
using DependenciesInfosIter = std::unordered_map<QualifiedTableName, DependenciesInfo>::iterator;
|
||||||
|
|
||||||
struct ParsedTablesMetadata
|
struct ParsedTablesMetadata
|
||||||
{
|
{
|
||||||
String default_database;
|
String default_database;
|
||||||
|
|
||||||
using ParsedMetadata = std::map<QualifiedTableName, std::pair<String, ASTPtr>>;
|
|
||||||
std::mutex mutex;
|
std::mutex mutex;
|
||||||
ParsedMetadata metadata;
|
ParsedMetadata metadata;
|
||||||
|
|
||||||
|
/// For logging
|
||||||
size_t total_dictionaries = 0;
|
size_t total_dictionaries = 0;
|
||||||
std::unordered_set<QualifiedTableName> independent_tables;
|
|
||||||
std::unordered_map<QualifiedTableName, std::vector<QualifiedTableName>> table_dependencies;
|
/// List of tables that do not have any dependencies and can be loaded
|
||||||
|
TableNames independent_tables;
|
||||||
|
|
||||||
|
/// Actually it contains two different maps (with, probably, intersecting keys):
|
||||||
|
/// 1. table name -> number of dependencies
|
||||||
|
/// 2. table name -> dependent tables list (adjacency list of dependencies graph).
|
||||||
|
/// If table A depends on table B, then there is an edge B --> A, i.e. dependencies_info[B].dependent_tables contains A.
|
||||||
|
/// And dependencies_info[C].dependencies_count is a number of incoming edges for vertex C (how many tables we have to load before C).
|
||||||
|
DependenciesInfos dependencies_info;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/// Loads tables (and dictionaries) from specified databases
|
||||||
|
/// taking into account dependencies between them.
|
||||||
class TablesLoader
|
class TablesLoader
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
using Databases = std::map<String, DatabasePtr>;
|
||||||
using Databases = std::vector<DatabasePtr>;
|
|
||||||
|
|
||||||
TablesLoader(ContextMutablePtr global_context_, Databases databases_, bool force_restore_ = false, bool force_attach_ = false);
|
TablesLoader(ContextMutablePtr global_context_, Databases databases_, bool force_restore_ = false, bool force_attach_ = false);
|
||||||
|
TablesLoader() = delete;
|
||||||
|
|
||||||
void loadTables();
|
void loadTables();
|
||||||
|
void startupTables();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
ContextMutablePtr global_context;
|
ContextMutablePtr global_context;
|
||||||
@ -52,19 +80,27 @@ private:
|
|||||||
bool force_restore;
|
bool force_restore;
|
||||||
bool force_attach;
|
bool force_attach;
|
||||||
|
|
||||||
std::map<String, DatabasePtr> databases_to_load;
|
Strings databases_to_load;
|
||||||
ParsedTablesMetadata all_tables;
|
ParsedTablesMetadata all_tables;
|
||||||
Poco::Logger * log;
|
Poco::Logger * log;
|
||||||
std::atomic<size_t> tables_processed{0};
|
std::atomic<size_t> tables_processed{0};
|
||||||
|
AtomicStopwatch stopwatch;
|
||||||
|
|
||||||
|
ThreadPool pool;
|
||||||
|
|
||||||
using RemoveDependencyPredicate = std::function<bool(const QualifiedTableName &, const QualifiedTableName &)>;
|
void removeUnresolvableDependencies();
|
||||||
void removeDependencies(RemoveDependencyPredicate need_remove_dependency, std::unordered_set<QualifiedTableName> & independent_tables);
|
|
||||||
|
|
||||||
void startLoadingIndependentTables(ThreadPool & pool, AtomicStopwatch & watch, size_t level);
|
void loadTablesInTopologicalOrder(ThreadPool & pool);
|
||||||
|
|
||||||
|
DependenciesInfosIter removeResolvedDependency(const DependenciesInfosIter & info_it, TableNames & independent_tables);
|
||||||
|
|
||||||
|
void startLoadingIndependentTables(ThreadPool & pool, size_t level);
|
||||||
|
|
||||||
void checkCyclicDependencies() const;
|
void checkCyclicDependencies() const;
|
||||||
|
|
||||||
|
size_t getNumberOfTablesWithDependencies() const;
|
||||||
|
|
||||||
|
void logDependencyGraph() const;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -5,6 +5,7 @@
|
|||||||
#include <Dictionaries/DictionaryStructure.h>
|
#include <Dictionaries/DictionaryStructure.h>
|
||||||
#include <Databases/IDatabase.h>
|
#include <Databases/IDatabase.h>
|
||||||
#include <Storages/IStorage.h>
|
#include <Storages/IStorage.h>
|
||||||
|
#include <Poco/Logger.h>
|
||||||
|
|
||||||
#if !defined(ARCADIA_BUILD)
|
#if !defined(ARCADIA_BUILD)
|
||||||
# include "config_core.h"
|
# include "config_core.h"
|
||||||
@ -30,6 +31,7 @@ ExternalDictionariesLoader::ExternalDictionariesLoader(ContextPtr global_context
|
|||||||
setConfigSettings({"dictionary", "name", "database", "uuid"});
|
setConfigSettings({"dictionary", "name", "database", "uuid"});
|
||||||
enableAsyncLoading(true);
|
enableAsyncLoading(true);
|
||||||
enablePeriodicUpdates(true);
|
enablePeriodicUpdates(true);
|
||||||
|
log = &Poco::Logger::get("EDL");
|
||||||
}
|
}
|
||||||
|
|
||||||
ExternalLoader::LoadablePtr ExternalDictionariesLoader::create(
|
ExternalLoader::LoadablePtr ExternalDictionariesLoader::create(
|
||||||
@ -89,12 +91,14 @@ DictionaryStructure ExternalDictionariesLoader::getDictionaryStructure(const std
|
|||||||
|
|
||||||
std::string ExternalDictionariesLoader::resolveDictionaryName(const std::string & dictionary_name, const std::string & current_database_name) const
|
std::string ExternalDictionariesLoader::resolveDictionaryName(const std::string & dictionary_name, const std::string & current_database_name) const
|
||||||
{
|
{
|
||||||
|
LOG_INFO(log, "Looking for {} ({})", dictionary_name, current_database_name);
|
||||||
bool has_dictionary = has(dictionary_name);
|
bool has_dictionary = has(dictionary_name);
|
||||||
if (has_dictionary)
|
if (has_dictionary)
|
||||||
return dictionary_name;
|
return dictionary_name;
|
||||||
|
|
||||||
std::string resolved_name = resolveDictionaryNameFromDatabaseCatalog(dictionary_name);
|
std::string resolved_name = resolveDictionaryNameFromDatabaseCatalog(dictionary_name);
|
||||||
has_dictionary = has(resolved_name);
|
has_dictionary = has(resolved_name);
|
||||||
|
LOG_INFO(log, "Got resolved name {}, {}", resolved_name, has_dictionary);
|
||||||
|
|
||||||
if (!has_dictionary)
|
if (!has_dictionary)
|
||||||
{
|
{
|
||||||
|
@ -4,6 +4,7 @@
|
|||||||
#include <Interpreters/Context_fwd.h>
|
#include <Interpreters/Context_fwd.h>
|
||||||
#include <Interpreters/ExternalLoader.h>
|
#include <Interpreters/ExternalLoader.h>
|
||||||
#include <Common/quoteString.h>
|
#include <Common/quoteString.h>
|
||||||
|
#include <common/logger_useful.h>
|
||||||
|
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
|
||||||
@ -46,6 +47,7 @@ protected:
|
|||||||
|
|
||||||
friend class StorageSystemDictionaries;
|
friend class StorageSystemDictionaries;
|
||||||
friend class DatabaseDictionary;
|
friend class DatabaseDictionary;
|
||||||
|
Poco::Logger * log;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -14,6 +14,7 @@
|
|||||||
#include <boost/range/adaptor/map.hpp>
|
#include <boost/range/adaptor/map.hpp>
|
||||||
#include <boost/range/algorithm/copy.hpp>
|
#include <boost/range/algorithm/copy.hpp>
|
||||||
#include <unordered_set>
|
#include <unordered_set>
|
||||||
|
#include <Poco/Logger.h>
|
||||||
|
|
||||||
|
|
||||||
namespace CurrentStatusInfo
|
namespace CurrentStatusInfo
|
||||||
@ -467,6 +468,7 @@ public:
|
|||||||
if (infos.find(name) == infos.end())
|
if (infos.find(name) == infos.end())
|
||||||
{
|
{
|
||||||
Info & info = infos.emplace(name, Info{name, config}).first->second;
|
Info & info = infos.emplace(name, Info{name, config}).first->second;
|
||||||
|
LOG_TRACE(log, "Inserted {} into infos", name);
|
||||||
if (always_load_everything)
|
if (always_load_everything)
|
||||||
{
|
{
|
||||||
LOG_TRACE(log, "Will load '{}' because always_load_everything flag is set.", name);
|
LOG_TRACE(log, "Will load '{}' because always_load_everything flag is set.", name);
|
||||||
|
@ -275,8 +275,9 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
|
|||||||
if (!load_database_without_tables)
|
if (!load_database_without_tables)
|
||||||
{
|
{
|
||||||
/// We use global context here, because storages lifetime is bigger than query context lifetime
|
/// We use global context here, because storages lifetime is bigger than query context lifetime
|
||||||
TablesLoader loader{getContext()->getGlobalContext(), {database}, has_force_restore_data_flag, create.attach && force_attach}; //-V560
|
TablesLoader loader{getContext()->getGlobalContext(), {{database_name, database}}, has_force_restore_data_flag, create.attach && force_attach}; //-V560
|
||||||
loader.loadTables();
|
loader.loadTables();
|
||||||
|
loader.startupTables();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
|
@ -52,11 +52,6 @@ public:
|
|||||||
force_attach = force_attach_;
|
force_attach = force_attach_;
|
||||||
}
|
}
|
||||||
|
|
||||||
void setSkipStartupTables(bool skip_startup_tables_)
|
|
||||||
{
|
|
||||||
skip_startup_tables = skip_startup_tables_;
|
|
||||||
}
|
|
||||||
|
|
||||||
void setLoadDatabaseWithoutTables(bool load_database_without_tables_)
|
void setLoadDatabaseWithoutTables(bool load_database_without_tables_)
|
||||||
{
|
{
|
||||||
load_database_without_tables = load_database_without_tables_;
|
load_database_without_tables = load_database_without_tables_;
|
||||||
@ -104,7 +99,6 @@ private:
|
|||||||
/// Is this an internal query - not from the user.
|
/// Is this an internal query - not from the user.
|
||||||
bool internal = false;
|
bool internal = false;
|
||||||
bool force_attach = false;
|
bool force_attach = false;
|
||||||
bool skip_startup_tables = false;
|
|
||||||
bool load_database_without_tables = false;
|
bool load_database_without_tables = false;
|
||||||
|
|
||||||
mutable String as_database_saved;
|
mutable String as_database_saved;
|
||||||
|
@ -44,8 +44,7 @@ static void executeCreateQuery(
|
|||||||
interpreter.setInternal(true);
|
interpreter.setInternal(true);
|
||||||
interpreter.setForceAttach(true);
|
interpreter.setForceAttach(true);
|
||||||
interpreter.setForceRestoreData(has_force_restore_data_flag);
|
interpreter.setForceRestoreData(has_force_restore_data_flag);
|
||||||
interpreter.setSkipStartupTables(true);
|
interpreter.setLoadDatabaseWithoutTables(true);
|
||||||
interpreter.setLoadDatabaseWithoutTables(database != DatabaseCatalog::SYSTEM_DATABASE);
|
|
||||||
interpreter.execute();
|
interpreter.execute();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -161,11 +160,12 @@ void loadMetadata(ContextMutablePtr context, const String & default_database_nam
|
|||||||
for (const auto & [name, db_path] : databases)
|
for (const auto & [name, db_path] : databases)
|
||||||
{
|
{
|
||||||
loadDatabase(context, name, db_path, has_force_restore_data_flag);
|
loadDatabase(context, name, db_path, has_force_restore_data_flag);
|
||||||
loaded_databases.emplace_back(DatabaseCatalog::instance().getDatabase(name));
|
loaded_databases.insert({name, DatabaseCatalog::instance().getDatabase(name)});
|
||||||
}
|
}
|
||||||
|
|
||||||
TablesLoader loader{context, std::move(loaded_databases), has_force_restore_data_flag, /* force_attach */ true};
|
TablesLoader loader{context, std::move(loaded_databases), has_force_restore_data_flag, /* force_attach */ true};
|
||||||
loader.loadTables();
|
loader.loadTables();
|
||||||
|
loader.startupTables();
|
||||||
|
|
||||||
if (has_force_restore_data_flag)
|
if (has_force_restore_data_flag)
|
||||||
{
|
{
|
||||||
@ -199,6 +199,17 @@ void loadMetadataSystem(ContextMutablePtr context)
|
|||||||
executeCreateQuery(database_create_query, context, DatabaseCatalog::SYSTEM_DATABASE, "<no file>", true);
|
executeCreateQuery(database_create_query, context, DatabaseCatalog::SYSTEM_DATABASE, "<no file>", true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TablesLoader loader{context, {{DatabaseCatalog::SYSTEM_DATABASE, DatabaseCatalog::instance().getSystemDatabase()}},
|
||||||
|
/* force_restore */true, /* force_attach */ true};
|
||||||
|
loader.loadTables();
|
||||||
|
/// Will startup tables in system database after all databases are loaded.
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void startupSystemTables()
|
||||||
|
{
|
||||||
|
ThreadPool pool;
|
||||||
|
DatabaseCatalog::instance().getSystemDatabase()->startupTables(pool, /* force_restore */true, /* force_attach */ true);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <Interpreters/Context_fwd.h>
|
#include <Interpreters/Context_fwd.h>
|
||||||
|
#include <Databases/TablesLoader.h>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -13,4 +14,6 @@ void loadMetadataSystem(ContextMutablePtr context);
|
|||||||
/// Load tables from databases and add them to context. Database 'system' is ignored. Use separate function to load system tables.
|
/// Load tables from databases and add them to context. Database 'system' is ignored. Use separate function to load system tables.
|
||||||
void loadMetadata(ContextMutablePtr context, const String & default_database_name = {});
|
void loadMetadata(ContextMutablePtr context, const String & default_database_name = {});
|
||||||
|
|
||||||
|
void startupSystemTables();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user