Implemented MySQL connection mutualization for external MySQL dictionaries

This commit is contained in:
Clement Rodriguez 2019-10-08 19:27:00 +02:00
parent 269dcad274
commit 3141446302
8 changed files with 237 additions and 10 deletions

View File

@ -6,7 +6,6 @@
#include "DictionaryStructure.h"
namespace DB
{
namespace ErrorCodes
@ -47,6 +46,7 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory)
# include <common/logger_useful.h>
# include <Formats/MySQLBlockInputStream.h>
# include "readInvalidateQuery.h"
# include <mysqlxx/PoolFactory.h>
namespace DB
{
@ -67,7 +67,7 @@ MySQLDictionarySource::MySQLDictionarySource(
, update_field{config.getString(config_prefix + ".update_field", "")}
, dont_check_update_time{config.getBool(config_prefix + ".dont_check_update_time", false)}
, sample_block{sample_block_}
, pool{config, config_prefix}
, pool{mysqlxx::PoolFactory::instance().Get(config, config_prefix)}
, query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::Backticks}
, load_all_query{query_builder.composeLoadAllQuery()}
, invalidate_query{config.getString(config_prefix + ".invalidate_query", "")}

View File

@ -2,6 +2,10 @@
#include <Interpreters/Context.h>
#include <Dictionaries/DictionaryFactory.h>
#if USE_MYSQL
# include <mysqlxx/PoolFactory.h>
#endif
namespace DB
{
@ -27,4 +31,20 @@ ExternalLoader::LoadablePtr ExternalDictionaries::create(
return DictionaryFactory::instance().create(name, config, key_in_config, context);
}
void ExternalDictionaries::reload(const String & name, bool load_never_loading)
{
#if USE_MYSQL
mysqlxx::PoolFactory::instance().reset();
#endif
ExternalLoader::reload(name, load_never_loading);
}
void ExternalDictionaries::reload(bool load_never_loading)
{
#if USE_MYSQL
mysqlxx::PoolFactory::instance().reset();
#endif
ExternalLoader::reload(load_never_loading);
}
}

View File

@ -5,7 +5,6 @@
#include <common/logger_useful.h>
#include <memory>
namespace DB
{
@ -33,6 +32,14 @@ public:
return std::static_pointer_cast<const IDictionaryBase>(tryGetLoadable(name));
}
/// Override ExternalLoader::reload to reset mysqlxx::PoolFactory.h
/// since connection parameters might have changed. Inherited method is called afterward
void reload(const String & name, bool load_never_loading = false);
/// Override ExternalLoader::reload to reset mysqlxx::PoolFactory.h
/// since connection parameters might have changed. Inherited method is called afterward
void reload(bool load_never_loading = false);
protected:
LoadablePtr create(const std::string & name, const Poco::Util::AbstractConfiguration & config,
const std::string & key_in_config) const override;

View File

@ -8,6 +8,7 @@ add_library (mysqlxx
src/Row.cpp
src/Value.cpp
src/Pool.cpp
src/PoolFactory.cpp
src/PoolWithFailover.cpp
include/mysqlxx/Connection.h
@ -15,6 +16,7 @@ add_library (mysqlxx
include/mysqlxx/mysqlxx.h
include/mysqlxx/Null.h
include/mysqlxx/Pool.h
include/mysqlxx/PoolFactory.h
include/mysqlxx/PoolWithFailover.h
include/mysqlxx/Query.h
include/mysqlxx/ResultBase.h

View File

@ -0,0 +1,51 @@
#pragma once
#include <mutex>
#include <memory>
#include <boost/noncopyable.hpp>
#include "PoolWithFailover.h"
#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS 1
#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS 16
#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES 3
namespace mysqlxx
{
/*
* PoolFactory.h
* This class is a helper singleton to mutualize connections to MySQL.
*/
class PoolFactory final : private boost::noncopyable
{
public:
static PoolFactory & instance();
PoolFactory(const PoolFactory &) = delete;
/** Allocates a PoolWithFailover to connect to MySQL. */
PoolWithFailover Get(const std::string & config_name,
unsigned default_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,
unsigned max_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS,
size_t max_tries = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
/** Allocates a PoolWithFailover to connect to MySQL. */
PoolWithFailover Get(const Poco::Util::AbstractConfiguration & config,
const std::string & config_name,
unsigned default_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,
unsigned max_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS,
size_t max_tries = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
void reset();
~PoolFactory() = default;
PoolFactory& operator=(const PoolFactory &) = delete;
private:
PoolFactory();
struct Impl;
std::unique_ptr<Impl> impl;
};
}

View File

@ -77,6 +77,10 @@ namespace mysqlxx
size_t max_tries;
/// Mutex for set of replicas.
std::mutex mutex;
std::string config_name;
/// Can the Pool be shared
bool shareable;
public:
using Entry = Pool::Entry;

View File

@ -0,0 +1,136 @@
#include <mysqlxx/PoolFactory.h>
#include <Poco/Util/Application.h>
#include <Poco/Util/LayeredConfiguration.h>
namespace mysqlxx
{
struct PoolFactory::Impl
{
// Cache of already affected pools identified by their config name
std::map<std::string, std::shared_ptr<PoolWithFailover>> pools;
// Cache of Pool ID (host + port + user +...) cibling already established shareable pool
std::map<std::string, std::string> pools_by_ids;
/// Protect pools and pools_by_ids caches
std::mutex mutex;
};
PoolWithFailover PoolFactory::Get(const std::string & config_name, unsigned default_connections,
unsigned max_connections, size_t max_tries)
{
return Get(Poco::Util::Application::instance().config(), config_name, default_connections, max_connections, max_tries);
}
/// Duplicate of code from StringUtils.h. Copied here for less dependencies.
static bool startsWith(const std::string & s, const char * prefix)
{
return s.size() >= strlen(prefix) && 0 == memcmp(s.data(), prefix, strlen(prefix));
}
static std::string getPoolEntryName(const Poco::Util::AbstractConfiguration & config,
const std::string & config_name)
{
bool shared = config.getBool(config_name + ".share_connection", false);
// Not shared no need to generate a name the pool won't be stored
if (!shared)
return "";
std::string entry_name = "";
std::string host = config.getString(config_name + ".host", "");
std::string port = config.getString(config_name + ".port", "");
std::string user = config.getString(config_name + ".user", "");
std::string db = config.getString(config_name + ".db", "");
std::string table = config.getString(config_name + ".table", "");
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_name, keys);
if (config.has(config_name + ".replica"))
{
Poco::Util::AbstractConfiguration::Keys replica_keys;
config.keys(config_name, replica_keys);
for (const auto & replica_config_key : replica_keys)
{
/// There could be another elements in the same level in configuration file, like "user", "port"...
if (startsWith(replica_config_key, "replica"))
{
std::string replica_name = config_name + "." + replica_config_key;
std::string tmp_host = config.getString(replica_name + ".host", host);
std::string tmp_port = config.getString(replica_name + ".port", port);
std::string tmp_user = config.getString(replica_name + ".user", user);
entry_name += (entry_name.empty() ? "" : "|") + tmp_user + "@" + tmp_host + ":" + tmp_port + "/" + db;
}
}
}
else
{
entry_name = user + "@" + host + ":" + port + "/" + db;
}
return entry_name;
}
PoolWithFailover PoolFactory::Get(const Poco::Util::AbstractConfiguration & config,
const std::string & config_name, unsigned default_connections, unsigned max_connections, size_t max_tries)
{
std::lock_guard<std::mutex> lock(impl->mutex);
Poco::Util::Application & app = Poco::Util::Application::instance();
app.logger().warning("Config name=" + config_name);
if (auto entry = impl->pools.find(config_name); entry != impl->pools.end())
{
app.logger().warning("Entry found=" + config_name);
return *(entry->second.get());
}
else
{
app.logger().warning("Searching confg=" + config_name);
std::string entry_name = getPoolEntryName(config, config_name);
app.logger().warning("Entry name created=" + entry_name);
if (auto id = impl->pools_by_ids.find(entry_name); id != impl->pools_by_ids.end())
{
app.logger().warning("found");
entry = impl->pools.find(id->second);
std::shared_ptr<PoolWithFailover> pool = entry->second;
impl->pools.insert_or_assign(config_name, pool);
app.logger().warning("found OK");
return *pool;
}
app.logger().warning("make pool");
auto pool = std::make_shared<PoolWithFailover>(config, config_name, default_connections, max_connections, max_tries);
app.logger().warning("make pool OK");
// Check the pool will be shared
if (!entry_name.empty())
{
// Store shared pool
app.logger().warning("store");
impl->pools.insert_or_assign(config_name, pool);
impl->pools_by_ids.insert_or_assign(entry_name, config_name);
app.logger().warning("store OK");
}
app.logger().warning("a2");
auto a2 = *(pool.get());
app.logger().warning("a2 OK");
return *(pool.get());
}
}
void PoolFactory::reset()
{
std::lock_guard<std::mutex> lock(impl->mutex);
impl->pools.clear();
impl->pools_by_ids.clear();
}
PoolFactory::PoolFactory() : impl(std::make_unique<PoolFactory::Impl>()) {}
PoolFactory & PoolFactory::instance()
{
static PoolFactory ret;
return ret;
}
}

View File

@ -48,15 +48,22 @@ PoolWithFailover::PoolWithFailover(const std::string & config_name, const unsign
{}
PoolWithFailover::PoolWithFailover(const PoolWithFailover & other)
: max_tries{other.max_tries}
: max_tries{other.max_tries}, config_name{other.config_name}
{
for (const auto & priority_replicas : other.replicas_by_priority)
if (shareable)
{
Replicas replicas;
replicas.reserve(priority_replicas.second.size());
for (const auto & pool : priority_replicas.second)
replicas.emplace_back(std::make_shared<Pool>(*pool));
replicas_by_priority.emplace(priority_replicas.first, std::move(replicas));
replicas_by_priority = other.replicas_by_priority;
}
else
{
for (const auto & priority_replicas : other.replicas_by_priority)
{
Replicas replicas;
replicas.reserve(priority_replicas.second.size());
for (const auto & pool : priority_replicas.second)
replicas.emplace_back(std::make_shared<Pool>(*pool));
replicas_by_priority.emplace(priority_replicas.first, std::move(replicas));
}
}
}