ClickHouse/libs/libmysqlxx/include/mysqlxx/Pool.h

428 lines
11 KiB
C
Raw Normal View History

2014-04-08 06:51:53 +00:00
#pragma once
2011-03-04 20:58:19 +00:00
#include <list>
#include <mysql/mysqld_error.h>
#include <Poco/Util/Application.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <Poco/NumberFormatter.h>
#include <Poco/Mutex.h>
#include <Poco/Exception.h>
#include <Poco/SharedPtr.h>
#include <Yandex/logger_useful.h>
2013-04-28 02:56:13 +00:00
#include <statdaemons/daemon.h>
2011-03-04 20:58:19 +00:00
#include <mysqlxx/Connection.h>
2011-03-17 20:00:04 +00:00
#define MYSQLXX_POOL_DEFAULT_START_CONNECTIONS 1
#define MYSQLXX_POOL_DEFAULT_MAX_CONNECTIONS 16
#define MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL 10
2011-03-04 20:58:19 +00:00
namespace mysqlxx
{
/** Пул соединений с MySQL.
* Этот класс имеет мало отношения к mysqlxx и сделан не в стиле библиотеки. (взят из старого кода)
2011-03-18 20:26:54 +00:00
* Использование:
* mysqlxx::Pool pool("mysql_params");
*
* void thread()
* {
* mysqlxx::Pool::Entry connection = pool.Get();
* std::string s = connection->query("SELECT 'Hello, world!' AS world").use().fetch()["world"].getString();
* }
*
* TODO: Упростить, используя PoolBase.
2011-03-18 20:26:54 +00:00
*/
2011-03-04 20:58:19 +00:00
class Pool
{
protected:
/** Информация о соединении. */
2011-03-04 20:58:19 +00:00
struct Connection
{
Connection() : ref_count(0) {}
2011-03-04 20:58:19 +00:00
mysqlxx::Connection conn;
int ref_count;
2011-03-04 20:58:19 +00:00
};
public:
/** Соединение с базой данных. */
2011-03-04 20:58:19 +00:00
class Entry
{
public:
2014-04-08 07:31:51 +00:00
Entry() {}
2011-03-04 20:58:19 +00:00
Entry(const Entry & src)
: data(src.data), pool(src.pool)
2011-03-04 20:58:19 +00:00
{
incrementRefCount();
2011-03-04 20:58:19 +00:00
}
~Entry()
2011-03-04 20:58:19 +00:00
{
decrementRefCount();
2011-03-04 20:58:19 +00:00
}
Entry & operator= (const Entry & src)
{
pool = src.pool;
if (data)
decrementRefCount();
data = src.data;
if (data)
incrementRefCount();
2011-03-04 20:58:19 +00:00
return * this;
}
bool isNull() const
{
2014-04-08 07:31:51 +00:00
return data == nullptr;
}
2011-03-04 20:58:19 +00:00
operator mysqlxx::Connection & ()
{
2014-04-08 07:31:51 +00:00
if (data == nullptr)
2011-03-04 20:58:19 +00:00
throw Poco::RuntimeException("Tried to access NULL database connection.");
forceConnected();
return data->conn;
2011-03-04 20:58:19 +00:00
}
operator const mysqlxx::Connection & () const
{
2014-04-08 07:31:51 +00:00
if (data == nullptr)
2011-03-04 20:58:19 +00:00
throw Poco::RuntimeException("Tried to access NULL database connection.");
forceConnected();
return data->conn;
2011-03-04 20:58:19 +00:00
}
const mysqlxx::Connection * operator->() const
{
2014-04-08 07:31:51 +00:00
if (data == nullptr)
2011-03-04 20:58:19 +00:00
throw Poco::RuntimeException("Tried to access NULL database connection.");
forceConnected();
return &data->conn;
2011-03-04 20:58:19 +00:00
}
mysqlxx::Connection * operator->()
{
2014-04-08 07:31:51 +00:00
if (data == nullptr)
2011-03-04 20:58:19 +00:00
throw Poco::RuntimeException("Tried to access NULL database connection.");
forceConnected();
return &data->conn;
2011-03-04 20:58:19 +00:00
}
Entry(Pool::Connection * conn, Pool * p)
: data(conn), pool(p)
2011-03-04 20:58:19 +00:00
{
incrementRefCount();
2011-03-04 20:58:19 +00:00
}
std::string getDescription() const
{
if (pool)
return pool->getDescription();
else
return "pool is null";
}
2011-03-04 20:58:19 +00:00
friend class Pool;
private:
/** Указатель на соединение. */
2014-04-08 07:31:51 +00:00
Connection * data = nullptr;
/** Указатель на пул, которому мы принадлежим. */
2014-04-08 07:31:51 +00:00
Pool * pool = nullptr;
2011-03-04 20:58:19 +00:00
/** Переподключается к базе данных в случае необходимости. Если не удалось - подождать и попробовать снова. */
void forceConnected() const
2011-03-04 20:58:19 +00:00
{
Poco::Util::Application & app = Poco::Util::Application::instance();
if (data->conn.ping())
2011-03-04 20:58:19 +00:00
return;
bool first = true;
do
{
if (first)
first = false;
else
2011-10-06 18:23:00 +00:00
Daemon::instance().sleep(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
2011-03-04 20:58:19 +00:00
app.logger().information("MYSQL: Reconnecting to " + pool->description);
data->conn.connect(
pool->db.c_str(),
pool->server.c_str(),
pool->user.c_str(),
pool->password.c_str(),
pool->port,
pool->connect_timeout,
pool->rw_timeout);
2011-03-04 20:58:19 +00:00
}
while (!data->conn.ping());
2011-03-04 20:58:19 +00:00
}
/** Переподключается к базе данных в случае необходимости. Если не удалось - вернуть false. */
bool tryForceConnected() const
{
return data->conn.ping();
}
void incrementRefCount()
{
if (!data)
return;
++data->ref_count;
my_thread_init();
}
void decrementRefCount()
{
if (!data)
return;
--data->ref_count;
my_thread_end();
}
2011-03-04 20:58:19 +00:00
};
2011-03-04 20:58:19 +00:00
/**
* @param config_name Имя параметра в конфигурационном файле
* @param default_connections_ Количество подключений по-умолчанию
* @param max_connections_ Максимальное количество подключений
2011-03-04 20:58:19 +00:00
*/
Pool(const std::string & config_name,
unsigned default_connections_ = MYSQLXX_POOL_DEFAULT_START_CONNECTIONS,
unsigned max_connections_ = MYSQLXX_POOL_DEFAULT_MAX_CONNECTIONS,
2014-04-08 07:31:51 +00:00
const char * parent_config_name_ = nullptr)
: default_connections(default_connections_), max_connections(max_connections_),
initialized(false), was_successful(false)
2011-03-04 20:58:19 +00:00
{
Poco::Util::LayeredConfiguration & cfg = Poco::Util::Application::instance().config();
server = cfg.getString(config_name + ".host");
if (parent_config_name_)
{
const std::string parent_config_name(parent_config_name_);
db = cfg.getString(config_name + ".db", cfg.getString(parent_config_name + ".db", ""));
user = cfg.has(config_name + ".user") ?
cfg.getString(config_name + ".user") : cfg.getString(parent_config_name + ".user");
password = cfg.has(config_name + ".password") ?
cfg.getString(config_name + ".password") : cfg.getString(parent_config_name + ".password");
port = cfg.has(config_name + ".port") ? cfg.getInt(config_name + ".port") :
cfg.getInt(parent_config_name + ".port");
}
else
{
db = cfg.getString(config_name + ".db", "");
user = cfg.getString(config_name + ".user");
password = cfg.getString(config_name + ".password");
port = cfg.getInt(config_name + ".port");
}
connect_timeout = cfg.getInt(config_name + ".connect_timeout",
cfg.getInt("mysql_connect_timeout",
MYSQLXX_DEFAULT_TIMEOUT));
rw_timeout =
cfg.getInt(config_name + ".rw_timeout",
cfg.getInt("mysql_rw_timeout",
MYSQLXX_DEFAULT_RW_TIMEOUT));
2011-03-04 20:58:19 +00:00
}
/**
* @param db_ Имя БД
* @param server_ Хост для подключения
* @param user_ Имя пользователя
* @param password_ Пароль
* @param port_ Порт для подключения
* @param default_connections_ Количество подключений по-умолчанию
* @param max_connections_ Максимальное количество подключений
*/
Pool(const std::string & db_,
const std::string & server_,
const std::string & user_ = "",
const std::string & password_ = "",
unsigned port_ = 0,
unsigned connect_timeout_ = MYSQLXX_DEFAULT_TIMEOUT,
unsigned rw_timeout_ = MYSQLXX_DEFAULT_RW_TIMEOUT,
unsigned default_connections_ = MYSQLXX_POOL_DEFAULT_START_CONNECTIONS,
unsigned max_connections_ = MYSQLXX_POOL_DEFAULT_MAX_CONNECTIONS)
: default_connections(default_connections_), max_connections(max_connections_),
initialized(false), db(db_), server(server_), user(user_), password(password_), port(port_),
connect_timeout(connect_timeout_), rw_timeout(rw_timeout_), was_successful(false) {}
2011-03-04 20:58:19 +00:00
~Pool()
{
Poco::ScopedLock<Poco::FastMutex> locker(lock);
2011-03-04 20:58:19 +00:00
for (Connections::iterator it = connections.begin(); it != connections.end(); it++)
2011-03-04 20:58:19 +00:00
delete static_cast<Connection *>(*it);
}
/** Выделяет соединение для работы. */
2011-03-04 20:58:19 +00:00
Entry Get()
{
Poco::ScopedLock<Poco::FastMutex> locker(lock);
2011-03-04 20:58:19 +00:00
initialize();
2011-03-04 20:58:19 +00:00
for (;;)
{
for (Connections::iterator it = connections.begin(); it != connections.end(); it++)
2011-03-04 20:58:19 +00:00
{
if ((*it)->ref_count == 0)
2011-03-04 20:58:19 +00:00
return Entry(*it, this);
}
if (connections.size() < (size_t)max_connections)
2011-03-04 20:58:19 +00:00
{
Connection * conn = allocConnection();
if (conn)
return Entry(conn, this);
2011-03-04 20:58:19 +00:00
}
lock.unlock();
2011-10-06 18:23:00 +00:00
Daemon::instance().sleep(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
lock.lock();
2011-03-04 20:58:19 +00:00
}
}
/** Выделяет соединение для работы.
* Если база недоступна - возвращает пустой объект Entry.
* Если пул переполнен - кидает исключение.
*/
Entry tryGet()
2011-03-04 20:58:19 +00:00
{
Poco::ScopedLock<Poco::FastMutex> locker(lock);
2011-03-04 20:58:19 +00:00
initialize();
/// Поиск уже установленного, но не использующегося сейчас соединения.
for (Connections::iterator it = connections.begin(); it != connections.end(); ++it)
{
if ((*it)->ref_count == 0)
{
Entry res(*it, this);
return res.tryForceConnected() ? res : Entry();
}
}
/// Если пул переполнен.
if (connections.size() >= max_connections)
throw Poco::Exception("mysqlxx::Pool is full");
/// Выделение нового соединения.
Connection * conn = allocConnection(true);
if (conn)
return Entry(conn, this);
return Entry();
}
/// Получить описание БД
std::string getDescription() const
{
return description;
2011-03-04 20:58:19 +00:00
}
protected:
/** Количество соединений с MySQL, создаваемых при запуске. */
unsigned default_connections;
/** Максимально возможное количество соедиений. */
unsigned max_connections;
2011-03-04 20:58:19 +00:00
private:
/** Признак того, что мы инициализированы. */
bool initialized;
/** Список соединений. */
typedef std::list<Connection *> Connections;
/** Список соединений. */
Connections connections;
/** Замок для доступа к списку соединений. */
Poco::FastMutex lock;
/** Описание соединения. */
std::string description;
/** Параметры подключения. **/
std::string db;
std::string server;
std::string user;
std::string password;
unsigned port;
unsigned connect_timeout;
unsigned rw_timeout;
2011-03-04 20:58:19 +00:00
/** Хотя бы один раз было успешное соединение. */
bool was_successful;
/** Выполняет инициализацию класса, если мы еще не инициализированы. */
inline void initialize()
2011-03-04 20:58:19 +00:00
{
if (!initialized)
2011-03-04 20:58:19 +00:00
{
description = db + "@" + server + ":" + Poco::NumberFormatter::format(port) + " as user " + user;
2011-03-04 20:58:19 +00:00
for (unsigned i = 0; i < default_connections; i++)
allocConnection();
2011-03-04 20:58:19 +00:00
initialized = true;
2011-03-04 20:58:19 +00:00
}
}
/** Создает новое соединение. */
Connection * allocConnection(bool dont_throw_if_failed_first_time = false)
2011-03-04 20:58:19 +00:00
{
Poco::Util::Application & app = Poco::Util::Application::instance();
Connection * conn;
2011-03-04 20:58:19 +00:00
conn = new Connection();
2011-03-04 20:58:19 +00:00
try
{
app.logger().information("MYSQL: Connecting to " + description);
conn->conn.connect(
db.c_str(),
server.c_str(),
user.c_str(),
password.c_str(),
port,
connect_timeout,
rw_timeout);
2011-03-04 20:58:19 +00:00
}
catch (mysqlxx::ConnectionFailed & e)
{
2011-10-02 03:37:24 +00:00
if ((!was_successful && !dont_throw_if_failed_first_time)
|| e.errnum() == ER_ACCESS_DENIED_ERROR
2011-03-04 20:58:19 +00:00
|| e.errnum() == ER_DBACCESS_DENIED_ERROR
|| e.errnum() == ER_BAD_DB_ERROR)
{
app.logger().error(e.what());
throw;
}
else
{
app.logger().error(e.what());
delete conn;
2011-03-04 20:58:19 +00:00
if (Daemon::instance().isCancelled())
throw Poco::Exception("Daemon is cancelled while trying to connect to MySQL server.");
2014-04-08 07:58:53 +00:00
return nullptr;
2011-03-04 20:58:19 +00:00
}
}
was_successful = true;
connections.push_back(conn);
return conn;
2011-03-04 20:58:19 +00:00
}
};
}