more fixes

This commit is contained in:
Yatsishin Ilya 2021-02-16 10:10:01 +03:00
parent 06e1188d9e
commit 0b62ef4aa9
4 changed files with 176 additions and 138 deletions

View File

@ -134,6 +134,8 @@ class ClickHouseCluster:
self.project_name = re.sub(r'[^a-z0-9]', '', project_name.lower())
self.instances_dir = p.join(self.base_dir, '_instances' + ('' if not self.name else '_' + self.name))
self.docker_logs_path = p.join(self.instances_dir, 'docker.log')
self.env_file = p.join(self.instances_dir, DEFAULT_ENV_NAME)
self.env_variables = {}
custom_dockerd_host = custom_dockerd_host or os.environ.get('CLICKHOUSE_TESTS_DOCKERD_HOST')
self.docker_api_version = os.environ.get("DOCKER_API_VERSION")
@ -142,6 +144,7 @@ class ClickHouseCluster:
self.base_cmd = ['docker-compose']
if custom_dockerd_host:
self.base_cmd += ['--host', custom_dockerd_host]
self.base_cmd += ['--env-file', self.env_file]
self.base_cmd += ['--project-name', self.project_name]
self.base_zookeeper_cmd = None
@ -154,6 +157,7 @@ class ClickHouseCluster:
self.instances = {}
self.with_zookeeper = False
self.with_mysql = False
self.with_mysql8 = False
self.with_postgres = False
self.with_kafka = False
self.with_kerberized_kafka = False
@ -193,6 +197,10 @@ class ClickHouseCluster:
self.mysql_host = "mysql57"
self.mysql_port = get_open_port()
# available when with_mysql8 == True
self.mysql8_host = "mysql80"
self.mysql8_port = get_open_port()
self.zookeeper_use_tmpfs = True
self.docker_client = None
@ -216,9 +224,20 @@ class ClickHouseCluster:
return self.base_mysql_cmd
def setup_mysql8_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.with_mysql8 = True
env_variables['MYSQL8_HOST'] = self.mysql8_host
env_variables['MYSQL8_EXTERNAL_PORT'] = str(self.mysql8_port)
env_variables['MYSQL8_INTERNAL_PORT'] = "3306"
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_mysql_8_0.yml')])
self.base_mysql8_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
'--file', p.join(docker_compose_yml_dir, 'docker_compose_mysql_8_0.yml')]
return self.base_mysql8_cmd
def add_instance(self, name, base_config_dir=None, main_configs=None, user_configs=None, dictionaries=None,
macros=None,
with_zookeeper=False, with_mysql=False, with_kafka=False, with_kerberized_kafka=False, with_rabbitmq=False,
with_zookeeper=False, with_mysql=False, with_mysql8=False, with_kafka=False, with_kerberized_kafka=False, with_rabbitmq=False,
clickhouse_path_dir=None,
with_odbc_drivers=False, with_postgres=False, with_hdfs=False, with_kerberized_hdfs=False, with_mongo=False,
with_redis=False, with_minio=False, with_cassandra=False,
@ -261,6 +280,7 @@ class ClickHouseCluster:
with_zookeeper=with_zookeeper,
zookeeper_config_path=self.zookeeper_config_path,
with_mysql=with_mysql,
with_mysql8=with_mysql8,
with_kafka=with_kafka,
with_kerberized_kafka=with_kerberized_kafka,
with_rabbitmq=with_rabbitmq,
@ -285,8 +305,6 @@ class ClickHouseCluster:
docker_compose_yml_dir = get_docker_compose_path()
assert instance.env_file is not None
self.instances[name] = instance
if ipv4_address is not None or ipv6_address is not None:
self.with_net_trics = True
@ -309,6 +327,9 @@ class ClickHouseCluster:
if with_mysql and not self.with_mysql:
cmds.append(self.setup_mysql_cmd(instance, env_variables, docker_compose_yml_dir))
if with_mysql8 and not self.with_mysql8:
cmds.append(self.setup_mysql8_cmd(instance, env_variables, docker_compose_yml_dir))
if with_postgres and not self.with_postgres:
self.with_postgres = True
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_postgres.yml')])
@ -445,6 +466,9 @@ class ClickHouseCluster:
node.wait_for_start(start_deadline)
return node
def restart_service(self, service_name):
run_and_check(self.base_cmd + ["restart", service_name])
def get_instance_ip(self, instance_name):
print("get_instance_ip instance_name={}".format(instance_name))
docker_id = self.get_instance_docker_id(instance_name)
@ -498,6 +522,7 @@ class ClickHouseCluster:
def wait_mysql_to_start(self, timeout=60):
start = time.time()
errors = []
while time.time() - start < timeout:
try:
conn = pymysql.connect(user='root', password='clickhouse', host='127.0.0.1', port=self.mysql_port)
@ -505,12 +530,28 @@ class ClickHouseCluster:
print("Mysql Started")
return
except Exception as ex:
print("Can't connect to MySQL " + str(ex))
errors += [str(ex)]
time.sleep(0.5)
subprocess_call(['docker-compose', 'ps', '--services', '--all'])
logging.error("Can't connect to MySQL:{}".format(errors))
raise Exception("Cannot wait MySQL container")
def wait_mysql8_to_start(self, timeout=60):
start = time.time()
while time.time() - start < timeout:
try:
conn = pymysql.connect(user='root', password='clickhouse', host='127.0.0.1', port=self.mysql8_port)
conn.close()
print("Mysql 8 Started")
return
except Exception as ex:
print("Can't connect to MySQL 8 " + str(ex))
time.sleep(0.5)
subprocess_call(['docker-compose', 'ps', '--services', '--all'])
raise Exception("Cannot wait MySQL 8 container")
def wait_postgres_to_start(self, timeout=60):
start = time.time()
while time.time() - start < timeout:
@ -671,7 +712,7 @@ class ClickHouseCluster:
self.docker_client = docker.from_env(version=self.docker_api_version)
common_opts = ['up', '-d', '--force-recreate']
common_opts = ['up', '-d', '--force-recreate', '--remove-orphans']
if self.with_zookeeper and self.base_zookeeper_cmd:
print('Setup ZooKeeper')
@ -697,6 +738,11 @@ class ClickHouseCluster:
subprocess_check_call(self.base_mysql_cmd + common_opts)
self.wait_mysql_to_start(120)
if self.with_mysql8 and self.base_mysql8_cmd:
print('Setup MySQL 8')
subprocess_check_call(self.base_mysql8_cmd + common_opts)
self.wait_mysql8_to_start(120)
if self.with_postgres and self.base_postgres_cmd:
print('Setup Postgres')
subprocess_check_call(self.base_postgres_cmd + common_opts)
@ -750,6 +796,8 @@ class ClickHouseCluster:
subprocess_check_call(self.base_cassandra_cmd + ['up', '-d', '--force-recreate'])
self.wait_cassandra_to_start()
_create_env_file(os.path.join(self.env_file), self.env_variables)
clickhouse_start_cmd = self.base_cmd + ['up', '-d', '--no-recreate']
print(("Trying to create ClickHouse instance by command %s", ' '.join(map(str, clickhouse_start_cmd))))
subprocess.check_output(clickhouse_start_cmd)
@ -796,7 +844,7 @@ class ClickHouseCluster:
subprocess_check_call(self.base_cmd + ['kill'])
try:
subprocess_check_call(self.base_cmd + ['down', '--volumes', '--remove-orphans'])
subprocess_check_call(self.base_cmd + ['down', '--volumes'])
except Exception as e:
print("Down + remove orphans failed durung shutdown. {}".format(repr(e)))
@ -917,7 +965,7 @@ class ClickHouseInstance:
def __init__(
self, cluster, base_path, name, base_config_dir, custom_main_configs, custom_user_configs,
custom_dictionaries,
macros, with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_kerberized_kafka, with_rabbitmq, with_kerberized_hdfs,
macros, with_zookeeper, zookeeper_config_path, with_mysql, with_mysql8, with_kafka, with_kerberized_kafka, with_rabbitmq, with_kerberized_hdfs,
with_mongo, with_redis, with_minio,
with_cassandra, server_bin_path, odbc_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers,
hostname=None, env_variables=None,
@ -945,6 +993,7 @@ class ClickHouseInstance:
self.odbc_bridge_bin_path = odbc_bridge_bin_path
self.with_mysql = with_mysql
self.with_mysql8 = with_mysql8
self.with_kafka = with_kafka
self.with_kerberized_kafka = with_kerberized_kafka
self.with_rabbitmq = with_rabbitmq
@ -1339,6 +1388,9 @@ class ClickHouseInstance:
if self.with_mysql:
depends_on.append("mysql57")
if self.with_mysql8:
depends_on.append("mysql80")
if self.with_kafka:
depends_on.append("kafka1")
depends_on.append("schema-registry")
@ -1360,6 +1412,7 @@ class ClickHouseInstance:
if self.with_minio:
depends_on.append("minio1")
self.cluster.env_variables.update(self.env_variables)
_create_env_file(os.path.join(self.env_file), self.env_variables)
print("Env {} stored in {}".format(self.env_variables, self.env_file))

View File

@ -31,12 +31,12 @@ def check_query(clickhouse_node, query, result_set, retry_count=60, interval_sec
def dml_with_materialize_mysql_database(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS test_database")
clickhouse_node.query("DROP DATABASE IF EXISTS test_database")
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
mysql_node.query("DROP DATABASE IF EXISTS test_database_dml")
clickhouse_node.query("DROP DATABASE IF EXISTS test_database_dml")
mysql_node.query("CREATE DATABASE test_database_dml DEFAULT CHARACTER SET 'utf8'")
# existed before the mapping was created
mysql_node.query("CREATE TABLE test_database.test_table_1 ("
mysql_node.query("CREATE TABLE test_database_dml.test_table_1 ("
"`key` INT NOT NULL PRIMARY KEY, "
"unsigned_tiny_int TINYINT UNSIGNED, tiny_int TINYINT, "
"unsigned_small_int SMALLINT UNSIGNED, small_int SMALLINT, "
@ -53,68 +53,68 @@ def dml_with_materialize_mysql_database(clickhouse_node, mysql_node, service_nam
# it already has some data
mysql_node.query("""
INSERT INTO test_database.test_table_1 VALUES(1, 1, -1, 2, -2, 3, -3, 4, -4, 5, -5, 6, -6, 3.2, -3.2, 3.4, -3.4, 'varchar', 'char', 'binary',
INSERT INTO test_database_dml.test_table_1 VALUES(1, 1, -1, 2, -2, 3, -3, 4, -4, 5, -5, 6, -6, 3.2, -3.2, 3.4, -3.4, 'varchar', 'char', 'binary',
'2020-01-01', '2020-01-01 00:00:00', '2020-01-01 00:00:00', true);
""")
clickhouse_node.query(
"CREATE DATABASE test_database ENGINE = MaterializeMySQL('{}:3306', 'test_database', 'root', 'clickhouse')".format(
"CREATE DATABASE test_database_dml ENGINE = MaterializeMySQL('{}:3306', 'test_database_dml', 'root', 'clickhouse')".format(
service_name))
assert "test_database" in clickhouse_node.query("SHOW DATABASES")
assert "test_database_dml" in clickhouse_node.query("SHOW DATABASES")
check_query(clickhouse_node, "SELECT * FROM test_database.test_table_1 ORDER BY key FORMAT TSV",
check_query(clickhouse_node, "SELECT * FROM test_database_dml.test_table_1 ORDER BY key FORMAT TSV",
"1\t1\t-1\t2\t-2\t3\t-3\t4\t-4\t5\t-5\t6\t-6\t3.2\t-3.2\t3.4\t-3.4\tvarchar\tchar\tbinary\\0\\0\t2020-01-01\t"
"2020-01-01 00:00:00\t2020-01-01 00:00:00\t1\n")
mysql_node.query("""
INSERT INTO test_database.test_table_1 VALUES(2, 1, -1, 2, -2, 3, -3, 4, -4, 5, -5, 6, -6, 3.2, -3.2, 3.4, -3.4, 'varchar', 'char', 'binary',
INSERT INTO test_database_dml.test_table_1 VALUES(2, 1, -1, 2, -2, 3, -3, 4, -4, 5, -5, 6, -6, 3.2, -3.2, 3.4, -3.4, 'varchar', 'char', 'binary',
'2020-01-01', '2020-01-01 00:00:00', '2020-01-01 00:00:00', false);
""")
check_query(clickhouse_node, "SELECT * FROM test_database.test_table_1 ORDER BY key FORMAT TSV",
check_query(clickhouse_node, "SELECT * FROM test_database_dml.test_table_1 ORDER BY key FORMAT TSV",
"1\t1\t-1\t2\t-2\t3\t-3\t4\t-4\t5\t-5\t6\t-6\t3.2\t-3.2\t3.4\t-3.4\tvarchar\tchar\tbinary\\0\\0\t2020-01-01\t"
"2020-01-01 00:00:00\t2020-01-01 00:00:00\t1\n2\t1\t-1\t2\t-2\t3\t-3\t4\t-4\t5\t-5\t6\t-6\t3.2\t-3.2\t3.4\t-3.4\t"
"varchar\tchar\tbinary\\0\\0\t2020-01-01\t2020-01-01 00:00:00\t2020-01-01 00:00:00\t0\n")
mysql_node.query("UPDATE test_database.test_table_1 SET unsigned_tiny_int = 2 WHERE `key` = 1")
mysql_node.query("UPDATE test_database_dml.test_table_1 SET unsigned_tiny_int = 2 WHERE `key` = 1")
check_query(clickhouse_node, """
SELECT key, unsigned_tiny_int, tiny_int, unsigned_small_int,
small_int, unsigned_medium_int, medium_int, unsigned_int, _int, unsigned_integer, _integer,
unsigned_bigint, _bigint, unsigned_float, _float, unsigned_double, _double, _varchar, _char, binary_col,
_date, _datetime, /* exclude it, because ON UPDATE CURRENT_TIMESTAMP _timestamp, */
_bool FROM test_database.test_table_1 ORDER BY key FORMAT TSV
_bool FROM test_database_dml.test_table_1 ORDER BY key FORMAT TSV
""",
"1\t2\t-1\t2\t-2\t3\t-3\t4\t-4\t5\t-5\t6\t-6\t3.2\t-3.2\t3.4\t-3.4\tvarchar\tchar\tbinary\\0\\0\t2020-01-01\t"
"2020-01-01 00:00:00\t1\n2\t1\t-1\t2\t-2\t3\t-3\t4\t-4\t5\t-5\t6\t-6\t3.2\t-3.2\t3.4\t-3.4\t"
"varchar\tchar\tbinary\\0\\0\t2020-01-01\t2020-01-01 00:00:00\t0\n")
# update primary key
mysql_node.query("UPDATE test_database.test_table_1 SET `key` = 3 WHERE `unsigned_tiny_int` = 2")
mysql_node.query("UPDATE test_database_dml.test_table_1 SET `key` = 3 WHERE `unsigned_tiny_int` = 2")
check_query(clickhouse_node, "SELECT key, unsigned_tiny_int, tiny_int, unsigned_small_int,"
" small_int, unsigned_medium_int, medium_int, unsigned_int, _int, unsigned_integer, _integer, "
" unsigned_bigint, _bigint, unsigned_float, _float, unsigned_double, _double, _varchar, _char, binary_col, "
" _date, _datetime, /* exclude it, because ON UPDATE CURRENT_TIMESTAMP _timestamp, */ "
" _bool FROM test_database.test_table_1 ORDER BY key FORMAT TSV",
" _bool FROM test_database_dml.test_table_1 ORDER BY key FORMAT TSV",
"2\t1\t-1\t2\t-2\t3\t-3\t4\t-4\t5\t-5\t6\t-6\t3.2\t-3.2\t3.4\t-3.4\t"
"varchar\tchar\tbinary\\0\\0\t2020-01-01\t2020-01-01 00:00:00\t0\n3\t2\t-1\t2\t-2\t3\t-3\t"
"4\t-4\t5\t-5\t6\t-6\t3.2\t-3.2\t3.4\t-3.4\tvarchar\tchar\tbinary\\0\\0\t2020-01-01\t2020-01-01 00:00:00\t1\n")
mysql_node.query('DELETE FROM test_database.test_table_1 WHERE `key` = 2')
mysql_node.query('DELETE FROM test_database_dml.test_table_1 WHERE `key` = 2')
check_query(clickhouse_node, "SELECT key, unsigned_tiny_int, tiny_int, unsigned_small_int,"
" small_int, unsigned_medium_int, medium_int, unsigned_int, _int, unsigned_integer, _integer, "
" unsigned_bigint, _bigint, unsigned_float, _float, unsigned_double, _double, _varchar, _char, binary_col, "
" _date, _datetime, /* exclude it, because ON UPDATE CURRENT_TIMESTAMP _timestamp, */ "
" _bool FROM test_database.test_table_1 ORDER BY key FORMAT TSV",
" _bool FROM test_database_dml.test_table_1 ORDER BY key FORMAT TSV",
"3\t2\t-1\t2\t-2\t3\t-3\t4\t-4\t5\t-5\t6\t-6\t3.2\t-3.2\t3.4\t-3.4\tvarchar\tchar\tbinary\\0\\0\t2020-01-01\t"
"2020-01-01 00:00:00\t1\n")
mysql_node.query('DELETE FROM test_database.test_table_1 WHERE `unsigned_tiny_int` = 2')
check_query(clickhouse_node, "SELECT * FROM test_database.test_table_1 ORDER BY key FORMAT TSV", "")
mysql_node.query('DELETE FROM test_database_dml.test_table_1 WHERE `unsigned_tiny_int` = 2')
check_query(clickhouse_node, "SELECT * FROM test_database_dml.test_table_1 ORDER BY key FORMAT TSV", "")
clickhouse_node.query("DROP DATABASE test_database")
mysql_node.query("DROP DATABASE test_database")
clickhouse_node.query("DROP DATABASE test_database_dml")
mysql_node.query("DROP DATABASE test_database_dml")
def materialize_mysql_database_with_datetime_and_decimal(clickhouse_node, mysql_node, service_name):
@ -525,7 +525,7 @@ def insert_with_modify_binlog_checksum(clickhouse_node, mysql_node, service_name
def err_sync_user_privs_with_materialize_mysql_database(clickhouse_node, mysql_node, service_name):
clickhouse_node.query("DROP DATABASE IF EXISTS priv_err_db")
clickhouse_node.query("DROP DATABАASE IF EXISTS priv_err_db")
mysql_node.query("DROP DATABASE IF EXISTS priv_err_db")
mysql_node.query("CREATE DATABASE priv_err_db DEFAULT CHARACTER SET 'utf8'")
mysql_node.query("CREATE TABLE priv_err_db.test_table_1 (id INT NOT NULL PRIMARY KEY) ENGINE = InnoDB;")
@ -670,6 +670,7 @@ def mysql_kill_sync_thread_restore_test(clickhouse_node, mysql_node, service_nam
def mysql_killed_while_insert(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS kill_mysql_while_insert")
mysql_node.query("CREATE DATABASE kill_mysql_while_insert")
mysql_node.query("CREATE TABLE kill_mysql_while_insert.test ( `id` int(11) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB;")
clickhouse_node.query("CREATE DATABASE kill_mysql_while_insert ENGINE = MaterializeMySQL('{}:3306', 'kill_mysql_while_insert', 'root', 'clickhouse')".format(service_name))
@ -684,17 +685,14 @@ def mysql_killed_while_insert(clickhouse_node, mysql_node, service_name):
t = threading.Thread(target=insert, args=(10000,))
t.start()
run_and_check(
['docker-compose', '-p', mysql_node.project_name, '-f', mysql_node.docker_compose, 'stop'])
clickhouse_node.cluster.restart_service(service_name)
finally:
with pytest.raises(QueryRuntimeException) as execption:
time.sleep(5)
clickhouse_node.query("SELECT count() FROM kill_mysql_while_insert.test")
assert "Master maybe lost." in str(execption.value)
run_and_check(
['docker-compose', '-p', mysql_node.project_name, '-f', mysql_node.docker_compose, 'start'])
mysql_node.wait_mysql_to_start(120)
mysql_node.alloc_connection()
clickhouse_node.query("DETACH DATABASE kill_mysql_while_insert")
clickhouse_node.query("ATTACH DATABASE kill_mysql_while_insert")
@ -709,6 +707,7 @@ def mysql_killed_while_insert(clickhouse_node, mysql_node, service_name):
def clickhouse_killed_while_insert(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS kill_clickhouse_while_insert")
mysql_node.query("CREATE DATABASE kill_clickhouse_while_insert")
mysql_node.query("CREATE TABLE kill_clickhouse_while_insert.test ( `id` int(11) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB;")
clickhouse_node.query("CREATE DATABASE kill_clickhouse_while_insert ENGINE = MaterializeMySQL('{}:3306', 'kill_clickhouse_while_insert', 'root', 'clickhouse')".format(service_name))

View File

@ -7,15 +7,18 @@ import pymysql.cursors
import pytest
from helpers.cluster import ClickHouseCluster, get_docker_compose_path, run_and_check
import docker
import logging
from . import materialize_with_ddl
DOCKER_COMPOSE_PATH = get_docker_compose_path()
cluster = ClickHouseCluster(__file__)
mysql_node = None
mysql8_node = None
node_db_ordinary = cluster.add_instance('node1', user_configs=["configs/users.xml"], with_mysql=False, stay_alive=True)
node_db_atomic = cluster.add_instance('node2', user_configs=["configs/users_db_atomic.xml"], with_mysql=False, stay_alive=True)
node_db_ordinary = cluster.add_instance('node1', user_configs=["configs/users.xml"], with_mysql=True, stay_alive=True)
node_db_atomic = cluster.add_instance('node2', user_configs=["configs/users_db_atomic.xml"], with_mysql8=True, stay_alive=True)
@pytest.fixture(scope="module")
@ -27,26 +30,29 @@ def started_cluster():
cluster.shutdown()
class MySQLNodeInstance:
def __init__(self, user='root', password='clickhouse', ip_address='127.0.0.1', port=cluster.mysql_port, docker_compose=None, project_name=cluster.project_name):
class MySQLConnection:
def __init__(self, port, user='root', password='clickhouse', ip_address='127.0.0.1', docker_compose=None, project_name=cluster.project_name):
self.user = user
self.port = port
self.ip_address = ip_address
self.password = password
self.mysql_connection = None # lazy init
self.docker_compose = docker_compose
self.project_name = project_name
def alloc_connection(self):
if self.mysql_connection is None:
self.mysql_connection = pymysql.connect(user=self.user, password=self.password, host=self.ip_address,
port=self.port, autocommit=True)
else:
if self.mysql_connection.ping():
self.mysql_connection = pymysql.connect(user=self.user, password=self.password, host=self.ip_address,
port=self.port, autocommit=True)
return self.mysql_connection
errors = []
for _ in range(5):
try:
if self.mysql_connection is None:
self.mysql_connection = pymysql.connect(user=self.user, password=self.password, host=self.ip_address,
port=self.port, autocommit=True)
else:
self.mysql_connection.ping(reconnect=True)
logging.debug("MySQL Connection establised: {}:{}".format(self.ip_address, self.port))
return self.mysql_connection
except Exception as e:
errors += [str(e)]
time.sleep(1)
raise Exception("Connection not establised, {}".format(errors))
def query(self, execution_query):
with self.alloc_connection().cursor() as cursor:
@ -75,174 +81,133 @@ class MySQLNodeInstance:
if self.mysql_connection is not None:
self.mysql_connection.close()
def wait_mysql_to_start(self, timeout=60):
start = time.time()
while time.time() - start < timeout:
try:
self.alloc_connection()
print("Mysql Started")
return
except Exception as ex:
print("Can't connect to MySQL " + str(ex))
time.sleep(0.5)
run_and_check(['docker-compose', 'ps', '--services', 'all'])
raise Exception("Cannot wait MySQL container")
@pytest.fixture(scope="module")
def started_mysql_5_7():
docker_compose = os.path.join(DOCKER_COMPOSE_PATH, 'docker_compose_mysql_5_7_for_materialize_mysql.yml')
mysql_node = MySQLNodeInstance('root', 'clickhouse', '127.0.0.1', 3308, docker_compose)
try:
run_and_check(
['docker-compose', '-p', cluster.project_name, '-f', docker_compose, 'up', '--no-recreate', '-d'])
mysql_node.wait_mysql_to_start(120)
yield mysql_node
finally:
mysql_node.close()
run_and_check(['docker-compose', '-p', cluster.project_name, '-f', docker_compose, 'down', '--volumes',
'--remove-orphans'])
mysql_node = MySQLConnection(cluster.mysql_port, 'root', 'clickhouse', '127.0.0.1')
yield mysql_node
@pytest.fixture(scope="module")
def started_mysql_8_0():
docker_compose = os.path.join(DOCKER_COMPOSE_PATH, 'docker_compose_mysql_8_0_for_materialize_mysql.yml')
mysql_node = MySQLNodeInstance('root', 'clickhouse', '127.0.0.1', 33308, docker_compose)
try:
run_and_check(
['docker-compose', '-p', cluster.project_name, '-f', docker_compose, 'up', '--no-recreate', '-d'])
mysql_node.wait_mysql_to_start(120)
yield mysql_node
finally:
mysql_node.close()
run_and_check(['docker-compose', '-p', cluster.project_name, '-f', docker_compose, 'down', '--volumes',
'--remove-orphans'])
mysql8_node = MySQLConnection(cluster.mysql8_port, 'root', 'clickhouse', '127.0.0.1')
yield mysql8_node
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_materialize_database_dml_with_mysql_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.dml_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.materialize_mysql_database_with_datetime_and_decimal(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.dml_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.materialize_mysql_database_with_datetime_and_decimal(clickhouse_node, started_mysql_5_7, "mysql57")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_materialize_database_dml_with_mysql_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.dml_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql8_0")
materialize_with_ddl.materialize_mysql_database_with_datetime_and_decimal(clickhouse_node, started_mysql_8_0, "mysql8_0")
materialize_with_ddl.dml_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.materialize_mysql_database_with_datetime_and_decimal(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_materialize_database_ddl_with_mysql_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.drop_table_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.create_table_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.rename_table_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.alter_add_column_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.alter_drop_column_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.drop_table_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.create_table_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.rename_table_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.alter_add_column_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.alter_drop_column_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
# mysql 5.7 cannot support alter rename column
# materialize_with_ddl.alter_rename_column_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.alter_rename_table_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.alter_modify_column_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1")
# materialize_with_ddl.alter_rename_column_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.alter_rename_table_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.alter_modify_column_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_materialize_database_ddl_with_mysql_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.drop_table_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql8_0")
materialize_with_ddl.create_table_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql8_0")
materialize_with_ddl.rename_table_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql8_0")
materialize_with_ddl.alter_add_column_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0,
"mysql8_0")
materialize_with_ddl.alter_drop_column_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0,
"mysql8_0")
materialize_with_ddl.alter_rename_table_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0,
"mysql8_0")
materialize_with_ddl.alter_rename_column_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0,
"mysql8_0")
materialize_with_ddl.alter_modify_column_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0,
"mysql8_0")
materialize_with_ddl.drop_table_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.create_table_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.rename_table_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.alter_add_column_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.alter_drop_column_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.alter_rename_table_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.alter_rename_column_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.alter_modify_column_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_materialize_database_ddl_with_empty_transaction_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.query_event_with_empty_transaction(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.query_event_with_empty_transaction(clickhouse_node, started_mysql_5_7, "mysql57")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_materialize_database_ddl_with_empty_transaction_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.query_event_with_empty_transaction(clickhouse_node, started_mysql_8_0, "mysql8_0")
materialize_with_ddl.query_event_with_empty_transaction(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_select_without_columns_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.select_without_columns(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.select_without_columns(clickhouse_node, started_mysql_5_7, "mysql57")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_select_without_columns_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.select_without_columns(clickhouse_node, started_mysql_8_0, "mysql8_0")
materialize_with_ddl.select_without_columns(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_insert_with_modify_binlog_checksum_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.insert_with_modify_binlog_checksum(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.insert_with_modify_binlog_checksum(clickhouse_node, started_mysql_5_7, "mysql57")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_insert_with_modify_binlog_checksum_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.insert_with_modify_binlog_checksum(clickhouse_node, started_mysql_8_0, "mysql8_0")
materialize_with_ddl.insert_with_modify_binlog_checksum(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_materialize_database_err_sync_user_privs_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.err_sync_user_privs_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.err_sync_user_privs_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_materialize_database_err_sync_user_privs_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.err_sync_user_privs_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql8_0")
materialize_with_ddl.err_sync_user_privs_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_network_partition_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.network_partition_test(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.network_partition_test(clickhouse_node, started_mysql_5_7, "mysql57")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_network_partition_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.network_partition_test(clickhouse_node, started_mysql_8_0, "mysql8_0")
materialize_with_ddl.network_partition_test(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_mysql_kill_sync_thread_restore_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.mysql_kill_sync_thread_restore_test(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.mysql_kill_sync_thread_restore_test(clickhouse_node, started_mysql_5_7, "mysql57")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_mysql_kill_sync_thread_restore_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.mysql_kill_sync_thread_restore_test(clickhouse_node, started_mysql_8_0, "mysql8_0")
materialize_with_ddl.mysql_kill_sync_thread_restore_test(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_mysql_killed_while_insert_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.mysql_killed_while_insert(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.mysql_killed_while_insert(clickhouse_node, started_mysql_5_7, "mysql57")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_mysql_killed_while_insert_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.mysql_killed_while_insert(clickhouse_node, started_mysql_8_0, "mysql8_0")
materialize_with_ddl.mysql_killed_while_insert(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_clickhouse_killed_while_insert_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.clickhouse_killed_while_insert(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.clickhouse_killed_while_insert(clickhouse_node, started_mysql_5_7, "mysql57")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_clickhouse_killed_while_insert_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.clickhouse_killed_while_insert(clickhouse_node, started_mysql_8_0, "mysql8_0")
materialize_with_ddl.clickhouse_killed_while_insert(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_ordinary])
def test_utf8mb4(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.utf8mb4_test(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.utf8mb4_test(clickhouse_node, started_mysql_8_0, "mysql8_0")
materialize_with_ddl.utf8mb4_test(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.utf8mb4_test(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_ordinary])
def test_system_parts_table(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.system_parts_test(clickhouse_node, started_mysql_8_0, "mysql8_0")
materialize_with_ddl.system_parts_test(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_ordinary])
def test_multi_table_update(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.multi_table_update_test(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.multi_table_update_test(clickhouse_node, started_mysql_8_0, "mysql8_0")
materialize_with_ddl.multi_table_update_test(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.multi_table_update_test(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_ordinary])
def test_system_tables_table(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.system_tables_test(clickhouse_node, started_mysql_8_0, "mysql8_0")
materialize_with_ddl.system_tables_test(clickhouse_node, started_mysql_8_0, "mysql80")

View File

@ -3,6 +3,8 @@ import time
import psycopg2
import pymysql.cursors
import pytest
import logging
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
@ -15,6 +17,11 @@ node1 = cluster.add_instance('node1', with_odbc_drivers=True, with_mysql=True,
'configs/dictionaries/sqlite3_odbc_cached_dictionary.xml',
'configs/dictionaries/postgres_odbc_hashed_dictionary.xml'], stay_alive=True)
drop_table_sql_template = """
DROP TABLE IF EXISTS `clickhouse`.`{}`
"""
create_table_sql_template = """
CREATE TABLE `clickhouse`.`{}` (
`id` int(11) NOT NULL,
@ -27,9 +34,22 @@ create_table_sql_template = """
def get_mysql_conn():
conn = pymysql.connect(user='root', password='clickhouse', host='127.0.0.1', port=cluster.mysql_port)
return conn
errors = []
conn = None
for _ in range(5):
try:
if conn is None:
conn = pymysql.connect(user='root', password='clickhouse', host='127.0.0.1', port=cluster.mysql_port)
else:
conn.ping(reconnect=True)
logging.debug("MySQL Connection establised: 127.0.0.1:{}".format(cluster.mysql_port))
return conn
except Exception as e:
errors += [str(e)]
time.sleep(1)
raise Exception("Connection not establised, {}".format(errors))
def create_mysql_db(conn, name):
with conn.cursor() as cursor:
@ -39,6 +59,7 @@ def create_mysql_db(conn, name):
def create_mysql_table(conn, table_name):
with conn.cursor() as cursor:
cursor.execute(drop_table_sql_template.format(table_name))
cursor.execute(create_table_sql_template.format(table_name))