ClickHouse/base/mysqlxx/PoolWithFailover.cpp
Azat Khuzhin ed70ed6f71 Introduce connection_no_wait setting for MySQL engine.
This will allow to avoid superfluous sleep during query execution, since
this not only not desired behavoiur, but also may hang the server, since
if you will execute enough queries that will use MySQL database but will
not allow enough connections (or your MySQL server is too slow) then you
may run out of threads in the global thread pool.

Also note that right now it is possible to get deadlock when the mysql
pool is full, consider the following scenario:

- you have m1 and m2 mysql tables
- you have q1 and q2 queries, bot queries join m1 and m2
- q1 allocated connection for m1 but cannot allocate connection for m2
- q2 allocated connection for m2 but cannot allocate connection for m1
- but to resolve the lock one should give up on the locking while it is not possible right now...

And then you got no free threads and this:

    # grep -h ^202 /proc/$(pgrep clickhouse-serv)/task/*/syscall  | cut -d' ' -f2 | sort | uniq -c | sort -nr | head
       1554 0x7ffb60b92fe8 # mutex in mysqlxx::PoolWithFailover::get
       1375 0x7ffb9f1c4748 # mutex in ::PoolEntryHelper::~PoolEntryHelper from DB::MultiplexedConnections::invalidateReplica
       1160 0x7ffb612918b8 # mutex in mysqlxx::PoolWithFailover::get
         42 0x7ffb9f057984 # mutex in ThreadPoolImpl<std::__1::thread>::worker

    *NOTE: 202 is a `futex` with WAIT*

(Went with `syscall` because debugging 10k+ threads is not easy, and
eventually it may TRAP)
2021-09-02 22:23:37 +03:00

190 lines
6.7 KiB
C++

