2021-08-26 13:19:52 +00:00
# include <Databases/TablesLoader.h>
2021-08-31 08:53:48 +00:00
# include <Databases/IDatabase.h>
2021-11-01 18:53:07 +00:00
# include <Databases/DDLDependencyVisitor.h>
2022-12-02 14:05:46 +00:00
# include <Databases/DDLLoadingDependencyVisitor.h>
2021-08-31 08:53:48 +00:00
# include <Interpreters/DatabaseCatalog.h>
# include <Interpreters/Context.h>
2021-09-01 19:42:49 +00:00
# include <Interpreters/ExternalDictionariesLoader.h>
2021-08-31 08:53:48 +00:00
# include <Poco/Util/AbstractConfiguration.h>
2022-04-27 15:05:45 +00:00
# include <Common/logger_useful.h>
2021-08-31 08:53:48 +00:00
# include <Common/ThreadPool.h>
2023-03-22 07:49:22 +00:00
# include <Common/CurrentMetrics.h>
2021-09-01 19:42:49 +00:00
# include <numeric>
2021-08-26 13:19:52 +00:00
2023-04-26 18:25:39 +00:00
2023-03-22 07:49:22 +00:00
namespace CurrentMetrics
{
2023-04-26 18:25:39 +00:00
extern const Metric AsyncLoaderThreads ;
extern const Metric AsyncLoaderThreadsActive ;
2023-03-22 07:49:22 +00:00
}
2021-08-26 13:19:52 +00:00
namespace DB
{
2021-08-31 08:53:48 +00:00
namespace ErrorCodes
{
2021-09-02 13:34:46 +00:00
extern const int LOGICAL_ERROR ;
2021-08-31 08:53:48 +00:00
}
2021-08-26 13:19:52 +00:00
static constexpr size_t PRINT_MESSAGE_EACH_N_OBJECTS = 256 ;
static constexpr size_t PRINT_MESSAGE_EACH_N_SECONDS = 5 ;
void logAboutProgress ( Poco : : Logger * log , size_t processed , size_t total , AtomicStopwatch & watch )
{
if ( processed % PRINT_MESSAGE_EACH_N_OBJECTS = = 0 | | watch . compareAndRestart ( PRINT_MESSAGE_EACH_N_SECONDS ) )
{
LOG_INFO ( log , " {}% " , processed * 100.0 / total ) ;
watch . restart ( ) ;
}
}
2022-07-20 20:54:43 +00:00
TablesLoader : : TablesLoader ( ContextMutablePtr global_context_ , Databases databases_ , LoadingStrictnessLevel strictness_mode_ )
2023-03-22 07:49:22 +00:00
: global_context ( global_context_ )
, databases ( std : : move ( databases_ ) )
, strictness_mode ( strictness_mode_ )
, referential_dependencies ( " ReferentialDeps " )
, loading_dependencies ( " LoadingDeps " )
, all_loading_dependencies ( " LoadingDeps " )
2023-04-26 18:25:39 +00:00
, async_loader ( global_context - > getAsyncLoader ( ) )
, pool ( CurrentMetrics : : AsyncLoaderThreads , CurrentMetrics : : AsyncLoaderThreadsActive )
2021-08-26 13:19:52 +00:00
{
2021-09-13 19:11:16 +00:00
metadata . default_database = global_context - > getCurrentDatabase ( ) ;
2021-08-26 13:19:52 +00:00
log = & Poco : : Logger : : get ( " TablesLoader " ) ;
}
2023-04-26 18:25:39 +00:00
void TablesLoader : : createTasks ( LoadJobSet load_after )
{
bool need_resolve_dependencies = ! global_context - > getConfigRef ( ) . has ( " ignore_table_dependencies_on_metadata_loading " ) ;
/// Load all Lazy, MySQl, PostgreSQL, SQLite, etc databases first.
for ( auto & database : databases )
{
if ( need_resolve_dependencies & & database . second - > supportsLoadingInTopologicalOrder ( ) )
databases_to_load . push_back ( database . first ) ;
else
load_tables . push_back ( database . second - > loadStoredObjectsAsync ( async_loader , load_after , global_context , strictness_mode , /* skip_startup_tables */ true ) ) ;
}
if ( databases_to_load . empty ( ) )
return ;
/// Read and parse metadata from Ordinary, Atomic, Materialized*, Replicated, etc databases. Build dependency graph.
for ( auto & database_name : databases_to_load )
{
databases [ database_name ] - > beforeLoadingMetadata ( global_context , strictness_mode ) ;
bool is_startup = LoadingStrictnessLevel : : FORCE_ATTACH < = strictness_mode ;
databases [ database_name ] - > loadTablesMetadata ( global_context , metadata , is_startup ) ;
}
LOG_INFO ( log , " Parsed metadata of {} tables in {} databases in {} sec " ,
metadata . parsed_tables . size ( ) , databases_to_load . size ( ) , stopwatch . elapsedSeconds ( ) ) ;
stopwatch . restart ( ) ;
LoadJobSet load_databases_without_dependencies ;
for ( const auto task : load_tables )
load_databases_without_dependencies . insert ( task - > goals ( ) . begin ( ) , task - > goals ( ) . end ( ) ) ;
if ( load_databases_without_dependencies . empty ( ) )
load_databases_without_dependencies = std : : move ( load_after ) ;
buildDependencyGraph ( ) ;
/// Update existing info (it's important for ATTACH DATABASE)
DatabaseCatalog : : instance ( ) . addDependencies ( referential_dependencies , loading_dependencies ) ;
/// Remove tables that do not exist
removeUnresolvableDependencies ( ) ;
std : : unordered_map < UUID , LoadTaskPtr > load_table ; /// table uuid -> load task
std : : unordered_map < String , LoadTaskPtrs > startup_database ; /// database name -> all its tables startup tasks
for ( const auto & table_id : all_loading_dependencies . getTablesSortedByDependency ( ) )
{
/// Make set of jobs to load before this table
LoadJobSet load_before ;
for ( StorageID dependency_id : all_loading_dependencies . getDependencies ( table_id ) )
{
const auto & goals = load_table [ dependency_id . uuid ] - > goals ( ) ;
load_before . insert ( goals . begin ( ) , goals . end ( ) ) ;
}
if ( load_before . empty ( ) )
load_before = load_databases_without_dependencies ;
// Make load table task
auto table_name = table_id . getQualifiedName ( ) ;
const auto & path_and_query = metadata . parsed_tables [ table_name ] ;
auto load_task = databases [ table_name . database ] - > loadTableFromMetadataAsync ( async_loader , load_before , load_context , path_and_query . path , table_name , path_and_query . ast , strictness_mode ) ;
load_table [ table_id . uuid ] = load_task ;
load_tables . push_back ( load_task ) ;
// Make startup table task
auto startup_task = databases [ table_name . database ] - > startupTableAsync ( async_loader , load_task - > goals ( ) , table_name , strictness_mode ) ;
startup_database [ table_name . database ] = startup_task ;
startup_tables . push_back ( startup_task ) ;
// TODO(serxa): we should report progress, a job should be attached to task.goals() here to report it. But what task should contain that job is unclear yet
// logAboutProgress(log, ++tables_processed, total_tables, stopwatch);
}
// TODO(serxa): make startup database tasks
// for (auto [database_name, startup_tables] : startup_database)
// {
// }
}
LoadTaskPtrs TablesLoader : : loadTablesAsync ( LoadJobSet load_after )
{
createTasks ( load_after ) ;
return load_tables ;
}
LoadTaskPtr TablesLoader : : startupTablesAsync ( )
{
return startup_tables ;
}
2021-08-26 13:19:52 +00:00
void TablesLoader : : loadTables ( )
{
2023-04-26 18:25:39 +00:00
// TODO(serxa): rewrite using loadTablesAsync()
2021-08-31 08:53:48 +00:00
bool need_resolve_dependencies = ! global_context - > getConfigRef ( ) . has ( " ignore_table_dependencies_on_metadata_loading " ) ;
2021-09-01 19:42:49 +00:00
/// Load all Lazy, MySQl, PostgreSQL, SQLite, etc databases first.
2021-08-26 13:19:52 +00:00
for ( auto & database : databases )
{
2021-09-01 19:42:49 +00:00
if ( need_resolve_dependencies & & database . second - > supportsLoadingInTopologicalOrder ( ) )
databases_to_load . push_back ( database . first ) ;
2021-08-26 13:19:52 +00:00
else
2022-07-20 20:54:43 +00:00
database . second - > loadStoredObjects ( global_context , strictness_mode , /* skip_startup_tables */ true ) ;
2021-08-26 13:19:52 +00:00
}
2021-09-13 19:11:16 +00:00
if ( databases_to_load . empty ( ) )
return ;
2021-09-01 19:42:49 +00:00
/// Read and parse metadata from Ordinary, Atomic, Materialized*, Replicated, etc databases. Build dependency graph.
for ( auto & database_name : databases_to_load )
2021-08-31 08:53:48 +00:00
{
2022-07-20 20:54:43 +00:00
databases [ database_name ] - > beforeLoadingMetadata ( global_context , strictness_mode ) ;
bool is_startup = LoadingStrictnessLevel : : FORCE_ATTACH < = strictness_mode ;
databases [ database_name ] - > loadTablesMetadata ( global_context , metadata , is_startup ) ;
2021-08-31 08:53:48 +00:00
}
2021-08-26 13:19:52 +00:00
2021-09-02 13:34:46 +00:00
LOG_INFO ( log , " Parsed metadata of {} tables in {} databases in {} sec " ,
2021-09-13 19:11:16 +00:00
metadata . parsed_tables . size ( ) , databases_to_load . size ( ) , stopwatch . elapsedSeconds ( ) ) ;
2021-09-01 19:42:49 +00:00
stopwatch . restart ( ) ;
2022-12-02 14:05:46 +00:00
buildDependencyGraph ( ) ;
2021-11-01 18:53:07 +00:00
/// Update existing info (it's important for ATTACH DATABASE)
2023-02-01 23:30:49 +00:00
DatabaseCatalog : : instance ( ) . addDependencies ( referential_dependencies , loading_dependencies ) ;
2021-11-01 18:53:07 +00:00
2022-12-02 14:05:46 +00:00
/// Remove tables that do not exist
removeUnresolvableDependencies ( ) ;
2021-09-01 19:42:49 +00:00
2023-04-26 18:25:39 +00:00
loadTablesInTopologicalOrder ( ) ;
2021-09-01 19:42:49 +00:00
}
2022-12-02 14:05:46 +00:00
2021-09-01 19:42:49 +00:00
void TablesLoader : : startupTables ( )
{
2023-04-26 18:25:39 +00:00
// TODO(serxa): rewrite using startupTablesAsync()
2021-09-01 19:42:49 +00:00
/// Startup tables after all tables are loaded. Background tasks (merges, mutations, etc) may slow down data parts loading.
for ( auto & database : databases )
2022-07-20 20:54:43 +00:00
database . second - > startupTables ( pool , strictness_mode ) ;
2021-09-01 19:42:49 +00:00
}
2022-12-02 14:05:46 +00:00
void TablesLoader : : buildDependencyGraph ( )
2021-09-01 19:42:49 +00:00
{
2022-12-02 14:05:46 +00:00
for ( const auto & [ table_name , table_metadata ] : metadata . parsed_tables )
{
2023-02-01 23:30:49 +00:00
auto new_ref_dependencies = getDependenciesFromCreateQuery ( global_context , table_name , table_metadata . ast ) ;
2022-12-02 14:05:46 +00:00
auto new_loading_dependencies = getLoadingDependenciesFromCreateQuery ( global_context , table_name , table_metadata . ast ) ;
2023-02-01 23:30:49 +00:00
if ( ! new_ref_dependencies . empty ( ) )
referential_dependencies . addDependencies ( table_name , new_ref_dependencies ) ;
2022-12-02 14:05:46 +00:00
if ( ! new_loading_dependencies . empty ( ) )
2023-02-01 23:30:49 +00:00
loading_dependencies . addDependencies ( table_name , new_loading_dependencies ) ;
2022-12-02 14:05:46 +00:00
/// We're adding `new_loading_dependencies` to the graph here even if they're empty because
/// we need to have all tables from `metadata.parsed_tables` in the graph.
2023-02-01 23:30:49 +00:00
all_loading_dependencies . addDependencies ( table_name , new_loading_dependencies ) ;
2022-12-02 14:05:46 +00:00
}
referential_dependencies . log ( ) ;
2023-02-01 23:30:49 +00:00
all_loading_dependencies . log ( ) ;
2022-12-02 14:05:46 +00:00
}
void TablesLoader : : removeUnresolvableDependencies ( )
{
auto need_exclude_dependency = [ this ] ( const StorageID & table_id )
2021-08-31 08:53:48 +00:00
{
2021-09-01 19:42:49 +00:00
/// Table exists and will be loaded
2022-12-02 14:05:46 +00:00
if ( metadata . parsed_tables . contains ( table_id . getQualifiedName ( ) ) )
2021-08-26 13:19:52 +00:00
return false ;
2022-12-02 14:05:46 +00:00
if ( DatabaseCatalog : : instance ( ) . isTableExist ( table_id , global_context ) )
2021-12-06 13:35:34 +00:00
{
2022-12-02 14:05:46 +00:00
/// Table exists and it's already loaded
}
else if ( table_id . database_name = = metadata . default_database & &
global_context - > getExternalDictionariesLoader ( ) . has ( table_id . table_name ) )
{
/// Tables depend on a XML dictionary.
LOG_WARNING (
log ,
" Tables {} depend on XML dictionary {}, but XML dictionaries are loaded independently. "
" Consider converting it to DDL dictionary. " ,
2023-02-01 23:30:49 +00:00
fmt : : join ( all_loading_dependencies . getDependents ( table_id ) , " , " ) ,
2022-12-02 14:05:46 +00:00
table_id ) ;
}
else
{
/// Some tables depend on table "table_id", but there is no such table in DatabaseCatalog and we don't have its metadata.
/// We will ignore it and try to load dependent tables without "table_id"
/// (but most likely dependent tables will fail to load).
LOG_WARNING (
log ,
" Tables {} depend on {}, but seems like that does not exist. Will ignore it and try to load existing tables " ,
2023-02-01 23:30:49 +00:00
fmt : : join ( all_loading_dependencies . getDependents ( table_id ) , " , " ) ,
2022-12-02 14:05:46 +00:00
table_id ) ;
2021-12-06 13:35:34 +00:00
}
2021-08-26 13:19:52 +00:00
2022-12-02 14:05:46 +00:00
size_t num_dependencies , num_dependents ;
2023-02-01 23:30:49 +00:00
all_loading_dependencies . getNumberOfAdjacents ( table_id , num_dependencies , num_dependents ) ;
2022-12-02 14:05:46 +00:00
if ( num_dependencies | | ! num_dependents )
2021-09-01 19:42:49 +00:00
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Table {} does not have dependencies and dependent tables as it expected to. "
2022-12-02 14:05:46 +00:00
" It's a bug " , table_id ) ;
2021-08-26 13:19:52 +00:00
2022-12-02 14:05:46 +00:00
return true ; /// Exclude this dependency.
2021-09-01 19:42:49 +00:00
} ;
2021-08-26 13:19:52 +00:00
2023-02-01 23:30:49 +00:00
all_loading_dependencies . removeTablesIf ( need_exclude_dependency ) ;
2022-12-02 14:05:46 +00:00
2023-02-01 23:30:49 +00:00
if ( all_loading_dependencies . getNumberOfTables ( ) ! = metadata . parsed_tables . size ( ) )
2022-12-02 14:05:46 +00:00
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Number of tables to be loaded is not as expected. It's a bug " ) ;
/// Cannot load tables with cyclic dependencies.
2023-02-01 23:30:49 +00:00
all_loading_dependencies . checkNoCyclicDependencies ( ) ;
2021-09-01 19:42:49 +00:00
}
2021-08-26 13:19:52 +00:00
2022-12-02 14:05:46 +00:00
2023-04-26 18:25:39 +00:00
void TablesLoader : : loadTablesInTopologicalOrder ( )
2021-09-01 19:42:49 +00:00
{
2022-06-29 15:53:08 +00:00
/// Compatibility setting which should be enabled by default on attach
/// Otherwise server will be unable to start for some old-format of IPv6/IPv4 types of columns
ContextMutablePtr load_context = Context : : createCopy ( global_context ) ;
load_context - > setSetting ( " cast_ipv4_ipv6_default_on_conversion_error " , 1 ) ;
2023-04-26 18:25:39 +00:00
// TODO(serxa): remove the following code. Return waitable job or job set instead of sync wait.
2022-12-02 14:05:46 +00:00
/// Load tables in parallel.
2023-02-01 23:30:49 +00:00
auto tables_to_load = all_loading_dependencies . getTablesSortedByDependencyForParallel ( ) ;
2021-08-26 13:19:52 +00:00
2022-12-02 14:05:46 +00:00
for ( size_t level = 0 ; level ! = tables_to_load . size ( ) ; + + level )
2021-08-26 13:19:52 +00:00
{
2023-04-26 18:25:39 +00:00
startLoadingTables ( load_context , tables_to_load [ level ] , level ) ;
pool . wait ( ) ;
2021-08-26 13:19:52 +00:00
}
}
2023-04-26 18:25:39 +00:00
void TablesLoader : : startLoadingTables ( ContextMutablePtr load_context , const std : : vector < StorageID > & tables_to_load , size_t level )
2021-08-26 13:19:52 +00:00
{
2021-09-13 19:11:16 +00:00
size_t total_tables = metadata . parsed_tables . size ( ) ;
2021-08-26 13:19:52 +00:00
2022-12-02 14:05:46 +00:00
LOG_INFO ( log , " Loading {} tables with dependency level {} " , tables_to_load . size ( ) , level ) ;
2021-08-26 13:19:52 +00:00
2022-12-02 14:05:46 +00:00
for ( const auto & table_id : tables_to_load )
2021-08-26 13:19:52 +00:00
{
2023-04-26 18:25:39 +00:00
pool . scheduleOrThrowOnError ( [ this , load_context , total_tables , table_name = table_id . getQualifiedName ( ) ] ( )
2021-08-26 13:19:52 +00:00
{
2021-09-13 19:11:16 +00:00
const auto & path_and_query = metadata . parsed_tables [ table_name ] ;
2022-07-20 20:54:43 +00:00
databases [ table_name . database ] - > loadTableFromMetadata ( load_context , path_and_query . path , table_name , path_and_query . ast , strictness_mode ) ;
2021-09-01 19:42:49 +00:00
logAboutProgress ( log , + + tables_processed , total_tables , stopwatch ) ;
2021-08-26 13:19:52 +00:00
} ) ;
}
}
2021-09-01 19:42:49 +00:00
}