ClickHouse/tests/integration/test_mysql_database_engine/test.py
2021-08-23 22:18:04 +00:00

430 lines
25 KiB
Python
Executable File

import contextlib
import time
from string import Template
import pymysql.cursors
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
clickhouse_node = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], user_configs=['configs/users.xml'], with_mysql=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
class MySQLNodeInstance:
def __init__(self, user, password, hostname, port):
self.user = user
self.port = port
self.hostname = hostname
self.password = password
self.mysql_connection = None # lazy init
def query(self, execution_query):
if self.mysql_connection is None:
self.mysql_connection = pymysql.connect(user=self.user, password=self.password, host=self.hostname,
port=self.port)
with self.mysql_connection.cursor() as cursor:
def execute(query):
res = cursor.execute(query)
if query.lstrip().lower().startswith(('select', 'show')):
# Mimic output of the ClickHouseInstance, which is:
# tab-sparated values and newline (\n)-separated rows.
rows = []
for row in cursor.fetchall():
rows.append("\t".join(str(item) for item in row))
res = "\n".join(rows)
return res
if isinstance(execution_query, (str, bytes)):
return execute(execution_query)
else:
return [execute(q) for q in execution_query]
def close(self):
if self.mysql_connection is not None:
self.mysql_connection.close()
def test_mysql_ddl_for_mysql_database(started_cluster):
with contextlib.closing(MySQLNodeInstance('root', 'clickhouse', started_cluster.mysql_ip, started_cluster.mysql_port)) as mysql_node:
mysql_node.query("DROP DATABASE IF EXISTS test_database")
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
clickhouse_node.query(
"CREATE DATABASE test_database ENGINE = MySQL('mysql57:3306', 'test_database', 'root', 'clickhouse')")
assert 'test_database' in clickhouse_node.query('SHOW DATABASES')
mysql_node.query(
'CREATE TABLE `test_database`.`test_table` ( `id` int(11) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB;')
assert 'test_table' in clickhouse_node.query('SHOW TABLES FROM test_database')
time.sleep(
3) # Because the unit of MySQL modification time is seconds, modifications made in the same second cannot be obtained
mysql_node.query('ALTER TABLE `test_database`.`test_table` ADD COLUMN `add_column` int(11)')
assert 'add_column' in clickhouse_node.query(
"SELECT name FROM system.columns WHERE table = 'test_table' AND database = 'test_database'")
time.sleep(
3) # Because the unit of MySQL modification time is seconds, modifications made in the same second cannot be obtained
mysql_node.query('ALTER TABLE `test_database`.`test_table` DROP COLUMN `add_column`')
assert 'add_column' not in clickhouse_node.query(
"SELECT name FROM system.columns WHERE table = 'test_table' AND database = 'test_database'")
mysql_node.query('DROP TABLE `test_database`.`test_table`;')
assert 'test_table' not in clickhouse_node.query('SHOW TABLES FROM test_database')
clickhouse_node.query("DROP DATABASE test_database")
assert 'test_database' not in clickhouse_node.query('SHOW DATABASES')
mysql_node.query("DROP DATABASE test_database")
def test_clickhouse_ddl_for_mysql_database(started_cluster):
with contextlib.closing(MySQLNodeInstance('root', 'clickhouse', started_cluster.mysql_ip, started_cluster.mysql_port)) as mysql_node:
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
mysql_node.query(
'CREATE TABLE `test_database`.`test_table` ( `id` int(11) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB;')
clickhouse_node.query(
"CREATE DATABASE test_database ENGINE = MySQL('mysql57:3306', 'test_database', 'root', 'clickhouse')")
assert 'test_table' in clickhouse_node.query('SHOW TABLES FROM test_database')
clickhouse_node.query("DROP TABLE test_database.test_table")
assert 'test_table' not in clickhouse_node.query('SHOW TABLES FROM test_database')
clickhouse_node.query("ATTACH TABLE test_database.test_table")
assert 'test_table' in clickhouse_node.query('SHOW TABLES FROM test_database')
clickhouse_node.query("DETACH TABLE test_database.test_table")
assert 'test_table' not in clickhouse_node.query('SHOW TABLES FROM test_database')
clickhouse_node.query("ATTACH TABLE test_database.test_table")
assert 'test_table' in clickhouse_node.query('SHOW TABLES FROM test_database')
clickhouse_node.query("DROP DATABASE test_database")
assert 'test_database' not in clickhouse_node.query('SHOW DATABASES')
mysql_node.query("DROP DATABASE test_database")
def test_clickhouse_dml_for_mysql_database(started_cluster):
with contextlib.closing(MySQLNodeInstance('root', 'clickhouse', started_cluster.mysql_ip, started_cluster.mysql_port)) as mysql_node:
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
mysql_node.query(
'CREATE TABLE `test_database`.`test_table` ( `i``d` int(11) NOT NULL, PRIMARY KEY (`i``d`)) ENGINE=InnoDB;')
clickhouse_node.query(
"CREATE DATABASE test_database ENGINE = MySQL('mysql57:3306', test_database, 'root', 'clickhouse')")
assert clickhouse_node.query("SELECT count() FROM `test_database`.`test_table`").rstrip() == '0'
clickhouse_node.query("INSERT INTO `test_database`.`test_table`(`i``d`) select number from numbers(10000)")
assert clickhouse_node.query("SELECT count() FROM `test_database`.`test_table`").rstrip() == '10000'
clickhouse_node.query("DROP DATABASE test_database")
assert 'test_database' not in clickhouse_node.query('SHOW DATABASES')
mysql_node.query("DROP DATABASE test_database")
def test_clickhouse_join_for_mysql_database(started_cluster):
with contextlib.closing(MySQLNodeInstance('root', 'clickhouse', started_cluster.mysql_ip, started_cluster.mysql_port)) as mysql_node:
mysql_node.query("CREATE DATABASE IF NOT EXISTS test DEFAULT CHARACTER SET 'utf8'")
mysql_node.query("CREATE TABLE test.t1_mysql_local ("
"pays VARCHAR(55) DEFAULT 'FRA' NOT NULL,"
"service VARCHAR(5) DEFAULT '' NOT NULL,"
"opco CHAR(3) DEFAULT '' NOT NULL"
")")
mysql_node.query("CREATE TABLE test.t2_mysql_local ("
"service VARCHAR(5) DEFAULT '' NOT NULL,"
"opco VARCHAR(5) DEFAULT ''"
")")
clickhouse_node.query(
"CREATE TABLE default.t1_remote_mysql AS mysql('mysql57:3306','test','t1_mysql_local','root','clickhouse')")
clickhouse_node.query(
"CREATE TABLE default.t2_remote_mysql AS mysql('mysql57:3306','test','t2_mysql_local','root','clickhouse')")
clickhouse_node.query("INSERT INTO `default`.`t1_remote_mysql` VALUES ('EN','A',''),('RU','B','AAA')")
clickhouse_node.query("INSERT INTO `default`.`t2_remote_mysql` VALUES ('A','AAA'),('Z','')")
assert clickhouse_node.query("SELECT s.pays "
"FROM default.t1_remote_mysql AS s "
"LEFT JOIN default.t1_remote_mysql AS s_ref "
"ON (s_ref.opco = s.opco AND s_ref.service = s.service) "
"WHERE s_ref.opco != '' AND s.opco != '' ").rstrip() == 'RU'
mysql_node.query("DROP DATABASE test")
def test_bad_arguments_for_mysql_database_engine(started_cluster):
with contextlib.closing(MySQLNodeInstance('root', 'clickhouse', started_cluster.mysql_ip, port=started_cluster.mysql_port)) as mysql_node:
with pytest.raises(QueryRuntimeException) as exception:
mysql_node.query("CREATE DATABASE IF NOT EXISTS test_bad_arguments DEFAULT CHARACTER SET 'utf8'")
clickhouse_node.query(
"CREATE DATABASE test_database_bad_arguments ENGINE = MySQL('mysql57:3306', test_bad_arguments, root, 'clickhouse')")
assert 'Database engine MySQL requested literal argument.' in str(exception.value)
mysql_node.query("DROP DATABASE test_bad_arguments")
def test_column_comments_for_mysql_database_engine(started_cluster):
with contextlib.closing(MySQLNodeInstance('root', 'clickhouse', started_cluster.mysql_ip, started_cluster.mysql_port)) as mysql_node:
mysql_node.query("DROP DATABASE IF EXISTS test_database")
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
clickhouse_node.query(
"CREATE DATABASE test_database ENGINE = MySQL('mysql57:3306', 'test_database', 'root', 'clickhouse')")
assert 'test_database' in clickhouse_node.query('SHOW DATABASES')
mysql_node.query(
"CREATE TABLE `test_database`.`test_table` ( `id` int(11) NOT NULL, PRIMARY KEY (`id`), `test` int COMMENT 'test comment') ENGINE=InnoDB;")
assert 'test comment' in clickhouse_node.query('DESCRIBE TABLE `test_database`.`test_table`')
time.sleep(
3) # Because the unit of MySQL modification time is seconds, modifications made in the same second cannot be obtained
mysql_node.query("ALTER TABLE `test_database`.`test_table` ADD COLUMN `add_column` int(11) COMMENT 'add_column comment'")
assert 'add_column comment' in clickhouse_node.query(
"SELECT comment FROM system.columns WHERE table = 'test_table' AND database = 'test_database'")
clickhouse_node.query("DROP DATABASE test_database")
mysql_node.query("DROP DATABASE test_database")
def test_data_types_support_level_for_mysql_database_engine(started_cluster):
with contextlib.closing(MySQLNodeInstance('root', 'clickhouse', started_cluster.mysql_ip, started_cluster.mysql_port)) as mysql_node:
mysql_node.query("CREATE DATABASE IF NOT EXISTS test DEFAULT CHARACTER SET 'utf8'")
clickhouse_node.query("CREATE DATABASE test_database ENGINE = MySQL('mysql57:3306', test, 'root', 'clickhouse')",
settings={"mysql_datatypes_support_level": "decimal,datetime64"})
assert "SETTINGS mysql_datatypes_support_level = \\'decimal,datetime64\\'" in clickhouse_node.query("SHOW CREATE DATABASE test_database FORMAT TSV")
clickhouse_node.query("DETACH DATABASE test_database")
# without context settings
clickhouse_node.query("ATTACH DATABASE test_database")
assert "SETTINGS mysql_datatypes_support_level = \\'decimal,datetime64\\'" in clickhouse_node.query("SHOW CREATE DATABASE test_database FORMAT TSV")
clickhouse_node.query(
"CREATE DATABASE test_database_1 ENGINE = MySQL('mysql57:3306', test, 'root', 'clickhouse') SETTINGS mysql_datatypes_support_level = 'decimal,datetime64'",
settings={"mysql_datatypes_support_level": "decimal"})
assert "SETTINGS mysql_datatypes_support_level = \\'decimal,datetime64\\'" in clickhouse_node.query("SHOW CREATE DATABASE test_database_1 FORMAT TSV")
clickhouse_node.query("DETACH DATABASE test_database_1")
# without context settings
clickhouse_node.query("ATTACH DATABASE test_database_1")
assert "SETTINGS mysql_datatypes_support_level = \\'decimal,datetime64\\'" in clickhouse_node.query("SHOW CREATE DATABASE test_database_1 FORMAT TSV")
clickhouse_node.query("DROP DATABASE test_database")
clickhouse_node.query("DROP DATABASE test_database_1")
assert 'test_database' not in clickhouse_node.query('SHOW DATABASES')
mysql_node.query("DROP DATABASE test")
# test tool cannot support null by now. TSV format returns \N for null, so cannot compare using == directly
# float_values = ['NULL']
# float_values = [0] mysql returns 0 while clickhouse returns 0.0, so cannot compare using == directly
int32_values = [0, 1, -1, 2147483647, -2147483648]
uint32_values = [0, 1] # [FIXME] seems client have issue with value 4294967295, it returns -1 for it
mint_values = [0, 1, -1, 8388607, -8388608]
umint_values = [0, 1, 16777215]
int16_values = [0, 1, -1, 32767, -32768]
uint16_values = [0, 1, 65535]
int8_values = [0, 1, -1, 127, -128]
uint8_values = [0, 1, 255]
# string_values = ["'ClickHouse'", 'NULL']
string_values = ["'ClickHouse'"]
decimal_values = [0, 0.123, 0.4, 5.67, 8.91011, 123456789.123, -0.123, -0.4, -5.67, -8.91011, -123456789.123]
timestamp_values = ["'2015-05-18 07:40:01.123'", "'2019-09-16 19:20:11.123'"]
timestamp_values_no_subsecond = ["'2015-05-18 07:40:01'", "'2019-09-16 19:20:11'"]
@pytest.mark.parametrize("case_name, mysql_type, expected_ch_type, mysql_values, setting_mysql_datatypes_support_level",
[
# test common type mapping
# ("common_types", "FLOAT", "Nullable(Float32)", float_values, ""),
# ("common_types", "FLOAT UNSIGNED", "Nullable(Float32)", float_values, ""),
pytest.param("common_types", "INT", "Nullable(Int32)", int32_values, "", id="common_types_1"),
pytest.param("common_types", "INT NOT NULL", "Int32", int32_values, "", id="common_types_2"),
pytest.param("common_types", "INT UNSIGNED NOT NULL", "UInt32", uint32_values, "", id="common_types_3"),
pytest.param("common_types", "INT UNSIGNED", "Nullable(UInt32)", uint32_values, "", id="common_types_4"),
pytest.param("common_types", "INT UNSIGNED DEFAULT NULL", "Nullable(UInt32)", uint32_values, "", id="common_types_5"),
pytest.param("common_types", "INT UNSIGNED DEFAULT '1'", "Nullable(UInt32)", uint32_values, "", id="common_types_6"),
pytest.param("common_types", "INT(10)", "Nullable(Int32)", int32_values, "", id="common_types_7"),
pytest.param("common_types", "INT(10) NOT NULL", "Int32", int32_values, "", id="common_types_8"),
pytest.param("common_types", "INT(10) UNSIGNED NOT NULL", "UInt32", uint32_values, "", id="common_types_8"),
pytest.param("common_types", "INT(10) UNSIGNED", "Nullable(UInt32)", uint32_values, "", id="common_types_9"),
pytest.param("common_types", "INT(10) UNSIGNED DEFAULT NULL", "Nullable(UInt32)", uint32_values, "", id="common_types_10"),
pytest.param("common_types", "INT(10) UNSIGNED DEFAULT '1'", "Nullable(UInt32)", uint32_values, "", id="common_types_11"),
pytest.param("common_types", "INTEGER", "Nullable(Int32)", int32_values, "", id="common_types_12"),
pytest.param("common_types", "INTEGER UNSIGNED", "Nullable(UInt32)", uint32_values, "", id="common_types_13"),
pytest.param("common_types", "MEDIUMINT", "Nullable(Int32)", mint_values, "", id="common_types_14"),
pytest.param("common_types", "MEDIUMINT UNSIGNED", "Nullable(UInt32)", umint_values, "", id="common_types_15"),
pytest.param("common_types", "SMALLINT", "Nullable(Int16)", int16_values, "", id="common_types_16"),
pytest.param("common_types", "SMALLINT UNSIGNED", "Nullable(UInt16)", uint16_values, "", id="common_types_17"),
pytest.param("common_types", "TINYINT", "Nullable(Int8)", int8_values, "", id="common_types_18"),
pytest.param("common_types", "TINYINT UNSIGNED", "Nullable(UInt8)", uint8_values, "", id="common_types_19"),
pytest.param("common_types", "VARCHAR(10)", "Nullable(String)", string_values, "", id="common_types_20"),
pytest.param("decimal_default", "decimal NOT NULL", "Decimal(10, 0)", decimal_values,
"decimal,datetime64", id="decimal_1"),
pytest.param("decimal_default_nullable", "decimal", "Nullable(Decimal(10, 0))", decimal_values,
"decimal,datetime64", id="decimal_2"),
pytest.param("decimal_18_6", "decimal(18, 6) NOT NULL", "Decimal(18, 6)", decimal_values,
"decimal,datetime64", id="decimal_3"),
pytest.param("decimal_38_6", "decimal(38, 6) NOT NULL", "Decimal(38, 6)", decimal_values,
"decimal,datetime64", id="decimal_4"),
# Due to python DB driver roundtrip MySQL timestamp and datetime values
# are printed with 6 digits after decimal point, so to simplify tests a bit,
# we only validate precision of 0 and 6.
pytest.param("timestamp_default", "timestamp", "DateTime", timestamp_values, "decimal,datetime64", id="timestamp_default"),
pytest.param("timestamp_6", "timestamp(6)", "DateTime64(6)", timestamp_values, "decimal,datetime64", id="timestamp_6"),
pytest.param("datetime_default", "DATETIME NOT NULL", "DateTime64(0)", timestamp_values,
"decimal,datetime64", id="datetime_default"),
pytest.param("datetime_6", "DATETIME(6) NOT NULL", "DateTime64(6)", timestamp_values,
"decimal,datetime64", id="datetime_6_1"),
# right now precision bigger than 39 is not supported by ClickHouse's Decimal, hence fall back to String
pytest.param("decimal_40_6", "decimal(40, 6) NOT NULL", "String", decimal_values,
"decimal,datetime64", id="decimal_40_6"),
pytest.param("decimal_18_6", "decimal(18, 6) NOT NULL", "String", decimal_values, "datetime64", id="decimal_18_6_1"),
pytest.param("decimal_18_6", "decimal(18, 6) NOT NULL", "String", decimal_values, "", id="decimal_18_6_2"),
pytest.param("datetime_6", "DATETIME(6) NOT NULL", "DateTime", timestamp_values_no_subsecond,
"decimal", id="datetime_6_2"),
pytest.param("datetime_6", "DATETIME(6) NOT NULL", "DateTime", timestamp_values_no_subsecond, "", id="datetime_6_3"),
])
def test_mysql_types(started_cluster, case_name, mysql_type, expected_ch_type, mysql_values,
setting_mysql_datatypes_support_level):
""" Verify that values written to MySQL can be read on ClickHouse side via DB engine MySQL,
or Table engine MySQL, or mysql() table function.
Make sure that type is converted properly and values match exactly.
"""
substitutes = dict(
mysql_db='decimal_support',
table_name=case_name,
mysql_type=mysql_type,
mysql_values=', '.join('({})'.format(x) for x in mysql_values),
ch_mysql_db='mysql_db',
ch_mysql_table='mysql_table_engine_' + case_name,
expected_ch_type=expected_ch_type,
)
clickhouse_query_settings = dict(
mysql_datatypes_support_level=setting_mysql_datatypes_support_level,
output_format_decimal_trailing_zeros=1
)
def execute_query(node, query, **kwargs):
def do_execute(query):
query = Template(query).safe_substitute(substitutes)
res = node.query(query, **kwargs)
return res if isinstance(res, int) else res.rstrip('\n\r')
if isinstance(query, (str, bytes)):
return do_execute(query)
else:
return [do_execute(q) for q in query]
with contextlib.closing(MySQLNodeInstance('root', 'clickhouse', started_cluster.mysql_ip, port=started_cluster.mysql_port)) as mysql_node:
execute_query(mysql_node, [
"DROP DATABASE IF EXISTS ${mysql_db}",
"CREATE DATABASE ${mysql_db} DEFAULT CHARACTER SET 'utf8'",
"CREATE TABLE `${mysql_db}`.`${table_name}` (value ${mysql_type})",
"INSERT INTO `${mysql_db}`.`${table_name}` (value) VALUES ${mysql_values}",
"SELECT * FROM `${mysql_db}`.`${table_name}`",
"FLUSH TABLES"
])
assert execute_query(mysql_node, "SELECT COUNT(*) FROM ${mysql_db}.${table_name}") \
== \
"{}".format(len(mysql_values))
# MySQL TABLE ENGINE
execute_query(clickhouse_node, [
"DROP TABLE IF EXISTS ${ch_mysql_table};",
"CREATE TABLE ${ch_mysql_table} (value ${expected_ch_type}) ENGINE = MySQL('mysql57:3306', '${mysql_db}', '${table_name}', 'root', 'clickhouse')",
], settings=clickhouse_query_settings)
# Validate type
assert \
execute_query(clickhouse_node, "SELECT toTypeName(value) FROM ${ch_mysql_table} LIMIT 1",
settings=clickhouse_query_settings) \
== \
expected_ch_type
# Validate values
assert \
execute_query(clickhouse_node, "SELECT value FROM ${ch_mysql_table}",
settings=clickhouse_query_settings) \
== \
execute_query(mysql_node, "SELECT value FROM ${mysql_db}.${table_name}")
# MySQL DATABASE ENGINE
execute_query(clickhouse_node, [
"DROP DATABASE IF EXISTS ${ch_mysql_db}",
"CREATE DATABASE ${ch_mysql_db} ENGINE = MySQL('mysql57:3306', '${mysql_db}', 'root', 'clickhouse')"
], settings=clickhouse_query_settings)
# Validate type
assert \
execute_query(clickhouse_node, "SELECT toTypeName(value) FROM ${ch_mysql_db}.${table_name} LIMIT 1",
settings=clickhouse_query_settings) \
== \
expected_ch_type
# Validate values
assert \
execute_query(clickhouse_node, "SELECT value FROM ${ch_mysql_db}.${table_name}",
settings=clickhouse_query_settings) \
== \
execute_query(mysql_node, "SELECT value FROM ${mysql_db}.${table_name}")
# MySQL TABLE FUNCTION
# Validate type
assert \
execute_query(clickhouse_node,
"SELECT toTypeName(value) FROM mysql('mysql57:3306', '${mysql_db}', '${table_name}', 'root', 'clickhouse') LIMIT 1",
settings=clickhouse_query_settings) \
== \
expected_ch_type
# Validate values
assert \
execute_query(mysql_node, "SELECT value FROM ${mysql_db}.${table_name}") \
== \
execute_query(clickhouse_node,
"SELECT value FROM mysql('mysql57:3306', '${mysql_db}', '${table_name}', 'root', 'clickhouse')",
settings=clickhouse_query_settings)
def test_clickhouse_mysql_no_connection(started_cluster):
with contextlib.closing(MySQLNodeInstance('root', 'clickhouse', started_cluster.mysql_ip, started_cluster.mysql_port)) as mysql_node:
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
mysql_node.query('CREATE TABLE `test_database`.`test_table` ( `i``d` int(11) NOT NULL, PRIMARY KEY (`i``d`)) ENGINE=InnoDB;')
clickhouse_node.query("CREATE DATABASE test_database ENGINE = MySQL('mysql57:3306', test_database, 'root', 'clickhouse')")
clickhouse_node.query("INSERT INTO `test_database`.`test_table`(`i``d`) select number from numbers(10000)")
assert clickhouse_node.query("SELECT count() FROM `test_database`.`test_table`").rstrip() == '10000'
started_cluster.pause_container('mysql57');
result = clickhouse_node.query_and_get_error("SELECT count() FROM `test_database`.`test_table`")
assert('Exception: Connections to all replicas failed' in result)
started_cluster.unpause_container('mysql57');
result = clickhouse_node.query("SELECT count() FROM `test_database`.`test_table`")
assert(result.strip() == '10000')
started_cluster.pause_container('mysql57');
clickhouse_node.query("DROP DATABASE test_database")
assert 'test_database' not in clickhouse_node.query('SHOW DATABASES')
started_cluster.unpause_container('mysql57');