diff --git a/dbms/programs/server/Server.cpp b/dbms/programs/server/Server.cpp index 75c29435036..f965bf58eaa 100644 --- a/dbms/programs/server/Server.cpp +++ b/dbms/programs/server/Server.cpp @@ -376,13 +376,21 @@ int Server::main(const std::vector & /*args*/) format_schema_path.createDirectories(); LOG_INFO(log, "Loading metadata."); - loadMetadataSystem(*global_context); - /// After attaching system databases we can initialize system log. - global_context->initializeSystemLogs(); - /// After the system database is created, attach virtual system tables (in addition to query_log and part_log) - attachSystemTablesServer(*global_context->getDatabase("system"), has_zookeeper); - /// Then, load remaining databases - loadMetadata(*global_context); + try + { + loadMetadataSystem(*global_context); + /// After attaching system databases we can initialize system log. + global_context->initializeSystemLogs(); + /// After the system database is created, attach virtual system tables (in addition to query_log and part_log) + attachSystemTablesServer(*global_context->getDatabase("system"), has_zookeeper); + /// Then, load remaining databases + loadMetadata(*global_context); + } + catch (...) + { + tryLogCurrentException(log, "Caught exception while loading metadata"); + throw; + } LOG_DEBUG(log, "Loaded metadata."); global_context->setCurrentDatabase(default_database); diff --git a/dbms/src/Databases/DatabaseOrdinary.cpp b/dbms/src/Databases/DatabaseOrdinary.cpp index e0fe4294e7d..7c4adbe9f67 100644 --- a/dbms/src/Databases/DatabaseOrdinary.cpp +++ b/dbms/src/Databases/DatabaseOrdinary.cpp @@ -21,6 +21,7 @@ #include #include #include +#include namespace DB @@ -164,9 +165,15 @@ void DatabaseOrdinary::loadTables( AtomicStopwatch watch; std::atomic tables_processed {0}; Poco::Event all_tables_processed; + ExceptionHandler exception_handler; auto task_function = [&](const String & table) { + SCOPE_EXIT( + if (++tables_processed == total_tables) + all_tables_processed.set() + ); + /// Messages, so that it's not boring to wait for the server to load for a long time. if ((tables_processed + 1) % PRINT_MESSAGE_EACH_N_TABLES == 0 || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS)) @@ -176,14 +183,11 @@ void DatabaseOrdinary::loadTables( } loadTable(context, metadata_path, *this, name, data_path, table, has_force_restore_data_flag); - - if (++tables_processed == total_tables) - all_tables_processed.set(); }; for (const auto & filename : file_names) { - auto task = std::bind(task_function, filename); + auto task = createExceptionHandledJob(std::bind(task_function, filename), exception_handler); if (thread_pool) thread_pool->schedule(task); @@ -194,6 +198,8 @@ void DatabaseOrdinary::loadTables( if (thread_pool) all_tables_processed.wait(); + exception_handler.throwIfException(); + /// After all tables was basically initialized, startup them. startupTables(thread_pool); } @@ -207,12 +213,18 @@ void DatabaseOrdinary::startupTables(ThreadPool * thread_pool) std::atomic tables_processed {0}; size_t total_tables = tables.size(); Poco::Event all_tables_processed; + ExceptionHandler exception_handler; if (!total_tables) return; auto task_function = [&](const StoragePtr & table) { + SCOPE_EXIT( + if (++tables_processed == total_tables) + all_tables_processed.set() + ); + if ((tables_processed + 1) % PRINT_MESSAGE_EACH_N_TABLES == 0 || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS)) { @@ -221,14 +233,11 @@ void DatabaseOrdinary::startupTables(ThreadPool * thread_pool) } table->startup(); - - if (++tables_processed == total_tables) - all_tables_processed.set(); }; for (const auto & name_storage : tables) { - auto task = std::bind(task_function, name_storage.second); + auto task = createExceptionHandledJob(std::bind(task_function, name_storage.second), exception_handler); if (thread_pool) thread_pool->schedule(task); @@ -238,6 +247,8 @@ void DatabaseOrdinary::startupTables(ThreadPool * thread_pool) if (thread_pool) all_tables_processed.wait(); + + exception_handler.throwIfException(); } diff --git a/libs/libcommon/include/common/ThreadPool.h b/libs/libcommon/include/common/ThreadPool.h index 15952042a04..980fdcba355 100644 --- a/libs/libcommon/include/common/ThreadPool.h +++ b/libs/libcommon/include/common/ThreadPool.h @@ -58,3 +58,17 @@ private: void worker(); }; + +/// Allows to save first catched exception in jobs and postpone its rethrow. +class ExceptionHandler +{ +public: + void setException(std::exception_ptr && exception); + void throwIfException(); + +private: + std::exception_ptr first_exception; + std::mutex mutex; +}; + +ThreadPool::Job createExceptionHandledJob(ThreadPool::Job job, ExceptionHandler & handler); diff --git a/libs/libcommon/src/ThreadPool.cpp b/libs/libcommon/src/ThreadPool.cpp index 06f563bc55a..4da7c9689b8 100644 --- a/libs/libcommon/src/ThreadPool.cpp +++ b/libs/libcommon/src/ThreadPool.cpp @@ -112,3 +112,34 @@ void ThreadPool::worker() } } + +void ExceptionHandler::setException(std::exception_ptr && exception) +{ + std::unique_lock lock(mutex); + if (!first_exception) + first_exception = std::move(exception); +} + +void ExceptionHandler::throwIfException() +{ + std::unique_lock lock(mutex); + if (first_exception) + std::rethrow_exception(first_exception); +} + + +ThreadPool::Job createExceptionHandledJob(ThreadPool::Job job, ExceptionHandler & handler) +{ + return [job{std::move(job)}, &handler] () + { + try + { + job(); + } + catch (...) + { + handler.setException(std::current_exception()); + } + }; +} +