mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
Merge pull request #28474 from azat/mysql-connection_no_block
Introduce connection_wait_timeout for MySQL engine.
This commit is contained in:
commit
7a4a0b0ede
@ -7,10 +7,22 @@
|
||||
#endif
|
||||
|
||||
#include <mysqlxx/Pool.h>
|
||||
|
||||
#include <common/sleep.h>
|
||||
|
||||
#include <Poco/Util/LayeredConfiguration.h>
|
||||
#include <ctime>
|
||||
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
inline uint64_t clock_gettime_ns(clockid_t clock_type = CLOCK_MONOTONIC)
|
||||
{
|
||||
struct timespec ts;
|
||||
clock_gettime(clock_type, &ts);
|
||||
return uint64_t(ts.tv_sec * 1000000000LL + ts.tv_nsec);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
namespace mysqlxx
|
||||
@ -124,10 +136,15 @@ Pool::~Pool()
|
||||
}
|
||||
|
||||
|
||||
Pool::Entry Pool::get()
|
||||
Pool::Entry Pool::get(uint64_t wait_timeout)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
|
||||
uint64_t deadline = 0;
|
||||
/// UINT64_MAX -- wait indefinitely
|
||||
if (wait_timeout && wait_timeout != UINT64_MAX)
|
||||
deadline = clock_gettime_ns() + wait_timeout * 1'000'000'000;
|
||||
|
||||
initialize();
|
||||
for (;;)
|
||||
{
|
||||
@ -153,6 +170,12 @@ Pool::Entry Pool::get()
|
||||
logger.trace("(%s): Unable to create a new connection: Max number of connections has been reached.", getDescription());
|
||||
}
|
||||
|
||||
if (!wait_timeout)
|
||||
throw Poco::Exception("mysqlxx::Pool is full (wait is disabled, see connection_wait_timeout setting)");
|
||||
|
||||
if (deadline && clock_gettime_ns() >= deadline)
|
||||
throw Poco::Exception("mysqlxx::Pool is full (connection_wait_timeout is exceeded)");
|
||||
|
||||
lock.unlock();
|
||||
logger.trace("(%s): Sleeping for %d seconds.", getDescription(), MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
|
||||
sleepForSeconds(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
|
||||
|
@ -189,7 +189,7 @@ public:
|
||||
~Pool();
|
||||
|
||||
/// Allocates connection.
|
||||
Entry get();
|
||||
Entry get(uint64_t wait_timeout);
|
||||
|
||||
/// Allocates connection.
|
||||
/// If database is not accessible, returns empty Entry object.
|
||||
|
@ -21,8 +21,9 @@ PoolWithFailover::PoolWithFailover(
|
||||
const unsigned max_connections_,
|
||||
const size_t max_tries_)
|
||||
: max_tries(max_tries_)
|
||||
, shareable(config_.getBool(config_name_ + ".share_connection", false))
|
||||
, wait_timeout(UINT64_MAX)
|
||||
{
|
||||
shareable = config_.getBool(config_name_ + ".share_connection", false);
|
||||
if (config_.has(config_name_ + ".replica"))
|
||||
{
|
||||
Poco::Util::AbstractConfiguration::Keys replica_keys;
|
||||
@ -80,9 +81,11 @@ PoolWithFailover::PoolWithFailover(
|
||||
const std::string & password,
|
||||
unsigned default_connections_,
|
||||
unsigned max_connections_,
|
||||
size_t max_tries_)
|
||||
size_t max_tries_,
|
||||
uint64_t wait_timeout_)
|
||||
: max_tries(max_tries_)
|
||||
, shareable(false)
|
||||
, wait_timeout(wait_timeout_)
|
||||
{
|
||||
/// Replicas have the same priority, but traversed replicas are moved to the end of the queue.
|
||||
for (const auto & [host, port] : addresses)
|
||||
@ -101,6 +104,7 @@ PoolWithFailover::PoolWithFailover(
|
||||
PoolWithFailover::PoolWithFailover(const PoolWithFailover & other)
|
||||
: max_tries{other.max_tries}
|
||||
, shareable{other.shareable}
|
||||
, wait_timeout(other.wait_timeout)
|
||||
{
|
||||
if (shareable)
|
||||
{
|
||||
@ -140,7 +144,7 @@ PoolWithFailover::Entry PoolWithFailover::get()
|
||||
|
||||
try
|
||||
{
|
||||
Entry entry = shareable ? pool->get() : pool->tryGet();
|
||||
Entry entry = shareable ? pool->get(wait_timeout) : pool->tryGet();
|
||||
|
||||
if (!entry.isNull())
|
||||
{
|
||||
@ -172,7 +176,7 @@ PoolWithFailover::Entry PoolWithFailover::get()
|
||||
if (full_pool)
|
||||
{
|
||||
app.logger().error("All connections failed, trying to wait on a full pool " + (*full_pool)->getDescription());
|
||||
return (*full_pool)->get();
|
||||
return (*full_pool)->get(wait_timeout);
|
||||
}
|
||||
|
||||
std::stringstream message;
|
||||
|
@ -80,6 +80,8 @@ namespace mysqlxx
|
||||
std::mutex mutex;
|
||||
/// Can the Pool be shared
|
||||
bool shareable;
|
||||
/// Timeout for waiting free connection.
|
||||
uint64_t wait_timeout = 0;
|
||||
|
||||
public:
|
||||
using Entry = Pool::Entry;
|
||||
@ -96,6 +98,7 @@ namespace mysqlxx
|
||||
* default_connections Number of connection in pool to each replica at start.
|
||||
* max_connections Maximum number of connections in pool to each replica.
|
||||
* max_tries_ Max number of connection tries.
|
||||
* wait_timeout_ Timeout for waiting free connection.
|
||||
*/
|
||||
PoolWithFailover(
|
||||
const std::string & config_name_,
|
||||
@ -117,7 +120,8 @@ namespace mysqlxx
|
||||
const std::string & password,
|
||||
unsigned default_connections_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,
|
||||
unsigned max_connections_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS,
|
||||
size_t max_tries_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
|
||||
size_t max_tries_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES,
|
||||
uint64_t wait_timeout_ = UINT64_MAX);
|
||||
|
||||
PoolWithFailover(const PoolWithFailover & other);
|
||||
|
||||
|
@ -19,6 +19,7 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
|
||||
SETTINGS
|
||||
[connection_pool_size=16, ]
|
||||
[connection_max_tries=3, ]
|
||||
[connection_wait_timeout=5, ] /* 0 -- do not wait */
|
||||
[connection_auto_close=true ]
|
||||
;
|
||||
```
|
||||
|
@ -247,7 +247,7 @@ void MaterializedMySQLSyncThread::assertMySQLAvailable()
|
||||
{
|
||||
try
|
||||
{
|
||||
checkMySQLVariables(pool.get(), getContext()->getSettingsRef());
|
||||
checkMySQLVariables(pool.get(/* wait_timeout= */ UINT64_MAX), getContext()->getSettingsRef());
|
||||
}
|
||||
catch (const mysqlxx::ConnectionFailed & e)
|
||||
{
|
||||
@ -729,7 +729,7 @@ void MaterializedMySQLSyncThread::onEvent(Buffers & buffers, const BinlogEventPt
|
||||
{
|
||||
/// Some behaviors(such as changing the value of "binlog_checksum") rotate the binlog file.
|
||||
/// To ensure that the synchronization continues, we need to handle these events
|
||||
metadata.fetchMasterVariablesValue(pool.get());
|
||||
metadata.fetchMasterVariablesValue(pool.get(/* wait_timeout= */ UINT64_MAX));
|
||||
client.setBinlogChecksum(metadata.binlog_checksum);
|
||||
}
|
||||
else if (receive_event->header.type != HEARTBEAT_EVENT)
|
||||
|
@ -17,6 +17,7 @@ class ASTStorage;
|
||||
#define LIST_OF_MYSQL_SETTINGS(M) \
|
||||
M(UInt64, connection_pool_size, 16, "Size of connection pool (if all connections are in use, the query will wait until some connection will be freed).", 0) \
|
||||
M(UInt64, connection_max_tries, 3, "Number of retries for pool with failover", 0) \
|
||||
M(UInt64, connection_wait_timeout, 5, "Timeout (in seconds) for waiting for free connection (in case of there is already connection_pool_size active connections), 0 - do not wait.", 0) \
|
||||
M(Bool, connection_auto_close, true, "Auto-close connection after query execution, i.e. disable connection reuse.", 0) \
|
||||
|
||||
DECLARE_SETTINGS_TRAITS(MySQLSettingsTraits, LIST_OF_MYSQL_SETTINGS)
|
||||
|
@ -267,11 +267,15 @@ void registerStorageMySQL(StorageFactory & factory)
|
||||
throw Exception("connection_pool_size cannot be zero.", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
auto addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 3306);
|
||||
mysqlxx::PoolWithFailover pool(remote_database, addresses,
|
||||
username, password,
|
||||
mysqlxx::PoolWithFailover pool(
|
||||
remote_database,
|
||||
addresses,
|
||||
username,
|
||||
password,
|
||||
MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,
|
||||
mysql_settings.connection_pool_size,
|
||||
mysql_settings.connection_max_tries);
|
||||
mysql_settings.connection_max_tries,
|
||||
mysql_settings.connection_wait_timeout);
|
||||
|
||||
bool replace_query = false;
|
||||
std::string on_duplicate_clause;
|
||||
|
@ -3,7 +3,10 @@ from contextlib import contextmanager
|
||||
## sudo -H pip install PyMySQL
|
||||
import pymysql.cursors
|
||||
import pytest
|
||||
import time
|
||||
import threading
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.client import QueryRuntimeException
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
@ -319,6 +322,51 @@ CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL
|
||||
conn.close()
|
||||
|
||||
|
||||
# Check that limited connection_wait_timeout (via connection_pool_size=1) will throw.
|
||||
def test_settings_connection_wait_timeout(started_cluster):
|
||||
table_name = 'test_settings_connection_wait_timeout'
|
||||
node1.query(f'DROP TABLE IF EXISTS {table_name}')
|
||||
wait_timeout = 2
|
||||
|
||||
conn = get_mysql_conn(started_cluster, cluster.mysql_ip)
|
||||
drop_mysql_table(conn, table_name)
|
||||
create_mysql_table(conn, table_name)
|
||||
|
||||
node1.query('''
|
||||
CREATE TABLE {}
|
||||
(
|
||||
id UInt32,
|
||||
name String,
|
||||
age UInt32,
|
||||
money UInt32
|
||||
)
|
||||
ENGINE = MySQL('mysql57:3306', 'clickhouse', '{}', 'root', 'clickhouse')
|
||||
SETTINGS connection_wait_timeout={}, connection_pool_size=1
|
||||
'''.format(table_name, table_name, wait_timeout)
|
||||
)
|
||||
|
||||
node1.query("INSERT INTO {} (id, name) SELECT number, concat('name_', toString(number)) from numbers(10) ".format(table_name))
|
||||
|
||||
def worker():
|
||||
node1.query("SELECT sleepEachRow(1) FROM {}".format(table_name))
|
||||
|
||||
worker_thread = threading.Thread(target=worker)
|
||||
worker_thread.start()
|
||||
|
||||
# ensure that first query started in worker_thread
|
||||
time.sleep(1)
|
||||
|
||||
started = time.time()
|
||||
with pytest.raises(QueryRuntimeException, match=r"Exception: mysqlxx::Pool is full \(connection_wait_timeout is exceeded\)"):
|
||||
node1.query("SELECT sleepEachRow(1) FROM {}".format(table_name))
|
||||
ended = time.time()
|
||||
assert (ended - started) >= wait_timeout
|
||||
|
||||
worker_thread.join()
|
||||
|
||||
drop_mysql_table(conn, table_name)
|
||||
conn.close()
|
||||
|
||||
if __name__ == '__main__':
|
||||
with contextmanager(started_cluster)() as cluster:
|
||||
for name, instance in list(cluster.instances.items()):
|
||||
|
Loading…
Reference in New Issue
Block a user