Fix a race condition in parallel table loading.

ThreadSanitizer reported that a data race occurs when we attach to a
database and load its tables concurrently. Remove the custom code that
waits for all tables to load, and replace it with a local thread pool
and its wait() method.

This changes observable behavior: before, we would load the tables
sequentially when loading the system database, or performing an ATTACH
DATABASE query. Now we always load tables of ordinary databases in
parallel.
This commit is contained in:
Alexander Kuzmenkov 2019-07-19 17:22:57 +03:00
parent 07fcbbcdc8
commit de72d117ce
14 changed files with 39 additions and 85 deletions

View File

@ -139,3 +139,7 @@
/// This number is only used for distributed version compatible.
/// It could be any magic number.
#define DBMS_DISTRIBUTED_SENDS_MAGIC_NUMBER 0xCAFECABE
/// A macro for suppressing warnings about unused variables. Useful for
/// structured bindings which have no standard way to declare this.
#define UNUSED_VARIABLE(X) (void) (X)

View File

@ -27,7 +27,7 @@ DatabaseDictionary::DatabaseDictionary(const String & name_)
{
}
void DatabaseDictionary::loadTables(Context &, ThreadPool *, bool)
void DatabaseDictionary::loadTables(Context &, bool)
{
}

View File

@ -33,7 +33,6 @@ public:
void loadTables(
Context & context,
ThreadPool * thread_pool,
bool has_force_restore_data_flag) override;
bool isTableExist(

View File

@ -18,7 +18,6 @@ DatabaseMemory::DatabaseMemory(String name_)
void DatabaseMemory::loadTables(
Context & /*context*/,
ThreadPool * /*thread_pool*/,
bool /*has_force_restore_data_flag*/)
{
/// Nothing to load.

View File

@ -25,7 +25,6 @@ public:
void loadTables(
Context & context,
ThreadPool * thread_pool,
bool has_force_restore_data_flag) override;
void createTable(

View File

@ -46,7 +46,7 @@ public:
throw Exception("MySQL database engine does not support detach table.", ErrorCodes::NOT_IMPLEMENTED);
}
void loadTables(Context &, ThreadPool *, bool) override
void loadTables(Context &, bool) override
{
/// do nothing
}

View File

@ -119,7 +119,6 @@ DatabaseOrdinary::DatabaseOrdinary(String name_, const String & metadata_path_,
void DatabaseOrdinary::loadTables(
Context & context,
ThreadPool * thread_pool,
bool has_force_restore_data_flag)
{
using FileNames = std::vector<std::string>;
@ -161,96 +160,68 @@ void DatabaseOrdinary::loadTables(
*/
std::sort(file_names.begin(), file_names.end());
size_t total_tables = file_names.size();
const size_t total_tables = file_names.size();
LOG_INFO(log, "Total " << total_tables << " tables.");
AtomicStopwatch watch;
std::atomic<size_t> tables_processed {0};
Poco::Event all_tables_processed;
ExceptionHandler exception_handler;
auto task_function = [&](const String & table)
auto loadOneTable = [&](const String & table)
{
SCOPE_EXIT(
if (++tables_processed == total_tables)
all_tables_processed.set()
);
loadTable(context, metadata_path, *this, name, data_path, table, has_force_restore_data_flag);
/// Messages, so that it's not boring to wait for the server to load for a long time.
if ((tables_processed + 1) % PRINT_MESSAGE_EACH_N_TABLES == 0
if (++tables_processed % PRINT_MESSAGE_EACH_N_TABLES == 0
|| watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS))
{
LOG_INFO(log, std::fixed << std::setprecision(2) << tables_processed * 100.0 / total_tables << "%");
watch.restart();
}
loadTable(context, metadata_path, *this, name, data_path, table, has_force_restore_data_flag);
};
for (const auto & filename : file_names)
{
auto task = createExceptionHandledJob(std::bind(task_function, filename), exception_handler);
ThreadPool pool(SettingMaxThreads().getAutoValue());
if (thread_pool)
thread_pool->schedule(task);
else
task();
for (const auto & file_name : file_names)
{
pool.schedule([&]() { loadOneTable(file_name); });
}
if (thread_pool)
all_tables_processed.wait();
exception_handler.throwIfException();
pool.wait();
/// After all tables was basically initialized, startup them.
startupTables(thread_pool);
startupTables(pool);
}
void DatabaseOrdinary::startupTables(ThreadPool * thread_pool)
void DatabaseOrdinary::startupTables(ThreadPool & thread_pool)
{
LOG_INFO(log, "Starting up tables.");
AtomicStopwatch watch;
std::atomic<size_t> tables_processed {0};
size_t total_tables = tables.size();
Poco::Event all_tables_processed;
ExceptionHandler exception_handler;
const size_t total_tables = tables.size();
if (!total_tables)
return;
auto task_function = [&](const StoragePtr & table)
{
SCOPE_EXIT(
if (++tables_processed == total_tables)
all_tables_processed.set()
);
AtomicStopwatch watch;
std::atomic<size_t> tables_processed {0};
if ((tables_processed + 1) % PRINT_MESSAGE_EACH_N_TABLES == 0
auto startupOneTable = [&](const StoragePtr & table)
{
table->startup();
if (++tables_processed % PRINT_MESSAGE_EACH_N_TABLES == 0
|| watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS))
{
LOG_INFO(log, std::fixed << std::setprecision(2) << tables_processed * 100.0 / total_tables << "%");
watch.restart();
watch.restart();
}
table->startup();
};
for (const auto & name_storage : tables)
for (const auto & table : tables)
{
auto task = createExceptionHandledJob(std::bind(task_function, name_storage.second), exception_handler);
if (thread_pool)
thread_pool->schedule(task);
else
task();
thread_pool.schedule([&]() { startupOneTable(table.second); });
}
if (thread_pool)
all_tables_processed.wait();
exception_handler.throwIfException();
thread_pool.wait();
}

View File

@ -19,7 +19,6 @@ public:
void loadTables(
Context & context,
ThreadPool * thread_pool,
bool has_force_restore_data_flag) override;
void createTable(
@ -73,7 +72,7 @@ private:
const String data_path;
Poco::Logger * log;
void startupTables(ThreadPool * thread_pool);
void startupTables(ThreadPool & thread_pool);
ASTPtr getCreateTableQueryImpl(const Context & context, const String & table_name, bool throw_on_error) const;
};

View File

@ -56,11 +56,10 @@ public:
/// Get name of database engine.
virtual String getEngineName() const = 0;
/// Load a set of existing tables. If thread_pool is specified, use it.
/// Load a set of existing tables.
/// You can call only once, right after the object is created.
virtual void loadTables(
Context & context,
ThreadPool * thread_pool,
bool has_force_restore_data_flag) = 0;
/// Check the existence of the table.

View File

@ -159,7 +159,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
if (need_write_metadata)
Poco::File(metadata_file_tmp_path).renameTo(metadata_file_path);
database->loadTables(context, thread_pool, has_force_restore_data_flag);
database->loadTables(context, has_force_restore_data_flag);
}
catch (...)
{

View File

@ -31,11 +31,6 @@ public:
static ASTPtr formatIndices(const IndicesDescription & indices);
void setDatabaseLoadingThreadpool(ThreadPool & thread_pool_)
{
thread_pool = &thread_pool_;
}
void setForceRestoreData(bool has_force_restore_data_flag_)
{
has_force_restore_data_flag = has_force_restore_data_flag_;
@ -61,9 +56,6 @@ private:
ASTPtr query_ptr;
Context & context;
/// Using while loading database.
ThreadPool * thread_pool = nullptr;
/// Skip safety threshold when loading tables.
bool has_force_restore_data_flag = false;
/// Is this an internal query - not from the user.

View File

@ -33,7 +33,6 @@ static void executeCreateQuery(
Context & context,
const String & database,
const String & file_name,
ThreadPool * pool,
bool has_force_restore_data_flag)
{
ParserCreateQuery parser;
@ -45,8 +44,6 @@ static void executeCreateQuery(
InterpreterCreateQuery interpreter(ast, context);
interpreter.setInternal(true);
if (pool)
interpreter.setDatabaseLoadingThreadpool(*pool);
interpreter.setForceRestoreData(has_force_restore_data_flag);
interpreter.execute();
}
@ -56,7 +53,6 @@ static void loadDatabase(
Context & context,
const String & database,
const String & database_path,
ThreadPool * thread_pool,
bool force_restore_data)
{
/// There may exist .sql file with database creation statement.
@ -73,7 +69,8 @@ static void loadDatabase(
else
database_attach_query = "ATTACH DATABASE " + backQuoteIfNeed(database);
executeCreateQuery(database_attach_query, context, database, database_metadata_file, thread_pool, force_restore_data);
executeCreateQuery(database_attach_query, context, database,
database_metadata_file, force_restore_data);
}
@ -92,9 +89,6 @@ void loadMetadata(Context & context)
Poco::File force_restore_data_flag_file(context.getFlagsPath() + "force_restore_data");
bool has_force_restore_data_flag = force_restore_data_flag_file.exists();
/// For parallel tables loading.
ThreadPool thread_pool(SettingMaxThreads().getAutoValue());
/// Loop over databases.
std::map<String, String> databases;
Poco::DirectoryIterator dir_end;
@ -113,10 +107,8 @@ void loadMetadata(Context & context)
databases.emplace(unescapeForFileName(it.name()), it.path().toString());
}
for (const auto & elem : databases)
loadDatabase(context, elem.first, elem.second, &thread_pool, has_force_restore_data_flag);
thread_pool.wait();
for (const auto & [name, path] : databases)
loadDatabase(context, name, path, has_force_restore_data_flag);
if (has_force_restore_data_flag)
{
@ -138,7 +130,7 @@ void loadMetadataSystem(Context & context)
if (Poco::File(path).exists())
{
/// 'has_force_restore_data_flag' is true, to not fail on loading query_log table, if it is corrupted.
loadDatabase(context, SYSTEM_DATABASE, path, nullptr, true);
loadDatabase(context, SYSTEM_DATABASE, path, true);
}
else
{

View File

@ -84,7 +84,7 @@ try
context.setPath("./");
auto database = std::make_shared<DatabaseOrdinary>("test", "./metadata/test/", context);
context.addDatabase("test", database);
database->loadTables(context, nullptr, false);
database->loadTables(context, false);
context.setCurrentDatabase("test");
InterpreterCreateQuery interpreter(ast, context);

View File

@ -39,7 +39,7 @@ try
DatabasePtr system = std::make_shared<DatabaseOrdinary>("system", "./metadata/system/", context);
context.addDatabase("system", system);
system->loadTables(context, nullptr, false);
system->loadTables(context, false);
attachSystemTablesLocal(*context.getDatabase("system"));
context.setCurrentDatabase("default");