diff --git a/docs/en/engines/database-engines/mysql.md b/docs/en/engines/database-engines/mysql.md index 89a0786a9ec..45f2d9a29e1 100644 --- a/docs/en/engines/database-engines/mysql.md +++ b/docs/en/engines/database-engines/mysql.md @@ -3,7 +3,7 @@ sidebar_position: 50 sidebar_label: MySQL --- -# MySQL +# MySQL Allows to connect to databases on a remote MySQL server and perform `INSERT` and `SELECT` queries to exchange data between ClickHouse and MySQL. @@ -98,7 +98,7 @@ mysql> select * from mysql_table; Database in ClickHouse, exchanging data with the MySQL server: ``` sql -CREATE DATABASE mysql_db ENGINE = MySQL('localhost:3306', 'test', 'my_user', 'user_password') +CREATE DATABASE mysql_db ENGINE = MySQL('localhost:3306', 'test', 'my_user', 'user_password') SETTINGS read_write_timeout=10000, connect_timeout=100; ``` ``` sql diff --git a/src/Common/mysqlxx/Pool.cpp b/src/Common/mysqlxx/Pool.cpp index a74feb54cd3..bee62a0af2e 100644 --- a/src/Common/mysqlxx/Pool.cpp +++ b/src/Common/mysqlxx/Pool.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include @@ -260,7 +261,10 @@ void Pool::Entry::forceConnected() const else sleepForSeconds(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL); - pool->logger.debug("Entry: Reconnecting to MySQL server %s", pool->description); + pool->logger.debug( + "Creating a new MySQL connection to %s with settings: connect_timeout=%u, read_write_timeout=%u", + pool->description, pool->connect_timeout, pool->rw_timeout); + data->conn.connect( pool->db.c_str(), pool->server.c_str(), @@ -325,6 +329,10 @@ Pool::Connection * Pool::allocConnection(bool dont_throw_if_failed_first_time) { logger.debug("Connecting to %s", description); + logger.debug( + "Creating a new MySQL connection to %s with settings: connect_timeout=%u, read_write_timeout=%u", + description, connect_timeout, rw_timeout); + conn_ptr->conn.connect( db.c_str(), server.c_str(), diff --git a/src/Common/mysqlxx/PoolWithFailover.cpp b/src/Common/mysqlxx/PoolWithFailover.cpp index 36dd713d454..f3dee1a6776 100644 --- a/src/Common/mysqlxx/PoolWithFailover.cpp +++ b/src/Common/mysqlxx/PoolWithFailover.cpp @@ -168,6 +168,7 @@ PoolWithFailover::Entry PoolWithFailover::get() } app.logger().warning("Connection to " + pool->getDescription() + " failed: " + e.displayText()); + replica_name_to_error_detail.insert_or_assign(pool->getDescription(), ErrorDetail{e.code(), e.displayText()}); continue; @@ -177,7 +178,10 @@ PoolWithFailover::Entry PoolWithFailover::get() } } - app.logger().error("Connection to all replicas failed " + std::to_string(try_no + 1) + " times"); + if (replicas_by_priority.size() > 1) + app.logger().error("Connection to all mysql replicas failed " + std::to_string(try_no + 1) + " times"); + else + app.logger().error("Connection to mysql failed " + std::to_string(try_no + 1) + " times"); } if (full_pool) @@ -187,7 +191,11 @@ PoolWithFailover::Entry PoolWithFailover::get() } DB::WriteBufferFromOwnString message; - message << "Connections to all replicas failed: "; + if (replicas_by_priority.size() > 1) + message << "Connections to all mysql replicas failed: "; + else + message << "Connections to mysql failed: "; + for (auto it = replicas_by_priority.begin(); it != replicas_by_priority.end(); ++it) { for (auto jt = it->second.begin(); jt != it->second.end(); ++jt) diff --git a/src/Common/mysqlxx/mysqlxx/Pool.h b/src/Common/mysqlxx/mysqlxx/Pool.h index 5a436146f02..1fa8eaeb997 100644 --- a/src/Common/mysqlxx/mysqlxx/Pool.h +++ b/src/Common/mysqlxx/mysqlxx/Pool.h @@ -169,10 +169,24 @@ public: unsigned max_connections_ = MYSQLXX_POOL_DEFAULT_MAX_CONNECTIONS, unsigned enable_local_infile_ = MYSQLXX_DEFAULT_ENABLE_LOCAL_INFILE, bool opt_reconnect_ = MYSQLXX_DEFAULT_MYSQL_OPT_RECONNECT) - : logger(Poco::Logger::get("mysqlxx::Pool")), default_connections(default_connections_), - max_connections(max_connections_), db(db_), server(server_), user(user_), password(password_), port(port_), socket(socket_), - connect_timeout(connect_timeout_), rw_timeout(rw_timeout_), enable_local_infile(enable_local_infile_), - opt_reconnect(opt_reconnect_) {} + : logger(Poco::Logger::get("mysqlxx::Pool")) + , default_connections(default_connections_) + , max_connections(max_connections_) + , db(db_) + , server(server_) + , user(user_) + , password(password_) + , port(port_) + , socket(socket_) + , connect_timeout(connect_timeout_) + , rw_timeout(rw_timeout_) + , enable_local_infile(enable_local_infile_) + , opt_reconnect(opt_reconnect_) + { + logger.debug( + "Created MySQL Pool with settings: connect_timeout=%u, read_write_timeout=%u, default_connections_number=%u, max_connections_number=%u", + connect_timeout, rw_timeout, default_connections, max_connections); + } Pool(const Pool & other) : logger(other.logger), default_connections{other.default_connections}, diff --git a/src/Databases/DatabaseFactory.cpp b/src/Databases/DatabaseFactory.cpp index c16de2d33a5..96db2a17b72 100644 --- a/src/Databases/DatabaseFactory.cpp +++ b/src/Databases/DatabaseFactory.cpp @@ -183,15 +183,15 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String StorageMySQLConfiguration configuration; ASTs & arguments = engine->arguments->children; - MySQLSettings mysql_settings; + auto mysql_settings = std::make_unique(); - if (auto named_collection = getExternalDataSourceConfiguration(arguments, context, true, true, mysql_settings)) + if (auto named_collection = getExternalDataSourceConfiguration(arguments, context, true, true, *mysql_settings)) { auto [common_configuration, storage_specific_args, settings_changes] = named_collection.value(); configuration.set(common_configuration); configuration.addresses = {std::make_pair(configuration.host, configuration.port)}; - mysql_settings.applyChanges(settings_changes); + mysql_settings->applyChanges(settings_changes); if (!storage_specific_args.empty()) throw Exception(ErrorCodes::BAD_ARGUMENTS, @@ -228,15 +228,14 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String { if (engine_name == "MySQL") { - auto mysql_database_settings = std::make_unique(); - auto mysql_pool = createMySQLPoolWithFailover(configuration, mysql_settings); + mysql_settings->loadFromQueryContext(context); + mysql_settings->loadFromQuery(*engine_define); /// higher priority - mysql_database_settings->loadFromQueryContext(context); - mysql_database_settings->loadFromQuery(*engine_define); /// higher priority + auto mysql_pool = createMySQLPoolWithFailover(configuration, *mysql_settings); return std::make_shared( context, database_name, metadata_path, engine_define, configuration.database, - std::move(mysql_database_settings), std::move(mysql_pool), create.attach); + std::move(mysql_settings), std::move(mysql_pool), create.attach); } MySQLClient client(configuration.host, configuration.port, configuration.username, configuration.password); diff --git a/src/Storages/ExternalDataSourceConfiguration.cpp b/src/Storages/ExternalDataSourceConfiguration.cpp index 5710aa6cd6a..53a9655e0c7 100644 --- a/src/Storages/ExternalDataSourceConfiguration.cpp +++ b/src/Storages/ExternalDataSourceConfiguration.cpp @@ -17,6 +17,7 @@ #endif #if USE_MYSQL #include +#include #endif #if USE_NATSIO #include @@ -575,6 +576,10 @@ template std::optional getExternalDataSourceConfiguration( const ASTs & args, ContextPtr context, bool is_database_engine, bool throw_on_no_collection, const BaseSettings & storage_settings); +template +std::optional getExternalDataSourceConfiguration( + const ASTs & args, ContextPtr context, bool is_database_engine, bool throw_on_no_collection, const BaseSettings & storage_settings); + template std::optional getExternalDataSourceConfiguration( const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix, @@ -583,5 +588,6 @@ std::optional getExternalDataSourceConfiguration( template SettingsChanges getSettingsChangesFromConfig( const BaseSettings & settings, const Poco::Util::AbstractConfiguration & config, const String & config_prefix); + #endif } diff --git a/src/Storages/MySQL/MySQLHelpers.cpp b/src/Storages/MySQL/MySQLHelpers.cpp index edeb4ffca8a..94c07d2670f 100644 --- a/src/Storages/MySQL/MySQLHelpers.cpp +++ b/src/Storages/MySQL/MySQLHelpers.cpp @@ -4,6 +4,7 @@ #include #include #include +#include namespace DB { @@ -13,8 +14,8 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; } -mysqlxx::PoolWithFailover -createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const MySQLSettings & mysql_settings) +template mysqlxx::PoolWithFailover +createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const T & mysql_settings) { if (!mysql_settings.connection_pool_size) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Connection pool cannot have zero size"); @@ -29,6 +30,11 @@ createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, con mysql_settings.read_write_timeout); } +template +mysqlxx::PoolWithFailover createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const MySQLSettings & mysql_settings); +template +mysqlxx::PoolWithFailover createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const ConnectionMySQLSettings & mysql_settings); + } #endif diff --git a/src/Storages/MySQL/MySQLHelpers.h b/src/Storages/MySQL/MySQLHelpers.h index 712c5a2c719..59052be5c2a 100644 --- a/src/Storages/MySQL/MySQLHelpers.h +++ b/src/Storages/MySQL/MySQLHelpers.h @@ -9,10 +9,9 @@ namespace mysqlxx { class PoolWithFailover; } namespace DB { struct StorageMySQLConfiguration; -struct MySQLSettings; -mysqlxx::PoolWithFailover -createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const MySQLSettings & mysql_settings); +template mysqlxx::PoolWithFailover +createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const T & mysql_settings); } diff --git a/src/Storages/MySQL/MySQLSettings.cpp b/src/Storages/MySQL/MySQLSettings.cpp index 1a8f0804777..5c1a2246ae9 100644 --- a/src/Storages/MySQL/MySQLSettings.cpp +++ b/src/Storages/MySQL/MySQLSettings.cpp @@ -15,13 +15,18 @@ namespace ErrorCodes IMPLEMENT_SETTINGS_TRAITS(MySQLSettingsTraits, LIST_OF_MYSQL_SETTINGS) +void MySQLSettings::loadFromQuery(const ASTSetQuery & settings_def) +{ + applyChanges(settings_def.changes); +} + void MySQLSettings::loadFromQuery(ASTStorage & storage_def) { if (storage_def.settings) { try { - applyChanges(storage_def.settings->changes); + loadFromQuery(*storage_def.settings); } catch (Exception & e) { @@ -39,4 +44,3 @@ void MySQLSettings::loadFromQuery(ASTStorage & storage_def) } } - diff --git a/src/Storages/MySQL/MySQLSettings.h b/src/Storages/MySQL/MySQLSettings.h index be1e09c12e6..9fa9b846cd3 100644 --- a/src/Storages/MySQL/MySQLSettings.h +++ b/src/Storages/MySQL/MySQLSettings.h @@ -13,6 +13,7 @@ namespace Poco::Util namespace DB { class ASTStorage; +class ASTSetQuery; #define LIST_OF_MYSQL_SETTINGS(M) \ M(UInt64, connection_pool_size, 16, "Size of connection pool (if all connections are in use, the query will wait until some connection will be freed).", 0) \ @@ -32,6 +33,7 @@ using MySQLBaseSettings = BaseSettings; struct MySQLSettings : public MySQLBaseSettings { void loadFromQuery(ASTStorage & storage_def); + void loadFromQuery(const ASTSetQuery & settings_def); }; diff --git a/src/TableFunctions/TableFunctionMySQL.cpp b/src/TableFunctions/TableFunctionMySQL.cpp index bd554b6163e..c67d6b3b652 100644 --- a/src/TableFunctions/TableFunctionMySQL.cpp +++ b/src/TableFunctions/TableFunctionMySQL.cpp @@ -37,11 +37,26 @@ void TableFunctionMySQL::parseArguments(const ASTPtr & ast_function, ContextPtr if (!args_func.arguments) throw Exception("Table function 'mysql' must have arguments.", ErrorCodes::LOGICAL_ERROR); + auto & args = args_func.arguments->children; + MySQLSettings mysql_settings; - configuration = StorageMySQL::getConfiguration(args_func.arguments->children, context, mysql_settings); + const auto & settings = context->getSettingsRef(); mysql_settings.connect_timeout = settings.external_storage_connect_timeout_sec; mysql_settings.read_write_timeout = settings.external_storage_rw_timeout_sec; + + for (auto it = args.begin(); it != args.end(); ++it) + { + const ASTSetQuery * settings_ast = (*it)->as(); + if (settings_ast) + { + mysql_settings.loadFromQuery(*settings_ast); + args.erase(it); + break; + } + } + + configuration = StorageMySQL::getConfiguration(args, context, mysql_settings); pool.emplace(createMySQLPoolWithFailover(*configuration, mysql_settings)); } diff --git a/tests/integration/test_storage_mysql/configs/named_collections.xml b/tests/integration/test_storage_mysql/configs/named_collections.xml index b4a79880d2a..4d3fbf6085c 100644 --- a/tests/integration/test_storage_mysql/configs/named_collections.xml +++ b/tests/integration/test_storage_mysql/configs/named_collections.xml @@ -30,5 +30,16 @@ test_table
0 + + root + clickhouse + mysql57 + 3306 + clickhouse + test_settings
+ 1 + 20123001 + 20123002 +
diff --git a/tests/integration/test_storage_mysql/test.py b/tests/integration/test_storage_mysql/test.py index 34ef17327f9..50f0c5519b5 100644 --- a/tests/integration/test_storage_mysql/test.py +++ b/tests/integration/test_storage_mysql/test.py @@ -732,6 +732,93 @@ def test_mysql_null(started_cluster): conn.close() +def test_settings(started_cluster): + table_name = "test_settings" + node1.query(f"DROP TABLE IF EXISTS {table_name}") + wait_timeout = 123 + rw_timeout = 10123001 + connect_timeout = 10123002 + connection_pool_size = 1 + + conn = get_mysql_conn(started_cluster, cluster.mysql_ip) + drop_mysql_table(conn, table_name) + create_mysql_table(conn, table_name) + + node1.query( + f""" + CREATE TABLE {table_name} + ( + id UInt32, + name String, + age UInt32, + money UInt32 + ) + ENGINE = MySQL('mysql57:3306', 'clickhouse', '{table_name}', 'root', 'clickhouse') + SETTINGS connection_wait_timeout={wait_timeout}, connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}, connection_pool_size={connection_pool_size} + """ + ) + + node1.query(f"SELECT * FROM {table_name}") + assert node1.contains_in_log( + f"with settings: connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}" + ) + + rw_timeout = 20123001 + connect_timeout = 20123002 + node1.query(f"SELECT * FROM mysql(mysql_with_settings)") + assert node1.contains_in_log( + f"with settings: connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}" + ) + + rw_timeout = 30123001 + connect_timeout = 30123002 + node1.query( + f""" + SELECT * + FROM mysql('mysql57:3306', 'clickhouse', '{table_name}', 'root', 'clickhouse', + SETTINGS + connection_wait_timeout={wait_timeout}, + connect_timeout={connect_timeout}, + read_write_timeout={rw_timeout}, + connection_pool_size={connection_pool_size}) + """ + ) + assert node1.contains_in_log( + f"with settings: connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}" + ) + + rw_timeout = 40123001 + connect_timeout = 40123002 + node1.query( + f""" + CREATE DATABASE m + ENGINE = MySQL(mysql_with_settings, connection_wait_timeout={wait_timeout}, connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}, connection_pool_size={connection_pool_size}) + """ + ) + assert node1.contains_in_log( + f"with settings: connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}" + ) + + rw_timeout = 50123001 + connect_timeout = 50123002 + node1.query( + f""" + CREATE DATABASE mm ENGINE = MySQL('mysql57:3306', 'clickhouse', 'root', 'clickhouse') + SETTINGS + connection_wait_timeout={wait_timeout}, + connect_timeout={connect_timeout}, + read_write_timeout={rw_timeout}, + connection_pool_size={connection_pool_size} + """ + ) + assert node1.contains_in_log( + f"with settings: connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}" + ) + + drop_mysql_table(conn, table_name) + conn.close() + + if __name__ == "__main__": with contextmanager(started_cluster)() as cluster: for name, instance in list(cluster.instances.items()):