MySQL connection mutualize connection + integration test

This commit is contained in:
Clément Rodriguez 2020-02-27 10:34:06 +01:00
parent 07a970bfd0
commit 491f454b54
14 changed files with 299 additions and 42 deletions

View File

@ -8,6 +8,7 @@ add_library (mysqlxx
src/Row.cpp
src/Value.cpp
src/Pool.cpp
src/PoolFactory.cpp
src/PoolWithFailover.cpp
include/mysqlxx/Connection.h
@ -15,6 +16,7 @@ add_library (mysqlxx
include/mysqlxx/mysqlxx.h
include/mysqlxx/Null.h
include/mysqlxx/Pool.h
include/mysqlxx/PoolFactory.h
include/mysqlxx/PoolWithFailover.h
include/mysqlxx/Query.h
include/mysqlxx/ResultBase.h

View File

@ -198,6 +198,8 @@ public:
return description;
}
void removeConnection(Connection* data);
protected:
/// Number of MySQL connections which are created at launch.
unsigned default_connections;

View File

@ -0,0 +1,51 @@
#pragma once
#include <mutex>
#include <memory>
#include <boost/noncopyable.hpp>
#include "PoolWithFailover.h"
#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS 1
#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS 16
#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES 3
namespace mysqlxx
{
/*
* PoolFactory.h
* This class is a helper singleton to mutualize connections to MySQL.
*/
class PoolFactory final : private boost::noncopyable
{
public:
static PoolFactory & instance();
PoolFactory(const PoolFactory &) = delete;
/** Allocates a PoolWithFailover to connect to MySQL. */
PoolWithFailover Get(const std::string & config_name,
unsigned default_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,
unsigned max_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS,
size_t max_tries = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
/** Allocates a PoolWithFailover to connect to MySQL. */
PoolWithFailover Get(const Poco::Util::AbstractConfiguration & config,
const std::string & config_name,
unsigned default_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,
unsigned max_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS,
size_t max_tries = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
void reset();
~PoolFactory() = default;
PoolFactory& operator=(const PoolFactory &) = delete;
private:
PoolFactory();
struct Impl;
std::unique_ptr<Impl> impl;
};
}

View File

@ -239,7 +239,8 @@ template <> inline bool Value::get<bool >() cons
template <> inline char Value::get<char >() const { return getInt(); }
template <> inline signed char Value::get<signed char >() const { return getInt(); }
template <> inline unsigned char Value::get<unsigned char >() const { return getUInt(); }
template <> inline char8_t Value::get<char8_t >() const { return getUInt(); }
// crodriguez uncomment
//template <> inline char8_t Value::get<char8_t >() const { return getUInt(); }
template <> inline short Value::get<short >() const { return getInt(); }
template <> inline unsigned short Value::get<unsigned short >() const { return getUInt(); }
template <> inline int Value::get<int >() const { return getInt(); }

View File

@ -21,16 +21,20 @@ void Pool::Entry::incrementRefCount()
{
if (!data)
return;
++data->ref_count;
mysql_thread_init();
++(data->ref_count);
if(data->ref_count==1)
mysql_thread_init();
}
void Pool::Entry::decrementRefCount()
{
if (!data)
return;
--data->ref_count;
mysql_thread_end();
if (data->ref_count > 0) {
--(data->ref_count);
if (data->ref_count==0)
mysql_thread_end();
}
}
@ -169,14 +173,24 @@ Pool::Entry Pool::tryGet()
return Entry();
}
void Pool::removeConnection(Connection* connection)
{
std::lock_guard<std::mutex> lock(mutex);
if (connection)
{
if (connection->ref_count > 0)
{
connection->conn.disconnect();
connection->ref_count = 0;
}
connections.remove(connection);
}
}
void Pool::Entry::disconnect()
{
if (data)
{
decrementRefCount();
data->conn.disconnect();
}
pool->removeConnection(data);
}

View File

@ -0,0 +1,122 @@
#include <mysqlxx/PoolFactory.h>
#include <Poco/Util/Application.h>
#include <Poco/Util/LayeredConfiguration.h>
namespace mysqlxx
{
struct PoolFactory::Impl
{
// Cache of already affected pools identified by their config name
std::map<std::string, std::shared_ptr<PoolWithFailover>> pools;
// Cache of Pool ID (host + port + user +...) cibling already established shareable pool
std::map<std::string, std::string> pools_by_ids;
/// Protect pools and pools_by_ids caches
std::mutex mutex;
};
PoolWithFailover PoolFactory::Get(const std::string & config_name, unsigned default_connections,
unsigned max_connections, size_t max_tries)
{
return Get(Poco::Util::Application::instance().config(), config_name, default_connections, max_connections, max_tries);
}
/// Duplicate of code from StringUtils.h. Copied here for less dependencies.
static bool startsWith(const std::string & s, const char * prefix)
{
return s.size() >= strlen(prefix) && 0 == memcmp(s.data(), prefix, strlen(prefix));
}
static std::string getPoolEntryName(const Poco::Util::AbstractConfiguration & config,
const std::string & config_name)
{
bool shared = config.getBool(config_name + ".share_connection", false);
// Not shared no need to generate a name the pool won't be stored
if (!shared)
return "";
std::string entry_name = "";
std::string host = config.getString(config_name + ".host", "");
std::string port = config.getString(config_name + ".port", "");
std::string user = config.getString(config_name + ".user", "");
std::string db = config.getString(config_name + ".db", "");
std::string table = config.getString(config_name + ".table", "");
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_name, keys);
if (config.has(config_name + ".replica"))
{
Poco::Util::AbstractConfiguration::Keys replica_keys;
config.keys(config_name, replica_keys);
for (const auto & replica_config_key : replica_keys)
{
/// There could be another elements in the same level in configuration file, like "user", "port"...
if (startsWith(replica_config_key, "replica"))
{
std::string replica_name = config_name + "." + replica_config_key;
std::string tmp_host = config.getString(replica_name + ".host", host);
std::string tmp_port = config.getString(replica_name + ".port", port);
std::string tmp_user = config.getString(replica_name + ".user", user);
entry_name += (entry_name.empty() ? "" : "|") + tmp_user + "@" + tmp_host + ":" + tmp_port + "/" + db;
}
}
}
else
{
entry_name = user + "@" + host + ":" + port + "/" + db;
}
return entry_name;
}
PoolWithFailover PoolFactory::Get(const Poco::Util::AbstractConfiguration & config,
const std::string & config_name, unsigned default_connections, unsigned max_connections, size_t max_tries)
{
std::lock_guard<std::mutex> lock(impl->mutex);
if (auto entry = impl->pools.find(config_name); entry != impl->pools.end())
{
return *(entry->second.get());
}
else
{
std::string entry_name = getPoolEntryName(config, config_name);
if (auto id = impl->pools_by_ids.find(entry_name); id != impl->pools_by_ids.end())
{
entry = impl->pools.find(id->second);
std::shared_ptr<PoolWithFailover> pool = entry->second;
impl->pools.insert_or_assign(config_name, pool);
return *pool;
}
auto pool = std::make_shared<PoolWithFailover>(config, config_name, default_connections, max_connections, max_tries);
// Check the pool will be shared
if (!entry_name.empty())
{
// Store shared pool
impl->pools.insert_or_assign(config_name, pool);
impl->pools_by_ids.insert_or_assign(entry_name, config_name);
}
return *(pool.get());
}
}
void PoolFactory::reset()
{
std::lock_guard<std::mutex> lock(impl->mutex);
impl->pools.clear();
impl->pools_by_ids.clear();
}
PoolFactory::PoolFactory() : impl(std::make_unique<PoolFactory::Impl>()) {}
PoolFactory & PoolFactory::instance()
{
static PoolFactory ret;
return ret;
}
}

View File

@ -15,6 +15,7 @@ PoolWithFailover::PoolWithFailover(const Poco::Util::AbstractConfiguration & cfg
const unsigned max_connections, const size_t max_tries)
: max_tries(max_tries)
{
shareable = cfg.getBool(config_name + ".share_connection", false);
if (cfg.has(config_name + ".replica"))
{
Poco::Util::AbstractConfiguration::Keys replica_keys;
@ -48,7 +49,7 @@ PoolWithFailover::PoolWithFailover(const std::string & config_name, const unsign
{}
PoolWithFailover::PoolWithFailover(const PoolWithFailover & other)
: max_tries{other.max_tries}, config_name{other.config_name}
: max_tries{other.max_tries}, config_name{other.config_name}, shareable{other.shareable}
{
if (shareable)
{
@ -88,7 +89,7 @@ PoolWithFailover::Entry PoolWithFailover::Get()
try
{
Entry entry = pool->tryGet();
Entry entry = shareable ? pool->Get() : pool->tryGet();
if (!entry.isNull())
{

View File

@ -46,6 +46,7 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory)
# include <common/logger_useful.h>
# include <Formats/MySQLBlockInputStream.h>
# include "readInvalidateQuery.h"
# include <mysqlxx/PoolFactory.h>
namespace DB
{
@ -66,11 +67,11 @@ MySQLDictionarySource::MySQLDictionarySource(
, update_field{config.getString(config_prefix + ".update_field", "")}
, dont_check_update_time{config.getBool(config_prefix + ".dont_check_update_time", false)}
, sample_block{sample_block_}
, pool{config, config_prefix}
, pool{mysqlxx::PoolFactory::instance().Get(config, config_prefix)}
, query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::Backticks}
, load_all_query{query_builder.composeLoadAllQuery()}
, invalidate_query{config.getString(config_prefix + ".invalidate_query", "")}
, close_connection{config.getBool(config_prefix + ".close_connection", false)}
, close_connection{config.getBool(config_prefix + ".close_connection", false) || config.getBool(config_prefix + ".share_connection", false)}
{
}
@ -114,19 +115,21 @@ std::string MySQLDictionarySource::getUpdateFieldAndDate()
BlockInputStreamPtr MySQLDictionarySource::loadAll()
{
last_modification = getLastModification();
auto connection = pool.Get();
last_modification = getLastModification(connection, false);
LOG_TRACE(log, load_all_query);
return std::make_shared<MySQLBlockInputStream>(pool.Get(), load_all_query, sample_block, max_block_size, close_connection);
return std::make_shared<MySQLBlockInputStream>(connection, load_all_query, sample_block, max_block_size, close_connection);
}
BlockInputStreamPtr MySQLDictionarySource::loadUpdatedAll()
{
last_modification = getLastModification();
auto connection = pool.Get();
last_modification = getLastModification(connection, false);
std::string load_update_query = getUpdateFieldAndDate();
LOG_TRACE(log, load_update_query);
return std::make_shared<MySQLBlockInputStream>(pool.Get(), load_update_query, sample_block, max_block_size, close_connection);
return std::make_shared<MySQLBlockInputStream>(connection, load_update_query, sample_block, max_block_size, close_connection);
}
BlockInputStreamPtr MySQLDictionarySource::loadIds(const std::vector<UInt64> & ids)
@ -158,8 +161,8 @@ bool MySQLDictionarySource::isModified() const
if (dont_check_update_time)
return true;
return getLastModification() > last_modification;
auto connection = pool.Get();
return getLastModification(connection, true) > last_modification;
}
bool MySQLDictionarySource::supportsSelectiveLoad() const
@ -199,7 +202,7 @@ std::string MySQLDictionarySource::quoteForLike(const std::string s)
return out.str();
}
LocalDateTime MySQLDictionarySource::getLastModification() const
LocalDateTime MySQLDictionarySource::getLastModification(mysqlxx::Pool::Entry & connection, bool allow_connection_closure) const
{
LocalDateTime modification_time{std::time(nullptr)};
@ -208,7 +211,6 @@ LocalDateTime MySQLDictionarySource::getLastModification() const
try
{
auto connection = pool.Get();
auto query = connection->query("SHOW TABLE STATUS LIKE " + quoteForLike(table));
LOG_TRACE(log, query.str());
@ -233,6 +235,11 @@ LocalDateTime MySQLDictionarySource::getLastModification() const
++fetched_rows;
}
if (close_connection && allow_connection_closure)
{
connection.disconnect();
}
if (0 == fetched_rows)
LOG_ERROR(log, "Cannot find table in SHOW TABLE STATUS result.");
@ -243,7 +250,6 @@ LocalDateTime MySQLDictionarySource::getLastModification() const
{
tryLogCurrentException("MySQLDictionarySource");
}
/// we suppose failure to get modification time is not an error, therefore return current time
return modification_time;
}

View File

@ -62,7 +62,7 @@ private:
static std::string quoteForLike(const std::string s);
LocalDateTime getLastModification() const;
LocalDateTime getLastModification(mysqlxx::Pool::Entry & connection, bool allow_connection_closure) const;
// execute invalidate_query. expects single cell in result
std::string doInvalidateQuery(const std::string & request) const;

View File

@ -28,4 +28,12 @@ ExternalLoader::LoadablePtr ExternalDictionariesLoader::create(
bool dictionary_from_database = !repository_name.empty();
return DictionaryFactory::instance().create(name, config, key_in_config, context, dictionary_from_database);
}
void ExternalDictionariesLoader::resetAll()
{
#if USE_MYSQL
mysqlxx::PoolFactory::instance().reset();
#endif
}
}

View File

@ -28,6 +28,8 @@ public:
return std::static_pointer_cast<const IDictionaryBase>(tryLoad(name));
}
static void resetAll();
protected:
LoadablePtr create(const std::string & name, const Poco::Util::AbstractConfiguration & config,
const std::string & key_in_config, const std::string & repository_name) const override;

View File

@ -1,5 +1,41 @@
<?xml version="1.0"?>
<yandex>
<dictionary>
<name>dict0</name>
<source>
<mysql >
<db>test</db>
<host>mysql1</host>
<port>3306</port>
<user>root</user>
<password>clickhouse</password>
<table>test0</table>
<close_connection>true</close_connection>
<share_connection>true</share_connection>
</mysql>
</source>
<layout>
<hashed/>
</layout>
<structure>
<id>
<name>id</name>
<type>UInt32</type>
<expression>CAST(id AS UNSIGNED)</expression>
</id>
<attribute>
<name>id</name>
<type>Int32</type>
<null_value></null_value>
</attribute>
<attribute>
<name>value</name>
<type>String</type>
<null_value>(UNDEFINED)</null_value>
</attribute>
</structure>
<lifetime>0</lifetime>
</dictionary>
<dictionary>
<name>dict1</name>
<source>

View File

@ -0,0 +1,12 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>instance</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>

View File

@ -9,13 +9,13 @@ from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
DICTIONARY_FILES = ['configs/dictionaries/mysql_dict1.xml', 'configs/dictionaries/mysql_dict2.xml']
CONFIG_FILES = ['configs/dictionaries/mysql_dict1.xml', 'configs/dictionaries/mysql_dict2.xml', 'configs/remote_servers.xml']
cluster = ClickHouseCluster(__file__, base_configs_dir=os.path.join(SCRIPT_DIR, 'configs'))
instance = cluster.add_instance('instance', main_configs=DICTIONARY_FILES)
instance = cluster.add_instance('instance', main_configs=CONFIG_FILES, with_mysql = True)
create_table_mysql_template = """
CREATE TABLE `test`.`{}` (
CREATE TABLE IF NOT EXISTS `test`.`{}` (
`id` int(11) NOT NULL,
`value` varchar(50) NOT NULL,
PRIMARY KEY (`id`)
@ -23,17 +23,19 @@ create_table_mysql_template = """
"""
create_clickhouse_dictionary_table_template = """
CREATE TABLE `test`.`dict_table_{}` (`id` Int32, `value` String) ENGINE = Dictionary({})
ORDER BY `id` DESC SETTINGS index_granularity = 8192
CREATE TABLE IF NOT EXISTS `test`.`dict_table_{}` (`id` Int32, `value` String) ENGINE = Dictionary({})
"""
@pytest.fixture(scope="module")
def started_cluster():
try:
#time.sleep(30)
cluster.start()
# Create a MySQL database
create_mysql_db(get_mysql_conn(), 'test')
mysql_connection = get_mysql_conn()
create_mysql_db(mysql_connection, 'test')
mysql_connection.close()
# Create database in ClickHouse
instance.query("CREATE DATABASE IF NOT EXISTS test")
@ -54,7 +56,7 @@ def test_load_mysql_dictionaries(started_cluster):
for n in range(0, 5):
# Create MySQL tables, fill them and create CH dict tables
prepare_tables('test' + n)
prepare_mysql_table('test', str(n))
# Check dictionaries are loaded and have correct number of elements
for n in range(0, 100):
@ -62,34 +64,32 @@ def test_load_mysql_dictionaries(started_cluster):
if (n % 10) == 0:
query("SYSTEM RELOAD DICTIONARIES")
# Check number of rows
assert query("SELECT count() FROM `dict_table_`.{}".format('test' + (n % 5))).rstrip() == '10000'
# Check number of row
assert query("SELECT count() FROM `test`.`dict_table_{}`".format('test' + str(n % 5))).rstrip() == '10000'
def create_mysql_db(mysql_connection, name):
with mysql_connection.cursor() as cursor:
cursor.execute(
"CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(name))
cursor.execute("CREATE DATABASE IF NOT EXISTS {} DEFAULT CHARACTER SET 'utf8'".format(name))
def prepare_mysql_table(table_name):
def prepare_mysql_table(table_name, index):
mysql_connection = get_mysql_conn()
# Create table
create_mysql_table(mysql_connection, table_name)
create_mysql_table(mysql_connection, table_name + str(index))
# Insert rows using CH
query = instance.query
query("INSERT INTO `clickhouse_mysql`.{}(id, value) select number, concat('{} value ', toString(number)) from numbers(10000) ".format(table_name, table_name))
assert query("SELECT count() FROM `clickhouse_mysql`.{}".format(table_name)).rstrip() == '10000'
query("INSERT INTO `clickhouse_mysql`.{}(id, value) select number, concat('{} value ', toString(number)) from numbers(10000) ".format(table_name + str(index), table_name + str(index)))
assert query("SELECT count() FROM `clickhouse_mysql`.{}".format(table_name + str(index))).rstrip() == '10000'
mysql_connection.close()
#Create CH Dictionary tables based on MySQL tables
query(create_clickhouse_dictionary_table_template.format(table_name + n), 'dict' + n)
query(create_clickhouse_dictionary_table_template.format(table_name + str(index), 'dict' + str(index)))
def get_mysql_conn():
conn = pymysql.connect(user='root', password='clickhouse', host='mysql1', port=3308)
conn = pymysql.connect(user='root', password='clickhouse', host='127.0.0.10', port=3308)
return conn
def create_mysql_table(conn, table_name):
with conn.cursor() as cursor:
cursor.execute(create_table_mysql_template.format(table_name))