diff --git a/dbms/src/Databases/DatabaseOrdinary.cpp b/dbms/src/Databases/DatabaseOrdinary.cpp index 0406d2d2a83..777fba74c33 100644 --- a/dbms/src/Databases/DatabaseOrdinary.cpp +++ b/dbms/src/Databases/DatabaseOrdinary.cpp @@ -165,6 +165,7 @@ void DatabaseOrdinary::loadTables( AtomicStopwatch watch; std::atomic tables_processed {0}; Poco::Event all_tables_processed; + ExceptionHandler exception_handler; auto task_function = [&](const String & table) { @@ -186,7 +187,7 @@ void DatabaseOrdinary::loadTables( for (const auto & filename : file_names) { - auto task = std::bind(task_function, filename); + auto task = createExceptionHandledJob(std::bind(task_function, filename), exception_handler); if (thread_pool) thread_pool->schedule(task); @@ -197,6 +198,8 @@ void DatabaseOrdinary::loadTables( if (thread_pool) all_tables_processed.wait(); + exception_handler.throwIfException(); + /// After all tables was basically initialized, startup them. startupTables(thread_pool); } @@ -210,12 +213,18 @@ void DatabaseOrdinary::startupTables(ThreadPool * thread_pool) std::atomic tables_processed {0}; size_t total_tables = tables.size(); Poco::Event all_tables_processed; + ExceptionHandler exception_handler; if (!total_tables) return; auto task_function = [&](const StoragePtr & table) { + SCOPE_EXIT( + if (++tables_processed == total_tables) + all_tables_processed.set() + ); + if ((tables_processed + 1) % PRINT_MESSAGE_EACH_N_TABLES == 0 || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS)) { @@ -224,14 +233,11 @@ void DatabaseOrdinary::startupTables(ThreadPool * thread_pool) } table->startup(); - - if (++tables_processed == total_tables) - all_tables_processed.set(); }; for (const auto & name_storage : tables) { - auto task = std::bind(task_function, name_storage.second); + auto task = createExceptionHandledJob(std::bind(task_function, name_storage.second), exception_handler); if (thread_pool) thread_pool->schedule(task); @@ -241,6 +247,8 @@ void DatabaseOrdinary::startupTables(ThreadPool * thread_pool) if (thread_pool) all_tables_processed.wait(); + + exception_handler.throwIfException(); } diff --git a/libs/libcommon/include/common/ThreadPool.h b/libs/libcommon/include/common/ThreadPool.h index 15952042a04..afb969cf51c 100644 --- a/libs/libcommon/include/common/ThreadPool.h +++ b/libs/libcommon/include/common/ThreadPool.h @@ -58,3 +58,39 @@ private: void worker(); }; +class ExceptionHandler +{ +public: + void setException(std::exception_ptr && exception) + { + std::unique_lock lock(mutex); + if (!first_exception) + first_exception = std::move(exception); + } + + void throwIfException() + { + std::unique_lock lock(mutex); + if (first_exception) + std::rethrow_exception(first_exception); + } + +private: + std::exception_ptr first_exception; + std::mutex mutex; +}; + +ThreadPool::Job createExceptionHandledJob(ThreadPool::Job job, ExceptionHandler & handler) +{ + return [job{std::move(job)}, &handler] () + { + try + { + job(); + } + catch (...) + { + handler.setException(std::current_exception()); + } + }; +}