Make handling of unavailable MySQL consistent

If MySQL was unavailable when loading an existing database on startup,
we would previously load the database, fail to start synchronization,
but allow queries to the database.  Change this to only allow queries
if the `allows_queries_when_mysql_lost` setting is on, so that the
behavior is consistent with what happens if the connection to MySQL is
lost while ClickHouse is running.

Also retry connection to MySQL if MySQL is unavailable when ClickHouse
is started (we would previously reconnect only if the connection was
lost during the initial dump of existing data).
This commit is contained in:
Haavard Kvaalen 2021-02-10 09:10:30 +01:00
parent e8df9971f1
commit e6711675a1
3 changed files with 34 additions and 31 deletions

View File

@ -82,18 +82,11 @@ template<typename Base>
void DatabaseMaterializeMySQL<Base>::loadStoredObjects(Context & context, bool has_force_restore_data_flag, bool force_attach) void DatabaseMaterializeMySQL<Base>::loadStoredObjects(Context & context, bool has_force_restore_data_flag, bool force_attach)
{ {
Base::loadStoredObjects(context, has_force_restore_data_flag, force_attach); Base::loadStoredObjects(context, has_force_restore_data_flag, force_attach);
try if (!force_attach)
{ materialize_thread.assertMySQLAvailable();
materialize_thread.startSynchronization();
started_up = true;
}
catch (...)
{
tryLogCurrentException(Base::log, "Cannot load MySQL nested database stored objects.");
if (!force_attach) materialize_thread.startSynchronization();
throw; started_up = true;
}
} }
template<typename Base> template<typename Base>

View File

@ -188,8 +188,7 @@ void MaterializeMySQLSyncThread::synchronization()
{ {
client.disconnect(); client.disconnect();
tryLogCurrentException(log); tryLogCurrentException(log);
auto db = DatabaseCatalog::instance().getDatabase(database_name); setSynchronizationThreadException(std::current_exception());
setSynchronizationThreadException(db, std::current_exception());
} }
} }
@ -204,31 +203,28 @@ void MaterializeMySQLSyncThread::stopSynchronization()
} }
void MaterializeMySQLSyncThread::startSynchronization() void MaterializeMySQLSyncThread::startSynchronization()
{
background_thread_pool = std::make_unique<ThreadFromGlobalPool>([this]() { synchronization(); });
}
void MaterializeMySQLSyncThread::assertMySQLAvailable()
{ {
try try
{ {
checkMySQLVariables(pool.get()); checkMySQLVariables(pool.get());
background_thread_pool = std::make_unique<ThreadFromGlobalPool>([this]() { synchronization(); });
} }
catch (...) catch (const mysqlxx::ConnectionFailed & e)
{ {
try if (e.errnum() == ER_ACCESS_DENIED_ERROR
{ || e.errnum() == ER_DBACCESS_DENIED_ERROR)
throw Exception("MySQL SYNC USER ACCESS ERR: mysql sync user needs "
"at least GLOBAL PRIVILEGES:'RELOAD, REPLICATION SLAVE, REPLICATION CLIENT' "
"and SELECT PRIVILEGE on Database " + mysql_database_name
, ErrorCodes::SYNC_MYSQL_USER_ACCESS_ERROR);
else if (e.errnum() == ER_BAD_DB_ERROR)
throw Exception("Unknown database '" + mysql_database_name + "' on MySQL", ErrorCodes::UNKNOWN_DATABASE);
else
throw; throw;
}
catch (mysqlxx::ConnectionFailed & e)
{
if (e.errnum() == ER_ACCESS_DENIED_ERROR
|| e.errnum() == ER_DBACCESS_DENIED_ERROR)
throw Exception("MySQL SYNC USER ACCESS ERR: mysql sync user needs "
"at least GLOBAL PRIVILEGES:'RELOAD, REPLICATION SLAVE, REPLICATION CLIENT' "
"and SELECT PRIVILEGE on Database " + mysql_database_name
, ErrorCodes::SYNC_MYSQL_USER_ACCESS_ERROR);
else if (e.errnum() == ER_BAD_DB_ERROR)
throw Exception("Unknown database '" + mysql_database_name + "' on MySQL", ErrorCodes::UNKNOWN_DATABASE);
else
throw;
}
} }
} }
@ -341,6 +337,7 @@ std::optional<MaterializeMetadata> MaterializeMySQLSyncThread::prepareSynchroniz
connection = pool.get(); connection = pool.get();
opened_transaction = false; opened_transaction = false;
checkMySQLVariables(connection);
MaterializeMetadata metadata( MaterializeMetadata metadata(
connection, DatabaseCatalog::instance().getDatabase(database_name)->getMetadataPath() + "/.metadata", mysql_database_name, opened_transaction); connection, DatabaseCatalog::instance().getDatabase(database_name)->getMetadataPath() + "/.metadata", mysql_database_name, opened_transaction);
@ -369,6 +366,8 @@ std::optional<MaterializeMetadata> MaterializeMySQLSyncThread::prepareSynchroniz
client.connect(); client.connect();
client.startBinlogDumpGTID(randomNumber(), mysql_database_name, metadata.executed_gtid_set, metadata.binlog_checksum); client.startBinlogDumpGTID(randomNumber(), mysql_database_name, metadata.executed_gtid_set, metadata.binlog_checksum);
setSynchronizationThreadException(nullptr);
return metadata; return metadata;
} }
catch (...) catch (...)
@ -384,6 +383,7 @@ std::optional<MaterializeMetadata> MaterializeMySQLSyncThread::prepareSynchroniz
} }
catch (const mysqlxx::ConnectionFailed &) catch (const mysqlxx::ConnectionFailed &)
{ {
setSynchronizationThreadException(std::current_exception());
/// Avoid busy loop when MySQL is not available. /// Avoid busy loop when MySQL is not available.
sleepForMilliseconds(settings->max_wait_time_when_mysql_unavailable); sleepForMilliseconds(settings->max_wait_time_when_mysql_unavailable);
} }
@ -705,6 +705,12 @@ bool MaterializeMySQLSyncThread::isMySQLSyncThread()
return getThreadName() == MYSQL_BACKGROUND_THREAD_NAME; return getThreadName() == MYSQL_BACKGROUND_THREAD_NAME;
} }
void MaterializeMySQLSyncThread::setSynchronizationThreadException(const std::exception_ptr & exception)
{
auto db = DatabaseCatalog::instance().getDatabase(database_name);
DB::setSynchronizationThreadException(db, exception);
}
void MaterializeMySQLSyncThread::Buffers::add(size_t block_rows, size_t block_bytes, size_t written_rows, size_t written_bytes) void MaterializeMySQLSyncThread::Buffers::add(size_t block_rows, size_t block_bytes, size_t written_rows, size_t written_bytes)
{ {
total_blocks_rows += written_rows; total_blocks_rows += written_rows;

View File

@ -49,6 +49,8 @@ public:
void startSynchronization(); void startSynchronization();
void assertMySQLAvailable();
static bool isMySQLSyncThread(); static bool isMySQLSyncThread();
private: private:
@ -107,6 +109,8 @@ private:
std::atomic<bool> sync_quit{false}; std::atomic<bool> sync_quit{false};
std::unique_ptr<ThreadFromGlobalPool> background_thread_pool; std::unique_ptr<ThreadFromGlobalPool> background_thread_pool;
void executeDDLAtomic(const QueryEvent & query_event); void executeDDLAtomic(const QueryEvent & query_event);
void setSynchronizationThreadException(const std::exception_ptr & exception);
}; };
} }