diff --git a/tests/integration/test_cluster_copier/test.py b/tests/integration/test_cluster_copier/test.py index 440f0fc016b..9c2bcc22ef7 100644 --- a/tests/integration/test_cluster_copier/test.py +++ b/tests/integration/test_cluster_copier/test.py @@ -1,29 +1,27 @@ import os -import os.path as p import sys import time -import datetime +import kazoo import pytest -from contextlib import contextmanager import docker -from kazoo.client import KazooClient +import random +from contextlib import contextmanager +from helpers.cluster import ClickHouseCluster +from helpers.test_tools import TSV CURRENT_TEST_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, os.path.dirname(CURRENT_TEST_DIR)) -from helpers.cluster import ClickHouseCluster -from helpers.test_tools import TSV -COPYING_FAIL_PROBABILITY = 0.33 -MOVING_FAIL_PROBABILITY = 0.33 -cluster = None +COPYING_FAIL_PROBABILITY = 0.2 +MOVING_FAIL_PROBABILITY = 0.2 +cluster = ClickHouseCluster(__file__) def check_all_hosts_sucesfully_executed(tsv_content, num_hosts): M = TSV.toMat(tsv_content) hosts = [(l[0], l[1]) for l in M] # (host, port) codes = [l[2] for l in M] - messages = [l[3] for l in M] assert len(hosts) == num_hosts and len(set(hosts)) == num_hosts, "\n" + tsv_content assert len(set(codes)) == 1, "\n" + tsv_content @@ -36,7 +34,7 @@ def ddl_check_query(instance, query, num_hosts=3): return contents -@pytest.fixture(scope="function") +@pytest.fixture(scope="module") def started_cluster(): global cluster try: @@ -51,8 +49,6 @@ def started_cluster(): } } - cluster = ClickHouseCluster(__file__) - for cluster_name, shards in clusters_schema.iteritems(): for shard_name, replicas in shards.iteritems(): for replica_name in replicas: @@ -66,7 +62,6 @@ def started_cluster(): yield cluster finally: - pass cluster.shutdown() @@ -222,6 +217,11 @@ def execute_task(task, cmd_options): zk = cluster.get_kazoo_client('zoo1') print "Use ZooKeeper server: {}:{}".format(zk.hosts[0][0], zk.hosts[0][1]) + try: + zk.delete("/clickhouse-copier", recursive=True) + except kazoo.exceptions.NoNodeError: + print "No node /clickhouse-copier. It is Ok in first test." + zk_task_path = task.zk_task_path zk.ensure_path(zk_task_path) zk.create(zk_task_path + "/description", task.copier_task_config) @@ -236,7 +236,10 @@ def execute_task(task, cmd_options): '--base-dir', '/var/log/clickhouse-server/copier'] cmd += cmd_options - for instance_name, instance in cluster.instances.iteritems(): + copiers = random.sample(cluster.instances.keys(), 3) + + for instance_name in copiers: + instance = cluster.instances[instance_name] container = instance.get_docker_handle() exec_id = docker_api.exec_create(container.id, cmd, stderr=True) docker_api.exec_start(exec_id, detach=True) @@ -245,12 +248,13 @@ def execute_task(task, cmd_options): print "Copier for {} ({}) has started".format(instance.name, instance.ip_address) # Wait for copiers stopping and check their return codes - for exec_id, instance in zip(copiers_exec_ids, cluster.instances.itervalues()): + for exec_id, instance_name in zip(copiers_exec_ids, copiers): + instance = cluster.instances[instance_name] while True: res = docker_api.exec_inspect(exec_id) if not res['Running']: break - time.sleep(1) + time.sleep(0.5) assert res['ExitCode'] == 0, "Instance: {} ({}). Info: {}".format(instance.name, instance.ip_address, repr(res)) @@ -307,12 +311,15 @@ def test_copy_with_recovering_after_move_faults(started_cluster, use_sample_offs else: execute_task(Task1(started_cluster), ['--move-fault-probability', str(MOVING_FAIL_PROBABILITY)]) +@pytest.mark.timeout(600) def test_copy_month_to_week_partition(started_cluster): execute_task(Task2(started_cluster), []) +@pytest.mark.timeout(600) def test_copy_month_to_week_partition_with_recovering(started_cluster): execute_task(Task2(started_cluster), ['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY)]) +@pytest.mark.timeout(600) def test_copy_month_to_week_partition_with_recovering_after_move_faults(started_cluster): execute_task(Task2(started_cluster), ['--move-fault-probability', str(MOVING_FAIL_PROBABILITY)])