Attempt to reconnect to MySQL

For MaterializeMySQL databases, attempt to reconnect if the connection
to MySQL is lost.  The existing setting
`max_wait_time_when_mysql_unavailable` is used to control how often we
attempt to reconnect.  This setting can now be set to a negative value
to disable reconnects.
This commit is contained in:
Haavard Kvaalen 2021-02-19 09:54:59 +01:00
parent 11c9c8cb10
commit 641b1b249e
5 changed files with 87 additions and 35 deletions

View File

@ -17,6 +17,7 @@ namespace ErrorCodes
extern const int UNKNOWN_EXCEPTION;
extern const int LOGICAL_ERROR;
extern const int ATTEMPT_TO_READ_AFTER_EOF;
extern const int CANNOT_READ_ALL_DATA;
}
namespace MySQLReplication
@ -740,7 +741,7 @@ namespace MySQLReplication
switch (header)
{
case PACKET_EOF:
throw ReplicationError("Master maybe lost", ErrorCodes::UNKNOWN_EXCEPTION);
throw ReplicationError("Master maybe lost", ErrorCodes::CANNOT_READ_ALL_DATA);
case PACKET_ERR:
ERRPacket err;
err.readPayloadWithUnpacked(payload);

View File

@ -14,7 +14,7 @@ class ASTStorage;
M(UInt64, max_rows_in_buffers, DEFAULT_BLOCK_SIZE, "Max rows that data is allowed to cache in memory(for database and the cache data unable to query). when rows is exceeded, the data will be materialized", 0) \
M(UInt64, max_bytes_in_buffers, DBMS_DEFAULT_BUFFER_SIZE, "Max bytes that data is allowed to cache in memory(for database and the cache data unable to query). when rows is exceeded, the data will be materialized", 0) \
M(UInt64, max_flush_data_time, 1000, "Max milliseconds that data is allowed to cache in memory(for database and the cache data unable to query). when this time is exceeded, the data will be materialized", 0) \
M(UInt64, max_wait_time_when_mysql_unavailable, 1000, "Dump full data retry interval when MySQL is not available(milliseconds).", 0) \
M(Int64, max_wait_time_when_mysql_unavailable, 1000, "Retry interval when MySQL is not available (milliseconds). Negative value disable retry.", 0) \
M(Bool, allows_query_when_mysql_lost, false, "Allow query materialized table when mysql is lost.", 0) \
DECLARE_SETTINGS_TRAITS(MaterializeMySQLSettingsTraits, LIST_OF_MATERIALIZE_MODE_SETTINGS)

View File

@ -35,6 +35,8 @@ namespace ErrorCodes
extern const int ILLEGAL_MYSQL_VARIABLE;
extern const int SYNC_MYSQL_USER_ACCESS_ERROR;
extern const int UNKNOWN_DATABASE;
extern const int UNKNOWN_EXCEPTION;
extern const int CANNOT_READ_ALL_DATA;
}
static constexpr auto MYSQL_BACKGROUND_THREAD_NAME = "MySQLDBSync";
@ -157,35 +159,51 @@ void MaterializeMySQLSyncThread::synchronization()
{
MaterializeMetadata metadata(
DatabaseCatalog::instance().getDatabase(database_name)->getMetadataPath() + "/.metadata");
if (prepareSynchronized(metadata))
{
bool need_reconnect = true;
Stopwatch watch;
Buffers buffers(database_name);
while (!isCancelled())
{
if (need_reconnect)
{
if (!prepareSynchronized(metadata))
break;
need_reconnect = false;
}
/// TODO: add gc task for `sign = -1`(use alter table delete, execute by interval. need final state)
UInt64 max_flush_time = settings->max_flush_data_time;
BinlogEventPtr binlog_event = client.readOneBinlogEvent(std::max(UInt64(1), max_flush_time - watch.elapsedMilliseconds()));
try
{
BinlogEventPtr binlog_event = client.readOneBinlogEvent(std::max(UInt64(1), max_flush_time - watch.elapsedMilliseconds()));
if (binlog_event)
onEvent(buffers, binlog_event, metadata);
}
catch (const Exception & e)
{
if (e.code() != ErrorCodes::CANNOT_READ_ALL_DATA || settings->max_wait_time_when_mysql_unavailable < 0)
throw;
flushBuffersData(buffers, metadata);
LOG_INFO(log, "Lost connection to MySQL");
need_reconnect = true;
setSynchronizationThreadException(std::current_exception());
sleepForMilliseconds(settings->max_wait_time_when_mysql_unavailable);
continue;
}
if (watch.elapsedMilliseconds() > max_flush_time || buffers.checkThresholds(
settings->max_rows_in_buffer, settings->max_bytes_in_buffer,
settings->max_rows_in_buffers, settings->max_bytes_in_buffers)
)
{
watch.restart();
if (!buffers.data.empty())
flushBuffersData(buffers, metadata);
}
}
}
}
}
catch (...)
{
client.disconnect();
@ -336,7 +354,15 @@ bool MaterializeMySQLSyncThread::prepareSynchronized(MaterializeMetadata & metad
{
try
{
connection = pool.get();
connection = pool.tryGet();
if (connection.isNull())
{
if (settings->max_wait_time_when_mysql_unavailable < 0)
throw Exception("Unable to connect to MySQL", ErrorCodes::UNKNOWN_EXCEPTION);
sleepForMilliseconds(settings->max_wait_time_when_mysql_unavailable);
continue;
}
opened_transaction = false;
checkMySQLVariables(connection);
@ -383,20 +409,28 @@ bool MaterializeMySQLSyncThread::prepareSynchronized(MaterializeMetadata & metad
{
throw;
}
catch (const mysqlxx::ConnectionFailed &)
catch (const mysqlxx::ConnectionFailed &) {}
catch (const mysqlxx::BadQuery & e)
{
// Lost connection to MySQL server during query
if (e.code() != CR_SERVER_LOST || settings->max_wait_time_when_mysql_unavailable < 0)
throw;
}
setSynchronizationThreadException(std::current_exception());
/// Avoid busy loop when MySQL is not available.
sleepForMilliseconds(settings->max_wait_time_when_mysql_unavailable);
}
}
}
return false;
}
void MaterializeMySQLSyncThread::flushBuffersData(Buffers & buffers, MaterializeMetadata & metadata)
{
if (buffers.data.empty())
return;
metadata.transaction(client.getPosition(), [&]() { buffers.commit(global_context); });
const auto & position_message = [&]()

View File

@ -71,6 +71,9 @@ private:
const int ER_DBACCESS_DENIED_ERROR = 1044;
const int ER_BAD_DB_ERROR = 1049;
// https://dev.mysql.com/doc/mysql-errors/8.0/en/client-error-reference.html
const int CR_SERVER_LOST = 2013;
struct Buffers
{
String database;

View File

@ -616,8 +616,6 @@ def network_partition_test(clickhouse_node, mysql_node, service_name):
restore_instance_mysql_connections(clickhouse_node, pm)
clickhouse_node.query("DETACH DATABASE test_database")
clickhouse_node.query("ATTACH DATABASE test_database")
check_query(clickhouse_node, "SELECT * FROM test_database.test_table FORMAT TSV", '1\n')
clickhouse_node.query(
@ -635,17 +633,29 @@ def network_partition_test(clickhouse_node, mysql_node, service_name):
def mysql_kill_sync_thread_restore_test(clickhouse_node, mysql_node, service_name):
clickhouse_node.query("DROP DATABASE IF EXISTS test_database;")
clickhouse_node.query("DROP DATABASE IF EXISTS test_database_auto;")
mysql_node.query("DROP DATABASE IF EXISTS test_database;")
mysql_node.query("CREATE DATABASE test_database;")
mysql_node.query("CREATE TABLE test_database.test_table ( `id` int(11) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB;")
mysql_node.query("INSERT INTO test_database.test_table VALUES (1)")
clickhouse_node.query("CREATE DATABASE test_database ENGINE = MaterializeMySQL('{}:3306', 'test_database', 'root', 'clickhouse')".format(service_name))
mysql_node.query("DROP DATABASE IF EXISTS test_database_auto;")
mysql_node.query("CREATE DATABASE test_database_auto;")
mysql_node.query("CREATE TABLE test_database_auto.test_table ( `id` int(11) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB;")
mysql_node.query("INSERT INTO test_database_auto.test_table VALUES (11)")
clickhouse_node.query("CREATE DATABASE test_database ENGINE = MaterializeMySQL('{}:3306', 'test_database', 'root', 'clickhouse') SETTINGS max_wait_time_when_mysql_unavailable=-1".format(service_name))
clickhouse_node.query("CREATE DATABASE test_database_auto ENGINE = MaterializeMySQL('{}:3306', 'test_database_auto', 'root', 'clickhouse')".format(service_name))
check_query(clickhouse_node, "SELECT * FROM test_database.test_table FORMAT TSV", '1\n')
check_query(clickhouse_node, "SELECT * FROM test_database_auto.test_table FORMAT TSV", '11\n')
get_sync_id_query = "select id from information_schema.processlist where STATE='Master has sent all binlog to slave; waiting for more updates'"
result = mysql_node.query_and_get_data(get_sync_id_query)
assert len(result) == 2
for row in result:
row_result = {}
query = "kill " + str(row[0]) + ";"
@ -665,8 +675,13 @@ def mysql_kill_sync_thread_restore_test(clickhouse_node, mysql_node, service_nam
mysql_node.query("INSERT INTO test_database.test_table VALUES (2)")
check_query(clickhouse_node, "SELECT * FROM test_database.test_table ORDER BY id FORMAT TSV", '1\n2\n')
mysql_node.query("INSERT INTO test_database_auto.test_table VALUES (12)")
check_query(clickhouse_node, "SELECT * FROM test_database_auto.test_table ORDER BY id FORMAT TSV", '11\n12\n')
clickhouse_node.query("DROP DATABASE test_database")
clickhouse_node.query("DROP DATABASE test_database_auto")
mysql_node.query("DROP DATABASE test_database")
mysql_node.query("DROP DATABASE test_database_auto")
def mysql_killed_while_insert(clickhouse_node, mysql_node, service_name):
@ -687,10 +702,9 @@ def mysql_killed_while_insert(clickhouse_node, mysql_node, service_name):
run_and_check(
['docker-compose', '-p', mysql_node.project_name, '-f', mysql_node.docker_compose, 'stop'])
finally:
with pytest.raises(QueryRuntimeException) as execption:
with pytest.raises(QueryRuntimeException) as exception:
time.sleep(5)
clickhouse_node.query("SELECT count() FROM kill_mysql_while_insert.test")
assert "Master maybe lost." in str(execption.value)
run_and_check(
['docker-compose', '-p', mysql_node.project_name, '-f', mysql_node.docker_compose, 'start'])