libmysqlxx: changed settings to mysql [#CONV-8598]

This commit is contained in:
Pavel Kartavyy 2013-09-04 15:29:02 +00:00
parent 638799eee3
commit afa70827c7
2 changed files with 80 additions and 33 deletions

View File

@ -189,17 +189,29 @@ public:
*/
Pool(const std::string & config_name,
unsigned default_connections_ = MYSQLXX_POOL_DEFAULT_START_CONNECTIONS,
unsigned max_connections_ = MYSQLXX_POOL_DEFAULT_MAX_CONNECTIONS)
unsigned max_connections_ = MYSQLXX_POOL_DEFAULT_MAX_CONNECTIONS,
const std::string & user_default = "", const std::string & password_default = "",
const std::string & db_default = "", int port_default = 0)
: default_connections(default_connections_), max_connections(max_connections_),
initialized(false), was_successful(false)
{
Poco::Util::LayeredConfiguration & cfg = Poco::Util::Application::instance().config();
db = cfg.getString(config_name + ".db", "");
server = cfg.getString(config_name + ".host");
user = cfg.getString(config_name + ".user");
password = cfg.getString(config_name + ".password");
port = cfg.getInt(config_name + ".port");
db = cfg.getString(config_name + ".db", db_default);
if (!user_default.empty())
user = cfg.getString(config_name + ".user", user_default);
else
user = cfg.getString(config_name + ".user");
if (!password_default.empty())
password = cfg.getString(config_name + ".password", password_default);
else
password = cfg.getString(config_name + ".password");
if (port_default)
port = cfg.getInt(config_name + ".port", port_default);
else
port = cfg.getInt(config_name + ".port");
connect_timeout = cfg.getInt(config_name + ".connect_timeout",
cfg.getInt("mysql_connect_timeout",

View File

@ -10,11 +10,10 @@
namespace mysqlxx
{
/** Пул соединений с MySQL.
* Знает о наборе реплик с приоритетами.
* Пробует соединяться с репликами в порядке приоритета. При равном приоритете предпочитается реплика, к которой дольше всего не было попытки подключения.
*
*
* Использование аналогично mysqlxx::Pool. В конфиге задание сервера может выглядеть так же, как для Pool:
* <mysql_metrica>
* <host>mtstat01c*</host>
@ -23,9 +22,9 @@ namespace mysqlxx
* <password></password>
* <db>Metrica</db>
* </mysql_metrica>
*
*
* или так:
*
*
* <mysql_metrica>
* <replica>
* <host>mtstat01c</host>
@ -44,37 +43,54 @@ namespace mysqlxx
* <priority>1</priority>
* </replica>
* </mysql_metrica>
*
* или так:
*
* <mysql_metrica>
* <port>3306</port>
* <user>metrica</user>
* <password></password>
* <db>Metrica</db>
* <replica>
* <host>mtstat01c</host>
* <priority>0</priority>
* </replica>
* <replica>
* <host>mtstat01d</host>
* <priority>1</priority>
* </replica>
* </mysql_metrica>
*/
class PoolWithFailover
{
private:
typedef Poco::SharedPtr<Pool> PoolPtr;
struct Replica
{
PoolPtr pool;
int priority;
int error_count;
Replica() : priority(0), error_count(0) {}
Replica(PoolPtr pool_, int priority_)
: pool(pool_), priority(priority_), error_count(0) {}
};
typedef std::vector<Replica> Replicas;
/// [приоритет][номер] -> реплика.
typedef std::map<int, Replicas> ReplicasByPriority;
ReplicasByPriority replicas_by_priority;
/// Количество попыток подключения.
size_t max_tries;
/// Mutex для доступа к списку реплик.
Poco::FastMutex mutex;
public:
typedef Pool::Entry Entry;
/**
* @param config_name Имя параметра в конфигурационном файле.
* @param default_connections Количество подключений по умолчанию к какждой реплике.
@ -89,18 +105,37 @@ namespace mysqlxx
{
Poco::Util::Application & app = Poco::Util::Application::instance();
Poco::Util::AbstractConfiguration & cfg = app.config();
if (cfg.has(config_name + ".replica"))
{
int port_g = 0;
std::string user_g;
std::string password_g;
std::string db_g;
if (cfg.has(config_name + ".user"))
user_g = cfg.getString(config_name + ".user");
if (cfg.has(config_name + ".password"))
password_g = cfg.getString(config_name + ".password");
if (cfg.has(config_name + ".db"))
db_g= cfg.getString(config_name + ".db");
if (cfg.has(config_name + ".port"))
port_g = cfg.getInt(config_name + ".port");
Poco::Util::AbstractConfiguration::Keys replica_keys;
cfg.keys(config_name, replica_keys);
for (Poco::Util::AbstractConfiguration::Keys::const_iterator it = replica_keys.begin(); it != replica_keys.end(); ++it)
{
if (it->size() < std::string("replica").size() || it->substr(0, std::string("replica").size()) != "replica")
throw Poco::Exception("Unknown element in config: " + *it + ", expected replica");
std::string replica_name = config_name + "." + *it;
Replica replica(new Pool(replica_name, default_connections, max_connections), cfg.getInt(replica_name + ".priority", 0));
replicas_by_priority[replica.priority].push_back(replica);
if (!(*it == "port" || *it == "ser" || *it == "password" || *it == "db"))
{
if (it->size() < std::string("replica").size() || it->substr(0, std::string("replica").size()) != "replica")
throw Poco::Exception("Unknown element in config: " + *it + ", expected replica");
std::string replica_name = config_name + "." + *it;
Replica replica(new Pool(replica_name, default_connections, max_connections, user_g, password_g, db_g, port_g),
cfg.getInt(replica_name + ".priority", 0));
replicas_by_priority[replica.priority].push_back(replica);
}
}
}
else
@ -108,37 +143,37 @@ namespace mysqlxx
replicas_by_priority[0].push_back(Replica(new Pool(config_name, default_connections, max_connections), 0));
}
}
/** Выделяет соединение для работы. */
Entry Get()
{
Poco::ScopedLock<Poco::FastMutex> locker(mutex);
Poco::Util::Application & app = Poco::Util::Application::instance();
/// Если к какой-то реплике не подключились, потому что исчерпан лимит соединений, можно подождать и подключиться к ней.
Replica * full_pool = NULL;
for (size_t try_no = 0; try_no < max_tries; ++try_no)
{
full_pool = NULL;
for (ReplicasByPriority::iterator it = replicas_by_priority.begin(); it != replicas_by_priority.end(); ++it)
{
Replicas & replicas = it->second;
for (size_t i = 0; i < replicas.size(); ++i)
{
Replica & replica = replicas[i];
try
{
Entry entry = replica.pool->tryGet();
if (!entry.isNull())
{
/// Переместим все пройденные реплики в конец очереди.
/// Пройденные реплики с другим приоритетом перемещать незачем.
std::rotate(replicas.begin(), replicas.begin() + i + 1, replicas.end());
return entry;
}
}
@ -148,7 +183,7 @@ namespace mysqlxx
{
full_pool = &replica;
}
app.logger().error("Connection to " + replica.pool->getDescription() + " failed: " + e.displayText());
continue;
}
@ -156,10 +191,10 @@ namespace mysqlxx
app.logger().error("Connection to " + replica.pool->getDescription() + " failed.");
}
}
app.logger().error("Connection to all replicas failed " + Poco::NumberFormatter::format(try_no + 1) + " times");
}
if (full_pool)
{
app.logger().error("All connections failed, trying to wait on a full pool " + full_pool->pool->getDescription());
@ -171,7 +206,7 @@ namespace mysqlxx
for (ReplicasByPriority::const_iterator it = replicas_by_priority.begin(); it != replicas_by_priority.end(); ++it)
for (Replicas::const_iterator jt = it->second.begin(); jt != it->second.end(); ++jt)
message << (it == replicas_by_priority.begin() && jt == it->second.begin() ? "" : ", ") << jt->pool->getDescription();
throw Poco::Exception(message.str());
}
};