mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Merge pull request #40751 from kssenii/fix-mysql-timeouts
Fix issue with mysql db / table function timeouts
This commit is contained in:
commit
c88db2ef97
@ -99,7 +99,7 @@ mysql> select * from mysql_table;
|
||||
Database in ClickHouse, exchanging data with the MySQL server:
|
||||
|
||||
``` sql
|
||||
CREATE DATABASE mysql_db ENGINE = MySQL('localhost:3306', 'test', 'my_user', 'user_password')
|
||||
CREATE DATABASE mysql_db ENGINE = MySQL('localhost:3306', 'test', 'my_user', 'user_password') SETTINGS read_write_timeout=10000, connect_timeout=100;
|
||||
```
|
||||
|
||||
``` sql
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include <mysqlxx/Pool.h>
|
||||
#include <base/sleep.h>
|
||||
#include <Poco/Util/LayeredConfiguration.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <ctime>
|
||||
|
||||
|
||||
@ -260,7 +261,10 @@ void Pool::Entry::forceConnected() const
|
||||
else
|
||||
sleepForSeconds(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
|
||||
|
||||
pool->logger.debug("Entry: Reconnecting to MySQL server %s", pool->description);
|
||||
pool->logger.debug(
|
||||
"Creating a new MySQL connection to %s with settings: connect_timeout=%u, read_write_timeout=%u",
|
||||
pool->description, pool->connect_timeout, pool->rw_timeout);
|
||||
|
||||
data->conn.connect(
|
||||
pool->db.c_str(),
|
||||
pool->server.c_str(),
|
||||
@ -325,6 +329,10 @@ Pool::Connection * Pool::allocConnection(bool dont_throw_if_failed_first_time)
|
||||
{
|
||||
logger.debug("Connecting to %s", description);
|
||||
|
||||
logger.debug(
|
||||
"Creating a new MySQL connection to %s with settings: connect_timeout=%u, read_write_timeout=%u",
|
||||
description, connect_timeout, rw_timeout);
|
||||
|
||||
conn_ptr->conn.connect(
|
||||
db.c_str(),
|
||||
server.c_str(),
|
||||
|
@ -168,6 +168,7 @@ PoolWithFailover::Entry PoolWithFailover::get()
|
||||
}
|
||||
|
||||
app.logger().warning("Connection to " + pool->getDescription() + " failed: " + e.displayText());
|
||||
|
||||
replica_name_to_error_detail.insert_or_assign(pool->getDescription(), ErrorDetail{e.code(), e.displayText()});
|
||||
|
||||
continue;
|
||||
@ -177,7 +178,10 @@ PoolWithFailover::Entry PoolWithFailover::get()
|
||||
}
|
||||
}
|
||||
|
||||
app.logger().error("Connection to all replicas failed " + std::to_string(try_no + 1) + " times");
|
||||
if (replicas_by_priority.size() > 1)
|
||||
app.logger().error("Connection to all mysql replicas failed " + std::to_string(try_no + 1) + " times");
|
||||
else
|
||||
app.logger().error("Connection to mysql failed " + std::to_string(try_no + 1) + " times");
|
||||
}
|
||||
|
||||
if (full_pool)
|
||||
@ -187,7 +191,11 @@ PoolWithFailover::Entry PoolWithFailover::get()
|
||||
}
|
||||
|
||||
DB::WriteBufferFromOwnString message;
|
||||
message << "Connections to all replicas failed: ";
|
||||
if (replicas_by_priority.size() > 1)
|
||||
message << "Connections to all mysql replicas failed: ";
|
||||
else
|
||||
message << "Connections to mysql failed: ";
|
||||
|
||||
for (auto it = replicas_by_priority.begin(); it != replicas_by_priority.end(); ++it)
|
||||
{
|
||||
for (auto jt = it->second.begin(); jt != it->second.end(); ++jt)
|
||||
|
@ -169,10 +169,24 @@ public:
|
||||
unsigned max_connections_ = MYSQLXX_POOL_DEFAULT_MAX_CONNECTIONS,
|
||||
unsigned enable_local_infile_ = MYSQLXX_DEFAULT_ENABLE_LOCAL_INFILE,
|
||||
bool opt_reconnect_ = MYSQLXX_DEFAULT_MYSQL_OPT_RECONNECT)
|
||||
: logger(Poco::Logger::get("mysqlxx::Pool")), default_connections(default_connections_),
|
||||
max_connections(max_connections_), db(db_), server(server_), user(user_), password(password_), port(port_), socket(socket_),
|
||||
connect_timeout(connect_timeout_), rw_timeout(rw_timeout_), enable_local_infile(enable_local_infile_),
|
||||
opt_reconnect(opt_reconnect_) {}
|
||||
: logger(Poco::Logger::get("mysqlxx::Pool"))
|
||||
, default_connections(default_connections_)
|
||||
, max_connections(max_connections_)
|
||||
, db(db_)
|
||||
, server(server_)
|
||||
, user(user_)
|
||||
, password(password_)
|
||||
, port(port_)
|
||||
, socket(socket_)
|
||||
, connect_timeout(connect_timeout_)
|
||||
, rw_timeout(rw_timeout_)
|
||||
, enable_local_infile(enable_local_infile_)
|
||||
, opt_reconnect(opt_reconnect_)
|
||||
{
|
||||
logger.debug(
|
||||
"Created MySQL Pool with settings: connect_timeout=%u, read_write_timeout=%u, default_connections_number=%u, max_connections_number=%u",
|
||||
connect_timeout, rw_timeout, default_connections, max_connections);
|
||||
}
|
||||
|
||||
Pool(const Pool & other)
|
||||
: logger(other.logger), default_connections{other.default_connections},
|
||||
|
@ -183,15 +183,15 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
|
||||
|
||||
StorageMySQLConfiguration configuration;
|
||||
ASTs & arguments = engine->arguments->children;
|
||||
MySQLSettings mysql_settings;
|
||||
auto mysql_settings = std::make_unique<ConnectionMySQLSettings>();
|
||||
|
||||
if (auto named_collection = getExternalDataSourceConfiguration(arguments, context, true, true, mysql_settings))
|
||||
if (auto named_collection = getExternalDataSourceConfiguration(arguments, context, true, true, *mysql_settings))
|
||||
{
|
||||
auto [common_configuration, storage_specific_args, settings_changes] = named_collection.value();
|
||||
|
||||
configuration.set(common_configuration);
|
||||
configuration.addresses = {std::make_pair(configuration.host, configuration.port)};
|
||||
mysql_settings.applyChanges(settings_changes);
|
||||
mysql_settings->applyChanges(settings_changes);
|
||||
|
||||
if (!storage_specific_args.empty())
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||
@ -228,15 +228,14 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
|
||||
{
|
||||
if (engine_name == "MySQL")
|
||||
{
|
||||
auto mysql_database_settings = std::make_unique<ConnectionMySQLSettings>();
|
||||
auto mysql_pool = createMySQLPoolWithFailover(configuration, mysql_settings);
|
||||
mysql_settings->loadFromQueryContext(context);
|
||||
mysql_settings->loadFromQuery(*engine_define); /// higher priority
|
||||
|
||||
mysql_database_settings->loadFromQueryContext(context);
|
||||
mysql_database_settings->loadFromQuery(*engine_define); /// higher priority
|
||||
auto mysql_pool = createMySQLPoolWithFailover(configuration, *mysql_settings);
|
||||
|
||||
return std::make_shared<DatabaseMySQL>(
|
||||
context, database_name, metadata_path, engine_define, configuration.database,
|
||||
std::move(mysql_database_settings), std::move(mysql_pool), create.attach);
|
||||
std::move(mysql_settings), std::move(mysql_pool), create.attach);
|
||||
}
|
||||
|
||||
MySQLClient client(configuration.host, configuration.port, configuration.username, configuration.password);
|
||||
|
@ -17,6 +17,7 @@
|
||||
#endif
|
||||
#if USE_MYSQL
|
||||
#include <Storages/MySQL/MySQLSettings.h>
|
||||
#include <Databases/MySQL/ConnectionMySQLSettings.h>
|
||||
#endif
|
||||
#if USE_NATSIO
|
||||
#include <Storages/NATS/NATSSettings.h>
|
||||
@ -575,6 +576,10 @@ template
|
||||
std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
|
||||
const ASTs & args, ContextPtr context, bool is_database_engine, bool throw_on_no_collection, const BaseSettings<MySQLSettingsTraits> & storage_settings);
|
||||
|
||||
template
|
||||
std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
|
||||
const ASTs & args, ContextPtr context, bool is_database_engine, bool throw_on_no_collection, const BaseSettings<ConnectionMySQLSettingsTraits> & storage_settings);
|
||||
|
||||
template
|
||||
std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
|
||||
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix,
|
||||
@ -583,5 +588,6 @@ std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
|
||||
template
|
||||
SettingsChanges getSettingsChangesFromConfig(
|
||||
const BaseSettings<MySQLSettingsTraits> & settings, const Poco::Util::AbstractConfiguration & config, const String & config_prefix);
|
||||
|
||||
#endif
|
||||
}
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include <mysqlxx/PoolWithFailover.h>
|
||||
#include <Storages/ExternalDataSourceConfiguration.h>
|
||||
#include <Storages/MySQL/MySQLSettings.h>
|
||||
#include <Databases/MySQL/ConnectionMySQLSettings.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -13,8 +14,8 @@ namespace ErrorCodes
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
mysqlxx::PoolWithFailover
|
||||
createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const MySQLSettings & mysql_settings)
|
||||
template <typename T> mysqlxx::PoolWithFailover
|
||||
createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const T & mysql_settings)
|
||||
{
|
||||
if (!mysql_settings.connection_pool_size)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Connection pool cannot have zero size");
|
||||
@ -29,6 +30,11 @@ createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, con
|
||||
mysql_settings.read_write_timeout);
|
||||
}
|
||||
|
||||
template
|
||||
mysqlxx::PoolWithFailover createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const MySQLSettings & mysql_settings);
|
||||
template
|
||||
mysqlxx::PoolWithFailover createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const ConnectionMySQLSettings & mysql_settings);
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -9,10 +9,9 @@ namespace mysqlxx { class PoolWithFailover; }
|
||||
namespace DB
|
||||
{
|
||||
struct StorageMySQLConfiguration;
|
||||
struct MySQLSettings;
|
||||
|
||||
mysqlxx::PoolWithFailover
|
||||
createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const MySQLSettings & mysql_settings);
|
||||
template <typename T> mysqlxx::PoolWithFailover
|
||||
createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const T & mysql_settings);
|
||||
|
||||
}
|
||||
|
||||
|
@ -15,13 +15,18 @@ namespace ErrorCodes
|
||||
|
||||
IMPLEMENT_SETTINGS_TRAITS(MySQLSettingsTraits, LIST_OF_MYSQL_SETTINGS)
|
||||
|
||||
void MySQLSettings::loadFromQuery(const ASTSetQuery & settings_def)
|
||||
{
|
||||
applyChanges(settings_def.changes);
|
||||
}
|
||||
|
||||
void MySQLSettings::loadFromQuery(ASTStorage & storage_def)
|
||||
{
|
||||
if (storage_def.settings)
|
||||
{
|
||||
try
|
||||
{
|
||||
applyChanges(storage_def.settings->changes);
|
||||
loadFromQuery(*storage_def.settings);
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
@ -39,4 +44,3 @@ void MySQLSettings::loadFromQuery(ASTStorage & storage_def)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
@ -13,6 +13,7 @@ namespace Poco::Util
|
||||
namespace DB
|
||||
{
|
||||
class ASTStorage;
|
||||
class ASTSetQuery;
|
||||
|
||||
#define LIST_OF_MYSQL_SETTINGS(M) \
|
||||
M(UInt64, connection_pool_size, 16, "Size of connection pool (if all connections are in use, the query will wait until some connection will be freed).", 0) \
|
||||
@ -32,6 +33,7 @@ using MySQLBaseSettings = BaseSettings<MySQLSettingsTraits>;
|
||||
struct MySQLSettings : public MySQLBaseSettings
|
||||
{
|
||||
void loadFromQuery(ASTStorage & storage_def);
|
||||
void loadFromQuery(const ASTSetQuery & settings_def);
|
||||
};
|
||||
|
||||
|
||||
|
@ -37,11 +37,26 @@ void TableFunctionMySQL::parseArguments(const ASTPtr & ast_function, ContextPtr
|
||||
if (!args_func.arguments)
|
||||
throw Exception("Table function 'mysql' must have arguments.", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
auto & args = args_func.arguments->children;
|
||||
|
||||
MySQLSettings mysql_settings;
|
||||
configuration = StorageMySQL::getConfiguration(args_func.arguments->children, context, mysql_settings);
|
||||
|
||||
const auto & settings = context->getSettingsRef();
|
||||
mysql_settings.connect_timeout = settings.external_storage_connect_timeout_sec;
|
||||
mysql_settings.read_write_timeout = settings.external_storage_rw_timeout_sec;
|
||||
|
||||
for (auto it = args.begin(); it != args.end(); ++it)
|
||||
{
|
||||
const ASTSetQuery * settings_ast = (*it)->as<ASTSetQuery>();
|
||||
if (settings_ast)
|
||||
{
|
||||
mysql_settings.loadFromQuery(*settings_ast);
|
||||
args.erase(it);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
configuration = StorageMySQL::getConfiguration(args, context, mysql_settings);
|
||||
pool.emplace(createMySQLPoolWithFailover(*configuration, mysql_settings));
|
||||
}
|
||||
|
||||
|
@ -250,7 +250,7 @@ def test_mysql_client_exception(started_cluster):
|
||||
expected_msg = "\n".join(
|
||||
[
|
||||
"mysql: [Warning] Using a password on the command line interface can be insecure.",
|
||||
"ERROR 1000 (00000) at line 1: Poco::Exception. Code: 1000, e.code() = 0, Exception: Connections to all replicas failed: default@127.0.0.1:10086 as user default",
|
||||
"ERROR 1000 (00000) at line 1: Poco::Exception. Code: 1000, e.code() = 0, Exception: Connections to mysql failed: default@127.0.0.1:10086 as user default",
|
||||
]
|
||||
)
|
||||
assert stderr[: len(expected_msg)].decode() == expected_msg
|
||||
|
@ -30,5 +30,16 @@
|
||||
<table>test_table</table>
|
||||
<connection_pool_size>0</connection_pool_size>
|
||||
</mysql4>
|
||||
<mysql_with_settings>
|
||||
<user>root</user>
|
||||
<password>clickhouse</password>
|
||||
<host>mysql57</host>
|
||||
<port>3306</port>
|
||||
<database>clickhouse</database>
|
||||
<table>test_settings</table>
|
||||
<connection_pool_size>1</connection_pool_size>
|
||||
<read_write_timeout>20123001</read_write_timeout>
|
||||
<connect_timeout>20123002</connect_timeout>
|
||||
</mysql_with_settings>
|
||||
</named_collections>
|
||||
</clickhouse>
|
||||
|
@ -732,6 +732,93 @@ def test_mysql_null(started_cluster):
|
||||
conn.close()
|
||||
|
||||
|
||||
def test_settings(started_cluster):
|
||||
table_name = "test_settings"
|
||||
node1.query(f"DROP TABLE IF EXISTS {table_name}")
|
||||
wait_timeout = 123
|
||||
rw_timeout = 10123001
|
||||
connect_timeout = 10123002
|
||||
connection_pool_size = 1
|
||||
|
||||
conn = get_mysql_conn(started_cluster, cluster.mysql_ip)
|
||||
drop_mysql_table(conn, table_name)
|
||||
create_mysql_table(conn, table_name)
|
||||
|
||||
node1.query(
|
||||
f"""
|
||||
CREATE TABLE {table_name}
|
||||
(
|
||||
id UInt32,
|
||||
name String,
|
||||
age UInt32,
|
||||
money UInt32
|
||||
)
|
||||
ENGINE = MySQL('mysql57:3306', 'clickhouse', '{table_name}', 'root', 'clickhouse')
|
||||
SETTINGS connection_wait_timeout={wait_timeout}, connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}, connection_pool_size={connection_pool_size}
|
||||
"""
|
||||
)
|
||||
|
||||
node1.query(f"SELECT * FROM {table_name}")
|
||||
assert node1.contains_in_log(
|
||||
f"with settings: connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}"
|
||||
)
|
||||
|
||||
rw_timeout = 20123001
|
||||
connect_timeout = 20123002
|
||||
node1.query(f"SELECT * FROM mysql(mysql_with_settings)")
|
||||
assert node1.contains_in_log(
|
||||
f"with settings: connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}"
|
||||
)
|
||||
|
||||
rw_timeout = 30123001
|
||||
connect_timeout = 30123002
|
||||
node1.query(
|
||||
f"""
|
||||
SELECT *
|
||||
FROM mysql('mysql57:3306', 'clickhouse', '{table_name}', 'root', 'clickhouse',
|
||||
SETTINGS
|
||||
connection_wait_timeout={wait_timeout},
|
||||
connect_timeout={connect_timeout},
|
||||
read_write_timeout={rw_timeout},
|
||||
connection_pool_size={connection_pool_size})
|
||||
"""
|
||||
)
|
||||
assert node1.contains_in_log(
|
||||
f"with settings: connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}"
|
||||
)
|
||||
|
||||
rw_timeout = 40123001
|
||||
connect_timeout = 40123002
|
||||
node1.query(
|
||||
f"""
|
||||
CREATE DATABASE m
|
||||
ENGINE = MySQL(mysql_with_settings, connection_wait_timeout={wait_timeout}, connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}, connection_pool_size={connection_pool_size})
|
||||
"""
|
||||
)
|
||||
assert node1.contains_in_log(
|
||||
f"with settings: connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}"
|
||||
)
|
||||
|
||||
rw_timeout = 50123001
|
||||
connect_timeout = 50123002
|
||||
node1.query(
|
||||
f"""
|
||||
CREATE DATABASE mm ENGINE = MySQL('mysql57:3306', 'clickhouse', 'root', 'clickhouse')
|
||||
SETTINGS
|
||||
connection_wait_timeout={wait_timeout},
|
||||
connect_timeout={connect_timeout},
|
||||
read_write_timeout={rw_timeout},
|
||||
connection_pool_size={connection_pool_size}
|
||||
"""
|
||||
)
|
||||
assert node1.contains_in_log(
|
||||
f"with settings: connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}"
|
||||
)
|
||||
|
||||
drop_mysql_table(conn, table_name)
|
||||
conn.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
with contextmanager(started_cluster)() as cluster:
|
||||
for name, instance in list(cluster.instances.items()):
|
||||
|
Loading…
Reference in New Issue
Block a user