diff --git a/libs/libmysqlxx/include/mysqlxx/Pool.h b/libs/libmysqlxx/include/mysqlxx/Pool.h index 016b4d5b70e..9ce923c5ee9 100644 --- a/libs/libmysqlxx/include/mysqlxx/Pool.h +++ b/libs/libmysqlxx/include/mysqlxx/Pool.h @@ -148,7 +148,7 @@ public: Daemon::instance().sleep(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL); app.logger().information("MYSQL: Reconnecting to " + pool->description); - data->conn.connect(pool->config_name); + data->conn.connect(pool->db.c_str(), pool->server.c_str(), pool->user.c_str(), pool->password.c_str(), pool->port); } while (!data->conn.ping()); @@ -180,19 +180,47 @@ public: }; /** - * @param ConfigName Имя параметра в конфигурационном файле. - * @param DefConn Количество подключений по-умолчанию - * @param MaxConn Максимальное количество подключений - * @param AllowMultiQueries Не используется. + * @param config_name Имя параметра в конфигурационном файле + * @param default_connections_ Количество подключений по-умолчанию + * @param max_connections_ Максимальное количество подключений + * @param init_connect_ Запрос, выполняющийся сразу после соединения с БД. Пример: "SET NAMES cp1251" */ - Pool(const std::string & config_name_, + Pool(const std::string & config_name, unsigned default_connections_ = MYSQLXX_POOL_DEFAULT_START_CONNECTIONS, unsigned max_connections_ = MYSQLXX_POOL_DEFAULT_MAX_CONNECTIONS, - const std::string & init_connect_ = "") + const std::string & init_connect_ = "") : default_connections(default_connections_), max_connections(max_connections_), init_connect(init_connect_), - initialized(false), config_name(config_name_), was_successful(false) + initialized(false), was_successful(false) { + Poco::Util::LayeredConfiguration & cfg = Poco::Util::Application::instance().config(); + + db = cfg.getString(config_name + ".db", ""); + server = cfg.getString(config_name + ".host"); + user = cfg.getString(config_name + ".user"); + password = cfg.getString(config_name + ".password"); + port = cfg.getInt (config_name + ".port"); } + + /** + * @param db_ Имя БД + * @param server_ Хост для подключения + * @param user_ Имя пользователя + * @param password_ Пароль + * @param port_ Порт для подключения + * @param default_connections_ Количество подключений по-умолчанию + * @param max_connections_ Максимальное количество подключений + * @param init_connect_ Запрос, выполняющийся сразу после соединения с БД. Пример: "SET NAMES cp1251" + */ + Pool(const std::string & db_, + const std::string & server_, + const std::string & user_ = "", + const std::string & password_ = "", + unsigned port_ = 0, + unsigned default_connections_ = MYSQLXX_POOL_DEFAULT_START_CONNECTIONS, + unsigned max_connections_ = MYSQLXX_POOL_DEFAULT_MAX_CONNECTIONS, + const std::string & init_connect_ = "") + : default_connections(default_connections_), max_connections(max_connections_), init_connect(init_connect_), + initialized(false), db(db_), server(server_), user(user_), password(password_), port(port_), was_successful(false) {} ~Pool() { @@ -285,10 +313,15 @@ private: Connections connections; /** Замок для доступа к списку соединений. */ Poco::FastMutex lock; - /** Имя раздела в конфигурационном файле. */ - std::string config_name; /** Описание соединения. */ std::string description; + + /** Параметры подключения. **/ + std::string db; + std::string server; + std::string user; + std::string password; + unsigned port; /** Хотя бы один раз было успешное соединение. */ bool was_successful; @@ -298,13 +331,7 @@ private: { if (!initialized) { - Poco::Util::Application & app = Poco::Util::Application::instance(); - Poco::Util::LayeredConfiguration & cfg = app.config(); - - description = cfg.getString(config_name + ".db", "") - + "@" + cfg.getString(config_name + ".host") - + ":" + cfg.getString(config_name + ".port") - + " as user " + cfg.getString(config_name + ".user"); + description = db + "@" + server + ":" + Poco::NumberFormatter::format(port) + " as user " + user; for (unsigned i = 0; i < default_connections; i++) allocConnection(); @@ -323,7 +350,7 @@ private: try { app.logger().information("MYSQL: Connecting to " + description); - conn->conn.connect(config_name); + conn->conn.connect(db.c_str(), server.c_str(), user.c_str(), password.c_str(), port); } catch (mysqlxx::ConnectionFailed & e) { diff --git a/libs/libmysqlxx/include/mysqlxx/PoolWithFailover.h b/libs/libmysqlxx/include/mysqlxx/PoolWithFailover.h new file mode 100644 index 00000000000..586b332c300 --- /dev/null +++ b/libs/libmysqlxx/include/mysqlxx/PoolWithFailover.h @@ -0,0 +1,154 @@ +#pragma once + +#include "Pool.h" + + +namespace mysqlxx +{ + + /** Пул соединений с MySQL. + * Знает о наборе реплик с приоритетами. + * Пробует соединяться с репликами в порядке приоритета. При равном приоритете предпочитается реплика, к которой дольше всего не было попытки подключения. + * + * Использование аналогично mysqlxx::Pool. В конфиге задание сервера может выглядеть так же, как для Pool: + * + * mtstat01c* + * 3306 + * metrica + * + * Metrica + * + * + * или так: + * + * + * + * mtstat01c + * 3306 + * metrica + * + * Metrica + * 0 + * + * + * mtstat01d + * 3306 + * metrica + * + * Metrica + * 1 + * + * + */ + class PoolWithFailover + { + private: + typedef Poco::SharedPtr PoolPtr; + + struct Replica + { + PoolPtr pool; + int priority; + int error_count; + + Replica() : priority(0), error_count(0) {} + Replica(PoolPtr pool_, int priority_) + : pool(pool_), priority(priority_), error_count(0) {} + }; + + typedef std::vector Replicas; + /// [приоритет][номер] -> реплика. + typedef std::map ReplicasByPriority; + + ReplicasByPriority replicas_by_priority; + + /// Максимально возможное количество соедиений с каждой репликой. + unsigned max_connections; + /// Mutex для доступа к списку реплик. + Poco::FastMutex mutex; + + public: + typedef Pool::Entry Entry; + + /** + * @param config_name Имя параметра в конфигурационном файле. + * @param max_connections_ Максимальное количество подключений к какждой реплике + */ + PoolWithFailover(const std::string & config_name, + unsigned max_connections_ = MYSQLXX_POOL_DEFAULT_MAX_CONNECTIONS) + : max_connections(max_connections_) + { + Poco::Util::Application & app = Poco::Util::Application::instance(); + Poco::Util::AbstractConfiguration & cfg = app.config(); + + if (cfg.has(config_name + ".replica")) + { + Poco::Util::AbstractConfiguration::Keys replica_keys; + cfg.keys(config_name, replica_keys); + std::map replicas_by_priority; + for (Poco::Util::AbstractConfiguration::Keys::const_iterator it = replica_keys.begin(); it != replica_keys.end(); ++it) + { + if (it->size() < std::string("replica").size() || it->substr(0, std::string("replica").size()) != "replica") + throw Poco::Exception("Unknown element in config: " + *it + ", expected replica"); + std::string replica_name = config_name + "." + *it; + Replica replica(new Pool(replica_name, 0, max_connections), cfg.getInt(replica_name + ".priority", 0)); + replicas_by_priority[replica.priority].push_back(replica); + } + } + else + { + replicas_by_priority[0].push_back(Replica(new Pool(config_name, 0, max_connections), 0)); + } + } + + /** Выделяет соединение для работы. */ + Entry Get() + { + Poco::ScopedLock locker(mutex); + Poco::Util::Application & app = Poco::Util::Application::instance(); + + /// Если к какой-то реплике не подключились, потому что исчерпан лимит соединений, можно подождать и подключиться к ней. + Replica * full_pool; + + for (ReplicasByPriority::reverse_iterator it = replicas_by_priority.rbegin(); it != replicas_by_priority.rend(); ++it) + { + Replicas & replicas = it->second; + for (size_t i = 0; i < replicas.size(); ++i) + { + Replica & replica = replicas[i]; + + try + { + Entry entry = replica.pool->tryGet(); + + if (!entry.isNull()) + { + /// Переместим все пройденные реплики в конец очереди. + /// Пройденные реплики с другим приоритетом перемещать незачем. + std::rotate(replicas.begin(), replicas.begin() + i + 1, replicas.end()); + + return entry; + } + } + catch (Poco::Exception & e) + { + if (e.displayText() == "mysqlxx::Pool is full") + { + full_pool = &replica; + } + + app.logger().error("Connection to " + replica.pool->getDescription() + " failed: " + e.displayText()); + } + } + } + + if (full_pool) + { + app.logger().error("All connections failed, trying to wait on a full pool " + full_pool->pool->getDescription()); + return full_pool->pool->Get(); + } + + throw Poco::Exception("Connections to all replicas failed"); + } + }; +}