#include <algorithm>
#include <ctime>
#include <random>
#include <thread>
#include <mysqlxx/PoolWithFailover.h>
/// Duplicate of code from StringUtils.h. Copied here for less dependencies.
static bool startsWith(const std::string & s, const char * prefix)
{
return s.size() >= strlen(prefix) && 0 == memcmp(s.data(), prefix, strlen(prefix));
}
using namespace mysqlxx;
PoolWithFailover::PoolWithFailover(
const Poco::Util::AbstractConfiguration & config_,
const std::string & config_name_,
const unsigned default_connections_,
const unsigned max_connections_,
const size_t max_tries_)
: max_tries(max_tries_)
, shareable(config_.getBool(config_name_ + ".share_connection", false))
, wait_timeout(UINT64_MAX)
{
if (config_.has(config_name_ + ".replica"))
{
Poco::Util::AbstractConfiguration::Keys replica_keys;
config_.keys(config_name_, replica_keys);
for (const auto & replica_config_key : replica_keys)
{
/// There could be another elements in the same level in configuration file, like "password", "port"...
if (startsWith(replica_config_key, "replica"))
{
std::string replica_name = config_name_ + "." + replica_config_key;
int priority = config_.getInt(replica_name + ".priority", 0);
replicas_by_priority[priority].emplace_back(
std::make_shared<Pool>(config_, replica_name, default_connections_, max_connections_, config_name_.c_str()));
}
}
/// PoolWithFailover objects are stored in a cache inside PoolFactory.
/// This cache is reset by ExternalDictionariesLoader after every SYSTEM RELOAD DICTIONAR{Y|IES}
/// which triggers massive re-constructing of connection pools.
/// The state of PRNGs like std::mt19937 is considered to be quite heavy
/// thus here we attempt to optimize its construction.
static thread_local std::mt19937 rnd_generator(
std::hash<std::thread::id>{}(std::this_thread::get_id()) + std::clock());
for (auto & [_, replicas] : replicas_by_priority)
{
if (replicas.size() > 1)
std::shuffle(replicas.begin(), replicas.end(), rnd_generator);
}
}
else
{
replicas_by_priority[0].emplace_back(
std::make_shared<Pool>(config_, config_name_, default_connections_, max_connections_));
}
}
PoolWithFailover::PoolWithFailover(
const std::string & config_name_,
const unsigned default_connections_,
const unsigned max_connections_,
const size_t max_tries_)
: PoolWithFailover{Poco::Util::Application::instance().config(),
config_name_, default_connections_, max_connections_, max_tries_}
{
}
PoolWithFailover::PoolWithFailover(
const std::string & database,
const RemoteDescription & addresses,
const std::string & user,
const std::string & password,
unsigned default_connections_,
unsigned max_connections_,
size_t max_tries_,
uint64_t wait_timeout_)
: max_tries(max_tries_)
, shareable(false)
, wait_timeout(wait_timeout_)
{
/// Replicas have the same priority, but traversed replicas are moved to the end of the queue.
for (const auto & [host, port] : addresses)
{
replicas_by_priority[0].emplace_back(std::make_shared<Pool>(database,
host, user, password, port,
/* socket_ = */ "",
MYSQLXX_DEFAULT_TIMEOUT,
MYSQLXX_DEFAULT_RW_TIMEOUT,
default_connections_,
max_connections_));
}
}
PoolWithFailover::PoolWithFailover(const PoolWithFailover & other)
: max_tries{other.max_tries}
, shareable{other.shareable}
, wait_timeout(other.wait_timeout)
{
if (shareable)
{
replicas_by_priority = other.replicas_by_priority;
}
else
{
for (const auto & priority_replicas : other.replicas_by_priority)
{
Replicas replicas;
replicas.reserve(priority_replicas.second.size());
for (const auto & pool : priority_replicas.second)
replicas.emplace_back(std::make_shared<Pool>(*pool));
replicas_by_priority.emplace(priority_replicas.first, std::move(replicas));
}
}
}
PoolWithFailover::Entry PoolWithFailover::get()
{
Poco::Util::Application & app = Poco::Util::Application::instance();
std::lock_guard<std::mutex> locker(mutex);
/// If we cannot connect to some replica due to pool overflow, than we will wait and connect.
PoolPtr * full_pool = nullptr;
for (size_t try_no = 0; try_no < max_tries; ++try_no)
{
full_pool = nullptr;
for (auto & priority_replicas : replicas_by_priority)
{
Replicas & replicas = priority_replicas.second;
for (size_t i = 0, size = replicas.size(); i < size; ++i)
{
PoolPtr & pool = replicas[i];
try
{
Entry entry = shareable ? pool->get(wait_timeout) : pool->tryGet();
if (!entry.isNull())
{
/// Move all traversed replicas to the end of queue.
/// (No need to move replicas with another priority)
std::rotate(replicas.begin(), replicas.begin() + i + 1, replicas.end());
return entry;
}
}
catch (const Poco::Exception & e)
{
if (e.displayText().find("mysqlxx::Pool is full") != std::string::npos) /// NOTE: String comparison is trashy code.
{
full_pool = &pool;
}
app.logger().warning("Connection to " + pool->getDescription() + " failed: " + e.displayText());
continue;
}
app.logger().warning("Connection to " + pool->getDescription() + " failed.");
}
}
app.logger().error("Connection to all replicas failed " + std::to_string(try_no + 1) + " times");
}
if (full_pool)
{
app.logger().error("All connections failed, trying to wait on a full pool " + (*full_pool)->getDescription());
return (*full_pool)->get(wait_timeout);
}
std::stringstream message;
message << "Connections to all replicas failed: ";
for (auto it = replicas_by_priority.begin(); it != replicas_by_priority.end(); ++it)
for (auto jt = it->second.begin(); jt != it->second.end(); ++jt)
message << (it == replicas_by_priority.begin() && jt == it->second.begin() ? "" : ", ") << (*jt)->getDescription();
throw Poco::Exception(message.str());
}