ClickHouse/tests/integration/test_mysql_database_engine/test.py

133 lines
7.2 KiB
Python

import time
import contextlib
import pymysql.cursors
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.client import QueryRuntimeException
cluster = ClickHouseCluster(__file__)
clickhouse_node = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_mysql=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
class MySQLNodeInstance:
def __init__(self, user='root', password='clickhouse', hostname='127.0.0.1', port=3308):
self.user = user
self.port = port
self.hostname = hostname
self.password = password
self.mysql_connection = None # lazy init
def query(self, execution_query):
if self.mysql_connection is None:
self.mysql_connection = pymysql.connect(user=self.user, password=self.password, host=self.hostname, port=self.port)
with self.mysql_connection.cursor() as cursor:
cursor.execute(execution_query)
def close(self):
if self.mysql_connection is not None:
self.mysql_connection.close()
def test_mysql_ddl_for_mysql_database(started_cluster):
with contextlib.closing(MySQLNodeInstance('root', 'clickhouse', '127.0.0.1', port=3308)) as mysql_node:
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
clickhouse_node.query("CREATE DATABASE test_database ENGINE = MySQL('mysql1:3306', 'test_database', 'root', 'clickhouse')")
assert 'test_database' in clickhouse_node.query('SHOW DATABASES')
mysql_node.query('CREATE TABLE `test_database`.`test_table` ( `id` int(11) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB;')
assert 'test_table' in clickhouse_node.query('SHOW TABLES FROM test_database')
time.sleep(3) # Because the unit of MySQL modification time is seconds, modifications made in the same second cannot be obtained
mysql_node.query('ALTER TABLE `test_database`.`test_table` ADD COLUMN `add_column` int(11)')
assert 'add_column' in clickhouse_node.query("SELECT name FROM system.columns WHERE table = 'test_table' AND database = 'test_database'")
time.sleep(3) # Because the unit of MySQL modification time is seconds, modifications made in the same second cannot be obtained
mysql_node.query('ALTER TABLE `test_database`.`test_table` DROP COLUMN `add_column`')
assert 'add_column' not in clickhouse_node.query("SELECT name FROM system.columns WHERE table = 'test_table' AND database = 'test_database'")
mysql_node.query('DROP TABLE `test_database`.`test_table`;')
assert 'test_table' not in clickhouse_node.query('SHOW TABLES FROM test_database')
clickhouse_node.query("DROP DATABASE test_database")
assert 'test_database' not in clickhouse_node.query('SHOW DATABASES')
mysql_node.query("DROP DATABASE test_database")
def test_clickhouse_ddl_for_mysql_database(started_cluster):
with contextlib.closing(MySQLNodeInstance('root', 'clickhouse', '127.0.0.1', port=3308)) as mysql_node:
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
mysql_node.query('CREATE TABLE `test_database`.`test_table` ( `id` int(11) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB;')
clickhouse_node.query("CREATE DATABASE test_database ENGINE = MySQL('mysql1:3306', 'test_database', 'root', 'clickhouse')")
assert 'test_table' in clickhouse_node.query('SHOW TABLES FROM test_database')
clickhouse_node.query("DROP TABLE test_database.test_table")
assert 'test_table' not in clickhouse_node.query('SHOW TABLES FROM test_database')
clickhouse_node.query("ATTACH TABLE test_database.test_table")
assert 'test_table' in clickhouse_node.query('SHOW TABLES FROM test_database')
clickhouse_node.query("DETACH TABLE test_database.test_table")
assert 'test_table' not in clickhouse_node.query('SHOW TABLES FROM test_database')
clickhouse_node.query("ATTACH TABLE test_database.test_table")
assert 'test_table' in clickhouse_node.query('SHOW TABLES FROM test_database')
clickhouse_node.query("DROP DATABASE test_database")
assert 'test_database' not in clickhouse_node.query('SHOW DATABASES')
mysql_node.query("DROP DATABASE test_database")
def test_clickhouse_dml_for_mysql_database(started_cluster):
with contextlib.closing(MySQLNodeInstance('root', 'clickhouse', '127.0.0.1', port=3308)) as mysql_node:
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
mysql_node.query('CREATE TABLE `test_database`.`test_table` ( `i``d` int(11) NOT NULL, PRIMARY KEY (`i``d`)) ENGINE=InnoDB;')
clickhouse_node.query("CREATE DATABASE test_database ENGINE = MySQL('mysql1:3306', test_database, 'root', 'clickhouse')")
assert clickhouse_node.query("SELECT count() FROM `test_database`.`test_table`").rstrip() == '0'
clickhouse_node.query("INSERT INTO `test_database`.`test_table`(`i\`d`) select number from numbers(10000)")
assert clickhouse_node.query("SELECT count() FROM `test_database`.`test_table`").rstrip() == '10000'
mysql_node.query("DROP DATABASE test_database")
def test_clickhouse_join_for_mysql_database(started_cluster):
with contextlib.closing(MySQLNodeInstance('root', 'clickhouse', '127.0.0.1', port=3308)) as mysql_node:
mysql_node.query("CREATE DATABASE IF NOT EXISTS test DEFAULT CHARACTER SET 'utf8'")
mysql_node.query("CREATE TABLE test.t1_mysql_local ("
"pays VARCHAR(55) DEFAULT 'FRA' NOT NULL,"
"service VARCHAR(5) DEFAULT '' NOT NULL,"
"opco CHAR(3) DEFAULT '' NOT NULL"
")")
mysql_node.query("CREATE TABLE test.t2_mysql_local ("
"service VARCHAR(5) DEFAULT '' NOT NULL,"
"opco VARCHAR(5) DEFAULT ''"
")")
clickhouse_node.query("CREATE TABLE default.t1_remote_mysql AS mysql('mysql1:3306','test','t1_mysql_local','root','clickhouse')")
clickhouse_node.query("CREATE TABLE default.t2_remote_mysql AS mysql('mysql1:3306','test','t2_mysql_local','root','clickhouse')")
assert clickhouse_node.query("SELECT s.pays "
"FROM default.t1_remote_mysql AS s "
"LEFT JOIN default.t1_remote_mysql AS s_ref "
"ON (s_ref.opco = s.opco AND s_ref.service = s.service)") == ''
mysql_node.query("DROP DATABASE test")
def test_bad_arguments_for_mysql_database_engine(started_cluster):
with contextlib.closing(MySQLNodeInstance('root', 'clickhouse', '127.0.0.1', port=3308)) as mysql_node:
with pytest.raises(QueryRuntimeException) as exception:
mysql_node.query("CREATE DATABASE IF NOT EXISTS test_bad_arguments DEFAULT CHARACTER SET 'utf8'")
clickhouse_node.query("CREATE DATABASE test_database_bad_arguments ENGINE = MySQL('mysql1:3306', test_bad_arguments, root, 'clickhouse')")
assert 'Database engine MySQL requested literal argument.' in str(exception.value)
mysql_node.query("DROP DATABASE test_bad_arguments")