From 641b1b249e6eaead8eb8468acefad1d26418b17c Mon Sep 17 00:00:00 2001 From: Haavard Kvaalen Date: Fri, 19 Feb 2021 09:54:59 +0100 Subject: [PATCH] Attempt to reconnect to MySQL For MaterializeMySQL databases, attempt to reconnect if the connection to MySQL is lost. The existing setting `max_wait_time_when_mysql_unavailable` is used to control how often we attempt to reconnect. This setting can now be set to a negative value to disable reconnects. --- src/Core/MySQL/MySQLReplication.cpp | 3 +- .../MySQL/MaterializeMySQLSettings.h | 2 +- .../MySQL/MaterializeMySQLSyncThread.cpp | 88 +++++++++++++------ .../MySQL/MaterializeMySQLSyncThread.h | 3 + .../materialize_with_ddl.py | 26 ++++-- 5 files changed, 87 insertions(+), 35 deletions(-) diff --git a/src/Core/MySQL/MySQLReplication.cpp b/src/Core/MySQL/MySQLReplication.cpp index 1b202c4edb4..16e31a46aa2 100644 --- a/src/Core/MySQL/MySQLReplication.cpp +++ b/src/Core/MySQL/MySQLReplication.cpp @@ -17,6 +17,7 @@ namespace ErrorCodes extern const int UNKNOWN_EXCEPTION; extern const int LOGICAL_ERROR; extern const int ATTEMPT_TO_READ_AFTER_EOF; + extern const int CANNOT_READ_ALL_DATA; } namespace MySQLReplication @@ -740,7 +741,7 @@ namespace MySQLReplication switch (header) { case PACKET_EOF: - throw ReplicationError("Master maybe lost", ErrorCodes::UNKNOWN_EXCEPTION); + throw ReplicationError("Master maybe lost", ErrorCodes::CANNOT_READ_ALL_DATA); case PACKET_ERR: ERRPacket err; err.readPayloadWithUnpacked(payload); diff --git a/src/Databases/MySQL/MaterializeMySQLSettings.h b/src/Databases/MySQL/MaterializeMySQLSettings.h index 07de219c72f..9bd05b5382b 100644 --- a/src/Databases/MySQL/MaterializeMySQLSettings.h +++ b/src/Databases/MySQL/MaterializeMySQLSettings.h @@ -14,7 +14,7 @@ class ASTStorage; M(UInt64, max_rows_in_buffers, DEFAULT_BLOCK_SIZE, "Max rows that data is allowed to cache in memory(for database and the cache data unable to query). when rows is exceeded, the data will be materialized", 0) \ M(UInt64, max_bytes_in_buffers, DBMS_DEFAULT_BUFFER_SIZE, "Max bytes that data is allowed to cache in memory(for database and the cache data unable to query). when rows is exceeded, the data will be materialized", 0) \ M(UInt64, max_flush_data_time, 1000, "Max milliseconds that data is allowed to cache in memory(for database and the cache data unable to query). when this time is exceeded, the data will be materialized", 0) \ - M(UInt64, max_wait_time_when_mysql_unavailable, 1000, "Dump full data retry interval when MySQL is not available(milliseconds).", 0) \ + M(Int64, max_wait_time_when_mysql_unavailable, 1000, "Retry interval when MySQL is not available (milliseconds). Negative value disable retry.", 0) \ M(Bool, allows_query_when_mysql_lost, false, "Allow query materialized table when mysql is lost.", 0) \ DECLARE_SETTINGS_TRAITS(MaterializeMySQLSettingsTraits, LIST_OF_MATERIALIZE_MODE_SETTINGS) diff --git a/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp b/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp index fe9620da7a0..8d1c1e109ac 100644 --- a/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp +++ b/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp @@ -35,6 +35,8 @@ namespace ErrorCodes extern const int ILLEGAL_MYSQL_VARIABLE; extern const int SYNC_MYSQL_USER_ACCESS_ERROR; extern const int UNKNOWN_DATABASE; + extern const int UNKNOWN_EXCEPTION; + extern const int CANNOT_READ_ALL_DATA; } static constexpr auto MYSQL_BACKGROUND_THREAD_NAME = "MySQLDBSync"; @@ -157,32 +159,48 @@ void MaterializeMySQLSyncThread::synchronization() { MaterializeMetadata metadata( DatabaseCatalog::instance().getDatabase(database_name)->getMetadataPath() + "/.metadata"); - if (prepareSynchronized(metadata)) + bool need_reconnect = true; + + Stopwatch watch; + Buffers buffers(database_name); + + while (!isCancelled()) { - Stopwatch watch; - Buffers buffers(database_name); - - while (!isCancelled()) + if (need_reconnect) + { + if (!prepareSynchronized(metadata)) + break; + need_reconnect = false; + } + + /// TODO: add gc task for `sign = -1`(use alter table delete, execute by interval. need final state) + UInt64 max_flush_time = settings->max_flush_data_time; + + try { - /// TODO: add gc task for `sign = -1`(use alter table delete, execute by interval. need final state) - UInt64 max_flush_time = settings->max_flush_data_time; BinlogEventPtr binlog_event = client.readOneBinlogEvent(std::max(UInt64(1), max_flush_time - watch.elapsedMilliseconds())); + if (binlog_event) + onEvent(buffers, binlog_event, metadata); + } + catch (const Exception & e) + { + if (e.code() != ErrorCodes::CANNOT_READ_ALL_DATA || settings->max_wait_time_when_mysql_unavailable < 0) + throw; - { - if (binlog_event) - onEvent(buffers, binlog_event, metadata); - - if (watch.elapsedMilliseconds() > max_flush_time || buffers.checkThresholds( - settings->max_rows_in_buffer, settings->max_bytes_in_buffer, - settings->max_rows_in_buffers, settings->max_bytes_in_buffers) - ) - { - watch.restart(); - - if (!buffers.data.empty()) - flushBuffersData(buffers, metadata); - } - } + flushBuffersData(buffers, metadata); + LOG_INFO(log, "Lost connection to MySQL"); + need_reconnect = true; + setSynchronizationThreadException(std::current_exception()); + sleepForMilliseconds(settings->max_wait_time_when_mysql_unavailable); + continue; + } + if (watch.elapsedMilliseconds() > max_flush_time || buffers.checkThresholds( + settings->max_rows_in_buffer, settings->max_bytes_in_buffer, + settings->max_rows_in_buffers, settings->max_bytes_in_buffers) + ) + { + watch.restart(); + flushBuffersData(buffers, metadata); } } } @@ -336,7 +354,15 @@ bool MaterializeMySQLSyncThread::prepareSynchronized(MaterializeMetadata & metad { try { - connection = pool.get(); + connection = pool.tryGet(); + if (connection.isNull()) + { + if (settings->max_wait_time_when_mysql_unavailable < 0) + throw Exception("Unable to connect to MySQL", ErrorCodes::UNKNOWN_EXCEPTION); + sleepForMilliseconds(settings->max_wait_time_when_mysql_unavailable); + continue; + } + opened_transaction = false; checkMySQLVariables(connection); @@ -383,12 +409,17 @@ bool MaterializeMySQLSyncThread::prepareSynchronized(MaterializeMetadata & metad { throw; } - catch (const mysqlxx::ConnectionFailed &) + catch (const mysqlxx::ConnectionFailed &) {} + catch (const mysqlxx::BadQuery & e) { - setSynchronizationThreadException(std::current_exception()); - /// Avoid busy loop when MySQL is not available. - sleepForMilliseconds(settings->max_wait_time_when_mysql_unavailable); + // Lost connection to MySQL server during query + if (e.code() != CR_SERVER_LOST || settings->max_wait_time_when_mysql_unavailable < 0) + throw; } + + setSynchronizationThreadException(std::current_exception()); + /// Avoid busy loop when MySQL is not available. + sleepForMilliseconds(settings->max_wait_time_when_mysql_unavailable); } } @@ -397,6 +428,9 @@ bool MaterializeMySQLSyncThread::prepareSynchronized(MaterializeMetadata & metad void MaterializeMySQLSyncThread::flushBuffersData(Buffers & buffers, MaterializeMetadata & metadata) { + if (buffers.data.empty()) + return; + metadata.transaction(client.getPosition(), [&]() { buffers.commit(global_context); }); const auto & position_message = [&]() diff --git a/src/Databases/MySQL/MaterializeMySQLSyncThread.h b/src/Databases/MySQL/MaterializeMySQLSyncThread.h index 961d8c17cba..54a6cbbdda2 100644 --- a/src/Databases/MySQL/MaterializeMySQLSyncThread.h +++ b/src/Databases/MySQL/MaterializeMySQLSyncThread.h @@ -71,6 +71,9 @@ private: const int ER_DBACCESS_DENIED_ERROR = 1044; const int ER_BAD_DB_ERROR = 1049; + // https://dev.mysql.com/doc/mysql-errors/8.0/en/client-error-reference.html + const int CR_SERVER_LOST = 2013; + struct Buffers { String database; diff --git a/tests/integration/test_materialize_mysql_database/materialize_with_ddl.py b/tests/integration/test_materialize_mysql_database/materialize_with_ddl.py index c9be2387fc7..1f00598b332 100644 --- a/tests/integration/test_materialize_mysql_database/materialize_with_ddl.py +++ b/tests/integration/test_materialize_mysql_database/materialize_with_ddl.py @@ -616,8 +616,6 @@ def network_partition_test(clickhouse_node, mysql_node, service_name): restore_instance_mysql_connections(clickhouse_node, pm) - clickhouse_node.query("DETACH DATABASE test_database") - clickhouse_node.query("ATTACH DATABASE test_database") check_query(clickhouse_node, "SELECT * FROM test_database.test_table FORMAT TSV", '1\n') clickhouse_node.query( @@ -635,17 +633,29 @@ def network_partition_test(clickhouse_node, mysql_node, service_name): def mysql_kill_sync_thread_restore_test(clickhouse_node, mysql_node, service_name): clickhouse_node.query("DROP DATABASE IF EXISTS test_database;") + clickhouse_node.query("DROP DATABASE IF EXISTS test_database_auto;") + mysql_node.query("DROP DATABASE IF EXISTS test_database;") mysql_node.query("CREATE DATABASE test_database;") mysql_node.query("CREATE TABLE test_database.test_table ( `id` int(11) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB;") mysql_node.query("INSERT INTO test_database.test_table VALUES (1)") - clickhouse_node.query("CREATE DATABASE test_database ENGINE = MaterializeMySQL('{}:3306', 'test_database', 'root', 'clickhouse')".format(service_name)) + mysql_node.query("DROP DATABASE IF EXISTS test_database_auto;") + mysql_node.query("CREATE DATABASE test_database_auto;") + mysql_node.query("CREATE TABLE test_database_auto.test_table ( `id` int(11) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB;") + mysql_node.query("INSERT INTO test_database_auto.test_table VALUES (11)") + + clickhouse_node.query("CREATE DATABASE test_database ENGINE = MaterializeMySQL('{}:3306', 'test_database', 'root', 'clickhouse') SETTINGS max_wait_time_when_mysql_unavailable=-1".format(service_name)) + clickhouse_node.query("CREATE DATABASE test_database_auto ENGINE = MaterializeMySQL('{}:3306', 'test_database_auto', 'root', 'clickhouse')".format(service_name)) + check_query(clickhouse_node, "SELECT * FROM test_database.test_table FORMAT TSV", '1\n') + check_query(clickhouse_node, "SELECT * FROM test_database_auto.test_table FORMAT TSV", '11\n') get_sync_id_query = "select id from information_schema.processlist where STATE='Master has sent all binlog to slave; waiting for more updates'" result = mysql_node.query_and_get_data(get_sync_id_query) + assert len(result) == 2 + for row in result: row_result = {} query = "kill " + str(row[0]) + ";" @@ -656,7 +666,7 @@ def mysql_kill_sync_thread_restore_test(clickhouse_node, mysql_node, service_nam # When you use KILL, a thread-specific kill flag is set for the thread. In most cases, it might take some time for the thread to die because the kill flag is checked only at specific intervals: time.sleep(3) clickhouse_node.query("SELECT * FROM test_database.test_table") - assert "Cannot read all data" in str(exception.value) + assert "Cannot read all data" in str(exception.value) clickhouse_node.query("DETACH DATABASE test_database") clickhouse_node.query("ATTACH DATABASE test_database") @@ -665,8 +675,13 @@ def mysql_kill_sync_thread_restore_test(clickhouse_node, mysql_node, service_nam mysql_node.query("INSERT INTO test_database.test_table VALUES (2)") check_query(clickhouse_node, "SELECT * FROM test_database.test_table ORDER BY id FORMAT TSV", '1\n2\n') + mysql_node.query("INSERT INTO test_database_auto.test_table VALUES (12)") + check_query(clickhouse_node, "SELECT * FROM test_database_auto.test_table ORDER BY id FORMAT TSV", '11\n12\n') + clickhouse_node.query("DROP DATABASE test_database") + clickhouse_node.query("DROP DATABASE test_database_auto") mysql_node.query("DROP DATABASE test_database") + mysql_node.query("DROP DATABASE test_database_auto") def mysql_killed_while_insert(clickhouse_node, mysql_node, service_name): @@ -687,10 +702,9 @@ def mysql_killed_while_insert(clickhouse_node, mysql_node, service_name): run_and_check( ['docker-compose', '-p', mysql_node.project_name, '-f', mysql_node.docker_compose, 'stop']) finally: - with pytest.raises(QueryRuntimeException) as execption: + with pytest.raises(QueryRuntimeException) as exception: time.sleep(5) clickhouse_node.query("SELECT count() FROM kill_mysql_while_insert.test") - assert "Master maybe lost." in str(execption.value) run_and_check( ['docker-compose', '-p', mysql_node.project_name, '-f', mysql_node.docker_compose, 'start'])