Fix server shutdown due to exception while loading metadata

This commit is contained in:
serxa 2024-01-22 19:38:26 +00:00
parent 7cd53c8f0d
commit 1d6d1182fd
5 changed files with 28 additions and 9 deletions

View File

@ -1740,6 +1740,15 @@ try
LOG_INFO(log, "Loading metadata from {}", path_str); LOG_INFO(log, "Loading metadata from {}", path_str);
LoadTaskPtrs load_metadata_tasks; LoadTaskPtrs load_metadata_tasks;
// Make sure that if exception is thrown during startup async, new async loading jobs are not going to be called.
// This is important for the case when exception is thrown from loading of metadata with `async_load_databases = false`
// to avoid simultaneously running table startups and destructing databases.
SCOPE_EXIT_SAFE(
LOG_INFO(log, "Stopping AsyncLoader.");
global_context->getAsyncLoader().stopAndDoNotWait(); // Note that currently running jobs will proceed
);
try try
{ {
auto & database_catalog = DatabaseCatalog::instance(); auto & database_catalog = DatabaseCatalog::instance();

View File

@ -13,6 +13,7 @@
#include <Common/getNumberOfPhysicalCPUCores.h> #include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/ProfileEvents.h> #include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include "base/types.h"
namespace ProfileEvents namespace ProfileEvents
@ -38,7 +39,7 @@ void logAboutProgress(Poco::Logger * log, size_t processed, size_t total, Atomic
{ {
if (total && (processed % PRINT_MESSAGE_EACH_N_OBJECTS == 0 || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS))) if (total && (processed % PRINT_MESSAGE_EACH_N_OBJECTS == 0 || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS)))
{ {
LOG_INFO(log, "Processed: {}%", processed * 100.0 / total); LOG_INFO(log, "Processed: {}%", static_cast<Int64>(processed * 1000.0 / total) * 0.1);
watch.restart(); watch.restart();
} }
} }
@ -241,14 +242,17 @@ void AsyncLoader::wait()
void AsyncLoader::stop() void AsyncLoader::stop()
{ {
{ stopAndDoNotWait();
std::unique_lock lock{mutex};
is_running = false;
// NOTE: there is no need to notify because workers never wait
}
wait(); wait();
} }
void AsyncLoader::stopAndDoNotWait()
{
std::unique_lock lock{mutex};
is_running = false;
// NOTE: there is no need to notify because workers never wait
}
void AsyncLoader::schedule(LoadTask & task) void AsyncLoader::schedule(LoadTask & task)
{ {
chassert(this == &task.loader); chassert(this == &task.loader);

View File

@ -348,6 +348,9 @@ public:
// - or canceled using ~Task() or remove() later. // - or canceled using ~Task() or remove() later.
void stop(); void stop();
// Do not run new jobs
void stopAndDoNotWait();
// Schedule all jobs of given `task` and their dependencies (even if they are not in task). // Schedule all jobs of given `task` and their dependencies (even if they are not in task).
// All dependencies of a scheduled job inherit its pool if it has higher priority. This way higher priority job // All dependencies of a scheduled job inherit its pool if it has higher priority. This way higher priority job
// never waits for (blocked by) lower priority jobs. No priority inversion is possible. // never waits for (blocked by) lower priority jobs. No priority inversion is possible.

View File

@ -215,8 +215,11 @@ LoadTaskPtr DatabaseOrdinary::startupTableAsync(
logAboutProgress(log, ++tables_started, total_tables_to_startup, startup_watch); logAboutProgress(log, ++tables_started, total_tables_to_startup, startup_watch);
} }
else else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Table {}.{} doesn't exist during startup", {
// This might happen if synchronous metadata loading failed and server is going to shutdown.
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} doesn't exist during startup",
backQuote(name.database), backQuote(name.table)); backQuote(name.database), backQuote(name.table));
}
}); });
return startup_table[name.table] = makeLoadTask(async_loader, {job}); return startup_table[name.table] = makeLoadTask(async_loader, {job});

View File

@ -257,10 +257,10 @@ LoadTaskPtrs loadMetadata(ContextMutablePtr context, const String & default_data
} }
else else
{ {
// NOTE: some tables can still be started up in the "loading" phase if they are required by dependencies during loading of other tables
LOG_INFO(log, "Start synchronous loading of databases"); LOG_INFO(log, "Start synchronous loading of databases");
// Note that wait implicitly calls schedule
waitLoad(TablesLoaderForegroundPoolId, load_tasks); // First prioritize, schedule and wait all the load table tasks waitLoad(TablesLoaderForegroundPoolId, load_tasks); // First prioritize, schedule and wait all the load table tasks
LOG_INFO(log, "Start synchronous startup of databases");
waitLoad(TablesLoaderForegroundPoolId, startup_tasks); // Only then prioritize, schedule and wait all the startup tasks waitLoad(TablesLoaderForegroundPoolId, startup_tasks); // Only then prioritize, schedule and wait all the startup tasks
return {}; return {};
} }