diff --git a/base/mysqlxx/Connection.cpp b/base/mysqlxx/Connection.cpp index 8c7e11eb4a1..55757008562 100644 --- a/base/mysqlxx/Connection.cpp +++ b/base/mysqlxx/Connection.cpp @@ -104,6 +104,11 @@ void Connection::connect(const char* db, if (mysql_options(driver.get(), MYSQL_OPT_LOCAL_INFILE, &enable_local_infile_arg)) throw ConnectionFailed(errorMessage(driver.get()), mysql_errno(driver.get())); + /// Enables auto-reconnect. + bool reconnect = true; + if (mysql_options(driver.get(), MYSQL_OPT_RECONNECT, reinterpret_cast(&reconnect))) + throw ConnectionFailed(errorMessage(driver.get()), mysql_errno(driver.get())); + /// Specifies particular ssl key and certificate if it needs if (mysql_ssl_set(driver.get(), ifNotEmpty(ssl_key), ifNotEmpty(ssl_cert), ifNotEmpty(ssl_ca), nullptr, nullptr)) throw ConnectionFailed(errorMessage(driver.get()), mysql_errno(driver.get())); @@ -115,11 +120,6 @@ void Connection::connect(const char* db, if (mysql_set_character_set(driver.get(), "UTF8")) throw ConnectionFailed(errorMessage(driver.get()), mysql_errno(driver.get())); - /// Enables auto-reconnect. - bool reconnect = true; - if (mysql_options(driver.get(), MYSQL_OPT_RECONNECT, reinterpret_cast(&reconnect))) - throw ConnectionFailed(errorMessage(driver.get()), mysql_errno(driver.get())); - is_connected = true; } diff --git a/base/mysqlxx/Pool.cpp b/base/mysqlxx/Pool.cpp index d845570f1f2..2058429d3da 100644 --- a/base/mysqlxx/Pool.cpp +++ b/base/mysqlxx/Pool.cpp @@ -26,6 +26,7 @@ void Pool::Entry::incrementRefCount() mysql_thread_init(); } + void Pool::Entry::decrementRefCount() { if (!data) @@ -150,28 +151,39 @@ Pool::Entry Pool::tryGet() initialize(); - /// Searching for connection which was established but wasn't used. - for (auto & connection : connections) + /// Try to pick an idle connection from already allocated + for (auto connection_it = connections.cbegin(); connection_it != connections.cend();) { - if (connection->ref_count == 0) + Connection * connection_ptr = *connection_it; + /// Fixme: There is a race condition here b/c we do not synchronize with Pool::Entry's copy-assignment operator + if (connection_ptr->ref_count == 0) { - Entry res(connection, this); - return res.tryForceConnected() ? res : Entry(); + Entry res(connection_ptr, this); + if (res.tryForceConnected()) /// Tries to reestablish connection as well + return res; + + auto & logger = Poco::Util::Application::instance().logger(); + logger.information("Idle connection to mysql server cannot be recovered, dropping it."); + + /// This one is disconnected, cannot be reestablished and so needs to be disposed of. + connection_it = connections.erase(connection_it); + ::delete connection_ptr; /// TODO: Manual memory management is awkward (matches allocConnection() method) } + else + ++connection_it; } - /// Throws if pool is overflowed. if (connections.size() >= max_connections) throw Poco::Exception("mysqlxx::Pool is full"); - /// Allocates new connection. - Connection * conn = allocConnection(true); - if (conn) - return Entry(conn, this); + Connection * connection_ptr = allocConnection(true); + if (connection_ptr) + return {connection_ptr, this}; - return Entry(); + return {}; } + void Pool::removeConnection(Connection* connection) { std::lock_guard lock(mutex); @@ -199,11 +211,9 @@ void Pool::Entry::forceConnected() const throw Poco::RuntimeException("Tried to access NULL database connection."); Poco::Util::Application & app = Poco::Util::Application::instance(); - if (data->conn.ping()) - return; bool first = true; - do + while (!tryForceConnected()) { if (first) first = false; @@ -225,7 +235,26 @@ void Pool::Entry::forceConnected() const pool->rw_timeout, pool->enable_local_infile); } - while (!data->conn.ping()); +} + + +bool Pool::Entry::tryForceConnected() const +{ + auto * const mysql_driver = data->conn.getDriver(); + const auto prev_connection_id = mysql_thread_id(mysql_driver); + if (data->conn.ping()) /// Attempts to reestablish lost connection + { + const auto current_connection_id = mysql_thread_id(mysql_driver); + if (prev_connection_id != current_connection_id) + { + auto & logger = Poco::Util::Application::instance().logger(); + logger.information("Connection to mysql server has been reestablished. Connection id changed: %d -> %d", + prev_connection_id, current_connection_id); + } + return true; + } + + return false; } diff --git a/base/mysqlxx/Pool.h b/base/mysqlxx/Pool.h index 59d15e8c9a0..83b00e0081a 100644 --- a/base/mysqlxx/Pool.h +++ b/base/mysqlxx/Pool.h @@ -127,10 +127,7 @@ public: void forceConnected() const; /// Connects to database. If connection is failed then returns false. - bool tryForceConnected() const - { - return data->conn.ping(); - } + bool tryForceConnected() const; void incrementRefCount(); void decrementRefCount(); diff --git a/base/mysqlxx/tests/CMakeLists.txt b/base/mysqlxx/tests/CMakeLists.txt index ec3fdfaa913..2cf19d78418 100644 --- a/base/mysqlxx/tests/CMakeLists.txt +++ b/base/mysqlxx/tests/CMakeLists.txt @@ -1,2 +1,5 @@ add_executable (mysqlxx_test mysqlxx_test.cpp) target_link_libraries (mysqlxx_test PRIVATE mysqlxx) + +add_executable (mysqlxx_pool_test mysqlxx_pool_test.cpp) +target_link_libraries (mysqlxx_pool_test PRIVATE mysqlxx) diff --git a/base/mysqlxx/tests/mysqlxx_pool_test.cpp b/base/mysqlxx/tests/mysqlxx_pool_test.cpp new file mode 100644 index 00000000000..3dc23e4da85 --- /dev/null +++ b/base/mysqlxx/tests/mysqlxx_pool_test.cpp @@ -0,0 +1,98 @@ +#include + +#include +#include +#include +#include + + +namespace +{ +mysqlxx::Pool::Entry getWithFailover(mysqlxx::Pool & connections_pool) +{ + using namespace std::chrono; + + constexpr size_t max_tries = 3; + + mysqlxx::Pool::Entry worker_connection; + + for (size_t try_no = 1; try_no <= max_tries; ++try_no) + { + try + { + worker_connection = connections_pool.tryGet(); + + if (!worker_connection.isNull()) + { + return worker_connection; + } + } + catch (const Poco::Exception & e) + { + if (e.displayText().find("mysqlxx::Pool is full") != std::string::npos) + { + std::cerr << e.displayText() << std::endl; + } + + std::cerr << "Connection to " << connections_pool.getDescription() << " failed: " << e.displayText() << std::endl; + } + + std::clog << "Connection to all replicas failed " << try_no << " times" << std::endl; + std::this_thread::sleep_for(1s); + } + + std::stringstream message; + message << "Connections to all replicas failed: " << connections_pool.getDescription(); + + throw Poco::Exception(message.str()); +} +} + +int main(int, char **) +{ + using namespace std::chrono; + + const char * remote_mysql = "localhost"; + const std::string test_query = "SHOW DATABASES"; + + mysqlxx::Pool mysql_conn_pool("", remote_mysql, "default", "10203040", 3306); + + size_t iteration = 0; + while (++iteration) + { + std::clog << "Iteration: " << iteration << std::endl; + try + { + std::clog << "Acquiring DB connection ..."; + mysqlxx::Pool::Entry worker = getWithFailover(mysql_conn_pool); + std::clog << "ok" << std::endl; + + std::clog << "Preparing query (5s sleep) ..."; + std::this_thread::sleep_for(5s); + mysqlxx::Query query = worker->query(); + query << test_query; + std::clog << "ok" << std::endl; + + std::clog << "Querying result (5s sleep) ..."; + std::this_thread::sleep_for(5s); + mysqlxx::UseQueryResult result = query.use(); + std::clog << "ok" << std::endl; + + std::clog << "Fetching result data (5s sleep) ..."; + std::this_thread::sleep_for(5s); + size_t rows_count = 0; + while (result.fetch()) + ++rows_count; + std::clog << "ok" << std::endl; + + std::clog << "Read " << rows_count << " rows." << std::endl; + } + catch (const Poco::Exception & e) + { + std::cerr << "Iteration FAILED:\n" << e.displayText() << std::endl; + } + + std::clog << "====================" << std::endl; + std::this_thread::sleep_for(3s); + } +}