mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-12 09:22:05 +00:00
In mysqlxx::Pool: fix for reconnection problem (#17681)
Remove stalled unrecoverable connections from myqsl connections pool
This commit is contained in:
parent
4ebc410b15
commit
81d9d3237f
@ -104,6 +104,11 @@ void Connection::connect(const char* db,
|
|||||||
if (mysql_options(driver.get(), MYSQL_OPT_LOCAL_INFILE, &enable_local_infile_arg))
|
if (mysql_options(driver.get(), MYSQL_OPT_LOCAL_INFILE, &enable_local_infile_arg))
|
||||||
throw ConnectionFailed(errorMessage(driver.get()), mysql_errno(driver.get()));
|
throw ConnectionFailed(errorMessage(driver.get()), mysql_errno(driver.get()));
|
||||||
|
|
||||||
|
/// Enables auto-reconnect.
|
||||||
|
bool reconnect = true;
|
||||||
|
if (mysql_options(driver.get(), MYSQL_OPT_RECONNECT, reinterpret_cast<const char *>(&reconnect)))
|
||||||
|
throw ConnectionFailed(errorMessage(driver.get()), mysql_errno(driver.get()));
|
||||||
|
|
||||||
/// Specifies particular ssl key and certificate if it needs
|
/// Specifies particular ssl key and certificate if it needs
|
||||||
if (mysql_ssl_set(driver.get(), ifNotEmpty(ssl_key), ifNotEmpty(ssl_cert), ifNotEmpty(ssl_ca), nullptr, nullptr))
|
if (mysql_ssl_set(driver.get(), ifNotEmpty(ssl_key), ifNotEmpty(ssl_cert), ifNotEmpty(ssl_ca), nullptr, nullptr))
|
||||||
throw ConnectionFailed(errorMessage(driver.get()), mysql_errno(driver.get()));
|
throw ConnectionFailed(errorMessage(driver.get()), mysql_errno(driver.get()));
|
||||||
@ -115,11 +120,6 @@ void Connection::connect(const char* db,
|
|||||||
if (mysql_set_character_set(driver.get(), "UTF8"))
|
if (mysql_set_character_set(driver.get(), "UTF8"))
|
||||||
throw ConnectionFailed(errorMessage(driver.get()), mysql_errno(driver.get()));
|
throw ConnectionFailed(errorMessage(driver.get()), mysql_errno(driver.get()));
|
||||||
|
|
||||||
/// Enables auto-reconnect.
|
|
||||||
bool reconnect = true;
|
|
||||||
if (mysql_options(driver.get(), MYSQL_OPT_RECONNECT, reinterpret_cast<const char *>(&reconnect)))
|
|
||||||
throw ConnectionFailed(errorMessage(driver.get()), mysql_errno(driver.get()));
|
|
||||||
|
|
||||||
is_connected = true;
|
is_connected = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,6 +26,7 @@ void Pool::Entry::incrementRefCount()
|
|||||||
mysql_thread_init();
|
mysql_thread_init();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void Pool::Entry::decrementRefCount()
|
void Pool::Entry::decrementRefCount()
|
||||||
{
|
{
|
||||||
if (!data)
|
if (!data)
|
||||||
@ -150,28 +151,39 @@ Pool::Entry Pool::tryGet()
|
|||||||
|
|
||||||
initialize();
|
initialize();
|
||||||
|
|
||||||
/// Searching for connection which was established but wasn't used.
|
/// Try to pick an idle connection from already allocated
|
||||||
for (auto & connection : connections)
|
for (auto connection_it = connections.cbegin(); connection_it != connections.cend();)
|
||||||
{
|
{
|
||||||
if (connection->ref_count == 0)
|
Connection * connection_ptr = *connection_it;
|
||||||
|
/// Fixme: There is a race condition here b/c we do not synchronize with Pool::Entry's copy-assignment operator
|
||||||
|
if (connection_ptr->ref_count == 0)
|
||||||
{
|
{
|
||||||
Entry res(connection, this);
|
Entry res(connection_ptr, this);
|
||||||
return res.tryForceConnected() ? res : Entry();
|
if (res.tryForceConnected()) /// Tries to reestablish connection as well
|
||||||
|
return res;
|
||||||
|
|
||||||
|
auto & logger = Poco::Util::Application::instance().logger();
|
||||||
|
logger.information("Idle connection to mysql server cannot be recovered, dropping it.");
|
||||||
|
|
||||||
|
/// This one is disconnected, cannot be reestablished and so needs to be disposed of.
|
||||||
|
connection_it = connections.erase(connection_it);
|
||||||
|
::delete connection_ptr; /// TODO: Manual memory management is awkward (matches allocConnection() method)
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
++connection_it;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Throws if pool is overflowed.
|
|
||||||
if (connections.size() >= max_connections)
|
if (connections.size() >= max_connections)
|
||||||
throw Poco::Exception("mysqlxx::Pool is full");
|
throw Poco::Exception("mysqlxx::Pool is full");
|
||||||
|
|
||||||
/// Allocates new connection.
|
Connection * connection_ptr = allocConnection(true);
|
||||||
Connection * conn = allocConnection(true);
|
if (connection_ptr)
|
||||||
if (conn)
|
return {connection_ptr, this};
|
||||||
return Entry(conn, this);
|
|
||||||
|
|
||||||
return Entry();
|
return {};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void Pool::removeConnection(Connection* connection)
|
void Pool::removeConnection(Connection* connection)
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(mutex);
|
std::lock_guard<std::mutex> lock(mutex);
|
||||||
@ -199,11 +211,9 @@ void Pool::Entry::forceConnected() const
|
|||||||
throw Poco::RuntimeException("Tried to access NULL database connection.");
|
throw Poco::RuntimeException("Tried to access NULL database connection.");
|
||||||
|
|
||||||
Poco::Util::Application & app = Poco::Util::Application::instance();
|
Poco::Util::Application & app = Poco::Util::Application::instance();
|
||||||
if (data->conn.ping())
|
|
||||||
return;
|
|
||||||
|
|
||||||
bool first = true;
|
bool first = true;
|
||||||
do
|
while (!tryForceConnected())
|
||||||
{
|
{
|
||||||
if (first)
|
if (first)
|
||||||
first = false;
|
first = false;
|
||||||
@ -225,7 +235,26 @@ void Pool::Entry::forceConnected() const
|
|||||||
pool->rw_timeout,
|
pool->rw_timeout,
|
||||||
pool->enable_local_infile);
|
pool->enable_local_infile);
|
||||||
}
|
}
|
||||||
while (!data->conn.ping());
|
}
|
||||||
|
|
||||||
|
|
||||||
|
bool Pool::Entry::tryForceConnected() const
|
||||||
|
{
|
||||||
|
auto * const mysql_driver = data->conn.getDriver();
|
||||||
|
const auto prev_connection_id = mysql_thread_id(mysql_driver);
|
||||||
|
if (data->conn.ping()) /// Attempts to reestablish lost connection
|
||||||
|
{
|
||||||
|
const auto current_connection_id = mysql_thread_id(mysql_driver);
|
||||||
|
if (prev_connection_id != current_connection_id)
|
||||||
|
{
|
||||||
|
auto & logger = Poco::Util::Application::instance().logger();
|
||||||
|
logger.information("Connection to mysql server has been reestablished. Connection id changed: %d -> %d",
|
||||||
|
prev_connection_id, current_connection_id);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -127,10 +127,7 @@ public:
|
|||||||
void forceConnected() const;
|
void forceConnected() const;
|
||||||
|
|
||||||
/// Connects to database. If connection is failed then returns false.
|
/// Connects to database. If connection is failed then returns false.
|
||||||
bool tryForceConnected() const
|
bool tryForceConnected() const;
|
||||||
{
|
|
||||||
return data->conn.ping();
|
|
||||||
}
|
|
||||||
|
|
||||||
void incrementRefCount();
|
void incrementRefCount();
|
||||||
void decrementRefCount();
|
void decrementRefCount();
|
||||||
|
@ -1,2 +1,5 @@
|
|||||||
add_executable (mysqlxx_test mysqlxx_test.cpp)
|
add_executable (mysqlxx_test mysqlxx_test.cpp)
|
||||||
target_link_libraries (mysqlxx_test PRIVATE mysqlxx)
|
target_link_libraries (mysqlxx_test PRIVATE mysqlxx)
|
||||||
|
|
||||||
|
add_executable (mysqlxx_pool_test mysqlxx_pool_test.cpp)
|
||||||
|
target_link_libraries (mysqlxx_pool_test PRIVATE mysqlxx)
|
||||||
|
98
base/mysqlxx/tests/mysqlxx_pool_test.cpp
Normal file
98
base/mysqlxx/tests/mysqlxx_pool_test.cpp
Normal file
@ -0,0 +1,98 @@
|
|||||||
|
#include <mysqlxx/mysqlxx.h>
|
||||||
|
|
||||||
|
#include <chrono>
|
||||||
|
#include <iostream>
|
||||||
|
#include <sstream>
|
||||||
|
#include <thread>
|
||||||
|
|
||||||
|
|
||||||
|
namespace
|
||||||
|
{
|
||||||
|
mysqlxx::Pool::Entry getWithFailover(mysqlxx::Pool & connections_pool)
|
||||||
|
{
|
||||||
|
using namespace std::chrono;
|
||||||
|
|
||||||
|
constexpr size_t max_tries = 3;
|
||||||
|
|
||||||
|
mysqlxx::Pool::Entry worker_connection;
|
||||||
|
|
||||||
|
for (size_t try_no = 1; try_no <= max_tries; ++try_no)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
worker_connection = connections_pool.tryGet();
|
||||||
|
|
||||||
|
if (!worker_connection.isNull())
|
||||||
|
{
|
||||||
|
return worker_connection;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (const Poco::Exception & e)
|
||||||
|
{
|
||||||
|
if (e.displayText().find("mysqlxx::Pool is full") != std::string::npos)
|
||||||
|
{
|
||||||
|
std::cerr << e.displayText() << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::cerr << "Connection to " << connections_pool.getDescription() << " failed: " << e.displayText() << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::clog << "Connection to all replicas failed " << try_no << " times" << std::endl;
|
||||||
|
std::this_thread::sleep_for(1s);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::stringstream message;
|
||||||
|
message << "Connections to all replicas failed: " << connections_pool.getDescription();
|
||||||
|
|
||||||
|
throw Poco::Exception(message.str());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int, char **)
|
||||||
|
{
|
||||||
|
using namespace std::chrono;
|
||||||
|
|
||||||
|
const char * remote_mysql = "localhost";
|
||||||
|
const std::string test_query = "SHOW DATABASES";
|
||||||
|
|
||||||
|
mysqlxx::Pool mysql_conn_pool("", remote_mysql, "default", "10203040", 3306);
|
||||||
|
|
||||||
|
size_t iteration = 0;
|
||||||
|
while (++iteration)
|
||||||
|
{
|
||||||
|
std::clog << "Iteration: " << iteration << std::endl;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
std::clog << "Acquiring DB connection ...";
|
||||||
|
mysqlxx::Pool::Entry worker = getWithFailover(mysql_conn_pool);
|
||||||
|
std::clog << "ok" << std::endl;
|
||||||
|
|
||||||
|
std::clog << "Preparing query (5s sleep) ...";
|
||||||
|
std::this_thread::sleep_for(5s);
|
||||||
|
mysqlxx::Query query = worker->query();
|
||||||
|
query << test_query;
|
||||||
|
std::clog << "ok" << std::endl;
|
||||||
|
|
||||||
|
std::clog << "Querying result (5s sleep) ...";
|
||||||
|
std::this_thread::sleep_for(5s);
|
||||||
|
mysqlxx::UseQueryResult result = query.use();
|
||||||
|
std::clog << "ok" << std::endl;
|
||||||
|
|
||||||
|
std::clog << "Fetching result data (5s sleep) ...";
|
||||||
|
std::this_thread::sleep_for(5s);
|
||||||
|
size_t rows_count = 0;
|
||||||
|
while (result.fetch())
|
||||||
|
++rows_count;
|
||||||
|
std::clog << "ok" << std::endl;
|
||||||
|
|
||||||
|
std::clog << "Read " << rows_count << " rows." << std::endl;
|
||||||
|
}
|
||||||
|
catch (const Poco::Exception & e)
|
||||||
|
{
|
||||||
|
std::cerr << "Iteration FAILED:\n" << e.displayText() << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::clog << "====================" << std::endl;
|
||||||
|
std::this_thread::sleep_for(3s);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user