Merge pull request #3760 from yandex/fix-database-ordinary-load-table-deadlock

Fix database ordinary load table deadlock
This commit is contained in:
alexey-milovidov 2018-12-07 07:13:58 +03:00 committed by GitHub
commit acb4feef26
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 79 additions and 15 deletions

View File

@ -376,6 +376,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
format_schema_path.createDirectories(); format_schema_path.createDirectories();
LOG_INFO(log, "Loading metadata."); LOG_INFO(log, "Loading metadata.");
try
{
loadMetadataSystem(*global_context); loadMetadataSystem(*global_context);
/// After attaching system databases we can initialize system log. /// After attaching system databases we can initialize system log.
global_context->initializeSystemLogs(); global_context->initializeSystemLogs();
@ -383,6 +385,12 @@ int Server::main(const std::vector<std::string> & /*args*/)
attachSystemTablesServer(*global_context->getDatabase("system"), has_zookeeper); attachSystemTablesServer(*global_context->getDatabase("system"), has_zookeeper);
/// Then, load remaining databases /// Then, load remaining databases
loadMetadata(*global_context); loadMetadata(*global_context);
}
catch (...)
{
tryLogCurrentException(log, "Caught exception while loading metadata");
throw;
}
LOG_DEBUG(log, "Loaded metadata."); LOG_DEBUG(log, "Loaded metadata.");
global_context->setCurrentDatabase(default_database); global_context->setCurrentDatabase(default_database);

View File

@ -21,6 +21,7 @@
#include <IO/ReadBufferFromFile.h> #include <IO/ReadBufferFromFile.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <ext/scope_guard.h>
namespace DB namespace DB
@ -164,9 +165,15 @@ void DatabaseOrdinary::loadTables(
AtomicStopwatch watch; AtomicStopwatch watch;
std::atomic<size_t> tables_processed {0}; std::atomic<size_t> tables_processed {0};
Poco::Event all_tables_processed; Poco::Event all_tables_processed;
ExceptionHandler exception_handler;
auto task_function = [&](const String & table) auto task_function = [&](const String & table)
{ {
SCOPE_EXIT(
if (++tables_processed == total_tables)
all_tables_processed.set()
);
/// Messages, so that it's not boring to wait for the server to load for a long time. /// Messages, so that it's not boring to wait for the server to load for a long time.
if ((tables_processed + 1) % PRINT_MESSAGE_EACH_N_TABLES == 0 if ((tables_processed + 1) % PRINT_MESSAGE_EACH_N_TABLES == 0
|| watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS)) || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS))
@ -176,14 +183,11 @@ void DatabaseOrdinary::loadTables(
} }
loadTable(context, metadata_path, *this, name, data_path, table, has_force_restore_data_flag); loadTable(context, metadata_path, *this, name, data_path, table, has_force_restore_data_flag);
if (++tables_processed == total_tables)
all_tables_processed.set();
}; };
for (const auto & filename : file_names) for (const auto & filename : file_names)
{ {
auto task = std::bind(task_function, filename); auto task = createExceptionHandledJob(std::bind(task_function, filename), exception_handler);
if (thread_pool) if (thread_pool)
thread_pool->schedule(task); thread_pool->schedule(task);
@ -194,6 +198,8 @@ void DatabaseOrdinary::loadTables(
if (thread_pool) if (thread_pool)
all_tables_processed.wait(); all_tables_processed.wait();
exception_handler.throwIfException();
/// After all tables was basically initialized, startup them. /// After all tables was basically initialized, startup them.
startupTables(thread_pool); startupTables(thread_pool);
} }
@ -207,12 +213,18 @@ void DatabaseOrdinary::startupTables(ThreadPool * thread_pool)
std::atomic<size_t> tables_processed {0}; std::atomic<size_t> tables_processed {0};
size_t total_tables = tables.size(); size_t total_tables = tables.size();
Poco::Event all_tables_processed; Poco::Event all_tables_processed;
ExceptionHandler exception_handler;
if (!total_tables) if (!total_tables)
return; return;
auto task_function = [&](const StoragePtr & table) auto task_function = [&](const StoragePtr & table)
{ {
SCOPE_EXIT(
if (++tables_processed == total_tables)
all_tables_processed.set()
);
if ((tables_processed + 1) % PRINT_MESSAGE_EACH_N_TABLES == 0 if ((tables_processed + 1) % PRINT_MESSAGE_EACH_N_TABLES == 0
|| watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS)) || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS))
{ {
@ -221,14 +233,11 @@ void DatabaseOrdinary::startupTables(ThreadPool * thread_pool)
} }
table->startup(); table->startup();
if (++tables_processed == total_tables)
all_tables_processed.set();
}; };
for (const auto & name_storage : tables) for (const auto & name_storage : tables)
{ {
auto task = std::bind(task_function, name_storage.second); auto task = createExceptionHandledJob(std::bind(task_function, name_storage.second), exception_handler);
if (thread_pool) if (thread_pool)
thread_pool->schedule(task); thread_pool->schedule(task);
@ -238,6 +247,8 @@ void DatabaseOrdinary::startupTables(ThreadPool * thread_pool)
if (thread_pool) if (thread_pool)
all_tables_processed.wait(); all_tables_processed.wait();
exception_handler.throwIfException();
} }

View File

@ -58,3 +58,17 @@ private:
void worker(); void worker();
}; };
/// Allows to save first catched exception in jobs and postpone its rethrow.
class ExceptionHandler
{
public:
void setException(std::exception_ptr && exception);
void throwIfException();
private:
std::exception_ptr first_exception;
std::mutex mutex;
};
ThreadPool::Job createExceptionHandledJob(ThreadPool::Job job, ExceptionHandler & handler);

View File

@ -112,3 +112,34 @@ void ThreadPool::worker()
} }
} }
void ExceptionHandler::setException(std::exception_ptr && exception)
{
std::unique_lock<std::mutex> lock(mutex);
if (!first_exception)
first_exception = std::move(exception);
}
void ExceptionHandler::throwIfException()
{
std::unique_lock<std::mutex> lock(mutex);
if (first_exception)
std::rethrow_exception(first_exception);
}
ThreadPool::Job createExceptionHandledJob(ThreadPool::Job job, ExceptionHandler & handler)
{
return [job{std::move(job)}, &handler] ()
{
try
{
job();
}
catch (...)
{
handler.setException(std::current_exception());
}
};
}