This commit is contained in:
serxa 2023-05-12 20:27:41 +00:00
parent 9bf5292415
commit 646bc6b443
5 changed files with 11 additions and 31 deletions

View File

@ -199,20 +199,6 @@ LoadTaskPtr DatabaseOrdinary::loadTableFromMetadataAsync(
return load_table[name.table] = makeLoadTask(async_loader, {job});
}
void DatabaseOrdinary::startupTablesAndDatabase(AsyncLoader & async_loader, LoadingStrictnessLevel mode)
{
LOG_INFO(log, "Starting up tables.");
LoadTaskPtrs tasks;
for (const auto & table : TSA_SUPPRESS_WARNING_FOR_READ(tables))
tasks.push_back(startupTableAsync(
async_loader,
{},
QualifiedTableName{.database = getDatabaseName(), .table = table.first},
mode));
scheduleAndWaitLoad(tasks);
scheduleAndWaitLoad(startupDatabaseAsync(async_loader, {}, mode));
}
LoadTaskPtr DatabaseOrdinary::startupTableAsync(
AsyncLoader & async_loader,
LoadJobSet startup_after,

View File

@ -43,8 +43,6 @@ public:
const ASTPtr & ast,
LoadingStrictnessLevel mode) override;
void startupTablesAndDatabase(AsyncLoader & async_loader, LoadingStrictnessLevel mode) override;
LoadTaskPtr startupTableAsync(
AsyncLoader & async_loader,
LoadJobSet startup_after,

View File

@ -182,9 +182,6 @@ public:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented");
}
/// TODO(serxa): remove this method. Start all tables and the database itself
virtual void startupTablesAndDatabase(AsyncLoader & /*async_loader*/, LoadingStrictnessLevel /*mode*/) {}
/// Create a task to startup table `name` after specified dependencies `startup_after` using `async_loader`.
/// The returned task is also stored inside the database for cancellation on destruction.
[[nodiscard]] virtual LoadTaskPtr startupTableAsync(

View File

@ -239,13 +239,11 @@ LoadTaskPtrs loadMetadata(ContextMutablePtr context, const String & default_data
if (!async_load_databases) {
// First, load all tables
scheduleLoad(load_tasks);
waitLoad(load_tasks);
scheduleAndWaitLoad(load_tasks);
// Then, startup all tables. This is done to postpone merges and mutations
// Note that with async loader it would be a total barrier, which is unacceptable for the purpose of waiting.
scheduleLoad(startup_tasks);
waitLoad(startup_tasks);
scheduleAndWaitLoad(startup_tasks);
return {};
} else {
// Schedule all the jobs.
@ -463,7 +461,7 @@ void maybeConvertSystemDatabase(ContextMutablePtr context, LoadTaskPtrs & system
if (context->getSettingsRef().allow_deprecated_database_ordinary)
return;
maybeConvertOrdinaryDatabaseToAtomic(context, DatabaseCatalog::SYSTEM_DATABASE, *system_startup_tasks);
maybeConvertOrdinaryDatabaseToAtomic(context, DatabaseCatalog::SYSTEM_DATABASE, &system_startup_tasks);
}
void convertDatabasesEnginesIfNeed(const LoadTaskPtrs & load_metadata, ContextMutablePtr context)

View File

@ -8,15 +8,16 @@ namespace DB
/// Load tables from system database. Only real tables like query_log, part_log.
/// You should first load system database, then attach system tables that you need into it, then load other databases.
void loadMetadataSystem(ContextMutablePtr context);
/// Load tables from databases and add them to context. Database 'system' and 'information_schema' is ignored.
/// Use separate function to load system tables.
[[nodiscard]] LoadTaskPtrs loadMetadata(ContextMutablePtr context, const String & default_database_name = {}, bool async_load_databases = false);
/// It returns tasks to startup system tables.
/// Background operations in system tables may slowdown loading of the rest tables,
/// so we startup system tables after all databases are loaded.
void startupSystemTables(ContextMutablePtr context);
[[nodiscard]] LoadTaskPtrs loadMetadataSystem(ContextMutablePtr context);
/// Load tables from databases and add them to context. Databases 'system' and 'information_schema' are ignored.
/// Use separate function to load system tables.
/// If `async_load_databases = true` returns tasks for asynchronous load and startup of all tables
/// Note that returned tasks are already scheduled.
[[nodiscard]] LoadTaskPtrs loadMetadata(ContextMutablePtr context, const String & default_database_name = {}, bool async_load_databases = false);
/// Converts `system` database from Ordinary to Atomic (if needed)
void maybeConvertSystemDatabase(ContextMutablePtr context, LoadTaskPtrs & system_startup_tasks);