Mutualization of MySQL connection + integration tests

This commit is contained in:
Clement Rodriguez 2019-10-10 18:10:46 +02:00
parent 2c7c4700e4
commit 72427b0683
9 changed files with 296 additions and 17 deletions

View File

@ -27,7 +27,7 @@ ExternalLoader::LoadablePtr ExternalDictionariesLoader::create(
return DictionaryFactory::instance().create(name, config, key_in_config, context);
}
void ExternalDictionaries::reload(const String & name, bool load_never_loading)
void ExternalDictionariesLoader::reload(const String & name, bool load_never_loading)
{
#if USE_MYSQL
mysqlxx::PoolFactory::instance().reset();
@ -35,7 +35,7 @@ void ExternalDictionaries::reload(const String & name, bool load_never_loading)
ExternalLoader::reload(name, load_never_loading);
}
void ExternalDictionaries::reload(bool load_never_loading)
void ExternalDictionariesLoader::reload(bool load_never_loading)
{
#if USE_MYSQL
mysqlxx::PoolFactory::instance().reset();

View File

@ -0,0 +1,30 @@
<?xml version="1.0"?>
<yandex>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
<size>1000M</size>
<count>10</count>
</logger>
<tcp_port>9000</tcp_port>
<listen_host>127.0.0.1</listen_host>
<openSSL>
<client>
<cacheSessions>true</cacheSessions>
<verificationMode>none</verificationMode>
<invalidCertificateHandler>
<name>AcceptCertificateHandler</name>
</invalidCertificateHandler>
</client>
</openSSL>
<max_concurrent_queries>500</max_concurrent_queries>
<mark_cache_size>5368709120</mark_cache_size>
<path>./clickhouse/</path>
<users_config>users.xml</users_config>
<dictionaries_config>/etc/clickhouse-server/config.d/*.xml</dictionaries_config>
</yandex>

View File

@ -0,0 +1,39 @@
<?xml version="1.0"?>
<yandex>
<dictionary>
<name>dict1</name>
<source>
<mysql >
<db>test</db>
<host>mysql1</host>
<port>3306</port>
<user>root</user>
<password>clickhouse</password>
<table>test1</table>
<close_connection>true</close_connection>
<share_connection>true</share_connection>
</mysql>
</source>
<layout>
<hashed/>
</layout>
<structure>
<id>
<name>id</name>
<type>UInt32</type>
<expression>CAST(id AS UNSIGNED)</expression>
</id>
<attribute>
<name>id</name>
<type>Int32</type>
<null_value></null_value>
</attribute>
<attribute>
<name>value</name>
<type>String</type>
<null_value>(UNDEFINED)</null_value>
</attribute>
</structure>
<lifetime>0</lifetime>
</dictionary>
</yandex>

View File

@ -0,0 +1,113 @@
<?xml version="1.0"?>
<yandex>
<dictionary>
<name>dict2</name>
<source>
<mysql >
<db>test</db>
<host>mysql1</host>
<port>3306</port>
<user>root</user>
<password>clickhouse</password>
<table>test2</table>
<close_connection>true</close_connection>
<share_connection>true</share_connection>
</mysql>
</source>
<layout>
<hashed/>
</layout>
<structure>
<id>
<name>id</name>
<type>UInt32</type>
<expression>CAST(id AS UNSIGNED)</expression>
</id>
<attribute>
<name>id</name>
<type>Int32</type>
<null_value></null_value>
</attribute>
<attribute>
<name>value</name>
<type>String</type>
<null_value>(UNDEFINED)</null_value>
</attribute>
</structure>
<lifetime>0</lifetime>
</dictionary>
<dictionary>
<name>dict3</name>
<source>
<mysql >
<db>test</db>
<host>mysql1</host>
<port>3306</port>
<user>root</user>
<password>clickhouse</password>
<table>test2</table>
<close_connection>true</close_connection>
<share_connection>true</share_connection>
</mysql>
</source>
<layout>
<hashed/>
</layout>
<structure>
<id>
<name>id</name>
<type>UInt32</type>
<expression>CAST(id AS UNSIGNED)</expression>
</id>
<attribute>
<name>id</name>
<type>Int32</type>
<null_value></null_value>
</attribute>
<attribute>
<name>value</name>
<type>String</type>
<null_value>(UNDEFINED)</null_value>
</attribute>
</structure>
<lifetime>0</lifetime>
</dictionary>
<dictionary>
<name>dict4</name>
<source>
<mysql >
<db>test</db>
<host>mysql1</host>
<port>3306</port>
<user>root</user>
<password>clickhouse</password>
<table>test2</table>
<close_connection>true</close_connection>
<share_connection>true</share_connection>
</mysql>
</source>
<layout>
<hashed/>
</layout>
<structure>
<id>
<name>id</name>
<type>UInt32</type>
<expression>CAST(id AS UNSIGNED)</expression>
</id>
<attribute>
<name>id</name>
<type>Int32</type>
<null_value></null_value>
</attribute>
<attribute>
<name>value</name>
<type>String</type>
<null_value>(UNDEFINED)</null_value>
</attribute>
</structure>
<lifetime>0</lifetime>
</dictionary>
</yandex>

View File

@ -0,0 +1,23 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
</users>
<quotas>
<default>
</default>
</quotas>
</yandex>

View File

@ -0,0 +1,89 @@
import pytest
import os
import time
## sudo -H pip install PyMySQL
import pymysql.cursors
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
DICTIONARY_FILES = ['configs/dictionaries/mysql_dict1.xml', 'configs/dictionaries/mysql_dict2.xml']
cluster = ClickHouseCluster(__file__, base_configs_dir=os.path.join(SCRIPT_DIR, 'configs'))
instance = cluster.add_instance('instance', main_configs=DICTIONARY_FILES)
create_table_mysql_template = """
CREATE TABLE `test`.`{}` (
`id` int(11) NOT NULL,
`value` varchar(50) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB;
"""
create_clickhouse_dictionary_table_template = """
CREATE TABLE `test`.`{}` (`id` Int32, `value` String) ENGINE = Dictionary({})
ORDER BY `id` DESC SETTINGS index_granularity = 8192
"""
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
instance.query("CREATE DATABASE IF NOT EXISTS test")
# Create a MySQL database
create_mysql_db(get_mysql_conn(), 'test')
instance.query("CREATE DATABASE clickhouse_mysql ENGINE = MySQL('mysql1:3306', 'test', 'root', 'clickhouse')")
yield cluster
finally:
cluster.shutdown()
def test_load_mysql_dictionaries(started_cluster):
# Load dictionaries
query = instance.query
query("SYSTEM RELOAD DICTIONARIES")
for n in range(0, 5):
# Create MySQL tables and fills them
prepare_mysql_table('test' + n)
#Create Dictionary tables based on MySQL tables
query(create_clickhouse_dictionary_table_template.format('test' + n), 'dict' + n)
# Check dictionaries are loaded and have correct number of elements
for n in range(0, 100):
if (n % 10) == 0:
# Force reload of dictionaries
query("SYSTEM RELOAD DICTIONARIES")
assert query("SELECT count() FROM `test`.{}".format('test' + (n % 5))).rstrip() == '10000'
def create_mysql_db(mysql_connection, name):
with mysql_connection.cursor() as cursor:
cursor.execute(
"CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(name))
def prepare_mysql_table(table_name):
mysql_connection = get_mysql_conn()
# Create table
create_mysql_table(mysql_connection, table_name)
# Insert rows using CH
query = instance.query
query("INSERT INTO `clickhouse_mysql`.{}(id, value) select number, concat('{} value ', toString(number)) from numbers(10000) ".format(table_name, table_name))
assert query("SELECT count() FROM `clickhouse_mysql`.{}".format(table_name)).rstrip() == '10000'
mysql_connection.close()
def get_mysql_conn():
conn = pymysql.connect(user='root', password='clickhouse', host='mysql1', port=3308)
return conn
def create_mysql_table(conn, table_name):
with conn.cursor() as cursor:
cursor.execute(create_table_mysql_template.format(table_name))

View File

@ -104,8 +104,6 @@ namespace mysqlxx
PoolWithFailover(const PoolWithFailover & other);
// PoolWithFailover & operator=(const PoolWithFailover &) = delete;
/** Allocates a connection to use. */
Entry Get();
};

