This commit is contained in:
kssenii 2022-08-29 15:41:32 +02:00
parent a0bc5b6ea4
commit 0a6c4b9265
13 changed files with 184 additions and 25 deletions

View File

@ -3,7 +3,7 @@ sidebar_position: 50
sidebar_label: MySQL
---
# MySQL
# MySQL
Allows to connect to databases on a remote MySQL server and perform `INSERT` and `SELECT` queries to exchange data between ClickHouse and MySQL.
@ -98,7 +98,7 @@ mysql> select * from mysql_table;
Database in ClickHouse, exchanging data with the MySQL server:
``` sql
CREATE DATABASE mysql_db ENGINE = MySQL('localhost:3306', 'test', 'my_user', 'user_password')
CREATE DATABASE mysql_db ENGINE = MySQL('localhost:3306', 'test', 'my_user', 'user_password') SETTINGS read_write_timeout=10000, connect_timeout=100;
```
``` sql

View File

@ -9,6 +9,7 @@
#include <mysqlxx/Pool.h>
#include <base/sleep.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <Common/logger_useful.h>
#include <ctime>
@ -260,7 +261,10 @@ void Pool::Entry::forceConnected() const
else
sleepForSeconds(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
pool->logger.debug("Entry: Reconnecting to MySQL server %s", pool->description);
pool->logger.debug(
"Creating a new MySQL connection to %s with settings: connect_timeout=%u, read_write_timeout=%u",
pool->description, pool->connect_timeout, pool->rw_timeout);
data->conn.connect(
pool->db.c_str(),
pool->server.c_str(),
@ -325,6 +329,10 @@ Pool::Connection * Pool::allocConnection(bool dont_throw_if_failed_first_time)
{
logger.debug("Connecting to %s", description);
logger.debug(
"Creating a new MySQL connection to %s with settings: connect_timeout=%u, read_write_timeout=%u",
description, connect_timeout, rw_timeout);
conn_ptr->conn.connect(
db.c_str(),
server.c_str(),

View File

@ -168,6 +168,7 @@ PoolWithFailover::Entry PoolWithFailover::get()
}
app.logger().warning("Connection to " + pool->getDescription() + " failed: " + e.displayText());
replica_name_to_error_detail.insert_or_assign(pool->getDescription(), ErrorDetail{e.code(), e.displayText()});
continue;
@ -177,7 +178,10 @@ PoolWithFailover::Entry PoolWithFailover::get()
}
}
app.logger().error("Connection to all replicas failed " + std::to_string(try_no + 1) + " times");
if (replicas_by_priority.size() > 1)
app.logger().error("Connection to all mysql replicas failed " + std::to_string(try_no + 1) + " times");
else
app.logger().error("Connection to mysql failed " + std::to_string(try_no + 1) + " times");
}
if (full_pool)
@ -187,7 +191,11 @@ PoolWithFailover::Entry PoolWithFailover::get()
}
DB::WriteBufferFromOwnString message;
message << "Connections to all replicas failed: ";
if (replicas_by_priority.size() > 1)
message << "Connections to all mysql replicas failed: ";
else
message << "Connections to mysql failed: ";
for (auto it = replicas_by_priority.begin(); it != replicas_by_priority.end(); ++it)
{
for (auto jt = it->second.begin(); jt != it->second.end(); ++jt)

View File

@ -169,10 +169,24 @@ public:
unsigned max_connections_ = MYSQLXX_POOL_DEFAULT_MAX_CONNECTIONS,
unsigned enable_local_infile_ = MYSQLXX_DEFAULT_ENABLE_LOCAL_INFILE,
bool opt_reconnect_ = MYSQLXX_DEFAULT_MYSQL_OPT_RECONNECT)
: logger(Poco::Logger::get("mysqlxx::Pool")), default_connections(default_connections_),
max_connections(max_connections_), db(db_), server(server_), user(user_), password(password_), port(port_), socket(socket_),
connect_timeout(connect_timeout_), rw_timeout(rw_timeout_), enable_local_infile(enable_local_infile_),
opt_reconnect(opt_reconnect_) {}
: logger(Poco::Logger::get("mysqlxx::Pool"))
, default_connections(default_connections_)
, max_connections(max_connections_)
, db(db_)
, server(server_)
, user(user_)
, password(password_)
, port(port_)
, socket(socket_)
, connect_timeout(connect_timeout_)
, rw_timeout(rw_timeout_)
, enable_local_infile(enable_local_infile_)
, opt_reconnect(opt_reconnect_)
{
logger.debug(
"Created MySQL Pool with settings: connect_timeout=%u, read_write_timeout=%u, default_connections_number=%u, max_connections_number=%u",
connect_timeout, rw_timeout, default_connections, max_connections);
}
Pool(const Pool & other)
: logger(other.logger), default_connections{other.default_connections},

View File

@ -183,15 +183,15 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
StorageMySQLConfiguration configuration;
ASTs & arguments = engine->arguments->children;
MySQLSettings mysql_settings;
auto mysql_settings = std::make_unique<ConnectionMySQLSettings>();
if (auto named_collection = getExternalDataSourceConfiguration(arguments, context, true, true, mysql_settings))
if (auto named_collection = getExternalDataSourceConfiguration(arguments, context, true, true, *mysql_settings))
{
auto [common_configuration, storage_specific_args, settings_changes] = named_collection.value();
configuration.set(common_configuration);
configuration.addresses = {std::make_pair(configuration.host, configuration.port)};
mysql_settings.applyChanges(settings_changes);
mysql_settings->applyChanges(settings_changes);
if (!storage_specific_args.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
@ -228,15 +228,14 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
{
if (engine_name == "MySQL")
{
auto mysql_database_settings = std::make_unique<ConnectionMySQLSettings>();
auto mysql_pool = createMySQLPoolWithFailover(configuration, mysql_settings);
mysql_settings->loadFromQueryContext(context);
mysql_settings->loadFromQuery(*engine_define); /// higher priority
mysql_database_settings->loadFromQueryContext(context);
mysql_database_settings->loadFromQuery(*engine_define); /// higher priority
auto mysql_pool = createMySQLPoolWithFailover(configuration, *mysql_settings);
return std::make_shared<DatabaseMySQL>(
context, database_name, metadata_path, engine_define, configuration.database,
std::move(mysql_database_settings), std::move(mysql_pool), create.attach);
std::move(mysql_settings), std::move(mysql_pool), create.attach);
}
MySQLClient client(configuration.host, configuration.port, configuration.username, configuration.password);

View File

@ -17,6 +17,7 @@
#endif
#if USE_MYSQL
#include <Storages/MySQL/MySQLSettings.h>
#include <Databases/MySQL/ConnectionMySQLSettings.h>
#endif
#if USE_NATSIO
#include <Storages/NATS/NATSSettings.h>
@ -575,6 +576,10 @@ template
std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
const ASTs & args, ContextPtr context, bool is_database_engine, bool throw_on_no_collection, const BaseSettings<MySQLSettingsTraits> & storage_settings);
template
std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
const ASTs & args, ContextPtr context, bool is_database_engine, bool throw_on_no_collection, const BaseSettings<ConnectionMySQLSettingsTraits> & storage_settings);
template
std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix,
@ -583,5 +588,6 @@ std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
template
SettingsChanges getSettingsChangesFromConfig(
const BaseSettings<MySQLSettingsTraits> & settings, const Poco::Util::AbstractConfiguration & config, const String & config_prefix);
#endif
}

View File

@ -4,6 +4,7 @@
#include <mysqlxx/PoolWithFailover.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <Storages/MySQL/MySQLSettings.h>
#include <Databases/MySQL/ConnectionMySQLSettings.h>
namespace DB
{
@ -13,8 +14,8 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
mysqlxx::PoolWithFailover
createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const MySQLSettings & mysql_settings)
template <typename T> mysqlxx::PoolWithFailover
createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const T & mysql_settings)
{
if (!mysql_settings.connection_pool_size)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Connection pool cannot have zero size");
@ -29,6 +30,11 @@ createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, con
mysql_settings.read_write_timeout);
}
template
mysqlxx::PoolWithFailover createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const MySQLSettings & mysql_settings);
template
mysqlxx::PoolWithFailover createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const ConnectionMySQLSettings & mysql_settings);
}
#endif

View File

@ -9,10 +9,9 @@ namespace mysqlxx { class PoolWithFailover; }
namespace DB
{
struct StorageMySQLConfiguration;
struct MySQLSettings;
mysqlxx::PoolWithFailover
createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const MySQLSettings & mysql_settings);
template <typename T> mysqlxx::PoolWithFailover
createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const T & mysql_settings);
}

View File

@ -15,13 +15,18 @@ namespace ErrorCodes
IMPLEMENT_SETTINGS_TRAITS(MySQLSettingsTraits, LIST_OF_MYSQL_SETTINGS)
void MySQLSettings::loadFromQuery(const ASTSetQuery & settings_def)
{
applyChanges(settings_def.changes);
}
void MySQLSettings::loadFromQuery(ASTStorage & storage_def)
{
if (storage_def.settings)
{
try
{
applyChanges(storage_def.settings->changes);
loadFromQuery(*storage_def.settings);
}
catch (Exception & e)
{
@ -39,4 +44,3 @@ void MySQLSettings::loadFromQuery(ASTStorage & storage_def)
}
}

View File

@ -13,6 +13,7 @@ namespace Poco::Util
namespace DB
{
class ASTStorage;
class ASTSetQuery;
#define LIST_OF_MYSQL_SETTINGS(M) \
M(UInt64, connection_pool_size, 16, "Size of connection pool (if all connections are in use, the query will wait until some connection will be freed).", 0) \
@ -32,6 +33,7 @@ using MySQLBaseSettings = BaseSettings<MySQLSettingsTraits>;
struct MySQLSettings : public MySQLBaseSettings
{
void loadFromQuery(ASTStorage & storage_def);
void loadFromQuery(const ASTSetQuery & settings_def);
};

View File

@ -37,11 +37,26 @@ void TableFunctionMySQL::parseArguments(const ASTPtr & ast_function, ContextPtr
if (!args_func.arguments)
throw Exception("Table function 'mysql' must have arguments.", ErrorCodes::LOGICAL_ERROR);
auto & args = args_func.arguments->children;
MySQLSettings mysql_settings;
configuration = StorageMySQL::getConfiguration(args_func.arguments->children, context, mysql_settings);
const auto & settings = context->getSettingsRef();
mysql_settings.connect_timeout = settings.external_storage_connect_timeout_sec;
mysql_settings.read_write_timeout = settings.external_storage_rw_timeout_sec;
for (auto it = args.begin(); it != args.end(); ++it)
{
const ASTSetQuery * settings_ast = (*it)->as<ASTSetQuery>();
if (settings_ast)
{
mysql_settings.loadFromQuery(*settings_ast);
args.erase(it);
break;
}
}
configuration = StorageMySQL::getConfiguration(args, context, mysql_settings);
pool.emplace(createMySQLPoolWithFailover(*configuration, mysql_settings));
}

View File

@ -30,5 +30,16 @@
<table>test_table</table>
<connection_pool_size>0</connection_pool_size>
</mysql4>
<mysql_with_settings>
<user>root</user>
<password>clickhouse</password>
<host>mysql57</host>
<port>3306</port>
<database>clickhouse</database>
<table>test_settings</table>
<connection_pool_size>1</connection_pool_size>
<read_write_timeout>20123001</read_write_timeout>
<connect_timeout>20123002</connect_timeout>
</mysql_with_settings>
</named_collections>
</clickhouse>

View File

@ -732,6 +732,93 @@ def test_mysql_null(started_cluster):
conn.close()
def test_settings(started_cluster):
table_name = "test_settings"
node1.query(f"DROP TABLE IF EXISTS {table_name}")
wait_timeout = 123
rw_timeout = 10123001
connect_timeout = 10123002
connection_pool_size = 1
conn = get_mysql_conn(started_cluster, cluster.mysql_ip)
drop_mysql_table(conn, table_name)
create_mysql_table(conn, table_name)
node1.query(
f"""
CREATE TABLE {table_name}
(
id UInt32,
name String,
age UInt32,
money UInt32
)
ENGINE = MySQL('mysql57:3306', 'clickhouse', '{table_name}', 'root', 'clickhouse')
SETTINGS connection_wait_timeout={wait_timeout}, connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}, connection_pool_size={connection_pool_size}
"""
)
node1.query(f"SELECT * FROM {table_name}")
assert node1.contains_in_log(
f"with settings: connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}"
)
rw_timeout = 20123001
connect_timeout = 20123002
node1.query(f"SELECT * FROM mysql(mysql_with_settings)")
assert node1.contains_in_log(
f"with settings: connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}"
)
rw_timeout = 30123001
connect_timeout = 30123002
node1.query(
f"""
SELECT *
FROM mysql('mysql57:3306', 'clickhouse', '{table_name}', 'root', 'clickhouse',
SETTINGS
connection_wait_timeout={wait_timeout},
connect_timeout={connect_timeout},
read_write_timeout={rw_timeout},
connection_pool_size={connection_pool_size})
"""
)
assert node1.contains_in_log(
f"with settings: connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}"
)
rw_timeout = 40123001
connect_timeout = 40123002
node1.query(
f"""
CREATE DATABASE m
ENGINE = MySQL(mysql_with_settings, connection_wait_timeout={wait_timeout}, connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}, connection_pool_size={connection_pool_size})
"""
)
assert node1.contains_in_log(
f"with settings: connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}"
)
rw_timeout = 50123001
connect_timeout = 50123002
node1.query(
f"""
CREATE DATABASE mm ENGINE = MySQL('mysql57:3306', 'clickhouse', 'root', 'clickhouse')
SETTINGS
connection_wait_timeout={wait_timeout},
connect_timeout={connect_timeout},
read_write_timeout={rw_timeout},
connection_pool_size={connection_pool_size}
"""
)
assert node1.contains_in_log(
f"with settings: connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}"
)
drop_mysql_table(conn, table_name)
conn.close()
if __name__ == "__main__":
with contextmanager(started_cluster)() as cluster:
for name, instance in list(cluster.instances.items()):