ClickHouse/tests/integration/test_dictionaries_mysql/test.py

112 lines
3.9 KiB
Python
Raw Normal View History

## sudo -H pip install PyMySQL
import pymysql.cursors
import pytest
from helpers.cluster import ClickHouseCluster
import time
import logging
CONFIG_FILES = ['configs/dictionaries/mysql_dict1.xml', 'configs/dictionaries/mysql_dict2.xml',
'configs/remote_servers.xml']
2020-10-08 15:50:09 +00:00
CONFIG_FILES += ['configs/enable_dictionaries.xml', 'configs/log_conf.xml']
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance', main_configs=CONFIG_FILES, with_mysql=True)
create_table_mysql_template = """
CREATE TABLE IF NOT EXISTS `test`.`{}` (
`id` int(11) NOT NULL,
`value` varchar(50) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB;
"""
create_clickhouse_dictionary_table_template = """
2020-04-15 17:52:05 +00:00
CREATE TABLE IF NOT EXISTS `test`.`dict_table_{}` (`id` UInt64, `value` String) ENGINE = Dictionary({})
"""
@pytest.fixture(scope="module")
def started_cluster():
try:
# time.sleep(30)
cluster.start()
2020-04-15 17:52:05 +00:00
# Create a MySQL database
2021-03-16 10:00:49 +00:00
mysql_connection = get_mysql_conn(cluster)
create_mysql_db(mysql_connection, 'test')
mysql_connection.close()
2020-04-15 17:52:05 +00:00
2019-10-11 07:25:22 +00:00
# Create database in ClickHouse
instance.query("CREATE DATABASE IF NOT EXISTS test")
2020-04-15 17:52:05 +00:00
2019-10-11 07:25:22 +00:00
# Create database in ClickChouse using MySQL protocol (will be used for data insertion)
2021-02-15 09:35:45 +00:00
instance.query("CREATE DATABASE clickhouse_mysql ENGINE = MySQL('mysql57:3306', 'test', 'root', 'clickhouse')")
2020-04-15 17:52:05 +00:00
yield cluster
finally:
cluster.shutdown()
def test_load_mysql_dictionaries(started_cluster):
# Load dictionaries
query = instance.query
query("SYSTEM RELOAD DICTIONARIES")
2020-04-15 17:52:05 +00:00
for n in range(0, 5):
2019-10-11 07:25:22 +00:00
# Create MySQL tables, fill them and create CH dict tables
2021-03-16 10:00:49 +00:00
prepare_mysql_table(started_cluster, 'test', str(n))
2020-04-15 17:52:05 +00:00
# Check dictionaries are loaded and have correct number of elements
for n in range(0, 100):
2019-10-11 07:25:22 +00:00
# Force reload of dictionaries (each 10 iteration)
if (n % 10) == 0:
query("SYSTEM RELOAD DICTIONARIES")
2019-10-11 07:25:22 +00:00
# Check number of row
assert query("SELECT count() FROM `test`.`dict_table_{}`".format('test' + str(n % 5))).rstrip() == '10000'
def create_mysql_db(mysql_connection, name):
with mysql_connection.cursor() as cursor:
2021-02-15 09:35:45 +00:00
cursor.execute("DROP DATABASE IF EXISTS {}".format(name))
cursor.execute("CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(name))
2021-03-16 10:00:49 +00:00
def prepare_mysql_table(started_cluster, table_name, index):
mysql_connection = get_mysql_conn(started_cluster)
2020-04-15 17:52:05 +00:00
# Create table
create_mysql_table(mysql_connection, table_name + str(index))
# Insert rows using CH
query = instance.query
query(
"INSERT INTO `clickhouse_mysql`.{}(id, value) select number, concat('{} value ', toString(number)) from numbers(10000) ".format(
table_name + str(index), table_name + str(index)))
assert query("SELECT count() FROM `clickhouse_mysql`.{}".format(table_name + str(index))).rstrip() == '10000'
mysql_connection.close()
2020-04-15 17:52:05 +00:00
# Create CH Dictionary tables based on MySQL tables
query(create_clickhouse_dictionary_table_template.format(table_name + str(index), 'dict' + str(index)))
2021-03-16 10:00:49 +00:00
def get_mysql_conn(started_cluster):
errors = []
conn = None
for _ in range(5):
try:
if conn is None:
2021-03-16 10:00:49 +00:00
conn = pymysql.connect(user='root', password='clickhouse', host=started_cluster.mysql_ip, port=started_cluster.mysql_port)
else:
conn.ping(reconnect=True)
2021-03-16 10:00:49 +00:00
logging.debug(f"MySQL Connection establised: {started_cluster.mysql_ip}:{started_cluster.mysql_port}")
return conn
except Exception as e:
errors += [str(e)]
time.sleep(1)
raise Exception("Connection not establised, {}".format(errors))
def create_mysql_table(conn, table_name):
with conn.cursor() as cursor:
cursor.execute(create_table_mysql_template.format(table_name))