diff --git a/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp b/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp index 6a9f1e37f8e..8edeabee004 100644 --- a/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp +++ b/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp @@ -82,18 +82,11 @@ template void DatabaseMaterializeMySQL::loadStoredObjects(Context & context, bool has_force_restore_data_flag, bool force_attach) { Base::loadStoredObjects(context, has_force_restore_data_flag, force_attach); - try - { - materialize_thread.startSynchronization(); - started_up = true; - } - catch (...) - { - tryLogCurrentException(Base::log, "Cannot load MySQL nested database stored objects."); + if (!force_attach) + materialize_thread.assertMySQLAvailable(); - if (!force_attach) - throw; - } + materialize_thread.startSynchronization(); + started_up = true; } template diff --git a/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp b/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp index 08d170768f4..d651741d14f 100644 --- a/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp +++ b/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp @@ -188,8 +188,7 @@ void MaterializeMySQLSyncThread::synchronization() { client.disconnect(); tryLogCurrentException(log); - auto db = DatabaseCatalog::instance().getDatabase(database_name); - setSynchronizationThreadException(db, std::current_exception()); + setSynchronizationThreadException(std::current_exception()); } } @@ -204,31 +203,28 @@ void MaterializeMySQLSyncThread::stopSynchronization() } void MaterializeMySQLSyncThread::startSynchronization() +{ + background_thread_pool = std::make_unique([this]() { synchronization(); }); +} + +void MaterializeMySQLSyncThread::assertMySQLAvailable() { try { checkMySQLVariables(pool.get()); - background_thread_pool = std::make_unique([this]() { synchronization(); }); } - catch (...) + catch (const mysqlxx::ConnectionFailed & e) { - try - { + if (e.errnum() == ER_ACCESS_DENIED_ERROR + || e.errnum() == ER_DBACCESS_DENIED_ERROR) + throw Exception("MySQL SYNC USER ACCESS ERR: mysql sync user needs " + "at least GLOBAL PRIVILEGES:'RELOAD, REPLICATION SLAVE, REPLICATION CLIENT' " + "and SELECT PRIVILEGE on Database " + mysql_database_name + , ErrorCodes::SYNC_MYSQL_USER_ACCESS_ERROR); + else if (e.errnum() == ER_BAD_DB_ERROR) + throw Exception("Unknown database '" + mysql_database_name + "' on MySQL", ErrorCodes::UNKNOWN_DATABASE); + else throw; - } - catch (mysqlxx::ConnectionFailed & e) - { - if (e.errnum() == ER_ACCESS_DENIED_ERROR - || e.errnum() == ER_DBACCESS_DENIED_ERROR) - throw Exception("MySQL SYNC USER ACCESS ERR: mysql sync user needs " - "at least GLOBAL PRIVILEGES:'RELOAD, REPLICATION SLAVE, REPLICATION CLIENT' " - "and SELECT PRIVILEGE on Database " + mysql_database_name - , ErrorCodes::SYNC_MYSQL_USER_ACCESS_ERROR); - else if (e.errnum() == ER_BAD_DB_ERROR) - throw Exception("Unknown database '" + mysql_database_name + "' on MySQL", ErrorCodes::UNKNOWN_DATABASE); - else - throw; - } } } @@ -341,6 +337,7 @@ std::optional MaterializeMySQLSyncThread::prepareSynchroniz connection = pool.get(); opened_transaction = false; + checkMySQLVariables(connection); MaterializeMetadata metadata( connection, DatabaseCatalog::instance().getDatabase(database_name)->getMetadataPath() + "/.metadata", mysql_database_name, opened_transaction); @@ -369,6 +366,8 @@ std::optional MaterializeMySQLSyncThread::prepareSynchroniz client.connect(); client.startBinlogDumpGTID(randomNumber(), mysql_database_name, metadata.executed_gtid_set, metadata.binlog_checksum); + + setSynchronizationThreadException(nullptr); return metadata; } catch (...) @@ -384,6 +383,7 @@ std::optional MaterializeMySQLSyncThread::prepareSynchroniz } catch (const mysqlxx::ConnectionFailed &) { + setSynchronizationThreadException(std::current_exception()); /// Avoid busy loop when MySQL is not available. sleepForMilliseconds(settings->max_wait_time_when_mysql_unavailable); } @@ -705,6 +705,12 @@ bool MaterializeMySQLSyncThread::isMySQLSyncThread() return getThreadName() == MYSQL_BACKGROUND_THREAD_NAME; } +void MaterializeMySQLSyncThread::setSynchronizationThreadException(const std::exception_ptr & exception) +{ + auto db = DatabaseCatalog::instance().getDatabase(database_name); + DB::setSynchronizationThreadException(db, exception); +} + void MaterializeMySQLSyncThread::Buffers::add(size_t block_rows, size_t block_bytes, size_t written_rows, size_t written_bytes) { total_blocks_rows += written_rows; diff --git a/src/Databases/MySQL/MaterializeMySQLSyncThread.h b/src/Databases/MySQL/MaterializeMySQLSyncThread.h index 26934b87511..a4b659954e7 100644 --- a/src/Databases/MySQL/MaterializeMySQLSyncThread.h +++ b/src/Databases/MySQL/MaterializeMySQLSyncThread.h @@ -49,6 +49,8 @@ public: void startSynchronization(); + void assertMySQLAvailable(); + static bool isMySQLSyncThread(); private: @@ -107,6 +109,8 @@ private: std::atomic sync_quit{false}; std::unique_ptr background_thread_pool; void executeDDLAtomic(const QueryEvent & query_event); + + void setSynchronizationThreadException(const std::exception_ptr & exception); }; }