ClickHouse/tests/integration/test_dictionaries_mysql/test.py
2021-09-03 11:30:28 +03:00

260 lines
9.4 KiB
Python

## sudo -H pip install PyMySQL
import warnings
import pymysql.cursors
import pytest
from helpers.cluster import ClickHouseCluster
import time
import logging
DICTS = ['configs/dictionaries/mysql_dict1.xml', 'configs/dictionaries/mysql_dict2.xml']
CONFIG_FILES = ['configs/remote_servers.xml', 'configs/named_collections.xml']
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance', main_configs=CONFIG_FILES, with_mysql=True, dictionaries=DICTS)
create_table_mysql_template = """
CREATE TABLE IF NOT EXISTS `test`.`{}` (
`id` int(11) NOT NULL,
`value` varchar(50) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB;
"""
create_clickhouse_dictionary_table_template = """
CREATE TABLE IF NOT EXISTS `test`.`dict_table_{}` (`id` UInt64, `value` String) ENGINE = Dictionary({})
"""
@pytest.fixture(scope="module")
def started_cluster():
try:
# time.sleep(30)
cluster.start()
# Create a MySQL database
mysql_connection = get_mysql_conn(cluster)
create_mysql_db(mysql_connection, 'test')
mysql_connection.close()
# Create database in ClickHouse
instance.query("CREATE DATABASE IF NOT EXISTS test")
# Create database in ClickChouse using MySQL protocol (will be used for data insertion)
instance.query("CREATE DATABASE clickhouse_mysql ENGINE = MySQL('mysql57:3306', 'test', 'root', 'clickhouse')")
yield cluster
finally:
cluster.shutdown()
def test_mysql_dictionaries_custom_query_full_load(started_cluster):
mysql_connection = get_mysql_conn(started_cluster)
execute_mysql_query(mysql_connection, "CREATE TABLE IF NOT EXISTS test.test_table_1 (id Integer, value_1 Text);")
execute_mysql_query(mysql_connection, "CREATE TABLE IF NOT EXISTS test.test_table_2 (id Integer, value_2 Text);")
execute_mysql_query(mysql_connection, "INSERT INTO test.test_table_1 VALUES (1, 'Value_1');")
execute_mysql_query(mysql_connection, "INSERT INTO test.test_table_2 VALUES (1, 'Value_2');")
query = instance.query
query("""
CREATE DICTIONARY test_dictionary_custom_query
(
id UInt64,
value_1 String,
value_2 String
)
PRIMARY KEY id
LAYOUT(FLAT())
SOURCE(MYSQL(
HOST 'mysql57'
PORT 3306
USER 'root'
PASSWORD 'clickhouse'
QUERY $doc$SELECT id, value_1, value_2 FROM test.test_table_1 INNER JOIN test.test_table_2 USING (id);$doc$))
LIFETIME(0)
""")
result = query("SELECT id, value_1, value_2 FROM test_dictionary_custom_query")
assert result == '1\tValue_1\tValue_2\n'
query("DROP DICTIONARY test_dictionary_custom_query;")
execute_mysql_query(mysql_connection, "DROP TABLE test.test_table_1;")
execute_mysql_query(mysql_connection, "DROP TABLE test.test_table_2;")
def test_mysql_dictionaries_custom_query_partial_load_simple_key(started_cluster):
mysql_connection = get_mysql_conn(started_cluster)
execute_mysql_query(mysql_connection, "CREATE TABLE IF NOT EXISTS test.test_table_1 (id Integer, value_1 Text);")
execute_mysql_query(mysql_connection, "CREATE TABLE IF NOT EXISTS test.test_table_2 (id Integer, value_2 Text);")
execute_mysql_query(mysql_connection, "INSERT INTO test.test_table_1 VALUES (1, 'Value_1');")
execute_mysql_query(mysql_connection, "INSERT INTO test.test_table_2 VALUES (1, 'Value_2');")
query = instance.query
query("""
CREATE DICTIONARY test_dictionary_custom_query
(
id UInt64,
value_1 String,
value_2 String
)
PRIMARY KEY id
LAYOUT(DIRECT())
SOURCE(MYSQL(
HOST 'mysql57'
PORT 3306
USER 'root'
PASSWORD 'clickhouse'
QUERY $doc$SELECT id, value_1, value_2 FROM test.test_table_1 INNER JOIN test.test_table_2 USING (id) WHERE {condition};$doc$))
""")
result = query("SELECT dictGet('test_dictionary_custom_query', ('value_1', 'value_2'), toUInt64(1))")
assert result == "('Value_1','Value_2')\n"
query("DROP DICTIONARY test_dictionary_custom_query;")
execute_mysql_query(mysql_connection, "DROP TABLE test.test_table_1;")
execute_mysql_query(mysql_connection, "DROP TABLE test.test_table_2;")
def test_mysql_dictionaries_custom_query_partial_load_complex_key(started_cluster):
mysql_connection = get_mysql_conn(started_cluster)
execute_mysql_query(mysql_connection, "CREATE TABLE IF NOT EXISTS test.test_table_1 (id Integer, id_key Text, value_1 Text);")
execute_mysql_query(mysql_connection, "CREATE TABLE IF NOT EXISTS test.test_table_2 (id Integer, id_key Text, value_2 Text);")
execute_mysql_query(mysql_connection, "INSERT INTO test.test_table_1 VALUES (1, 'Key', 'Value_1');")
execute_mysql_query(mysql_connection, "INSERT INTO test.test_table_2 VALUES (1, 'Key', 'Value_2');")
query = instance.query
query("""
CREATE DICTIONARY test_dictionary_custom_query
(
id UInt64,
id_key String,
value_1 String,
value_2 String
)
PRIMARY KEY id, id_key
LAYOUT(COMPLEX_KEY_DIRECT())
SOURCE(MYSQL(
HOST 'mysql57'
PORT 3306
USER 'root'
PASSWORD 'clickhouse'
QUERY $doc$SELECT id, id_key, value_1, value_2 FROM test.test_table_1 INNER JOIN test.test_table_2 USING (id, id_key) WHERE {condition};$doc$))
""")
result = query("SELECT dictGet('test_dictionary_custom_query', ('value_1', 'value_2'), (toUInt64(1), 'Key'))")
assert result == "('Value_1','Value_2')\n"
query("DROP DICTIONARY test_dictionary_custom_query;")
execute_mysql_query(mysql_connection, "DROP TABLE test.test_table_1;")
execute_mysql_query(mysql_connection, "DROP TABLE test.test_table_2;")
def test_predefined_connection_configuration(started_cluster):
mysql_connection = get_mysql_conn(started_cluster)
execute_mysql_query(mysql_connection, "DROP TABLE IF EXISTS test.test_table")
execute_mysql_query(mysql_connection, "CREATE TABLE IF NOT EXISTS test.test_table (id Integer, value Integer);")
execute_mysql_query(mysql_connection, "INSERT INTO test.test_table VALUES (100, 200);")
instance.query('''
DROP DICTIONARY IF EXISTS dict;
CREATE DICTIONARY dict (id UInt32, value UInt32)
PRIMARY KEY id
SOURCE(MYSQL(NAME mysql1))
LIFETIME(MIN 1 MAX 2)
LAYOUT(HASHED());
''')
result = instance.query("SELECT dictGetUInt32(dict, 'value', toUInt64(100))")
assert(int(result) == 200)
instance.query('''
DROP DICTIONARY dict;
CREATE DICTIONARY dict (id UInt32, value UInt32)
PRIMARY KEY id
SOURCE(MYSQL(NAME mysql2))
LIFETIME(MIN 1 MAX 2)
LAYOUT(HASHED());
''')
result = instance.query_and_get_error("SELECT dictGetUInt32(dict, 'value', toUInt64(100))")
instance.query('''
DROP DICTIONARY dict;
CREATE DICTIONARY dict (id UInt32, value UInt32)
PRIMARY KEY id
SOURCE(MYSQL(NAME unknown_collection))
LIFETIME(MIN 1 MAX 2)
LAYOUT(HASHED());
''')
result = instance.query_and_get_error("SELECT dictGetUInt32(dict, 'value', toUInt64(100))")
instance.query('''
DROP DICTIONARY dict;
CREATE DICTIONARY dict (id UInt32, value UInt32)
PRIMARY KEY id
SOURCE(MYSQL(NAME mysql3 PORT 3306))
LIFETIME(MIN 1 MAX 2)
LAYOUT(HASHED());
''')
result = instance.query("SELECT dictGetUInt32(dict, 'value', toUInt64(100))")
assert(int(result) == 200)
def create_mysql_db(mysql_connection, name):
with mysql_connection.cursor() as cursor:
cursor.execute("DROP DATABASE IF EXISTS {}".format(name))
cursor.execute("CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(name))
def prepare_mysql_table(started_cluster, table_name, index):
mysql_connection = get_mysql_conn(started_cluster)
# Create table
create_mysql_table(mysql_connection, table_name + str(index))
# Insert rows using CH
query = instance.query
query(
"INSERT INTO `clickhouse_mysql`.{}(id, value) select number, concat('{} value ', toString(number)) from numbers(10000) ".format(
table_name + str(index), table_name + str(index)))
assert query("SELECT count() FROM `clickhouse_mysql`.{}".format(table_name + str(index))).rstrip() == '10000'
mysql_connection.close()
# Create CH Dictionary tables based on MySQL tables
query(create_clickhouse_dictionary_table_template.format(table_name + str(index), 'dict' + str(index)))
def get_mysql_conn(started_cluster):
errors = []
conn = None
for _ in range(5):
try:
if conn is None:
conn = pymysql.connect(user='root', password='clickhouse', host=started_cluster.mysql_ip, port=started_cluster.mysql_port)
else:
conn.ping(reconnect=True)
logging.debug(f"MySQL Connection establised: {started_cluster.mysql_ip}:{started_cluster.mysql_port}")
return conn
except Exception as e:
errors += [str(e)]
time.sleep(1)
raise Exception("Connection not establised, {}".format(errors))
def execute_mysql_query(connection, query):
logging.debug("Execute MySQL query:{}".format(query))
with warnings.catch_warnings():
warnings.simplefilter("ignore")
with connection.cursor() as cursor:
cursor.execute(query)
connection.commit()
def create_mysql_table(conn, table_name):
with conn.cursor() as cursor:
cursor.execute(create_table_mysql_template.format(table_name))