mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 00:52:02 +00:00
Add kazoo lib into integration test, enhanced some tests. [#CLICKHOUSE-3346]
This commit is contained in:
parent
d6db480be1
commit
d0f42c8f2f
@ -636,8 +636,8 @@ public:
|
||||
<< ", shard " << task_shard->info.shard_num << ")");
|
||||
|
||||
LOG_DEBUG(log, "There are "
|
||||
<< task_table.all_shards.size() << " shards, and "
|
||||
<< task_table.local_shards.size() << " remote ones");
|
||||
<< task_table.all_shards.size() << " shards, "
|
||||
<< task_table.local_shards.size() << " of them are remote ones");
|
||||
|
||||
auto connection_entry = task_shard->info.pool->get(&task_cluster->settings_pull);
|
||||
LOG_DEBUG(log, "Will get meta information for shard " << task_shard->numberInCluster()
|
||||
|
@ -14,7 +14,7 @@ Don't use Docker from your system repository.
|
||||
|
||||
* [pip](https://pypi.python.org/pypi/pip). To install: `sudo apt-get install python-pip`
|
||||
* [py.test](https://docs.pytest.org/) testing framework. To install: `sudo -H pip install pytest`
|
||||
* [docker-compose](https://docs.docker.com/compose/) and additional python libraries. To install: `sudo -H pip install docker-compose docker dicttoxml`
|
||||
* [docker-compose](https://docs.docker.com/compose/) and additional python libraries. To install: `sudo -H pip install docker-compose docker dicttoxml kazoo`
|
||||
|
||||
If you want to run the tests under a non-privileged user, you must add this user to `docker` group: `sudo usermod -aG docker $USER` and re-login.
|
||||
(You must close all your sessions (for example, restart your computer))
|
||||
|
@ -10,6 +10,8 @@ import time
|
||||
import errno
|
||||
from dicttoxml import dicttoxml
|
||||
import xml.dom.minidom
|
||||
from kazoo.client import KazooClient
|
||||
from kazoo.exceptions import KazooException
|
||||
|
||||
import docker
|
||||
from docker.errors import ContainerError
|
||||
@ -91,6 +93,12 @@ class ClickHouseCluster:
|
||||
return self.project_name + '_' + instance_name + '_1'
|
||||
|
||||
|
||||
def get_instance_ip(self, instance_name):
|
||||
docker_id = self.get_instance_docker_id(instance_name)
|
||||
handle = self.docker_client.containers.get(docker_id)
|
||||
return handle.attrs['NetworkSettings']['Networks'].values()[0]['IPAddress']
|
||||
|
||||
|
||||
def start(self, destroy_dirs=True):
|
||||
if self.is_up:
|
||||
return
|
||||
@ -114,7 +122,7 @@ class ClickHouseCluster:
|
||||
if self.with_zookeeper and self.base_zookeeper_cmd:
|
||||
subprocess.check_call(self.base_zookeeper_cmd + ['up', '-d', '--no-recreate'])
|
||||
for command in self.pre_zookeeper_commands:
|
||||
self.run_zookeeper_client_command(command, repeats=5)
|
||||
self.run_kazoo_commands_with_retries(command, repeats=5)
|
||||
|
||||
# Uncomment for debugging
|
||||
# print ' '.join(self.base_cmd + ['up', '--no-recreate'])
|
||||
@ -124,9 +132,7 @@ class ClickHouseCluster:
|
||||
start_deadline = time.time() + 20.0 # seconds
|
||||
for instance in self.instances.itervalues():
|
||||
instance.docker_client = self.docker_client
|
||||
|
||||
container = self.docker_client.containers.get(instance.docker_id)
|
||||
instance.ip_address = container.attrs['NetworkSettings']['Networks'].values()[0]['IPAddress']
|
||||
instance.ip_address = self.get_instance_ip(instance.name)
|
||||
|
||||
instance.wait_for_start(start_deadline)
|
||||
|
||||
@ -149,17 +155,23 @@ class ClickHouseCluster:
|
||||
instance.client = None
|
||||
|
||||
|
||||
def run_zookeeper_client_command(self, command, zoo_node = 'zoo1', repeats=1, sleep_for=1):
|
||||
cli_cmd = 'zkCli.sh ' + command
|
||||
zoo_name = self.get_instance_docker_id(zoo_node)
|
||||
network_mode = 'container:' + zoo_name
|
||||
for i in range(0, repeats - 1):
|
||||
def get_kazoo_client(self, zoo_instance_name):
|
||||
zk = KazooClient(hosts=self.get_instance_ip(zoo_instance_name))
|
||||
zk.start()
|
||||
return zk
|
||||
|
||||
|
||||
def run_kazoo_commands_with_retries(self, kazoo_callback, zoo_instance_name = 'zoo1', repeats=1, sleep_for=1):
|
||||
for i in range(repeats - 1):
|
||||
try:
|
||||
return self.docker_client.containers.run('zookeeper', cli_cmd, remove=True, network_mode=network_mode)
|
||||
except ContainerError:
|
||||
kazoo_callback(self.get_kazoo_client(zoo_instance_name))
|
||||
return
|
||||
except KazooException as e:
|
||||
print repr(e)
|
||||
time.sleep(sleep_for)
|
||||
|
||||
return self.docker_client.containers.run('zookeeper', cli_cmd, remove=True, network_mode=network_mode)
|
||||
kazoo_callback(self.get_kazoo_client(zoo_instance_name))
|
||||
|
||||
|
||||
def add_zookeeper_startup_command(self, command):
|
||||
self.pre_zookeeper_commands.append(command)
|
||||
|
@ -17,6 +17,24 @@ from helpers.test_tools import TSV
|
||||
COPYING_FAIL_PROBABILITY = 0.33
|
||||
cluster = None
|
||||
|
||||
|
||||
def check_all_hosts_sucesfully_executed(tsv_content, num_hosts):
|
||||
M = TSV.toMat(tsv_content)
|
||||
hosts = [(l[0], l[1]) for l in M] # (host, port)
|
||||
codes = [l[2] for l in M]
|
||||
messages = [l[3] for l in M]
|
||||
|
||||
assert len(hosts) == num_hosts and len(set(hosts)) == num_hosts, "\n" + tsv_content
|
||||
assert len(set(codes)) == 1, "\n" + tsv_content
|
||||
assert codes[0] == "0", "\n" + tsv_content
|
||||
|
||||
|
||||
def ddl_check_query(instance, query, num_hosts=3):
|
||||
contents = instance.query(query)
|
||||
check_all_hosts_sucesfully_executed(contents, num_hosts)
|
||||
return contents
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
global cluster
|
||||
@ -54,23 +72,24 @@ def started_cluster():
|
||||
def _test_copying(cmd_options):
|
||||
instance = cluster.instances['s0_0_0']
|
||||
|
||||
instance.query("CREATE TABLE hits ON CLUSTER cluster0 (d UInt64) ENGINE=ReplicatedMergeTree('/clickhouse/tables/cluster{cluster}/{shard}', '{replica}') PARTITION BY d % 3 ORDER BY d SETTINGS index_granularity = 16")
|
||||
instance.query("CREATE TABLE hits_all ON CLUSTER cluster0 (d UInt64) ENGINE=Distributed(cluster0, default, hits, d)")
|
||||
instance.query("CREATE TABLE hits_all ON CLUSTER cluster1 (d UInt64) ENGINE=Distributed(cluster1, default, hits, d + 1)")
|
||||
ddl_check_query(instance, "DROP TABLE IF EXISTS hits ON CLUSTER cluster0")
|
||||
ddl_check_query(instance, "DROP TABLE IF EXISTS hits ON CLUSTER cluster1")
|
||||
|
||||
ddl_check_query(instance, "CREATE TABLE hits ON CLUSTER cluster0 (d UInt64) ENGINE=ReplicatedMergeTree('/clickhouse/tables/cluster_{cluster}/{shard}', '{replica}') PARTITION BY d % 3 ORDER BY d SETTINGS index_granularity = 16")
|
||||
ddl_check_query(instance, "CREATE TABLE hits_all ON CLUSTER cluster0 (d UInt64) ENGINE=Distributed(cluster0, default, hits, d)")
|
||||
ddl_check_query(instance, "CREATE TABLE hits_all ON CLUSTER cluster1 (d UInt64) ENGINE=Distributed(cluster1, default, hits, d + 1)")
|
||||
instance.query("INSERT INTO hits_all SELECT * FROM system.numbers LIMIT 1002")
|
||||
|
||||
zoo_id = cluster.get_instance_docker_id('zoo1')
|
||||
zoo_handle = cluster.docker_client.containers.get(zoo_id)
|
||||
zoo_ip = zoo_handle.attrs['NetworkSettings']['Networks'].values()[0]['IPAddress']
|
||||
print "Use ZooKeeper server: {} ({})".format(zoo_id, zoo_ip)
|
||||
zk = cluster.get_kazoo_client('zoo1')
|
||||
print "Use ZooKeeper server: {}:{}".format(zk.hosts[0][0], zk.hosts[0][1])
|
||||
|
||||
zk = KazooClient(hosts=zoo_ip)
|
||||
zk.start()
|
||||
zk_task_path = "/clickhouse-copier/task_simple"
|
||||
zk.ensure_path(zk_task_path)
|
||||
|
||||
copier_task_config = open(os.path.join(CURRENT_TEST_DIR, 'task0_description.xml'), 'r').read()
|
||||
zk.create(zk_task_path + "/description", copier_task_config)
|
||||
|
||||
# Run cluster-copier processes on each node
|
||||
docker_api = docker.from_env().api
|
||||
copiers_exec_ids = []
|
||||
|
||||
@ -88,7 +107,7 @@ def _test_copying(cmd_options):
|
||||
copiers_exec_ids.append(exec_id)
|
||||
print "Copier for {} ({}) has started".format(instance.name, instance.ip_address)
|
||||
|
||||
# Wait for copiers finalizing and check their return codes
|
||||
# Wait for copiers stopping and check their return codes
|
||||
for exec_id, instance in zip(copiers_exec_ids, cluster.instances.itervalues()):
|
||||
while True:
|
||||
res = docker_api.exec_inspect(exec_id)
|
||||
@ -105,10 +124,10 @@ def _test_copying(cmd_options):
|
||||
assert TSV(cluster.instances['s1_1_0'].query("SELECT DISTINCT d % 2 FROM hits")) == TSV("0\n")
|
||||
|
||||
zk.delete(zk_task_path, recursive=True)
|
||||
instance.query("DROP TABLE hits_all ON CLUSTER cluster1")
|
||||
instance.query("DROP TABLE hits_all ON CLUSTER cluster1")
|
||||
instance.query("DROP TABLE hits ON CLUSTER cluster0")
|
||||
instance.query("DROP TABLE hits ON CLUSTER cluster1")
|
||||
ddl_check_query(instance, "DROP TABLE hits_all ON CLUSTER cluster0")
|
||||
ddl_check_query(instance, "DROP TABLE hits_all ON CLUSTER cluster1")
|
||||
ddl_check_query(instance, "DROP TABLE hits ON CLUSTER cluster0")
|
||||
ddl_check_query(instance, "DROP TABLE hits ON CLUSTER cluster1")
|
||||
|
||||
|
||||
def test_copy_simple(started_cluster):
|
||||
|
@ -89,7 +89,6 @@ def init_cluster(cluster):
|
||||
# Initialize databases and service tables
|
||||
instance = cluster.instances['ch1']
|
||||
|
||||
instance.query("SELECT 1")
|
||||
ddl_check_query(instance, """
|
||||
CREATE TABLE IF NOT EXISTS all_tables ON CLUSTER 'cluster_no_replicas'
|
||||
(database String, name String, engine String, metadata_modification_time DateTime)
|
||||
@ -119,6 +118,8 @@ def started_cluster():
|
||||
for instance in cluster.instances.values():
|
||||
ddl_check_there_are_no_dublicates(instance)
|
||||
|
||||
cluster.pm_random_drops.heal_all()
|
||||
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
@ -1,3 +1,4 @@
|
||||
from __future__ import print_function
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
import pytest
|
||||
|
||||
@ -11,8 +12,10 @@ def test_chroot_with_same_root():
|
||||
node2 = cluster_2.add_instance('node2', config_dir='configs', with_zookeeper=True)
|
||||
nodes = [node1, node2]
|
||||
|
||||
cluster_1.add_zookeeper_startup_command('create /root_a ""')
|
||||
cluster_1.add_zookeeper_startup_command('ls / ')
|
||||
def create_zk_root(zk):
|
||||
zk.ensure_path('/root_a')
|
||||
print(zk.get_children('/'))
|
||||
cluster_1.add_zookeeper_startup_command(create_zk_root)
|
||||
|
||||
try:
|
||||
cluster_1.start()
|
||||
@ -21,7 +24,7 @@ def test_chroot_with_same_root():
|
||||
cluster_2.start(destroy_dirs=False)
|
||||
for i, node in enumerate(nodes):
|
||||
node.query('''
|
||||
CREATE TABLE simple (date Date, id UInt32)
|
||||
CREATE TABLE simple (date Date, id UInt32)
|
||||
ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple', '{replica}', date, id, 8192);
|
||||
'''.format(replica=node.name))
|
||||
node.query("INSERT INTO simple VALUES ({0}, {0})".format(i))
|
||||
@ -45,9 +48,11 @@ def test_chroot_with_different_root():
|
||||
node2 = cluster_2.add_instance('node2', config_dir='configs', with_zookeeper=True)
|
||||
nodes = [node1, node2]
|
||||
|
||||
cluster_1.add_zookeeper_startup_command('create /root_a ""')
|
||||
cluster_1.add_zookeeper_startup_command('create /root_b ""')
|
||||
cluster_1.add_zookeeper_startup_command('ls / ')
|
||||
def create_zk_roots(zk):
|
||||
zk.ensure_path('/root_a')
|
||||
zk.ensure_path('/root_b')
|
||||
print(zk.get_children('/'))
|
||||
cluster_1.add_zookeeper_startup_command(create_zk_roots)
|
||||
|
||||
try:
|
||||
cluster_1.start()
|
||||
|
Loading…
Reference in New Issue
Block a user