mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-11 17:02:25 +00:00
437 lines
13 KiB
Python
437 lines
13 KiB
Python
## sudo -H pip install PyMySQL
|
|
import warnings
|
|
import pymysql.cursors
|
|
import pytest
|
|
from helpers.cluster import ClickHouseCluster
|
|
import time
|
|
import logging
|
|
|
|
DICTS = ["configs/dictionaries/mysql_dict1.xml", "configs/dictionaries/mysql_dict2.xml"]
|
|
CONFIG_FILES = ["configs/remote_servers.xml", "configs/named_collections.xml"]
|
|
USER_CONFIGS = ["configs/users.xml"]
|
|
cluster = ClickHouseCluster(__file__)
|
|
instance = cluster.add_instance(
|
|
"instance",
|
|
main_configs=CONFIG_FILES,
|
|
user_configs=USER_CONFIGS,
|
|
with_mysql8=True,
|
|
dictionaries=DICTS,
|
|
)
|
|
|
|
create_table_mysql_template = """
|
|
CREATE TABLE IF NOT EXISTS `test`.`{}` (
|
|
`id` int(11) NOT NULL,
|
|
`value` varchar(50) NOT NULL,
|
|
PRIMARY KEY (`id`)
|
|
) ENGINE=InnoDB;
|
|
"""
|
|
|
|
create_clickhouse_dictionary_table_template = """
|
|
CREATE TABLE IF NOT EXISTS `test`.`dict_table_{}` (`id` UInt64, `value` String) ENGINE = Dictionary({})
|
|
"""
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def started_cluster():
|
|
try:
|
|
# time.sleep(30)
|
|
cluster.start()
|
|
|
|
# Create a MySQL database
|
|
mysql_connection = get_mysql_conn(cluster)
|
|
create_mysql_db(mysql_connection, "test")
|
|
mysql_connection.close()
|
|
|
|
# Create database in ClickHouse
|
|
instance.query("CREATE DATABASE IF NOT EXISTS test")
|
|
|
|
# Create database in ClickChouse using MySQL protocol (will be used for data insertion)
|
|
instance.query(
|
|
"CREATE DATABASE clickhouse_mysql ENGINE = MySQL('mysql80:3306', 'test', 'root', 'clickhouse')"
|
|
)
|
|
|
|
yield cluster
|
|
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def test_mysql_dictionaries_custom_query_full_load(started_cluster):
|
|
mysql_connection = get_mysql_conn(started_cluster)
|
|
|
|
execute_mysql_query(
|
|
mysql_connection,
|
|
"CREATE TABLE IF NOT EXISTS test.test_table_1 (id Integer, value_1 Text);",
|
|
)
|
|
execute_mysql_query(
|
|
mysql_connection,
|
|
"CREATE TABLE IF NOT EXISTS test.test_table_2 (id Integer, value_2 Text);",
|
|
)
|
|
execute_mysql_query(
|
|
mysql_connection, "INSERT INTO test.test_table_1 VALUES (1, 'Value_1');"
|
|
)
|
|
execute_mysql_query(
|
|
mysql_connection, "INSERT INTO test.test_table_2 VALUES (1, 'Value_2');"
|
|
)
|
|
|
|
query = instance.query
|
|
query(
|
|
f"""
|
|
CREATE DICTIONARY test_dictionary_custom_query
|
|
(
|
|
id UInt64,
|
|
value_1 String,
|
|
value_2 String
|
|
)
|
|
PRIMARY KEY id
|
|
LAYOUT(FLAT())
|
|
SOURCE(MYSQL(
|
|
HOST 'mysql80'
|
|
PORT 3306
|
|
USER 'root'
|
|
PASSWORD 'clickhouse'
|
|
QUERY $doc$SELECT id, value_1, value_2 FROM test.test_table_1 INNER JOIN test.test_table_2 USING (id);$doc$))
|
|
LIFETIME(0)
|
|
"""
|
|
)
|
|
|
|
result = query(
|
|
"SELECT dictGetString('test_dictionary_custom_query', 'value_1', toUInt64(1))"
|
|
)
|
|
assert result == "Value_1\n"
|
|
|
|
result = query("SELECT id, value_1, value_2 FROM test_dictionary_custom_query")
|
|
assert result == "1\tValue_1\tValue_2\n"
|
|
|
|
query("DROP DICTIONARY test_dictionary_custom_query;")
|
|
|
|
query(
|
|
f"""
|
|
CREATE DICTIONARY test_cache_dictionary_custom_query
|
|
(
|
|
id1 UInt64,
|
|
id2 UInt64,
|
|
value_concat String
|
|
)
|
|
PRIMARY KEY id1, id2
|
|
LAYOUT(COMPLEX_KEY_CACHE(SIZE_IN_CELLS 10))
|
|
SOURCE(MYSQL(
|
|
HOST 'mysql80'
|
|
PORT 3306
|
|
USER 'root'
|
|
PASSWORD 'clickhouse'
|
|
QUERY 'SELECT id AS id1, id + 1 AS id2, CONCAT_WS(" ", "The", value_1) AS value_concat FROM test.test_table_1'))
|
|
LIFETIME(0)
|
|
"""
|
|
)
|
|
|
|
result = query(
|
|
"SELECT dictGetString('test_cache_dictionary_custom_query', 'value_concat', (1, 2))"
|
|
)
|
|
assert result == "The Value_1\n"
|
|
|
|
result = query("SELECT id1, value_concat FROM test_cache_dictionary_custom_query")
|
|
assert result == "1\tThe Value_1\n"
|
|
|
|
query("DROP DICTIONARY test_cache_dictionary_custom_query;")
|
|
|
|
execute_mysql_query(mysql_connection, "DROP TABLE test.test_table_1;")
|
|
execute_mysql_query(mysql_connection, "DROP TABLE test.test_table_2;")
|
|
|
|
|
|
def test_mysql_dictionaries_custom_query_partial_load_simple_key(started_cluster):
|
|
mysql_connection = get_mysql_conn(started_cluster)
|
|
|
|
execute_mysql_query(
|
|
mysql_connection,
|
|
"CREATE TABLE IF NOT EXISTS test.test_table_1 (id Integer, value_1 Text);",
|
|
)
|
|
execute_mysql_query(
|
|
mysql_connection,
|
|
"CREATE TABLE IF NOT EXISTS test.test_table_2 (id Integer, value_2 Text);",
|
|
)
|
|
execute_mysql_query(
|
|
mysql_connection, "INSERT INTO test.test_table_1 VALUES (1, 'Value_1');"
|
|
)
|
|
execute_mysql_query(
|
|
mysql_connection, "INSERT INTO test.test_table_2 VALUES (1, 'Value_2');"
|
|
)
|
|
|
|
query = instance.query
|
|
query(
|
|
"""
|
|
CREATE DICTIONARY test_dictionary_custom_query
|
|
(
|
|
id UInt64,
|
|
value_1 String,
|
|
value_2 String
|
|
)
|
|
PRIMARY KEY id
|
|
LAYOUT(DIRECT())
|
|
SOURCE(MYSQL(
|
|
HOST 'mysql80'
|
|
PORT 3306
|
|
USER 'root'
|
|
PASSWORD 'clickhouse'
|
|
QUERY $doc$SELECT id, value_1, value_2 FROM test.test_table_1 INNER JOIN test.test_table_2 USING (id) WHERE {condition};$doc$))
|
|
"""
|
|
)
|
|
|
|
result = query(
|
|
"SELECT dictGet('test_dictionary_custom_query', ('value_1', 'value_2'), toUInt64(1))"
|
|
)
|
|
|
|
assert result == "('Value_1','Value_2')\n"
|
|
|
|
query("DROP DICTIONARY test_dictionary_custom_query;")
|
|
|
|
execute_mysql_query(mysql_connection, "DROP TABLE test.test_table_1;")
|
|
execute_mysql_query(mysql_connection, "DROP TABLE test.test_table_2;")
|
|
|
|
|
|
def test_mysql_dictionaries_custom_query_partial_load_complex_key(started_cluster):
|
|
mysql_connection = get_mysql_conn(started_cluster)
|
|
|
|
execute_mysql_query(
|
|
mysql_connection,
|
|
"CREATE TABLE IF NOT EXISTS test.test_table_1 (id Integer, id_key Text, value_1 Text);",
|
|
)
|
|
execute_mysql_query(
|
|
mysql_connection,
|
|
"CREATE TABLE IF NOT EXISTS test.test_table_2 (id Integer, id_key Text, value_2 Text);",
|
|
)
|
|
execute_mysql_query(
|
|
mysql_connection, "INSERT INTO test.test_table_1 VALUES (1, 'Key', 'Value_1');"
|
|
)
|
|
execute_mysql_query(
|
|
mysql_connection, "INSERT INTO test.test_table_2 VALUES (1, 'Key', 'Value_2');"
|
|
)
|
|
|
|
query = instance.query
|
|
query(
|
|
"""
|
|
CREATE DICTIONARY test_dictionary_custom_query
|
|
(
|
|
id UInt64,
|
|
id_key String,
|
|
value_1 String,
|
|
value_2 String
|
|
)
|
|
PRIMARY KEY id, id_key
|
|
LAYOUT(COMPLEX_KEY_DIRECT())
|
|
SOURCE(MYSQL(
|
|
HOST 'mysql80'
|
|
PORT 3306
|
|
USER 'root'
|
|
PASSWORD 'clickhouse'
|
|
QUERY $doc$SELECT id, id_key, value_1, value_2 FROM test.test_table_1 INNER JOIN test.test_table_2 USING (id, id_key) WHERE {condition};$doc$))
|
|
"""
|
|
)
|
|
|
|
result = query(
|
|
"SELECT dictGet('test_dictionary_custom_query', ('value_1', 'value_2'), (toUInt64(1), 'Key'))"
|
|
)
|
|
|
|
assert result == "('Value_1','Value_2')\n"
|
|
|
|
query("DROP DICTIONARY test_dictionary_custom_query;")
|
|
|
|
execute_mysql_query(mysql_connection, "DROP TABLE test.test_table_1;")
|
|
execute_mysql_query(mysql_connection, "DROP TABLE test.test_table_2;")
|
|
|
|
|
|
def test_predefined_connection_configuration(started_cluster):
|
|
mysql_connection = get_mysql_conn(started_cluster)
|
|
|
|
execute_mysql_query(mysql_connection, "DROP TABLE IF EXISTS test.test_table")
|
|
execute_mysql_query(
|
|
mysql_connection,
|
|
"CREATE TABLE IF NOT EXISTS test.test_table (id Integer, value Integer);",
|
|
)
|
|
execute_mysql_query(
|
|
mysql_connection, "INSERT INTO test.test_table VALUES (100, 200);"
|
|
)
|
|
|
|
instance.query(
|
|
"""
|
|
DROP DICTIONARY IF EXISTS dict;
|
|
CREATE DICTIONARY dict (id UInt32, value UInt32)
|
|
PRIMARY KEY id
|
|
SOURCE(MYSQL(NAME mysql1))
|
|
LIFETIME(MIN 1 MAX 2)
|
|
LAYOUT(HASHED());
|
|
"""
|
|
)
|
|
result = instance.query("SELECT dictGetUInt32(dict, 'value', toUInt64(100))")
|
|
assert int(result) == 200
|
|
|
|
instance.query(
|
|
"""
|
|
DROP DICTIONARY dict;
|
|
CREATE DICTIONARY dict (id UInt32, value UInt32)
|
|
PRIMARY KEY id
|
|
SOURCE(MYSQL(NAME mysql2))
|
|
LIFETIME(MIN 1 MAX 2)
|
|
LAYOUT(HASHED());
|
|
"""
|
|
)
|
|
result = instance.query_and_get_error(
|
|
"SELECT dictGetUInt32(dict, 'value', toUInt64(100))"
|
|
)
|
|
instance.query(
|
|
"""
|
|
DROP DICTIONARY dict;
|
|
CREATE DICTIONARY dict (id UInt32, value UInt32)
|
|
PRIMARY KEY id
|
|
SOURCE(MYSQL(NAME unknown_collection))
|
|
LIFETIME(MIN 1 MAX 2)
|
|
LAYOUT(HASHED());
|
|
"""
|
|
)
|
|
result = instance.query_and_get_error(
|
|
"SELECT dictGetUInt32(dict, 'value', toUInt64(100))"
|
|
)
|
|
|
|
instance.query(
|
|
"""
|
|
DROP DICTIONARY dict;
|
|
CREATE DICTIONARY dict (id UInt32, value UInt32)
|
|
PRIMARY KEY id
|
|
SOURCE(MYSQL(NAME mysql3 PORT 3306))
|
|
LIFETIME(MIN 1 MAX 2)
|
|
LAYOUT(HASHED());
|
|
"""
|
|
)
|
|
result = instance.query("SELECT dictGetUInt32(dict, 'value', toUInt64(100))")
|
|
assert int(result) == 200
|
|
|
|
instance.query(
|
|
"""
|
|
DROP DICTIONARY IF EXISTS dict;
|
|
CREATE DICTIONARY dict (id UInt32, value UInt32)
|
|
PRIMARY KEY id
|
|
SOURCE(MYSQL(NAME mysql1 connection_pool_size 0))
|
|
LIFETIME(MIN 1 MAX 2)
|
|
LAYOUT(HASHED());
|
|
"""
|
|
)
|
|
result = instance.query_and_get_error(
|
|
"SELECT dictGetUInt32(dict, 'value', toUInt64(100))"
|
|
)
|
|
assert "Connection pool cannot have zero size" in result
|
|
|
|
instance.query(
|
|
"""
|
|
DROP DICTIONARY IF EXISTS dict;
|
|
CREATE DICTIONARY dict (id UInt32, value UInt32)
|
|
PRIMARY KEY id
|
|
SOURCE(MYSQL(NAME mysql4))
|
|
LIFETIME(MIN 1 MAX 2)
|
|
LAYOUT(HASHED());
|
|
"""
|
|
)
|
|
result = instance.query_and_get_error(
|
|
"SELECT dictGetUInt32(dict, 'value', toUInt64(100))"
|
|
)
|
|
assert "Connection pool cannot have zero size" in result
|
|
|
|
instance.query(
|
|
"""
|
|
DROP DICTIONARY IF EXISTS dict;
|
|
CREATE DICTIONARY dict (id UInt32, value UInt32)
|
|
PRIMARY KEY id
|
|
SOURCE(MYSQL(NAME mysql4 connection_pool_size 1))
|
|
LIFETIME(MIN 1 MAX 2)
|
|
LAYOUT(HASHED());
|
|
"""
|
|
)
|
|
result = instance.query("SELECT dictGetUInt32(dict, 'value', toUInt64(100))")
|
|
assert int(result) == 200
|
|
|
|
instance.query(
|
|
"""
|
|
DROP DICTIONARY IF EXISTS dict;
|
|
CREATE DICTIONARY dict (id UInt32, value UInt32)
|
|
PRIMARY KEY id
|
|
SOURCE(MYSQL(NAME mysql4 connection_pool_size 1 close_connection 1 share_connection 1))
|
|
LIFETIME(MIN 1 MAX 2)
|
|
LAYOUT(HASHED());
|
|
"""
|
|
)
|
|
result = instance.query("SELECT dictGetUInt32(dict, 'value', toUInt64(100))")
|
|
assert int(result) == 200
|
|
|
|
|
|
def create_mysql_db(mysql_connection, name):
|
|
with mysql_connection.cursor() as cursor:
|
|
cursor.execute("DROP DATABASE IF EXISTS {}".format(name))
|
|
cursor.execute("CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(name))
|
|
|
|
|
|
def prepare_mysql_table(started_cluster, table_name, index):
|
|
mysql_connection = get_mysql_conn(started_cluster)
|
|
|
|
# Create table
|
|
create_mysql_table(mysql_connection, table_name + str(index))
|
|
|
|
# Insert rows using CH
|
|
query = instance.query
|
|
query(
|
|
"INSERT INTO `clickhouse_mysql`.{}(id, value) select number, concat('{} value ', toString(number)) from numbers(10000) ".format(
|
|
table_name + str(index), table_name + str(index)
|
|
)
|
|
)
|
|
assert (
|
|
query(
|
|
"SELECT count() FROM `clickhouse_mysql`.{}".format(table_name + str(index))
|
|
).rstrip()
|
|
== "10000"
|
|
)
|
|
mysql_connection.close()
|
|
|
|
# Create CH Dictionary tables based on MySQL tables
|
|
query(
|
|
create_clickhouse_dictionary_table_template.format(
|
|
table_name + str(index), "dict" + str(index)
|
|
)
|
|
)
|
|
|
|
|
|
def get_mysql_conn(started_cluster):
|
|
errors = []
|
|
conn = None
|
|
for _ in range(5):
|
|
try:
|
|
if conn is None:
|
|
conn = pymysql.connect(
|
|
user="root",
|
|
password="clickhouse",
|
|
host=started_cluster.mysql8_ip,
|
|
port=started_cluster.mysql8_port,
|
|
)
|
|
else:
|
|
conn.ping(reconnect=True)
|
|
logging.debug(
|
|
f"MySQL Connection establised: {started_cluster.mysql8_ip}:{started_cluster.mysql8_port}"
|
|
)
|
|
return conn
|
|
except Exception as e:
|
|
errors += [str(e)]
|
|
time.sleep(1)
|
|
|
|
raise Exception("Connection not establised, {}".format(errors))
|
|
|
|
|
|
def execute_mysql_query(connection, query):
|
|
logging.debug("Execute MySQL query:{}".format(query))
|
|
with warnings.catch_warnings():
|
|
warnings.simplefilter("ignore")
|
|
with connection.cursor() as cursor:
|
|
cursor.execute(query)
|
|
connection.commit()
|
|
|
|
|
|
def create_mysql_table(conn, table_name):
|
|
with conn.cursor() as cursor:
|
|
cursor.execute(create_table_mysql_template.format(table_name))
|