Merge pull request #11573 from nikitamikhaylov/copier-test-fix

Fix copier test
This commit is contained in:
Nikita Mikhaylov 2020-06-11 00:38:27 +04:00 committed by GitHub
commit 4bf46ed288
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1,29 +1,27 @@
import os import os
import os.path as p
import sys import sys
import time import time
import datetime import kazoo
import pytest import pytest
from contextlib import contextmanager
import docker import docker
from kazoo.client import KazooClient import random
from contextlib import contextmanager
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
CURRENT_TEST_DIR = os.path.dirname(os.path.abspath(__file__)) CURRENT_TEST_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.dirname(CURRENT_TEST_DIR)) sys.path.insert(0, os.path.dirname(CURRENT_TEST_DIR))
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
COPYING_FAIL_PROBABILITY = 0.33 COPYING_FAIL_PROBABILITY = 0.2
MOVING_FAIL_PROBABILITY = 0.33 MOVING_FAIL_PROBABILITY = 0.2
cluster = None
cluster = ClickHouseCluster(__file__)
def check_all_hosts_sucesfully_executed(tsv_content, num_hosts): def check_all_hosts_sucesfully_executed(tsv_content, num_hosts):
M = TSV.toMat(tsv_content) M = TSV.toMat(tsv_content)
hosts = [(l[0], l[1]) for l in M] # (host, port) hosts = [(l[0], l[1]) for l in M] # (host, port)
codes = [l[2] for l in M] codes = [l[2] for l in M]
messages = [l[3] for l in M]
assert len(hosts) == num_hosts and len(set(hosts)) == num_hosts, "\n" + tsv_content assert len(hosts) == num_hosts and len(set(hosts)) == num_hosts, "\n" + tsv_content
assert len(set(codes)) == 1, "\n" + tsv_content assert len(set(codes)) == 1, "\n" + tsv_content
@ -36,7 +34,7 @@ def ddl_check_query(instance, query, num_hosts=3):
return contents return contents
@pytest.fixture(scope="function") @pytest.fixture(scope="module")
def started_cluster(): def started_cluster():
global cluster global cluster
try: try:
@ -51,8 +49,6 @@ def started_cluster():
} }
} }
cluster = ClickHouseCluster(__file__)
for cluster_name, shards in clusters_schema.iteritems(): for cluster_name, shards in clusters_schema.iteritems():
for shard_name, replicas in shards.iteritems(): for shard_name, replicas in shards.iteritems():
for replica_name in replicas: for replica_name in replicas:
@ -66,7 +62,6 @@ def started_cluster():
yield cluster yield cluster
finally: finally:
pass
cluster.shutdown() cluster.shutdown()
@ -222,6 +217,11 @@ def execute_task(task, cmd_options):
zk = cluster.get_kazoo_client('zoo1') zk = cluster.get_kazoo_client('zoo1')
print "Use ZooKeeper server: {}:{}".format(zk.hosts[0][0], zk.hosts[0][1]) print "Use ZooKeeper server: {}:{}".format(zk.hosts[0][0], zk.hosts[0][1])
try:
zk.delete("/clickhouse-copier", recursive=True)
except kazoo.exceptions.NoNodeError:
print "No node /clickhouse-copier. It is Ok in first test."
zk_task_path = task.zk_task_path zk_task_path = task.zk_task_path
zk.ensure_path(zk_task_path) zk.ensure_path(zk_task_path)
zk.create(zk_task_path + "/description", task.copier_task_config) zk.create(zk_task_path + "/description", task.copier_task_config)
@ -236,7 +236,10 @@ def execute_task(task, cmd_options):
'--base-dir', '/var/log/clickhouse-server/copier'] '--base-dir', '/var/log/clickhouse-server/copier']
cmd += cmd_options cmd += cmd_options
for instance_name, instance in cluster.instances.iteritems(): copiers = random.sample(cluster.instances.keys(), 3)
for instance_name in copiers:
instance = cluster.instances[instance_name]
container = instance.get_docker_handle() container = instance.get_docker_handle()
exec_id = docker_api.exec_create(container.id, cmd, stderr=True) exec_id = docker_api.exec_create(container.id, cmd, stderr=True)
docker_api.exec_start(exec_id, detach=True) docker_api.exec_start(exec_id, detach=True)
@ -245,12 +248,13 @@ def execute_task(task, cmd_options):
print "Copier for {} ({}) has started".format(instance.name, instance.ip_address) print "Copier for {} ({}) has started".format(instance.name, instance.ip_address)
# Wait for copiers stopping and check their return codes # Wait for copiers stopping and check their return codes
for exec_id, instance in zip(copiers_exec_ids, cluster.instances.itervalues()): for exec_id, instance_name in zip(copiers_exec_ids, copiers):
instance = cluster.instances[instance_name]
while True: while True:
res = docker_api.exec_inspect(exec_id) res = docker_api.exec_inspect(exec_id)
if not res['Running']: if not res['Running']:
break break
time.sleep(1) time.sleep(0.5)
assert res['ExitCode'] == 0, "Instance: {} ({}). Info: {}".format(instance.name, instance.ip_address, repr(res)) assert res['ExitCode'] == 0, "Instance: {} ({}). Info: {}".format(instance.name, instance.ip_address, repr(res))
@ -307,12 +311,15 @@ def test_copy_with_recovering_after_move_faults(started_cluster, use_sample_offs
else: else:
execute_task(Task1(started_cluster), ['--move-fault-probability', str(MOVING_FAIL_PROBABILITY)]) execute_task(Task1(started_cluster), ['--move-fault-probability', str(MOVING_FAIL_PROBABILITY)])
@pytest.mark.timeout(600)
def test_copy_month_to_week_partition(started_cluster): def test_copy_month_to_week_partition(started_cluster):
execute_task(Task2(started_cluster), []) execute_task(Task2(started_cluster), [])
@pytest.mark.timeout(600)
def test_copy_month_to_week_partition_with_recovering(started_cluster): def test_copy_month_to_week_partition_with_recovering(started_cluster):
execute_task(Task2(started_cluster), ['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY)]) execute_task(Task2(started_cluster), ['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY)])
@pytest.mark.timeout(600)
def test_copy_month_to_week_partition_with_recovering_after_move_faults(started_cluster): def test_copy_month_to_week_partition_with_recovering_after_move_faults(started_cluster):
execute_task(Task2(started_cluster), ['--move-fault-probability', str(MOVING_FAIL_PROBABILITY)]) execute_task(Task2(started_cluster), ['--move-fault-probability', str(MOVING_FAIL_PROBABILITY)])