add integration test_storage_mysql

This commit is contained in:
sundy-li 2018-05-14 19:10:07 +08:00 committed by alexey-milovidov
parent 5d91b4f2fd
commit 8a5990fc74
8 changed files with 152 additions and 15 deletions

View File

@ -21,7 +21,7 @@ public:
String getName() const override { return "MySQL"; }
Block getHeader() const override { return description.sample_block; };
Block getHeader() const override { return description.sample_block; }
private:
Block readImpl() override;

View File

@ -14,7 +14,7 @@ Don't use Docker from your system repository.
* [pip](https://pypi.python.org/pypi/pip). To install: `sudo apt-get install python-pip`
* [py.test](https://docs.pytest.org/) testing framework. To install: `sudo -H pip install pytest`
* [docker-compose](https://docs.docker.com/compose/) and additional python libraries. To install: `sudo -H pip install docker-compose docker dicttoxml kazoo`
* [docker-compose](https://docs.docker.com/compose/) and additional python libraries. To install: `sudo -H pip install docker-compose docker dicttoxml kazoo PyMySQL`
If you want to run the tests under a non-privileged user, you must add this user to `docker` group: `sudo usermod -aG docker $USER` and re-login.
(You must close all your sessions (for example, restart your computer))

View File

@ -48,15 +48,17 @@ class ClickHouseCluster:
self.base_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name', self.project_name]
self.base_zookeeper_cmd = None
self.base_mysql_cmd = []
self.pre_zookeeper_commands = []
self.instances = {}
self.with_zookeeper = False
self.with_mysql = False
self.docker_client = None
self.is_up = False
def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macroses={}, with_zookeeper=False,
def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macroses={}, with_zookeeper=False, with_mysql=False,
clickhouse_path_dir=None, hostname=None):
"""Add an instance to the cluster.
@ -75,7 +77,7 @@ class ClickHouseCluster:
instance = ClickHouseInstance(
self, self.base_dir, name, config_dir, main_configs, user_configs, macroses, with_zookeeper,
self.zookeeper_config_path, self.base_configs_dir, self.server_bin_path, clickhouse_path_dir, hostname=hostname)
self.zookeeper_config_path, with_mysql, self.base_configs_dir, self.server_bin_path, clickhouse_path_dir, hostname=hostname)
self.instances[name] = instance
self.base_cmd.extend(['--file', instance.docker_compose_path])
@ -85,6 +87,12 @@ class ClickHouseCluster:
self.base_zookeeper_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_zookeeper.yml')]
if with_mysql and not self.with_mysql:
self.with_mysql = True
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')])
self.base_mysql_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')]
return instance
@ -124,6 +132,9 @@ class ClickHouseCluster:
for command in self.pre_zookeeper_commands:
self.run_kazoo_commands_with_retries(command, repeats=5)
if self.with_mysql and self.base_mysql_cmd:
subprocess.check_call(self.base_mysql_cmd + ['up', '-d', '--no-recreate'])
# Uncomment for debugging
#print ' '.join(self.base_cmd + ['up', '--no-recreate'])
@ -138,6 +149,7 @@ class ClickHouseCluster:
instance.client = Client(instance.ip_address, command=self.client_bin_path)
self.is_up = True
@ -201,7 +213,7 @@ services:
class ClickHouseInstance:
def __init__(
self, cluster, base_path, name, custom_config_dir, custom_main_configs, custom_user_configs, macroses,
with_zookeeper, zookeeper_config_path, base_configs_dir, server_bin_path, clickhouse_path_dir, hostname=None):
with_zookeeper, zookeeper_config_path, with_mysql, base_configs_dir, server_bin_path, clickhouse_path_dir, hostname=None):
self.name = name
self.base_cmd = cluster.base_cmd[:]
@ -220,6 +232,8 @@ class ClickHouseInstance:
self.base_configs_dir = base_configs_dir
self.server_bin_path = server_bin_path
self.with_mysql = with_mysql
self.path = p.join(self.cluster.instances_dir, name)
self.docker_compose_path = p.join(self.path, 'docker_compose.yml')
@ -269,7 +283,6 @@ class ClickHouseInstance:
while True:
status = self.get_docker_handle().status
if status == 'exited':
raise Exception("Instance `{}' failed to start. Container status: {}".format(self.name, status))
@ -356,9 +369,15 @@ class ClickHouseInstance:
logs_dir = p.abspath(p.join(self.path, 'logs'))
os.mkdir(logs_dir)
depends_on = '[]'
depends_on = []
if self.with_mysql:
depends_on.append("mysql1")
if self.with_zookeeper:
depends_on = '["zoo1", "zoo2", "zoo3"]'
depends_on.append("zoo1")
depends_on.append("zoo2")
depends_on.append("zoo3")
with open(self.docker_compose_path, 'w') as docker_compose:
docker_compose.write(DOCKER_COMPOSE_TEMPLATE.format(
@ -370,7 +389,7 @@ class ClickHouseInstance:
config_d_dir=config_d_dir,
db_dir=db_dir,
logs_dir=logs_dir,
depends_on=depends_on))
depends_on=str(depends_on)))
def destroy_dir(self):

View File

@ -0,0 +1,9 @@
version: '2'
services:
mysql1:
image: mysql:5.7
restart: always
environment:
MYSQL_ROOT_PASSWORD: clickhouse
ports:
- 3308:3306

View File

@ -0,0 +1,12 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>

View File

@ -0,0 +1,98 @@
from contextlib import contextmanager
import pytest
## sudo -H pip install PyMySQL
import pymysql.cursors
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_mysql = True)
create_table_sql_template = """
CREATE TABLE `clickhouse`.`{}` (
`id` int(11) NOT NULL,
`name` varchar(50) NOT NULL,
`age` int NOT NULL default 0,
`money` int NOT NULL default 0,
PRIMARY KEY (`id`)) ENGINE=InnoDB;
"""
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
conn = get_mysql_conn()
## create mysql db and table
create_mysql_db(conn, 'clickhouse')
yield cluster
finally:
cluster.shutdown()
def test_insert_select(started_cluster):
table_name = 'test_insert_select'
conn = get_mysql_conn()
create_mysql_table(conn, table_name)
node1.query('''
CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL('mysql1:3306', 'clickhouse', '{}', 'root', 'clickhouse');
'''.format(table_name, table_name))
node1.query("INSERT INTO {}(id, name, money) select number, concat('name_', toString(number)), 3 from numbers(10000) ".format(table_name))
assert node1.query("SELECT count() FROM {}".format(table_name)).rstrip() == '10000'
assert node1.query("SELECT sum(money) FROM {}".format(table_name)).rstrip() == '30000'
conn.close()
def test_replace_select(started_cluster):
table_name = 'test_replace_select'
conn = get_mysql_conn()
create_mysql_table(conn, table_name)
node1.query('''
CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL('mysql1:3306', 'clickhouse', '{}', 'root', 'clickhouse', 1);
'''.format(table_name, table_name))
node1.query("INSERT INTO {}(id, name, money) select number, concat('name_', toString(number)), 3 from numbers(10000) ".format(table_name))
node1.query("INSERT INTO {}(id, name, money) select number, concat('name_', toString(number)), 3 from numbers(10000) ".format(table_name))
assert node1.query("SELECT count() FROM {}".format(table_name)).rstrip() == '10000'
assert node1.query("SELECT sum(money) FROM {}".format(table_name)).rstrip() == '30000'
conn.close()
def test_insert_on_duplicate_select(started_cluster):
table_name = 'test_insert_on_duplicate_select'
conn = get_mysql_conn()
create_mysql_table(conn, table_name)
node1.query('''
CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL('mysql1:3306', 'clickhouse', '{}', 'root', 'clickhouse', 0, 'update money = money + values(money)');
'''.format(table_name, table_name))
node1.query("INSERT INTO {}(id, name, money) select number, concat('name_', toString(number)), 3 from numbers(10000) ".format(table_name))
node1.query("INSERT INTO {}(id, name, money) select number, concat('name_', toString(number)), 3 from numbers(10000) ".format(table_name))
assert node1.query("SELECT count() FROM {}".format(table_name)).rstrip() == '10000'
assert node1.query("SELECT sum(money) FROM {}".format(table_name)).rstrip() == '60000'
conn.close()
def get_mysql_conn():
conn = pymysql.connect(user='root', password='clickhouse', host='127.0.0.1', port=3308)
return conn
def create_mysql_db(conn, name):
with conn.cursor() as cursor:
cursor.execute(
"CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(name))
def create_mysql_table(conn, tableName):
with conn.cursor() as cursor:
cursor.execute(create_table_sql_template.format(tableName))
if __name__ == '__main__':
with contextmanager(started_cluster)() as cluster:
for name, instance in cluster.instances.items():
print name, instance.ip_address
raw_input("Cluster created, press any key to destroy...")

View File

@ -4,7 +4,7 @@
The MySQL engine allows you to perform SELECT queries on data that is stored on a remote MySQL server.
The engine takes 5 - 7 parameters: the server address (host and port); the name of the database; the name of the table; the user's name; the user's password. Example:
The engine takes 5-7 parameters: the server address (host and port); the name of the database; the name of the table; the user's name; the user's password; wheter to use replace query; the on duplcate clause. Example:
```text
MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause']);
@ -16,5 +16,4 @@ The rest of the conditions and the LIMIT sampling constraint are executed in Cli
If `replace_query` is specified to 1, then `INSERT INTO` query to this table would be transformed to `REPLACE INTO`.
If `on_duplicate_clause` is specified, eg `update impression = values(impression) + impression`, it would add `on_duplicate_clause` to the end of the MySQL insert sql.
If both `replace_query` and `on_duplicate_clause` are specified, only the `on_duplicate_clause` will work.
Notice that only one of 'replace_query' and 'on_duplicate_clause' can be specified, or none of them.