MySQL dictionary source: A mechanism for retrying unexpected connection loss during communication with mysql server (#21237)

* Added a new type of Exception

for better recognition of connection failures

* Added more logging for mysql communication

* Retries on connection loss during query.

Make MySQL dictionary source resilient to unexpected loss of connection
during querying MySQL. This behavior is controlled with
".fail_on_connection_loss" config parameter, which defaults to false.

* Stripped some debugging leftover garbage

* Minor followup corrections

* Corrections after PR comments

* Yet more fixes
This commit is contained in:
Alexander Kazakov 2021-02-27 11:18:28 +03:00 committed by GitHub
parent 184e9b9424
commit 63b95c7451
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 113 additions and 30 deletions

View File

@ -26,6 +26,15 @@ struct ConnectionFailed : public Exception
};
/// Connection to MySQL server was lost
struct ConnectionLost : public Exception
{
ConnectionLost(const std::string & msg, int code = 0) : Exception(msg, code) {}
const char * name() const throw() override { return "mysqlxx::ConnectionLost"; }
const char * className() const throw() override { return "mysqlxx::ConnectionLost"; }
};
/// Erroneous query.
struct BadQuery : public Exception
{

View File

@ -10,7 +10,6 @@
#include <common/sleep.h>
#include <Poco/Util/Application.h>
#include <Poco/Util/LayeredConfiguration.h>
@ -41,7 +40,9 @@ void Pool::Entry::decrementRefCount()
Pool::Pool(const Poco::Util::AbstractConfiguration & cfg, const std::string & config_name,
unsigned default_connections_, unsigned max_connections_,
const char * parent_config_name_)
: default_connections(default_connections_), max_connections(max_connections_)
: logger(Poco::Logger::get("mysqlxx::Pool"))
, default_connections(default_connections_)
, max_connections(max_connections_)
{
server = cfg.getString(config_name + ".host");
@ -130,20 +131,30 @@ Pool::Entry Pool::get()
initialize();
for (;;)
{
logger.trace("(%s): Iterating through existing MySQL connections", getDescription());
for (auto & connection : connections)
{
if (connection->ref_count == 0)
return Entry(connection, this);
}
logger.trace("(%s): Trying to allocate a new connection.", getDescription());
if (connections.size() < static_cast<size_t>(max_connections))
{
Connection * conn = allocConnection();
if (conn)
return Entry(conn, this);
logger.trace("(%s): Unable to create a new connection: Allocation failed.", getDescription());
}
else
{
logger.trace("(%s): Unable to create a new connection: Max number of connections has been reached.", getDescription());
}
lock.unlock();
logger.trace("(%s): Sleeping for %d seconds.", getDescription(), MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
sleepForSeconds(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
lock.lock();
}
@ -167,8 +178,7 @@ Pool::Entry Pool::tryGet()
if (res.tryForceConnected()) /// Tries to reestablish connection as well
return res;
auto & logger = Poco::Util::Application::instance().logger();
logger.information("Idle connection to mysql server cannot be recovered, dropping it.");
logger.debug("(%s): Idle connection to MySQL server cannot be recovered, dropping it.", getDescription());
/// This one is disconnected, cannot be reestablished and so needs to be disposed of.
connection_it = connections.erase(connection_it);
@ -191,6 +201,8 @@ Pool::Entry Pool::tryGet()
void Pool::removeConnection(Connection* connection)
{
logger.trace("(%s): Removing connection.", getDescription());
std::lock_guard<std::mutex> lock(mutex);
if (connection)
{
@ -215,8 +227,6 @@ void Pool::Entry::forceConnected() const
if (data == nullptr)
throw Poco::RuntimeException("Tried to access NULL database connection.");
Poco::Util::Application & app = Poco::Util::Application::instance();
bool first = true;
while (!tryForceConnected())
{
@ -225,7 +235,7 @@ void Pool::Entry::forceConnected() const
else
sleepForSeconds(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
app.logger().information("MYSQL: Reconnecting to " + pool->description);
pool->logger.debug("Entry: Reconnecting to MySQL server %s", pool->description);
data->conn.connect(
pool->db.c_str(),
pool->server.c_str(),
@ -248,18 +258,22 @@ bool Pool::Entry::tryForceConnected() const
{
auto * const mysql_driver = data->conn.getDriver();
const auto prev_connection_id = mysql_thread_id(mysql_driver);
pool->logger.trace("Entry(connection %lu): sending PING to check if it is alive.", prev_connection_id);
if (data->conn.ping()) /// Attempts to reestablish lost connection
{
const auto current_connection_id = mysql_thread_id(mysql_driver);
if (prev_connection_id != current_connection_id)
{
auto & logger = Poco::Util::Application::instance().logger();
logger.information("Reconnected to mysql server. Connection id changed: %lu -> %lu",
prev_connection_id, current_connection_id);
pool->logger.debug("Entry(connection %lu): Reconnected to MySQL server. Connection id changed: %lu -> %lu",
current_connection_id, prev_connection_id, current_connection_id);
}
pool->logger.trace("Entry(connection %lu): PING ok.", current_connection_id);
return true;
}
pool->logger.trace("Entry(connection %lu): PING failed.", prev_connection_id);
return false;
}
@ -280,15 +294,13 @@ void Pool::initialize()
Pool::Connection * Pool::allocConnection(bool dont_throw_if_failed_first_time)
{
Poco::Util::Application & app = Poco::Util::Application::instance();
std::unique_ptr<Connection> conn(new Connection);
std::unique_ptr<Connection> conn_ptr{new Connection};
try
{
app.logger().information("MYSQL: Connecting to " + description);
logger.debug("Connecting to %s", description);
conn->conn.connect(
conn_ptr->conn.connect(
db.c_str(),
server.c_str(),
user.c_str(),
@ -305,25 +317,24 @@ Pool::Connection * Pool::allocConnection(bool dont_throw_if_failed_first_time)
}
catch (mysqlxx::ConnectionFailed & e)
{
logger.error(e.what());
if ((!was_successful && !dont_throw_if_failed_first_time)
|| e.errnum() == ER_ACCESS_DENIED_ERROR
|| e.errnum() == ER_DBACCESS_DENIED_ERROR
|| e.errnum() == ER_BAD_DB_ERROR)
{
app.logger().error(e.what());
throw;
}
else
{
app.logger().error(e.what());
return nullptr;
}
}
connections.push_back(conn_ptr.get());
was_successful = true;
auto * connection = conn.release();
connections.push_back(connection);
return connection;
return conn_ptr.release();
}
}

View File

@ -6,6 +6,8 @@
#include <atomic>
#include <Poco/Exception.h>
#include <Poco/Logger.h>
#include <mysqlxx/Connection.h>
@ -167,13 +169,13 @@ public:
unsigned max_connections_ = MYSQLXX_POOL_DEFAULT_MAX_CONNECTIONS,
unsigned enable_local_infile_ = MYSQLXX_DEFAULT_ENABLE_LOCAL_INFILE,
bool opt_reconnect_ = MYSQLXX_DEFAULT_MYSQL_OPT_RECONNECT)
: default_connections(default_connections_), max_connections(max_connections_),
db(db_), server(server_), user(user_), password(password_), port(port_), socket(socket_),
: logger(Poco::Logger::get("mysqlxx::Pool")), default_connections(default_connections_),
max_connections(max_connections_), db(db_), server(server_), user(user_), password(password_), port(port_), socket(socket_),
connect_timeout(connect_timeout_), rw_timeout(rw_timeout_), enable_local_infile(enable_local_infile_),
opt_reconnect(opt_reconnect_) {}
Pool(const Pool & other)
: default_connections{other.default_connections},
: logger(other.logger), default_connections{other.default_connections},
max_connections{other.max_connections},
db{other.db}, server{other.server},
user{other.user}, password{other.password},
@ -203,6 +205,8 @@ public:
void removeConnection(Connection * connection);
protected:
Poco::Logger & logger;
/// Number of MySQL connections which are created at launch.
unsigned default_connections;
/// Maximum possible number of connections

View File

@ -1,11 +1,16 @@
#if __has_include(<mysql.h>)
#include <errmsg.h>
#include <mysql.h>
#else
#include <mysql/errmsg.h>
#include <mysql/mysql.h>
#endif
#include <Poco/Logger.h>
#include <mysqlxx/Connection.h>
#include <mysqlxx/Query.h>
#include <mysqlxx/Types.h>
namespace mysqlxx
@ -57,8 +62,24 @@ void Query::reset()
void Query::executeImpl()
{
std::string query_string = query_buf.str();
if (mysql_real_query(conn->getDriver(), query_string.data(), query_string.size()))
throw BadQuery(errorMessage(conn->getDriver()), mysql_errno(conn->getDriver()));
MYSQL* mysql_driver = conn->getDriver();
auto & logger = Poco::Logger::get("mysqlxx::Query");
logger.trace("Running MySQL query using connection %lu", mysql_thread_id(mysql_driver));
if (mysql_real_query(mysql_driver, query_string.data(), query_string.size()))
{
const auto err_no = mysql_errno(mysql_driver);
switch (err_no)
{
case CR_SERVER_GONE_ERROR:
[[fallthrough]];
case CR_SERVER_LOST:
throw ConnectionLost(errorMessage(mysql_driver), err_no);
default:
throw BadQuery(errorMessage(mysql_driver), err_no);
}
}
}
UseQueryResult Query::use()

View File

@ -47,11 +47,13 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory)
# include <common/logger_useful.h>
# include <Formats/MySQLBlockInputStream.h>
# include "readInvalidateQuery.h"
# include <mysqlxx/Exception.h>
# include <mysqlxx/PoolFactory.h>
namespace DB
{
static const UInt64 max_block_size = 8192;
static const size_t default_num_tries_on_connection_loss = 3;
MySQLDictionarySource::MySQLDictionarySource(
@ -72,7 +74,10 @@ MySQLDictionarySource::MySQLDictionarySource(
, query_builder{dict_struct, db, "", table, where, IdentifierQuotingStyle::Backticks}
, load_all_query{query_builder.composeLoadAllQuery()}
, invalidate_query{config.getString(config_prefix + ".invalidate_query", "")}
, close_connection{config.getBool(config_prefix + ".close_connection", false) || config.getBool(config_prefix + ".share_connection", false)}
, close_connection(
config.getBool(config_prefix + ".close_connection", false) || config.getBool(config_prefix + ".share_connection", false))
, max_tries_for_mysql_block_input_stream(
config.getBool(config_prefix + ".fail_on_connection_loss", false) ? 1 : default_num_tries_on_connection_loss)
{
}
@ -94,6 +99,7 @@ MySQLDictionarySource::MySQLDictionarySource(const MySQLDictionarySource & other
, invalidate_query{other.invalidate_query}
, invalidate_query_response{other.invalidate_query_response}
, close_connection{other.close_connection}
, max_tries_for_mysql_block_input_stream{other.max_tries_for_mysql_block_input_stream}
{
}
@ -114,13 +120,41 @@ std::string MySQLDictionarySource::getUpdateFieldAndDate()
}
}
BlockInputStreamPtr MySQLDictionarySource::retriedCreateMySqlBIStream(const std::string & data_fetch_query_str, const size_t max_tries)
{
size_t count_connection_lost = 0;
while (true)
{
auto connection = pool.get();
try
{
return std::make_shared<MySQLBlockInputStream>(
connection, data_fetch_query_str, sample_block, max_block_size, close_connection);
}
catch (const mysqlxx::ConnectionLost & ecl) /// There are two retriable failures: CR_SERVER_GONE_ERROR, CR_SERVER_LOST
{
if (++count_connection_lost < max_tries)
{
LOG_WARNING(log, ecl.displayText());
LOG_WARNING(log, "Lost connection ({}/{}). Trying to reconnect...", count_connection_lost, max_tries);
continue;
}
LOG_ERROR(log, "Failed ({}/{}) to create BlockInputStream for MySQL dictionary source.", count_connection_lost, max_tries);
throw;
}
}
}
BlockInputStreamPtr MySQLDictionarySource::loadAll()
{
auto connection = pool.get();
last_modification = getLastModification(connection, false);
LOG_TRACE(log, load_all_query);
return std::make_shared<MySQLBlockInputStream>(connection, load_all_query, sample_block, max_block_size, close_connection);
return retriedCreateMySqlBIStream(load_all_query, max_tries_for_mysql_block_input_stream);
}
BlockInputStreamPtr MySQLDictionarySource::loadUpdatedAll()
@ -130,7 +164,7 @@ BlockInputStreamPtr MySQLDictionarySource::loadUpdatedAll()
std::string load_update_query = getUpdateFieldAndDate();
LOG_TRACE(log, load_update_query);
return std::make_shared<MySQLBlockInputStream>(connection, load_update_query, sample_block, max_block_size, close_connection);
return retriedCreateMySqlBIStream(load_update_query, max_tries_for_mysql_block_input_stream);
}
BlockInputStreamPtr MySQLDictionarySource::loadIds(const std::vector<UInt64> & ids)
@ -138,7 +172,7 @@ BlockInputStreamPtr MySQLDictionarySource::loadIds(const std::vector<UInt64> & i
/// We do not log in here and do not update the modification time, as the request can be large, and often called.
const auto query = query_builder.composeLoadIdsQuery(ids);
return std::make_shared<MySQLBlockInputStream>(pool.get(), query, sample_block, max_block_size, close_connection);
return retriedCreateMySqlBIStream(query, max_tries_for_mysql_block_input_stream);
}
BlockInputStreamPtr MySQLDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
@ -146,7 +180,7 @@ BlockInputStreamPtr MySQLDictionarySource::loadKeys(const Columns & key_columns,
/// We do not log in here and do not update the modification time, as the request can be large, and often called.
const auto query = query_builder.composeLoadKeysQuery(key_columns, requested_rows, ExternalQueryBuilder::AND_OR_CHAIN);
return std::make_shared<MySQLBlockInputStream>(pool.get(), query, sample_block, max_block_size, close_connection);
return retriedCreateMySqlBIStream(query, max_tries_for_mysql_block_input_stream);
}
bool MySQLDictionarySource::isModified() const

View File

@ -69,6 +69,9 @@ private:
// execute invalidate_query. expects single cell in result
std::string doInvalidateQuery(const std::string & request) const;
/// A helper method for recovering from "Lost connection to MySQL server during query" errors
BlockInputStreamPtr retriedCreateMySqlBIStream(const std::string & query_str, const size_t max_tries);
Poco::Logger * log;
std::chrono::time_point<std::chrono::system_clock> update_time;
@ -86,6 +89,7 @@ private:
std::string invalidate_query;
mutable std::string invalidate_query_response;
const bool close_connection;
const size_t max_tries_for_mysql_block_input_stream;
};
}