From 314144630231b7ad65344d66618646015a47550a Mon Sep 17 00:00:00 2001 From: Clement Rodriguez Date: Tue, 8 Oct 2019 19:27:00 +0200 Subject: [PATCH] Implemented MySQL connection mutualization for external MySQL dictionaries --- .../Dictionaries/MySQLDictionarySource.cpp | 4 +- .../src/Interpreters/ExternalDictionaries.cpp | 20 +++ dbms/src/Interpreters/ExternalDictionaries.h | 9 +- libs/libmysqlxx/CMakeLists.txt | 2 + libs/libmysqlxx/include/mysqlxx/PoolFactory.h | 51 +++++++ .../include/mysqlxx/PoolWithFailover.h | 4 + libs/libmysqlxx/src/PoolFactory.cpp | 136 ++++++++++++++++++ libs/libmysqlxx/src/PoolWithFailover.cpp | 21 ++- 8 files changed, 237 insertions(+), 10 deletions(-) create mode 100644 libs/libmysqlxx/include/mysqlxx/PoolFactory.h create mode 100644 libs/libmysqlxx/src/PoolFactory.cpp diff --git a/dbms/src/Dictionaries/MySQLDictionarySource.cpp b/dbms/src/Dictionaries/MySQLDictionarySource.cpp index 497448bf64c..38ddd6df921 100644 --- a/dbms/src/Dictionaries/MySQLDictionarySource.cpp +++ b/dbms/src/Dictionaries/MySQLDictionarySource.cpp @@ -6,7 +6,6 @@ #include "DictionaryStructure.h" - namespace DB { namespace ErrorCodes @@ -47,6 +46,7 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory) # include # include # include "readInvalidateQuery.h" +# include namespace DB { @@ -67,7 +67,7 @@ MySQLDictionarySource::MySQLDictionarySource( , update_field{config.getString(config_prefix + ".update_field", "")} , dont_check_update_time{config.getBool(config_prefix + ".dont_check_update_time", false)} , sample_block{sample_block_} - , pool{config, config_prefix} + , pool{mysqlxx::PoolFactory::instance().Get(config, config_prefix)} , query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::Backticks} , load_all_query{query_builder.composeLoadAllQuery()} , invalidate_query{config.getString(config_prefix + ".invalidate_query", "")} diff --git a/dbms/src/Interpreters/ExternalDictionaries.cpp b/dbms/src/Interpreters/ExternalDictionaries.cpp index e1cbd377978..1281ebd49af 100644 --- a/dbms/src/Interpreters/ExternalDictionaries.cpp +++ b/dbms/src/Interpreters/ExternalDictionaries.cpp @@ -2,6 +2,10 @@ #include #include +#if USE_MYSQL +# include +#endif + namespace DB { @@ -27,4 +31,20 @@ ExternalLoader::LoadablePtr ExternalDictionaries::create( return DictionaryFactory::instance().create(name, config, key_in_config, context); } +void ExternalDictionaries::reload(const String & name, bool load_never_loading) +{ + #if USE_MYSQL + mysqlxx::PoolFactory::instance().reset(); + #endif + ExternalLoader::reload(name, load_never_loading); +} + +void ExternalDictionaries::reload(bool load_never_loading) +{ + #if USE_MYSQL + mysqlxx::PoolFactory::instance().reset(); + #endif + ExternalLoader::reload(load_never_loading); +} + } diff --git a/dbms/src/Interpreters/ExternalDictionaries.h b/dbms/src/Interpreters/ExternalDictionaries.h index c071349cc97..5265c83379b 100644 --- a/dbms/src/Interpreters/ExternalDictionaries.h +++ b/dbms/src/Interpreters/ExternalDictionaries.h @@ -5,7 +5,6 @@ #include #include - namespace DB { @@ -33,6 +32,14 @@ public: return std::static_pointer_cast(tryGetLoadable(name)); } + /// Override ExternalLoader::reload to reset mysqlxx::PoolFactory.h + /// since connection parameters might have changed. Inherited method is called afterward + void reload(const String & name, bool load_never_loading = false); + + /// Override ExternalLoader::reload to reset mysqlxx::PoolFactory.h + /// since connection parameters might have changed. Inherited method is called afterward + void reload(bool load_never_loading = false); + protected: LoadablePtr create(const std::string & name, const Poco::Util::AbstractConfiguration & config, const std::string & key_in_config) const override; diff --git a/libs/libmysqlxx/CMakeLists.txt b/libs/libmysqlxx/CMakeLists.txt index 263a031d7b0..4315c71fab3 100644 --- a/libs/libmysqlxx/CMakeLists.txt +++ b/libs/libmysqlxx/CMakeLists.txt @@ -8,6 +8,7 @@ add_library (mysqlxx src/Row.cpp src/Value.cpp src/Pool.cpp + src/PoolFactory.cpp src/PoolWithFailover.cpp include/mysqlxx/Connection.h @@ -15,6 +16,7 @@ add_library (mysqlxx include/mysqlxx/mysqlxx.h include/mysqlxx/Null.h include/mysqlxx/Pool.h + include/mysqlxx/PoolFactory.h include/mysqlxx/PoolWithFailover.h include/mysqlxx/Query.h include/mysqlxx/ResultBase.h diff --git a/libs/libmysqlxx/include/mysqlxx/PoolFactory.h b/libs/libmysqlxx/include/mysqlxx/PoolFactory.h new file mode 100644 index 00000000000..3c553b8b6da --- /dev/null +++ b/libs/libmysqlxx/include/mysqlxx/PoolFactory.h @@ -0,0 +1,51 @@ +#pragma once + +#include +#include +#include +#include "PoolWithFailover.h" + +#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS 1 +#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS 16 +#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES 3 + +namespace mysqlxx +{ +/* + * PoolFactory.h + * This class is a helper singleton to mutualize connections to MySQL. + */ +class PoolFactory final : private boost::noncopyable +{ +public: + static PoolFactory & instance(); + + PoolFactory(const PoolFactory &) = delete; + + /** Allocates a PoolWithFailover to connect to MySQL. */ + PoolWithFailover Get(const std::string & config_name, + unsigned default_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS, + unsigned max_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS, + size_t max_tries = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES); + + /** Allocates a PoolWithFailover to connect to MySQL. */ + PoolWithFailover Get(const Poco::Util::AbstractConfiguration & config, + const std::string & config_name, + unsigned default_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS, + unsigned max_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS, + size_t max_tries = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES); + + void reset(); + + + ~PoolFactory() = default; + PoolFactory& operator=(const PoolFactory &) = delete; + +private: + PoolFactory(); + + struct Impl; + std::unique_ptr impl; +}; + +} diff --git a/libs/libmysqlxx/include/mysqlxx/PoolWithFailover.h b/libs/libmysqlxx/include/mysqlxx/PoolWithFailover.h index 21b27ebd4fe..af59b705a61 100644 --- a/libs/libmysqlxx/include/mysqlxx/PoolWithFailover.h +++ b/libs/libmysqlxx/include/mysqlxx/PoolWithFailover.h @@ -77,6 +77,10 @@ namespace mysqlxx size_t max_tries; /// Mutex for set of replicas. std::mutex mutex; + std::string config_name; + + /// Can the Pool be shared + bool shareable; public: using Entry = Pool::Entry; diff --git a/libs/libmysqlxx/src/PoolFactory.cpp b/libs/libmysqlxx/src/PoolFactory.cpp new file mode 100644 index 00000000000..0e6244c333e --- /dev/null +++ b/libs/libmysqlxx/src/PoolFactory.cpp @@ -0,0 +1,136 @@ +#include +#include +#include + +namespace mysqlxx +{ + +struct PoolFactory::Impl +{ + // Cache of already affected pools identified by their config name + std::map> pools; + + // Cache of Pool ID (host + port + user +...) cibling already established shareable pool + std::map pools_by_ids; + + /// Protect pools and pools_by_ids caches + std::mutex mutex; +}; + +PoolWithFailover PoolFactory::Get(const std::string & config_name, unsigned default_connections, + unsigned max_connections, size_t max_tries) +{ + return Get(Poco::Util::Application::instance().config(), config_name, default_connections, max_connections, max_tries); +} + +/// Duplicate of code from StringUtils.h. Copied here for less dependencies. +static bool startsWith(const std::string & s, const char * prefix) +{ + return s.size() >= strlen(prefix) && 0 == memcmp(s.data(), prefix, strlen(prefix)); +} + +static std::string getPoolEntryName(const Poco::Util::AbstractConfiguration & config, + const std::string & config_name) +{ + bool shared = config.getBool(config_name + ".share_connection", false); + + // Not shared no need to generate a name the pool won't be stored + if (!shared) + return ""; + + std::string entry_name = ""; + std::string host = config.getString(config_name + ".host", ""); + std::string port = config.getString(config_name + ".port", ""); + std::string user = config.getString(config_name + ".user", ""); + std::string db = config.getString(config_name + ".db", ""); + std::string table = config.getString(config_name + ".table", ""); + + Poco::Util::AbstractConfiguration::Keys keys; + config.keys(config_name, keys); + + if (config.has(config_name + ".replica")) + { + Poco::Util::AbstractConfiguration::Keys replica_keys; + config.keys(config_name, replica_keys); + for (const auto & replica_config_key : replica_keys) + { + /// There could be another elements in the same level in configuration file, like "user", "port"... + if (startsWith(replica_config_key, "replica")) + { + std::string replica_name = config_name + "." + replica_config_key; + std::string tmp_host = config.getString(replica_name + ".host", host); + std::string tmp_port = config.getString(replica_name + ".port", port); + std::string tmp_user = config.getString(replica_name + ".user", user); + entry_name += (entry_name.empty() ? "" : "|") + tmp_user + "@" + tmp_host + ":" + tmp_port + "/" + db; + } + } + } + else + { + entry_name = user + "@" + host + ":" + port + "/" + db; + } + return entry_name; +} + +PoolWithFailover PoolFactory::Get(const Poco::Util::AbstractConfiguration & config, + const std::string & config_name, unsigned default_connections, unsigned max_connections, size_t max_tries) +{ + + std::lock_guard lock(impl->mutex); + Poco::Util::Application & app = Poco::Util::Application::instance(); + app.logger().warning("Config name=" + config_name); + if (auto entry = impl->pools.find(config_name); entry != impl->pools.end()) + { + app.logger().warning("Entry found=" + config_name); + return *(entry->second.get()); + } + else + { + app.logger().warning("Searching confg=" + config_name); + std::string entry_name = getPoolEntryName(config, config_name); + app.logger().warning("Entry name created=" + entry_name); + if (auto id = impl->pools_by_ids.find(entry_name); id != impl->pools_by_ids.end()) + { + app.logger().warning("found"); + entry = impl->pools.find(id->second); + std::shared_ptr pool = entry->second; + impl->pools.insert_or_assign(config_name, pool); + app.logger().warning("found OK"); + return *pool; + } + + app.logger().warning("make pool"); + auto pool = std::make_shared(config, config_name, default_connections, max_connections, max_tries); + app.logger().warning("make pool OK"); + // Check the pool will be shared + if (!entry_name.empty()) + { + // Store shared pool + app.logger().warning("store"); + impl->pools.insert_or_assign(config_name, pool); + impl->pools_by_ids.insert_or_assign(entry_name, config_name); + app.logger().warning("store OK"); + } + app.logger().warning("a2"); + auto a2 = *(pool.get()); + app.logger().warning("a2 OK"); + return *(pool.get()); + } +} + +void PoolFactory::reset() +{ + std::lock_guard lock(impl->mutex); + impl->pools.clear(); + impl->pools_by_ids.clear(); +} + +PoolFactory::PoolFactory() : impl(std::make_unique()) {} + +PoolFactory & PoolFactory::instance() +{ + static PoolFactory ret; + return ret; +} + +} diff --git a/libs/libmysqlxx/src/PoolWithFailover.cpp b/libs/libmysqlxx/src/PoolWithFailover.cpp index dd89f1596d3..bcdbcb3df72 100644 --- a/libs/libmysqlxx/src/PoolWithFailover.cpp +++ b/libs/libmysqlxx/src/PoolWithFailover.cpp @@ -48,15 +48,22 @@ PoolWithFailover::PoolWithFailover(const std::string & config_name, const unsign {} PoolWithFailover::PoolWithFailover(const PoolWithFailover & other) - : max_tries{other.max_tries} + : max_tries{other.max_tries}, config_name{other.config_name} { - for (const auto & priority_replicas : other.replicas_by_priority) + if (shareable) { - Replicas replicas; - replicas.reserve(priority_replicas.second.size()); - for (const auto & pool : priority_replicas.second) - replicas.emplace_back(std::make_shared(*pool)); - replicas_by_priority.emplace(priority_replicas.first, std::move(replicas)); + replicas_by_priority = other.replicas_by_priority; + } + else + { + for (const auto & priority_replicas : other.replicas_by_priority) + { + Replicas replicas; + replicas.reserve(priority_replicas.second.size()); + for (const auto & pool : priority_replicas.second) + replicas.emplace_back(std::make_shared(*pool)); + replicas_by_priority.emplace(priority_replicas.first, std::move(replicas)); + } } }