mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
libmysqlxx: added PoolWithFailover [#CONV-7043].
This commit is contained in:
parent
1f375250d6
commit
36798abd60
@ -148,7 +148,7 @@ public:
|
||||
Daemon::instance().sleep(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
|
||||
|
||||
app.logger().information("MYSQL: Reconnecting to " + pool->description);
|
||||
data->conn.connect(pool->config_name);
|
||||
data->conn.connect(pool->db.c_str(), pool->server.c_str(), pool->user.c_str(), pool->password.c_str(), pool->port);
|
||||
}
|
||||
while (!data->conn.ping());
|
||||
|
||||
@ -180,20 +180,48 @@ public:
|
||||
};
|
||||
|
||||
/**
|
||||
* @param ConfigName Имя параметра в конфигурационном файле.
|
||||
* @param DefConn Количество подключений по-умолчанию
|
||||
* @param MaxConn Максимальное количество подключений
|
||||
* @param AllowMultiQueries Не используется.
|
||||
* @param config_name Имя параметра в конфигурационном файле
|
||||
* @param default_connections_ Количество подключений по-умолчанию
|
||||
* @param max_connections_ Максимальное количество подключений
|
||||
* @param init_connect_ Запрос, выполняющийся сразу после соединения с БД. Пример: "SET NAMES cp1251"
|
||||
*/
|
||||
Pool(const std::string & config_name_,
|
||||
Pool(const std::string & config_name,
|
||||
unsigned default_connections_ = MYSQLXX_POOL_DEFAULT_START_CONNECTIONS,
|
||||
unsigned max_connections_ = MYSQLXX_POOL_DEFAULT_MAX_CONNECTIONS,
|
||||
const std::string & init_connect_ = "")
|
||||
: default_connections(default_connections_), max_connections(max_connections_), init_connect(init_connect_),
|
||||
initialized(false), config_name(config_name_), was_successful(false)
|
||||
initialized(false), was_successful(false)
|
||||
{
|
||||
Poco::Util::LayeredConfiguration & cfg = Poco::Util::Application::instance().config();
|
||||
|
||||
db = cfg.getString(config_name + ".db", "");
|
||||
server = cfg.getString(config_name + ".host");
|
||||
user = cfg.getString(config_name + ".user");
|
||||
password = cfg.getString(config_name + ".password");
|
||||
port = cfg.getInt (config_name + ".port");
|
||||
}
|
||||
|
||||
/**
|
||||
* @param db_ Имя БД
|
||||
* @param server_ Хост для подключения
|
||||
* @param user_ Имя пользователя
|
||||
* @param password_ Пароль
|
||||
* @param port_ Порт для подключения
|
||||
* @param default_connections_ Количество подключений по-умолчанию
|
||||
* @param max_connections_ Максимальное количество подключений
|
||||
* @param init_connect_ Запрос, выполняющийся сразу после соединения с БД. Пример: "SET NAMES cp1251"
|
||||
*/
|
||||
Pool(const std::string & db_,
|
||||
const std::string & server_,
|
||||
const std::string & user_ = "",
|
||||
const std::string & password_ = "",
|
||||
unsigned port_ = 0,
|
||||
unsigned default_connections_ = MYSQLXX_POOL_DEFAULT_START_CONNECTIONS,
|
||||
unsigned max_connections_ = MYSQLXX_POOL_DEFAULT_MAX_CONNECTIONS,
|
||||
const std::string & init_connect_ = "")
|
||||
: default_connections(default_connections_), max_connections(max_connections_), init_connect(init_connect_),
|
||||
initialized(false), db(db_), server(server_), user(user_), password(password_), port(port_), was_successful(false) {}
|
||||
|
||||
~Pool()
|
||||
{
|
||||
Poco::ScopedLock<Poco::FastMutex> locker(lock);
|
||||
@ -285,11 +313,16 @@ private:
|
||||
Connections connections;
|
||||
/** Замок для доступа к списку соединений. */
|
||||
Poco::FastMutex lock;
|
||||
/** Имя раздела в конфигурационном файле. */
|
||||
std::string config_name;
|
||||
/** Описание соединения. */
|
||||
std::string description;
|
||||
|
||||
/** Параметры подключения. **/
|
||||
std::string db;
|
||||
std::string server;
|
||||
std::string user;
|
||||
std::string password;
|
||||
unsigned port;
|
||||
|
||||
/** Хотя бы один раз было успешное соединение. */
|
||||
bool was_successful;
|
||||
|
||||
@ -298,13 +331,7 @@ private:
|
||||
{
|
||||
if (!initialized)
|
||||
{
|
||||
Poco::Util::Application & app = Poco::Util::Application::instance();
|
||||
Poco::Util::LayeredConfiguration & cfg = app.config();
|
||||
|
||||
description = cfg.getString(config_name + ".db", "")
|
||||
+ "@" + cfg.getString(config_name + ".host")
|
||||
+ ":" + cfg.getString(config_name + ".port")
|
||||
+ " as user " + cfg.getString(config_name + ".user");
|
||||
description = db + "@" + server + ":" + Poco::NumberFormatter::format(port) + " as user " + user;
|
||||
|
||||
for (unsigned i = 0; i < default_connections; i++)
|
||||
allocConnection();
|
||||
@ -323,7 +350,7 @@ private:
|
||||
try
|
||||
{
|
||||
app.logger().information("MYSQL: Connecting to " + description);
|
||||
conn->conn.connect(config_name);
|
||||
conn->conn.connect(db.c_str(), server.c_str(), user.c_str(), password.c_str(), port);
|
||||
}
|
||||
catch (mysqlxx::ConnectionFailed & e)
|
||||
{
|
||||
|
154
libs/libmysqlxx/include/mysqlxx/PoolWithFailover.h
Normal file
154
libs/libmysqlxx/include/mysqlxx/PoolWithFailover.h
Normal file
@ -0,0 +1,154 @@
|
||||
#pragma once
|
||||
|
||||
#include "Pool.h"
|
||||
|
||||
|
||||
namespace mysqlxx
|
||||
{
|
||||
|
||||
/** Пул соединений с MySQL.
|
||||
* Знает о наборе реплик с приоритетами.
|
||||
* Пробует соединяться с репликами в порядке приоритета. При равном приоритете предпочитается реплика, к которой дольше всего не было попытки подключения.
|
||||
*
|
||||
* Использование аналогично mysqlxx::Pool. В конфиге задание сервера может выглядеть так же, как для Pool:
|
||||
* <mysql_metrica>
|
||||
* <host>mtstat01c*</host>
|
||||
* <port>3306</port>
|
||||
* <user>metrica</user>
|
||||
* <password></password>
|
||||
* <db>Metrica</db>
|
||||
* </mysql_metrica>
|
||||
*
|
||||
* или так:
|
||||
*
|
||||
* <mysql_metrica>
|
||||
* <replica>
|
||||
* <host>mtstat01c</host>
|
||||
* <port>3306</port>
|
||||
* <user>metrica</user>
|
||||
* <password></password>
|
||||
* <db>Metrica</db>
|
||||
* <priority>0</priority>
|
||||
* </replica>
|
||||
* <replica>
|
||||
* <host>mtstat01d</host>
|
||||
* <port>3306</port>
|
||||
* <user>metrica</user>
|
||||
* <password></password>
|
||||
* <db>Metrica</db>
|
||||
* <priority>1</priority>
|
||||
* </replica>
|
||||
* </mysql_metrica>
|
||||
*/
|
||||
class PoolWithFailover
|
||||
{
|
||||
private:
|
||||
typedef Poco::SharedPtr<Pool> PoolPtr;
|
||||
|
||||
struct Replica
|
||||
{
|
||||
PoolPtr pool;
|
||||
int priority;
|
||||
int error_count;
|
||||
|
||||
Replica() : priority(0), error_count(0) {}
|
||||
Replica(PoolPtr pool_, int priority_)
|
||||
: pool(pool_), priority(priority_), error_count(0) {}
|
||||
};
|
||||
|
||||
typedef std::vector<Replica> Replicas;
|
||||
/// [приоритет][номер] -> реплика.
|
||||
typedef std::map<int, Replicas> ReplicasByPriority;
|
||||
|
||||
ReplicasByPriority replicas_by_priority;
|
||||
|
||||
/// Максимально возможное количество соедиений с каждой репликой.
|
||||
unsigned max_connections;
|
||||
/// Mutex для доступа к списку реплик.
|
||||
Poco::FastMutex mutex;
|
||||
|
||||
public:
|
||||
typedef Pool::Entry Entry;
|
||||
|
||||
/**
|
||||
* @param config_name Имя параметра в конфигурационном файле.
|
||||
* @param max_connections_ Максимальное количество подключений к какждой реплике
|
||||
*/
|
||||
PoolWithFailover(const std::string & config_name,
|
||||
unsigned max_connections_ = MYSQLXX_POOL_DEFAULT_MAX_CONNECTIONS)
|
||||
: max_connections(max_connections_)
|
||||
{
|
||||
Poco::Util::Application & app = Poco::Util::Application::instance();
|
||||
Poco::Util::AbstractConfiguration & cfg = app.config();
|
||||
|
||||
if (cfg.has(config_name + ".replica"))
|
||||
{
|
||||
Poco::Util::AbstractConfiguration::Keys replica_keys;
|
||||
cfg.keys(config_name, replica_keys);
|
||||
std::map<int, Replicas> replicas_by_priority;
|
||||
for (Poco::Util::AbstractConfiguration::Keys::const_iterator it = replica_keys.begin(); it != replica_keys.end(); ++it)
|
||||
{
|
||||
if (it->size() < std::string("replica").size() || it->substr(0, std::string("replica").size()) != "replica")
|
||||
throw Poco::Exception("Unknown element in config: " + *it + ", expected replica");
|
||||
std::string replica_name = config_name + "." + *it;
|
||||
Replica replica(new Pool(replica_name, 0, max_connections), cfg.getInt(replica_name + ".priority", 0));
|
||||
replicas_by_priority[replica.priority].push_back(replica);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
replicas_by_priority[0].push_back(Replica(new Pool(config_name, 0, max_connections), 0));
|
||||
}
|
||||
}
|
||||
|
||||
/** Выделяет соединение для работы. */
|
||||
Entry Get()
|
||||
{
|
||||
Poco::ScopedLock<Poco::FastMutex> locker(mutex);
|
||||
Poco::Util::Application & app = Poco::Util::Application::instance();
|
||||
|
||||
/// Если к какой-то реплике не подключились, потому что исчерпан лимит соединений, можно подождать и подключиться к ней.
|
||||
Replica * full_pool;
|
||||
|
||||
for (ReplicasByPriority::reverse_iterator it = replicas_by_priority.rbegin(); it != replicas_by_priority.rend(); ++it)
|
||||
{
|
||||
Replicas & replicas = it->second;
|
||||
for (size_t i = 0; i < replicas.size(); ++i)
|
||||
{
|
||||
Replica & replica = replicas[i];
|
||||
|
||||
try
|
||||
{
|
||||
Entry entry = replica.pool->tryGet();
|
||||
|
||||
if (!entry.isNull())
|
||||
{
|
||||
/// Переместим все пройденные реплики в конец очереди.
|
||||
/// Пройденные реплики с другим приоритетом перемещать незачем.
|
||||
std::rotate(replicas.begin(), replicas.begin() + i + 1, replicas.end());
|
||||
|
||||
return entry;
|
||||
}
|
||||
}
|
||||
catch (Poco::Exception & e)
|
||||
{
|
||||
if (e.displayText() == "mysqlxx::Pool is full")
|
||||
{
|
||||
full_pool = &replica;
|
||||
}
|
||||
|
||||
app.logger().error("Connection to " + replica.pool->getDescription() + " failed: " + e.displayText());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (full_pool)
|
||||
{
|
||||
app.logger().error("All connections failed, trying to wait on a full pool " + full_pool->pool->getDescription());
|
||||
return full_pool->pool->Get();
|
||||
}
|
||||
|
||||
throw Poco::Exception("Connections to all replicas failed");
|
||||
}
|
||||
};
|
||||
}
|
Loading…
Reference in New Issue
Block a user