diff --git a/base/mysqlxx/CMakeLists.txt b/base/mysqlxx/CMakeLists.txt index 2d2ad75628d..b85d3371336 100644 --- a/base/mysqlxx/CMakeLists.txt +++ b/base/mysqlxx/CMakeLists.txt @@ -8,6 +8,7 @@ add_library (mysqlxx src/Row.cpp src/Value.cpp src/Pool.cpp + src/PoolFactory.cpp src/PoolWithFailover.cpp include/mysqlxx/Connection.h @@ -15,6 +16,7 @@ add_library (mysqlxx include/mysqlxx/mysqlxx.h include/mysqlxx/Null.h include/mysqlxx/Pool.h + include/mysqlxx/PoolFactory.h include/mysqlxx/PoolWithFailover.h include/mysqlxx/Query.h include/mysqlxx/ResultBase.h diff --git a/base/mysqlxx/include/mysqlxx/Pool.h b/base/mysqlxx/include/mysqlxx/Pool.h index 5261ffab017..db41b059357 100644 --- a/base/mysqlxx/include/mysqlxx/Pool.h +++ b/base/mysqlxx/include/mysqlxx/Pool.h @@ -198,6 +198,8 @@ public: return description; } + void removeConnection(Connection* data); + protected: /// Number of MySQL connections which are created at launch. unsigned default_connections; diff --git a/base/mysqlxx/include/mysqlxx/PoolFactory.h b/base/mysqlxx/include/mysqlxx/PoolFactory.h new file mode 100644 index 00000000000..3c553b8b6da --- /dev/null +++ b/base/mysqlxx/include/mysqlxx/PoolFactory.h @@ -0,0 +1,51 @@ +#pragma once + +#include +#include +#include +#include "PoolWithFailover.h" + +#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS 1 +#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS 16 +#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES 3 + +namespace mysqlxx +{ +/* + * PoolFactory.h + * This class is a helper singleton to mutualize connections to MySQL. + */ +class PoolFactory final : private boost::noncopyable +{ +public: + static PoolFactory & instance(); + + PoolFactory(const PoolFactory &) = delete; + + /** Allocates a PoolWithFailover to connect to MySQL. */ + PoolWithFailover Get(const std::string & config_name, + unsigned default_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS, + unsigned max_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS, + size_t max_tries = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES); + + /** Allocates a PoolWithFailover to connect to MySQL. */ + PoolWithFailover Get(const Poco::Util::AbstractConfiguration & config, + const std::string & config_name, + unsigned default_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS, + unsigned max_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS, + size_t max_tries = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES); + + void reset(); + + + ~PoolFactory() = default; + PoolFactory& operator=(const PoolFactory &) = delete; + +private: + PoolFactory(); + + struct Impl; + std::unique_ptr impl; +}; + +} diff --git a/base/mysqlxx/include/mysqlxx/Value.h b/base/mysqlxx/include/mysqlxx/Value.h index 4b0c6c7cbfa..1ca83a8d3a6 100644 --- a/base/mysqlxx/include/mysqlxx/Value.h +++ b/base/mysqlxx/include/mysqlxx/Value.h @@ -239,7 +239,8 @@ template <> inline bool Value::get() cons template <> inline char Value::get() const { return getInt(); } template <> inline signed char Value::get() const { return getInt(); } template <> inline unsigned char Value::get() const { return getUInt(); } -template <> inline char8_t Value::get() const { return getUInt(); } +// crodriguez uncomment +//template <> inline char8_t Value::get() const { return getUInt(); } template <> inline short Value::get() const { return getInt(); } template <> inline unsigned short Value::get() const { return getUInt(); } template <> inline int Value::get() const { return getInt(); } diff --git a/base/mysqlxx/src/Pool.cpp b/base/mysqlxx/src/Pool.cpp index 410ac062039..10c599316b2 100644 --- a/base/mysqlxx/src/Pool.cpp +++ b/base/mysqlxx/src/Pool.cpp @@ -21,16 +21,20 @@ void Pool::Entry::incrementRefCount() { if (!data) return; - ++data->ref_count; - mysql_thread_init(); + ++(data->ref_count); + if(data->ref_count==1) + mysql_thread_init(); } void Pool::Entry::decrementRefCount() { if (!data) return; - --data->ref_count; - mysql_thread_end(); + if (data->ref_count > 0) { + --(data->ref_count); + if (data->ref_count==0) + mysql_thread_end(); + } } @@ -169,14 +173,24 @@ Pool::Entry Pool::tryGet() return Entry(); } +void Pool::removeConnection(Connection* connection) +{ + std::lock_guard lock(mutex); + if (connection) + { + if (connection->ref_count > 0) + { + connection->conn.disconnect(); + connection->ref_count = 0; + } + connections.remove(connection); + } +} + void Pool::Entry::disconnect() { - if (data) - { - decrementRefCount(); - data->conn.disconnect(); - } + pool->removeConnection(data); } diff --git a/base/mysqlxx/src/PoolFactory.cpp b/base/mysqlxx/src/PoolFactory.cpp new file mode 100644 index 00000000000..41e5944ebdc --- /dev/null +++ b/base/mysqlxx/src/PoolFactory.cpp @@ -0,0 +1,122 @@ +#include +#include +#include + +namespace mysqlxx +{ + +struct PoolFactory::Impl +{ + // Cache of already affected pools identified by their config name + std::map> pools; + + // Cache of Pool ID (host + port + user +...) cibling already established shareable pool + std::map pools_by_ids; + + /// Protect pools and pools_by_ids caches + std::mutex mutex; +}; + +PoolWithFailover PoolFactory::Get(const std::string & config_name, unsigned default_connections, + unsigned max_connections, size_t max_tries) +{ + return Get(Poco::Util::Application::instance().config(), config_name, default_connections, max_connections, max_tries); +} + +/// Duplicate of code from StringUtils.h. Copied here for less dependencies. +static bool startsWith(const std::string & s, const char * prefix) +{ + return s.size() >= strlen(prefix) && 0 == memcmp(s.data(), prefix, strlen(prefix)); +} + +static std::string getPoolEntryName(const Poco::Util::AbstractConfiguration & config, + const std::string & config_name) +{ + bool shared = config.getBool(config_name + ".share_connection", false); + + // Not shared no need to generate a name the pool won't be stored + if (!shared) + return ""; + + std::string entry_name = ""; + std::string host = config.getString(config_name + ".host", ""); + std::string port = config.getString(config_name + ".port", ""); + std::string user = config.getString(config_name + ".user", ""); + std::string db = config.getString(config_name + ".db", ""); + std::string table = config.getString(config_name + ".table", ""); + + Poco::Util::AbstractConfiguration::Keys keys; + config.keys(config_name, keys); + + if (config.has(config_name + ".replica")) + { + Poco::Util::AbstractConfiguration::Keys replica_keys; + config.keys(config_name, replica_keys); + for (const auto & replica_config_key : replica_keys) + { + /// There could be another elements in the same level in configuration file, like "user", "port"... + if (startsWith(replica_config_key, "replica")) + { + std::string replica_name = config_name + "." + replica_config_key; + std::string tmp_host = config.getString(replica_name + ".host", host); + std::string tmp_port = config.getString(replica_name + ".port", port); + std::string tmp_user = config.getString(replica_name + ".user", user); + entry_name += (entry_name.empty() ? "" : "|") + tmp_user + "@" + tmp_host + ":" + tmp_port + "/" + db; + } + } + } + else + { + entry_name = user + "@" + host + ":" + port + "/" + db; + } + return entry_name; +} + +PoolWithFailover PoolFactory::Get(const Poco::Util::AbstractConfiguration & config, + const std::string & config_name, unsigned default_connections, unsigned max_connections, size_t max_tries) +{ + + std::lock_guard lock(impl->mutex); + if (auto entry = impl->pools.find(config_name); entry != impl->pools.end()) + { + return *(entry->second.get()); + } + else + { + std::string entry_name = getPoolEntryName(config, config_name); + if (auto id = impl->pools_by_ids.find(entry_name); id != impl->pools_by_ids.end()) + { + entry = impl->pools.find(id->second); + std::shared_ptr pool = entry->second; + impl->pools.insert_or_assign(config_name, pool); + return *pool; + } + + auto pool = std::make_shared(config, config_name, default_connections, max_connections, max_tries); + // Check the pool will be shared + if (!entry_name.empty()) + { + // Store shared pool + impl->pools.insert_or_assign(config_name, pool); + impl->pools_by_ids.insert_or_assign(entry_name, config_name); + } + return *(pool.get()); + } +} + +void PoolFactory::reset() +{ + std::lock_guard lock(impl->mutex); + impl->pools.clear(); + impl->pools_by_ids.clear(); +} + +PoolFactory::PoolFactory() : impl(std::make_unique()) {} + +PoolFactory & PoolFactory::instance() +{ + static PoolFactory ret; + return ret; +} + +} diff --git a/base/mysqlxx/src/PoolWithFailover.cpp b/base/mysqlxx/src/PoolWithFailover.cpp index bcdbcb3df72..8306922b0e5 100644 --- a/base/mysqlxx/src/PoolWithFailover.cpp +++ b/base/mysqlxx/src/PoolWithFailover.cpp @@ -15,6 +15,7 @@ PoolWithFailover::PoolWithFailover(const Poco::Util::AbstractConfiguration & cfg const unsigned max_connections, const size_t max_tries) : max_tries(max_tries) { + shareable = cfg.getBool(config_name + ".share_connection", false); if (cfg.has(config_name + ".replica")) { Poco::Util::AbstractConfiguration::Keys replica_keys; @@ -48,7 +49,7 @@ PoolWithFailover::PoolWithFailover(const std::string & config_name, const unsign {} PoolWithFailover::PoolWithFailover(const PoolWithFailover & other) - : max_tries{other.max_tries}, config_name{other.config_name} + : max_tries{other.max_tries}, config_name{other.config_name}, shareable{other.shareable} { if (shareable) { @@ -88,7 +89,7 @@ PoolWithFailover::Entry PoolWithFailover::Get() try { - Entry entry = pool->tryGet(); + Entry entry = shareable ? pool->Get() : pool->tryGet(); if (!entry.isNull()) { diff --git a/dbms/src/Dictionaries/MySQLDictionarySource.cpp b/dbms/src/Dictionaries/MySQLDictionarySource.cpp index 8df029f6c27..95d41d8d35a 100644 --- a/dbms/src/Dictionaries/MySQLDictionarySource.cpp +++ b/dbms/src/Dictionaries/MySQLDictionarySource.cpp @@ -46,6 +46,7 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory) # include # include # include "readInvalidateQuery.h" +# include namespace DB { @@ -66,11 +67,11 @@ MySQLDictionarySource::MySQLDictionarySource( , update_field{config.getString(config_prefix + ".update_field", "")} , dont_check_update_time{config.getBool(config_prefix + ".dont_check_update_time", false)} , sample_block{sample_block_} - , pool{config, config_prefix} + , pool{mysqlxx::PoolFactory::instance().Get(config, config_prefix)} , query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::Backticks} , load_all_query{query_builder.composeLoadAllQuery()} , invalidate_query{config.getString(config_prefix + ".invalidate_query", "")} - , close_connection{config.getBool(config_prefix + ".close_connection", false)} + , close_connection{config.getBool(config_prefix + ".close_connection", false) || config.getBool(config_prefix + ".share_connection", false)} { } @@ -114,19 +115,21 @@ std::string MySQLDictionarySource::getUpdateFieldAndDate() BlockInputStreamPtr MySQLDictionarySource::loadAll() { - last_modification = getLastModification(); + auto connection = pool.Get(); + last_modification = getLastModification(connection, false); LOG_TRACE(log, load_all_query); - return std::make_shared(pool.Get(), load_all_query, sample_block, max_block_size, close_connection); + return std::make_shared(connection, load_all_query, sample_block, max_block_size, close_connection); } BlockInputStreamPtr MySQLDictionarySource::loadUpdatedAll() { - last_modification = getLastModification(); + auto connection = pool.Get(); + last_modification = getLastModification(connection, false); std::string load_update_query = getUpdateFieldAndDate(); LOG_TRACE(log, load_update_query); - return std::make_shared(pool.Get(), load_update_query, sample_block, max_block_size, close_connection); + return std::make_shared(connection, load_update_query, sample_block, max_block_size, close_connection); } BlockInputStreamPtr MySQLDictionarySource::loadIds(const std::vector & ids) @@ -158,8 +161,8 @@ bool MySQLDictionarySource::isModified() const if (dont_check_update_time) return true; - - return getLastModification() > last_modification; + auto connection = pool.Get(); + return getLastModification(connection, true) > last_modification; } bool MySQLDictionarySource::supportsSelectiveLoad() const @@ -199,7 +202,7 @@ std::string MySQLDictionarySource::quoteForLike(const std::string s) return out.str(); } -LocalDateTime MySQLDictionarySource::getLastModification() const +LocalDateTime MySQLDictionarySource::getLastModification(mysqlxx::Pool::Entry & connection, bool allow_connection_closure) const { LocalDateTime modification_time{std::time(nullptr)}; @@ -208,7 +211,6 @@ LocalDateTime MySQLDictionarySource::getLastModification() const try { - auto connection = pool.Get(); auto query = connection->query("SHOW TABLE STATUS LIKE " + quoteForLike(table)); LOG_TRACE(log, query.str()); @@ -233,6 +235,11 @@ LocalDateTime MySQLDictionarySource::getLastModification() const ++fetched_rows; } + if (close_connection && allow_connection_closure) + { + connection.disconnect(); + } + if (0 == fetched_rows) LOG_ERROR(log, "Cannot find table in SHOW TABLE STATUS result."); @@ -243,7 +250,6 @@ LocalDateTime MySQLDictionarySource::getLastModification() const { tryLogCurrentException("MySQLDictionarySource"); } - /// we suppose failure to get modification time is not an error, therefore return current time return modification_time; } diff --git a/dbms/src/Dictionaries/MySQLDictionarySource.h b/dbms/src/Dictionaries/MySQLDictionarySource.h index 047bd860ee1..95e660d220f 100644 --- a/dbms/src/Dictionaries/MySQLDictionarySource.h +++ b/dbms/src/Dictionaries/MySQLDictionarySource.h @@ -62,7 +62,7 @@ private: static std::string quoteForLike(const std::string s); - LocalDateTime getLastModification() const; + LocalDateTime getLastModification(mysqlxx::Pool::Entry & connection, bool allow_connection_closure) const; // execute invalidate_query. expects single cell in result std::string doInvalidateQuery(const std::string & request) const; diff --git a/dbms/src/Interpreters/ExternalDictionariesLoader.cpp b/dbms/src/Interpreters/ExternalDictionariesLoader.cpp index 8f4d79a5398..c53537b80cd 100644 --- a/dbms/src/Interpreters/ExternalDictionariesLoader.cpp +++ b/dbms/src/Interpreters/ExternalDictionariesLoader.cpp @@ -28,4 +28,12 @@ ExternalLoader::LoadablePtr ExternalDictionariesLoader::create( bool dictionary_from_database = !repository_name.empty(); return DictionaryFactory::instance().create(name, config, key_in_config, context, dictionary_from_database); } + +void ExternalDictionariesLoader::resetAll() +{ + #if USE_MYSQL + mysqlxx::PoolFactory::instance().reset(); + #endif +} + } diff --git a/dbms/src/Interpreters/ExternalDictionariesLoader.h b/dbms/src/Interpreters/ExternalDictionariesLoader.h index 6bfa3ad5e85..68913ffa166 100644 --- a/dbms/src/Interpreters/ExternalDictionariesLoader.h +++ b/dbms/src/Interpreters/ExternalDictionariesLoader.h @@ -28,6 +28,8 @@ public: return std::static_pointer_cast(tryLoad(name)); } + static void resetAll(); + protected: LoadablePtr create(const std::string & name, const Poco::Util::AbstractConfiguration & config, const std::string & key_in_config, const std::string & repository_name) const override; diff --git a/dbms/tests/integration/test_dictionaries_mysql/configs/dictionaries/mysql_dict1.xml b/dbms/tests/integration/test_dictionaries_mysql/configs/dictionaries/mysql_dict1.xml index 0a3a613dfdc..514c73f3be2 100644 --- a/dbms/tests/integration/test_dictionaries_mysql/configs/dictionaries/mysql_dict1.xml +++ b/dbms/tests/integration/test_dictionaries_mysql/configs/dictionaries/mysql_dict1.xml @@ -1,5 +1,41 @@ + + dict0 + + + test + mysql1 + 3306 + root + clickhouse + test0
+ true + true +
+ + + + + + + id + UInt32 + CAST(id AS UNSIGNED) + + + id + Int32 + + + + value + String + (UNDEFINED) + + + 0 +
dict1 diff --git a/dbms/tests/integration/test_dictionaries_mysql/configs/remote_servers.xml b/dbms/tests/integration/test_dictionaries_mysql/configs/remote_servers.xml new file mode 100644 index 00000000000..b2b88a6e3c8 --- /dev/null +++ b/dbms/tests/integration/test_dictionaries_mysql/configs/remote_servers.xml @@ -0,0 +1,12 @@ + + + + + + instance + 9000 + + + + + diff --git a/dbms/tests/integration/test_dictionaries_mysql/test.py b/dbms/tests/integration/test_dictionaries_mysql/test.py index 78aeb747dba..80424a3471a 100644 --- a/dbms/tests/integration/test_dictionaries_mysql/test.py +++ b/dbms/tests/integration/test_dictionaries_mysql/test.py @@ -9,13 +9,13 @@ from helpers.cluster import ClickHouseCluster from helpers.test_tools import assert_eq_with_retry SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) -DICTIONARY_FILES = ['configs/dictionaries/mysql_dict1.xml', 'configs/dictionaries/mysql_dict2.xml'] +CONFIG_FILES = ['configs/dictionaries/mysql_dict1.xml', 'configs/dictionaries/mysql_dict2.xml', 'configs/remote_servers.xml'] cluster = ClickHouseCluster(__file__, base_configs_dir=os.path.join(SCRIPT_DIR, 'configs')) -instance = cluster.add_instance('instance', main_configs=DICTIONARY_FILES) +instance = cluster.add_instance('instance', main_configs=CONFIG_FILES, with_mysql = True) create_table_mysql_template = """ - CREATE TABLE `test`.`{}` ( + CREATE TABLE IF NOT EXISTS `test`.`{}` ( `id` int(11) NOT NULL, `value` varchar(50) NOT NULL, PRIMARY KEY (`id`) @@ -23,17 +23,19 @@ create_table_mysql_template = """ """ create_clickhouse_dictionary_table_template = """ - CREATE TABLE `test`.`dict_table_{}` (`id` Int32, `value` String) ENGINE = Dictionary({}) - ORDER BY `id` DESC SETTINGS index_granularity = 8192 + CREATE TABLE IF NOT EXISTS `test`.`dict_table_{}` (`id` Int32, `value` String) ENGINE = Dictionary({}) """ @pytest.fixture(scope="module") def started_cluster(): try: + #time.sleep(30) cluster.start() # Create a MySQL database - create_mysql_db(get_mysql_conn(), 'test') + mysql_connection = get_mysql_conn() + create_mysql_db(mysql_connection, 'test') + mysql_connection.close() # Create database in ClickHouse instance.query("CREATE DATABASE IF NOT EXISTS test") @@ -54,7 +56,7 @@ def test_load_mysql_dictionaries(started_cluster): for n in range(0, 5): # Create MySQL tables, fill them and create CH dict tables - prepare_tables('test' + n) + prepare_mysql_table('test', str(n)) # Check dictionaries are loaded and have correct number of elements for n in range(0, 100): @@ -62,34 +64,32 @@ def test_load_mysql_dictionaries(started_cluster): if (n % 10) == 0: query("SYSTEM RELOAD DICTIONARIES") - # Check number of rows - assert query("SELECT count() FROM `dict_table_`.{}".format('test' + (n % 5))).rstrip() == '10000' + # Check number of row + assert query("SELECT count() FROM `test`.`dict_table_{}`".format('test' + str(n % 5))).rstrip() == '10000' def create_mysql_db(mysql_connection, name): with mysql_connection.cursor() as cursor: - cursor.execute( - "CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(name)) + cursor.execute("CREATE DATABASE IF NOT EXISTS {} DEFAULT CHARACTER SET 'utf8'".format(name)) -def prepare_mysql_table(table_name): +def prepare_mysql_table(table_name, index): mysql_connection = get_mysql_conn() # Create table - create_mysql_table(mysql_connection, table_name) + create_mysql_table(mysql_connection, table_name + str(index)) # Insert rows using CH query = instance.query - query("INSERT INTO `clickhouse_mysql`.{}(id, value) select number, concat('{} value ', toString(number)) from numbers(10000) ".format(table_name, table_name)) - assert query("SELECT count() FROM `clickhouse_mysql`.{}".format(table_name)).rstrip() == '10000' + query("INSERT INTO `clickhouse_mysql`.{}(id, value) select number, concat('{} value ', toString(number)) from numbers(10000) ".format(table_name + str(index), table_name + str(index))) + assert query("SELECT count() FROM `clickhouse_mysql`.{}".format(table_name + str(index))).rstrip() == '10000' mysql_connection.close() #Create CH Dictionary tables based on MySQL tables - query(create_clickhouse_dictionary_table_template.format(table_name + n), 'dict' + n) + query(create_clickhouse_dictionary_table_template.format(table_name + str(index), 'dict' + str(index))) def get_mysql_conn(): - conn = pymysql.connect(user='root', password='clickhouse', host='mysql1', port=3308) + conn = pymysql.connect(user='root', password='clickhouse', host='127.0.0.10', port=3308) return conn def create_mysql_table(conn, table_name): with conn.cursor() as cursor: cursor.execute(create_table_mysql_template.format(table_name)) -