View File

@ -77,43 +77,30 @@ PoolWithFailover PoolFactory::Get(const Poco::Util::AbstractConfiguration & conf
{
std::lock_guard<std::mutex> lock(impl->mutex);
Poco::Util::Application & app = Poco::Util::Application::instance();
app.logger().warning("Config name=" + config_name);
if (auto entry = impl->pools.find(config_name); entry != impl->pools.end())
{
app.logger().warning("Entry found=" + config_name);
return *(entry->second.get());
}
else
{
app.logger().warning("Searching confg=" + config_name);
std::string entry_name = getPoolEntryName(config, config_name);
app.logger().warning("Entry name created=" + entry_name);
if (auto id = impl->pools_by_ids.find(entry_name); id != impl->pools_by_ids.end())
{
app.logger().warning("found");
entry = impl->pools.find(id->second);
std::shared_ptr<PoolWithFailover> pool = entry->second;
impl->pools.insert_or_assign(config_name, pool);
app.logger().warning("found OK");
return *pool;
}
app.logger().warning("make pool");
auto pool = std::make_shared<PoolWithFailover>(config, config_name, default_connections, max_connections, max_tries);
app.logger().warning("make pool OK");
// Check the pool will be shared
if (!entry_name.empty())
{
// Store shared pool
app.logger().warning("store");
impl->pools.insert_or_assign(config_name, pool);
impl->pools_by_ids.insert_or_assign(entry_name, config_name);
app.logger().warning("store OK");
}
app.logger().warning("a2");
auto a2 = *(pool.get());
app.logger().warning("a2 OK");
return *(pool.get());
}
}