diff --git a/dbms/src/Core/Defines.h b/dbms/src/Core/Defines.h index 461278fad3b..87ae5f183fe 100644 --- a/dbms/src/Core/Defines.h +++ b/dbms/src/Core/Defines.h @@ -139,3 +139,7 @@ /// This number is only used for distributed version compatible. /// It could be any magic number. #define DBMS_DISTRIBUTED_SENDS_MAGIC_NUMBER 0xCAFECABE + +/// A macro for suppressing warnings about unused variables. Useful for +/// structured bindings which have no standard way to declare this. +#define UNUSED_VARIABLE(X) (void) (X) diff --git a/dbms/src/Databases/DatabaseDictionary.cpp b/dbms/src/Databases/DatabaseDictionary.cpp index 4451b320e44..5e69b447a1d 100644 --- a/dbms/src/Databases/DatabaseDictionary.cpp +++ b/dbms/src/Databases/DatabaseDictionary.cpp @@ -27,7 +27,7 @@ DatabaseDictionary::DatabaseDictionary(const String & name_) { } -void DatabaseDictionary::loadTables(Context &, ThreadPool *, bool) +void DatabaseDictionary::loadTables(Context &, bool) { } diff --git a/dbms/src/Databases/DatabaseDictionary.h b/dbms/src/Databases/DatabaseDictionary.h index 92b52ffd4a0..1a3b815ae07 100644 --- a/dbms/src/Databases/DatabaseDictionary.h +++ b/dbms/src/Databases/DatabaseDictionary.h @@ -33,7 +33,6 @@ public: void loadTables( Context & context, - ThreadPool * thread_pool, bool has_force_restore_data_flag) override; bool isTableExist( diff --git a/dbms/src/Databases/DatabaseMemory.cpp b/dbms/src/Databases/DatabaseMemory.cpp index 3eea0bc666a..e84d5a95078 100644 --- a/dbms/src/Databases/DatabaseMemory.cpp +++ b/dbms/src/Databases/DatabaseMemory.cpp @@ -18,7 +18,6 @@ DatabaseMemory::DatabaseMemory(String name_) void DatabaseMemory::loadTables( Context & /*context*/, - ThreadPool * /*thread_pool*/, bool /*has_force_restore_data_flag*/) { /// Nothing to load. diff --git a/dbms/src/Databases/DatabaseMemory.h b/dbms/src/Databases/DatabaseMemory.h index fe7cc783ba3..325fde0437f 100644 --- a/dbms/src/Databases/DatabaseMemory.h +++ b/dbms/src/Databases/DatabaseMemory.h @@ -25,7 +25,6 @@ public: void loadTables( Context & context, - ThreadPool * thread_pool, bool has_force_restore_data_flag) override; void createTable( diff --git a/dbms/src/Databases/DatabaseMySQL.h b/dbms/src/Databases/DatabaseMySQL.h index 7ce836d6a64..f0fad5b8e5c 100644 --- a/dbms/src/Databases/DatabaseMySQL.h +++ b/dbms/src/Databases/DatabaseMySQL.h @@ -46,7 +46,7 @@ public: throw Exception("MySQL database engine does not support detach table.", ErrorCodes::NOT_IMPLEMENTED); } - void loadTables(Context &, ThreadPool *, bool) override + void loadTables(Context &, bool) override { /// do nothing } diff --git a/dbms/src/Databases/DatabaseOrdinary.cpp b/dbms/src/Databases/DatabaseOrdinary.cpp index 9fa7d1b1196..25b3eb652b5 100644 --- a/dbms/src/Databases/DatabaseOrdinary.cpp +++ b/dbms/src/Databases/DatabaseOrdinary.cpp @@ -119,7 +119,6 @@ DatabaseOrdinary::DatabaseOrdinary(String name_, const String & metadata_path_, void DatabaseOrdinary::loadTables( Context & context, - ThreadPool * thread_pool, bool has_force_restore_data_flag) { using FileNames = std::vector; @@ -161,96 +160,68 @@ void DatabaseOrdinary::loadTables( */ std::sort(file_names.begin(), file_names.end()); - size_t total_tables = file_names.size(); + const size_t total_tables = file_names.size(); LOG_INFO(log, "Total " << total_tables << " tables."); AtomicStopwatch watch; std::atomic tables_processed {0}; - Poco::Event all_tables_processed; - ExceptionHandler exception_handler; - auto task_function = [&](const String & table) + auto loadOneTable = [&](const String & table) { - SCOPE_EXIT( - if (++tables_processed == total_tables) - all_tables_processed.set() - ); + loadTable(context, metadata_path, *this, name, data_path, table, has_force_restore_data_flag); /// Messages, so that it's not boring to wait for the server to load for a long time. - if ((tables_processed + 1) % PRINT_MESSAGE_EACH_N_TABLES == 0 + if (++tables_processed % PRINT_MESSAGE_EACH_N_TABLES == 0 || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS)) { LOG_INFO(log, std::fixed << std::setprecision(2) << tables_processed * 100.0 / total_tables << "%"); watch.restart(); } - - loadTable(context, metadata_path, *this, name, data_path, table, has_force_restore_data_flag); }; - for (const auto & filename : file_names) - { - auto task = createExceptionHandledJob(std::bind(task_function, filename), exception_handler); + ThreadPool pool(SettingMaxThreads().getAutoValue()); - if (thread_pool) - thread_pool->schedule(task); - else - task(); + for (const auto & file_name : file_names) + { + pool.schedule([&]() { loadOneTable(file_name); }); } - if (thread_pool) - all_tables_processed.wait(); - - exception_handler.throwIfException(); + pool.wait(); /// After all tables was basically initialized, startup them. - startupTables(thread_pool); + startupTables(pool); } -void DatabaseOrdinary::startupTables(ThreadPool * thread_pool) +void DatabaseOrdinary::startupTables(ThreadPool & thread_pool) { LOG_INFO(log, "Starting up tables."); - AtomicStopwatch watch; - std::atomic tables_processed {0}; - size_t total_tables = tables.size(); - Poco::Event all_tables_processed; - ExceptionHandler exception_handler; - + const size_t total_tables = tables.size(); if (!total_tables) return; - auto task_function = [&](const StoragePtr & table) - { - SCOPE_EXIT( - if (++tables_processed == total_tables) - all_tables_processed.set() - ); + AtomicStopwatch watch; + std::atomic tables_processed {0}; - if ((tables_processed + 1) % PRINT_MESSAGE_EACH_N_TABLES == 0 + auto startupOneTable = [&](const StoragePtr & table) + { + table->startup(); + + if (++tables_processed % PRINT_MESSAGE_EACH_N_TABLES == 0 || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS)) { LOG_INFO(log, std::fixed << std::setprecision(2) << tables_processed * 100.0 / total_tables << "%"); watch.restart(); } - - table->startup(); }; - for (const auto & name_storage : tables) + for (const auto & table : tables) { - auto task = createExceptionHandledJob(std::bind(task_function, name_storage.second), exception_handler); - - if (thread_pool) - thread_pool->schedule(task); - else - task(); + thread_pool.schedule([&]() { startupOneTable(table.second); }); } - if (thread_pool) - all_tables_processed.wait(); - - exception_handler.throwIfException(); + thread_pool.wait(); } diff --git a/dbms/src/Databases/DatabaseOrdinary.h b/dbms/src/Databases/DatabaseOrdinary.h index 887bf101d62..369f78d36ba 100644 --- a/dbms/src/Databases/DatabaseOrdinary.h +++ b/dbms/src/Databases/DatabaseOrdinary.h @@ -19,7 +19,6 @@ public: void loadTables( Context & context, - ThreadPool * thread_pool, bool has_force_restore_data_flag) override; void createTable( @@ -73,7 +72,7 @@ private: const String data_path; Poco::Logger * log; - void startupTables(ThreadPool * thread_pool); + void startupTables(ThreadPool & thread_pool); ASTPtr getCreateTableQueryImpl(const Context & context, const String & table_name, bool throw_on_error) const; }; diff --git a/dbms/src/Databases/IDatabase.h b/dbms/src/Databases/IDatabase.h index ea344c712d3..6136be6c98b 100644 --- a/dbms/src/Databases/IDatabase.h +++ b/dbms/src/Databases/IDatabase.h @@ -56,11 +56,10 @@ public: /// Get name of database engine. virtual String getEngineName() const = 0; - /// Load a set of existing tables. If thread_pool is specified, use it. + /// Load a set of existing tables. /// You can call only once, right after the object is created. virtual void loadTables( Context & context, - ThreadPool * thread_pool, bool has_force_restore_data_flag) = 0; /// Check the existence of the table. diff --git a/dbms/src/Interpreters/InterpreterCreateQuery.cpp b/dbms/src/Interpreters/InterpreterCreateQuery.cpp index 7e1f46e674e..19d27a3851a 100644 --- a/dbms/src/Interpreters/InterpreterCreateQuery.cpp +++ b/dbms/src/Interpreters/InterpreterCreateQuery.cpp @@ -159,7 +159,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create) if (need_write_metadata) Poco::File(metadata_file_tmp_path).renameTo(metadata_file_path); - database->loadTables(context, thread_pool, has_force_restore_data_flag); + database->loadTables(context, has_force_restore_data_flag); } catch (...) { diff --git a/dbms/src/Interpreters/InterpreterCreateQuery.h b/dbms/src/Interpreters/InterpreterCreateQuery.h index 82ba09fb474..68e24c50ccc 100644 --- a/dbms/src/Interpreters/InterpreterCreateQuery.h +++ b/dbms/src/Interpreters/InterpreterCreateQuery.h @@ -31,11 +31,6 @@ public: static ASTPtr formatIndices(const IndicesDescription & indices); - void setDatabaseLoadingThreadpool(ThreadPool & thread_pool_) - { - thread_pool = &thread_pool_; - } - void setForceRestoreData(bool has_force_restore_data_flag_) { has_force_restore_data_flag = has_force_restore_data_flag_; @@ -61,9 +56,6 @@ private: ASTPtr query_ptr; Context & context; - /// Using while loading database. - ThreadPool * thread_pool = nullptr; - /// Skip safety threshold when loading tables. bool has_force_restore_data_flag = false; /// Is this an internal query - not from the user. diff --git a/dbms/src/Interpreters/loadMetadata.cpp b/dbms/src/Interpreters/loadMetadata.cpp index e0caa8f433d..84a3adffe07 100644 --- a/dbms/src/Interpreters/loadMetadata.cpp +++ b/dbms/src/Interpreters/loadMetadata.cpp @@ -33,7 +33,6 @@ static void executeCreateQuery( Context & context, const String & database, const String & file_name, - ThreadPool * pool, bool has_force_restore_data_flag) { ParserCreateQuery parser; @@ -45,8 +44,6 @@ static void executeCreateQuery( InterpreterCreateQuery interpreter(ast, context); interpreter.setInternal(true); - if (pool) - interpreter.setDatabaseLoadingThreadpool(*pool); interpreter.setForceRestoreData(has_force_restore_data_flag); interpreter.execute(); } @@ -56,7 +53,6 @@ static void loadDatabase( Context & context, const String & database, const String & database_path, - ThreadPool * thread_pool, bool force_restore_data) { /// There may exist .sql file with database creation statement. @@ -73,7 +69,8 @@ static void loadDatabase( else database_attach_query = "ATTACH DATABASE " + backQuoteIfNeed(database); - executeCreateQuery(database_attach_query, context, database, database_metadata_file, thread_pool, force_restore_data); + executeCreateQuery(database_attach_query, context, database, + database_metadata_file, force_restore_data); } @@ -92,9 +89,6 @@ void loadMetadata(Context & context) Poco::File force_restore_data_flag_file(context.getFlagsPath() + "force_restore_data"); bool has_force_restore_data_flag = force_restore_data_flag_file.exists(); - /// For parallel tables loading. - ThreadPool thread_pool(SettingMaxThreads().getAutoValue()); - /// Loop over databases. std::map databases; Poco::DirectoryIterator dir_end; @@ -113,10 +107,8 @@ void loadMetadata(Context & context) databases.emplace(unescapeForFileName(it.name()), it.path().toString()); } - for (const auto & elem : databases) - loadDatabase(context, elem.first, elem.second, &thread_pool, has_force_restore_data_flag); - - thread_pool.wait(); + for (const auto & [name, path] : databases) + loadDatabase(context, name, path, has_force_restore_data_flag); if (has_force_restore_data_flag) { @@ -138,7 +130,7 @@ void loadMetadataSystem(Context & context) if (Poco::File(path).exists()) { /// 'has_force_restore_data_flag' is true, to not fail on loading query_log table, if it is corrupted. - loadDatabase(context, SYSTEM_DATABASE, path, nullptr, true); + loadDatabase(context, SYSTEM_DATABASE, path, true); } else { diff --git a/dbms/src/Interpreters/tests/create_query.cpp b/dbms/src/Interpreters/tests/create_query.cpp index 11f01ef3a6a..47e1f202db7 100644 --- a/dbms/src/Interpreters/tests/create_query.cpp +++ b/dbms/src/Interpreters/tests/create_query.cpp @@ -84,7 +84,7 @@ try context.setPath("./"); auto database = std::make_shared("test", "./metadata/test/", context); context.addDatabase("test", database); - database->loadTables(context, nullptr, false); + database->loadTables(context, false); context.setCurrentDatabase("test"); InterpreterCreateQuery interpreter(ast, context); diff --git a/dbms/src/Interpreters/tests/select_query.cpp b/dbms/src/Interpreters/tests/select_query.cpp index 7267c408999..1283ae6e659 100644 --- a/dbms/src/Interpreters/tests/select_query.cpp +++ b/dbms/src/Interpreters/tests/select_query.cpp @@ -39,7 +39,7 @@ try DatabasePtr system = std::make_shared("system", "./metadata/system/", context); context.addDatabase("system", system); - system->loadTables(context, nullptr, false); + system->loadTables(context, false); attachSystemTablesLocal(*context.getDatabase("system")); context.setCurrentDatabase("default");