mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Fix deadlock in DatabaseOrdinary in case of exception while loading tables.
This commit is contained in:
parent
a451a9282b
commit
715d120ed9
@ -165,6 +165,7 @@ void DatabaseOrdinary::loadTables(
|
||||
AtomicStopwatch watch;
|
||||
std::atomic<size_t> tables_processed {0};
|
||||
Poco::Event all_tables_processed;
|
||||
ExceptionHandler exception_handler;
|
||||
|
||||
auto task_function = [&](const String & table)
|
||||
{
|
||||
@ -186,7 +187,7 @@ void DatabaseOrdinary::loadTables(
|
||||
|
||||
for (const auto & filename : file_names)
|
||||
{
|
||||
auto task = std::bind(task_function, filename);
|
||||
auto task = createExceptionHandledJob(std::bind(task_function, filename), exception_handler);
|
||||
|
||||
if (thread_pool)
|
||||
thread_pool->schedule(task);
|
||||
@ -197,6 +198,8 @@ void DatabaseOrdinary::loadTables(
|
||||
if (thread_pool)
|
||||
all_tables_processed.wait();
|
||||
|
||||
exception_handler.throwIfException();
|
||||
|
||||
/// After all tables was basically initialized, startup them.
|
||||
startupTables(thread_pool);
|
||||
}
|
||||
@ -210,12 +213,18 @@ void DatabaseOrdinary::startupTables(ThreadPool * thread_pool)
|
||||
std::atomic<size_t> tables_processed {0};
|
||||
size_t total_tables = tables.size();
|
||||
Poco::Event all_tables_processed;
|
||||
ExceptionHandler exception_handler;
|
||||
|
||||
if (!total_tables)
|
||||
return;
|
||||
|
||||
auto task_function = [&](const StoragePtr & table)
|
||||
{
|
||||
SCOPE_EXIT(
|
||||
if (++tables_processed == total_tables)
|
||||
all_tables_processed.set()
|
||||
);
|
||||
|
||||
if ((tables_processed + 1) % PRINT_MESSAGE_EACH_N_TABLES == 0
|
||||
|| watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS))
|
||||
{
|
||||
@ -224,14 +233,11 @@ void DatabaseOrdinary::startupTables(ThreadPool * thread_pool)
|
||||
}
|
||||
|
||||
table->startup();
|
||||
|
||||
if (++tables_processed == total_tables)
|
||||
all_tables_processed.set();
|
||||
};
|
||||
|
||||
for (const auto & name_storage : tables)
|
||||
{
|
||||
auto task = std::bind(task_function, name_storage.second);
|
||||
auto task = createExceptionHandledJob(std::bind(task_function, name_storage.second), exception_handler);
|
||||
|
||||
if (thread_pool)
|
||||
thread_pool->schedule(task);
|
||||
@ -241,6 +247,8 @@ void DatabaseOrdinary::startupTables(ThreadPool * thread_pool)
|
||||
|
||||
if (thread_pool)
|
||||
all_tables_processed.wait();
|
||||
|
||||
exception_handler.throwIfException();
|
||||
}
|
||||
|
||||
|
||||
|
@ -58,3 +58,39 @@ private:
|
||||
void worker();
|
||||
};
|
||||
|
||||
class ExceptionHandler
|
||||
{
|
||||
public:
|
||||
void setException(std::exception_ptr && exception)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
if (!first_exception)
|
||||
first_exception = std::move(exception);
|
||||
}
|
||||
|
||||
void throwIfException()
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
if (first_exception)
|
||||
std::rethrow_exception(first_exception);
|
||||
}
|
||||
|
||||
private:
|
||||
std::exception_ptr first_exception;
|
||||
std::mutex mutex;
|
||||
};
|
||||
|
||||
ThreadPool::Job createExceptionHandledJob(ThreadPool::Job job, ExceptionHandler & handler)
|
||||
{
|
||||
return [job{std::move(job)}, &handler] ()
|
||||
{
|
||||
try
|
||||
{
|
||||
job();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
handler.setException(std::current_exception());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user