mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-30 19:42:00 +00:00
713 lines
31 KiB
Python
Executable File
713 lines
31 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import logging
|
|
import subprocess
|
|
import os
|
|
import glob
|
|
import time
|
|
import shutil
|
|
from collections import defaultdict
|
|
import random
|
|
import json
|
|
import csv
|
|
# for crc32
|
|
import zlib
|
|
|
|
|
|
MAX_RETRY = 3
|
|
NUM_WORKERS = 5
|
|
SLEEP_BETWEEN_RETRIES = 5
|
|
PARALLEL_GROUP_SIZE = 100
|
|
CLICKHOUSE_BINARY_PATH = "usr/bin/clickhouse"
|
|
CLICKHOUSE_ODBC_BRIDGE_BINARY_PATH = "usr/bin/clickhouse-odbc-bridge"
|
|
CLICKHOUSE_LIBRARY_BRIDGE_BINARY_PATH = "usr/bin/clickhouse-library-bridge"
|
|
|
|
TRIES_COUNT = 10
|
|
MAX_TIME_SECONDS = 3600
|
|
|
|
MAX_TIME_IN_SANDBOX = 20 * 60 # 20 minutes
|
|
TASK_TIMEOUT = 8 * 60 * 60 # 8 hours
|
|
|
|
def stringhash(s):
|
|
return zlib.crc32(s.encode('utf-8'))
|
|
|
|
def get_tests_to_run(pr_info):
|
|
result = set([])
|
|
changed_files = pr_info['changed_files']
|
|
|
|
if changed_files is None:
|
|
return []
|
|
|
|
for fpath in changed_files:
|
|
if 'tests/integration/test_' in fpath:
|
|
logging.info('File %s changed and seems like integration test', fpath)
|
|
result.add(fpath.split('/')[2])
|
|
return list(result)
|
|
|
|
|
|
def filter_existing_tests(tests_to_run, repo_path):
|
|
result = []
|
|
for relative_test_path in tests_to_run:
|
|
if os.path.exists(os.path.join(repo_path, 'tests/integration', relative_test_path)):
|
|
result.append(relative_test_path)
|
|
else:
|
|
logging.info("Skipping test %s, seems like it was removed", relative_test_path)
|
|
return result
|
|
|
|
|
|
def _get_deselect_option(tests):
|
|
return ' '.join(['--deselect {}'.format(t) for t in tests])
|
|
|
|
# https://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks
|
|
def chunks(lst, n):
|
|
"""Yield successive n-sized chunks from lst."""
|
|
for i in range(0, len(lst), n):
|
|
yield lst[i:i + n]
|
|
|
|
def parse_test_results_output(fname):
|
|
read = False
|
|
description_output = []
|
|
with open(fname, 'r') as out:
|
|
for line in out:
|
|
if read and line.strip() and not line.startswith('=='):
|
|
description_output.append(line.strip())
|
|
if 'short test summary info' in line:
|
|
read = True
|
|
return description_output
|
|
|
|
|
|
def get_counters(output):
|
|
counters = {
|
|
"ERROR": set([]),
|
|
"PASSED": set([]),
|
|
"FAILED": set([]),
|
|
}
|
|
|
|
for line in output:
|
|
if '.py' in line:
|
|
line_arr = line.strip().split(' ')
|
|
state = line_arr[0]
|
|
test_name = ' '.join(line_arr[1:])
|
|
if ' - ' in test_name:
|
|
test_name = test_name[:test_name.find(' - ')]
|
|
if state in counters:
|
|
counters[state].add(test_name)
|
|
else:
|
|
logging.info("Strange line %s", line)
|
|
else:
|
|
logging.info("Strange line %s", line)
|
|
return {k: list(v) for k, v in counters.items()}
|
|
|
|
|
|
def parse_test_times(fname):
|
|
read = False
|
|
description_output = []
|
|
with open(fname, 'r') as out:
|
|
for line in out:
|
|
if read and '==' in line:
|
|
break
|
|
if read and line.strip():
|
|
description_output.append(line.strip())
|
|
if 'slowest durations' in line:
|
|
read = True
|
|
return description_output
|
|
|
|
|
|
def get_test_times(output):
|
|
result = defaultdict(float)
|
|
for line in output:
|
|
if '.py' in line:
|
|
line_arr = line.strip().split(' ')
|
|
test_time = line_arr[0]
|
|
test_name = ' '.join([elem for elem in line_arr[2:] if elem])
|
|
if test_name not in result:
|
|
result[test_name] = 0.0
|
|
result[test_name] += float(test_time[:-1])
|
|
return result
|
|
|
|
|
|
def clear_ip_tables_and_restart_daemons():
|
|
logging.info("Dump iptables after run %s", subprocess.check_output("sudo iptables -L", shell=True))
|
|
try:
|
|
logging.info("Killing all alive docker containers")
|
|
subprocess.check_output("timeout -s 9 10m docker kill $(docker ps -q)", shell=True)
|
|
except subprocess.CalledProcessError as err:
|
|
logging.info("docker kill excepted: " + str(err))
|
|
|
|
try:
|
|
logging.info("Removing all docker containers")
|
|
subprocess.check_output("timeout -s 9 10m docker rm $(docker ps -a -q) --force", shell=True)
|
|
except subprocess.CalledProcessError as err:
|
|
logging.info("docker rm excepted: " + str(err))
|
|
|
|
# don't restart docker if it's disabled
|
|
if os.environ.get("CLICKHOUSE_TESTS_RUNNER_RESTART_DOCKER", '1') == '1':
|
|
try:
|
|
logging.info("Stopping docker daemon")
|
|
subprocess.check_output("service docker stop", shell=True)
|
|
except subprocess.CalledProcessError as err:
|
|
logging.info("docker stop excepted: " + str(err))
|
|
|
|
try:
|
|
for i in range(200):
|
|
try:
|
|
logging.info("Restarting docker %s", i)
|
|
subprocess.check_output("service docker start", shell=True)
|
|
subprocess.check_output("docker ps", shell=True)
|
|
break
|
|
except subprocess.CalledProcessError as err:
|
|
time.sleep(0.5)
|
|
logging.info("Waiting docker to start, current %s", str(err))
|
|
else:
|
|
raise Exception("Docker daemon doesn't responding")
|
|
except subprocess.CalledProcessError as err:
|
|
logging.info("Can't reload docker: " + str(err))
|
|
|
|
iptables_iter = 0
|
|
try:
|
|
for i in range(1000):
|
|
iptables_iter = i
|
|
# when rules will be empty, it will raise exception
|
|
subprocess.check_output("sudo iptables -D DOCKER-USER 1", shell=True)
|
|
except subprocess.CalledProcessError as err:
|
|
logging.info("All iptables rules cleared, " + str(iptables_iter) + "iterations, last error: " + str(err))
|
|
|
|
|
|
class ClickhouseIntegrationTestsRunner:
|
|
|
|
def __init__(self, result_path, params):
|
|
self.result_path = result_path
|
|
self.params = params
|
|
|
|
self.image_versions = self.params['docker_images_with_versions']
|
|
self.shuffle_groups = self.params['shuffle_test_groups']
|
|
self.flaky_check = 'flaky check' in self.params['context_name']
|
|
# if use_tmpfs is not set we assume it to be true, otherwise check
|
|
self.use_tmpfs = 'use_tmpfs' not in self.params or self.params['use_tmpfs']
|
|
self.disable_net_host = 'disable_net_host' in self.params and self.params['disable_net_host']
|
|
self.start_time = time.time()
|
|
self.soft_deadline_time = self.start_time + (TASK_TIMEOUT - MAX_TIME_IN_SANDBOX)
|
|
|
|
if 'run_by_hash_total' in self.params:
|
|
self.run_by_hash_total = self.params['run_by_hash_total']
|
|
self.run_by_hash_num = self.params['run_by_hash_num']
|
|
else:
|
|
self.run_by_hash_total = 0
|
|
self.run_by_hash_num = 0
|
|
|
|
def path(self):
|
|
return self.result_path
|
|
|
|
def base_path(self):
|
|
return os.path.join(str(self.result_path), '../')
|
|
|
|
def should_skip_tests(self):
|
|
return []
|
|
|
|
def get_image_with_version(self, name):
|
|
if name in self.image_versions:
|
|
return name + ":" + self.image_versions[name]
|
|
logging.warn("Cannot find image %s in params list %s", name, self.image_versions)
|
|
if ':' not in name:
|
|
return name + ":latest"
|
|
return name
|
|
|
|
def get_single_image_version(self):
|
|
name = self.get_images_names()[0]
|
|
if name in self.image_versions:
|
|
return self.image_versions[name]
|
|
logging.warn("Cannot find image %s in params list %s", name, self.image_versions)
|
|
return 'latest'
|
|
|
|
def shuffle_test_groups(self):
|
|
return self.shuffle_groups != 0
|
|
|
|
@staticmethod
|
|
def get_images_names():
|
|
return ["clickhouse/integration-tests-runner", "clickhouse/mysql-golang-client",
|
|
"clickhouse/mysql-java-client", "clickhouse/mysql-js-client",
|
|
"clickhouse/mysql-php-client", "clickhouse/postgresql-java-client",
|
|
"clickhouse/integration-test", "clickhouse/kerberos-kdc",
|
|
"clickhouse/integration-helper", ]
|
|
|
|
|
|
def _can_run_with(self, path, opt):
|
|
with open(path, 'r') as script:
|
|
for line in script:
|
|
if opt in line:
|
|
return True
|
|
return False
|
|
|
|
def _install_clickhouse(self, debs_path):
|
|
for package in ('clickhouse-common-static_', 'clickhouse-server_', 'clickhouse-client', 'clickhouse-common-static-dbg_'): # order matters
|
|
logging.info("Installing package %s", package)
|
|
for f in os.listdir(debs_path):
|
|
if package in f:
|
|
full_path = os.path.join(debs_path, f)
|
|
logging.info("Package found in %s", full_path)
|
|
log_name = "install_" + f + ".log"
|
|
log_path = os.path.join(str(self.path()), log_name)
|
|
with open(log_path, 'w') as log:
|
|
cmd = "dpkg -x {} .".format(full_path)
|
|
logging.info("Executing installation cmd %s", cmd)
|
|
retcode = subprocess.Popen(cmd, shell=True, stderr=log, stdout=log).wait()
|
|
if retcode == 0:
|
|
logging.info("Instsallation of %s successfull", full_path)
|
|
else:
|
|
raise Exception("Installation of %s failed", full_path)
|
|
break
|
|
else:
|
|
raise Exception("Package with {} not found".format(package))
|
|
logging.info("Unstripping binary")
|
|
# logging.info("Unstring %s", subprocess.check_output("eu-unstrip /usr/bin/clickhouse {}".format(CLICKHOUSE_BINARY_PATH), shell=True))
|
|
|
|
logging.info("All packages installed")
|
|
os.chmod(CLICKHOUSE_BINARY_PATH, 0o777)
|
|
os.chmod(CLICKHOUSE_ODBC_BRIDGE_BINARY_PATH, 0o777)
|
|
os.chmod(CLICKHOUSE_LIBRARY_BRIDGE_BINARY_PATH, 0o777)
|
|
shutil.copy(CLICKHOUSE_BINARY_PATH, os.getenv("CLICKHOUSE_TESTS_SERVER_BIN_PATH"))
|
|
shutil.copy(CLICKHOUSE_ODBC_BRIDGE_BINARY_PATH, os.getenv("CLICKHOUSE_TESTS_ODBC_BRIDGE_BIN_PATH"))
|
|
shutil.copy(CLICKHOUSE_LIBRARY_BRIDGE_BINARY_PATH, os.getenv("CLICKHOUSE_TESTS_LIBRARY_BRIDGE_BIN_PATH"))
|
|
|
|
def _compress_logs(self, dir, relpaths, result_path):
|
|
subprocess.check_call("tar czf {} -C {} {}".format(result_path, dir, ' '.join(relpaths)), shell=True) # STYLE_CHECK_ALLOW_SUBPROCESS_CHECK_CALL
|
|
|
|
def _get_runner_opts(self):
|
|
result = []
|
|
if self.use_tmpfs:
|
|
result.append("--tmpfs")
|
|
if self.disable_net_host:
|
|
result.append("--disable-net-host")
|
|
return " ".join(result)
|
|
|
|
def _get_all_tests(self, repo_path):
|
|
image_cmd = self._get_runner_image_cmd(repo_path)
|
|
out_file = "all_tests.txt"
|
|
out_file_full = "all_tests_full.txt"
|
|
cmd = "cd {repo_path}/tests/integration && " \
|
|
"timeout -s 9 1h ./runner {runner_opts} {image_cmd} ' --setup-plan' " \
|
|
"| tee {out_file_full} | grep '::' | sed 's/ (fixtures used:.*//g' | sed 's/^ *//g' | sed 's/ *$//g' " \
|
|
"| grep -v 'SKIPPED' | sort -u > {out_file}".format(
|
|
repo_path=repo_path, runner_opts=self._get_runner_opts(), image_cmd=image_cmd, out_file=out_file, out_file_full=out_file_full)
|
|
|
|
logging.info("Getting all tests with cmd '%s'", cmd)
|
|
subprocess.check_call(cmd, shell=True) # STYLE_CHECK_ALLOW_SUBPROCESS_CHECK_CALL
|
|
|
|
all_tests_file_path = "{repo_path}/tests/integration/{out_file}".format(repo_path=repo_path, out_file=out_file)
|
|
if not os.path.isfile(all_tests_file_path) or os.path.getsize(all_tests_file_path) == 0:
|
|
all_tests_full_file_path = "{repo_path}/tests/integration/{out_file}".format(repo_path=repo_path, out_file=out_file_full)
|
|
if os.path.isfile(all_tests_full_file_path):
|
|
# log runner output
|
|
logging.info("runner output:")
|
|
with open(all_tests_full_file_path, 'r') as all_tests_full_file:
|
|
for line in all_tests_full_file:
|
|
line = line.rstrip()
|
|
if line:
|
|
logging.info("runner output: %s", line)
|
|
else:
|
|
logging.info("runner output '%s' is empty", all_tests_full_file_path)
|
|
|
|
raise Exception("There is something wrong with getting all tests list: file '{}' is empty or does not exist.".format(all_tests_file_path))
|
|
|
|
all_tests = []
|
|
with open(all_tests_file_path, "r") as all_tests_file:
|
|
for line in all_tests_file:
|
|
all_tests.append(line.strip())
|
|
return list(sorted(all_tests))
|
|
|
|
def _get_parallel_tests_skip_list(self, repo_path):
|
|
skip_list_file_path = "{}/tests/integration/parallel_skip.json".format(repo_path)
|
|
if not os.path.isfile(skip_list_file_path) or os.path.getsize(skip_list_file_path) == 0:
|
|
raise Exception("There is something wrong with getting all tests list: file '{}' is empty or does not exist.".format(skip_list_file_path))
|
|
|
|
skip_list_tests = []
|
|
with open(skip_list_file_path, "r") as skip_list_file:
|
|
skip_list_tests = json.load(skip_list_file)
|
|
return list(sorted(skip_list_tests))
|
|
|
|
def group_test_by_file(self, tests):
|
|
result = {}
|
|
for test in tests:
|
|
test_file = test.split('::')[0]
|
|
if test_file not in result:
|
|
result[test_file] = []
|
|
result[test_file].append(test)
|
|
return result
|
|
|
|
def _update_counters(self, main_counters, current_counters):
|
|
for test in current_counters["PASSED"]:
|
|
if test not in main_counters["PASSED"] and test not in main_counters["FLAKY"]:
|
|
is_flaky = False
|
|
if test in main_counters["FAILED"]:
|
|
main_counters["FAILED"].remove(test)
|
|
is_flaky = True
|
|
if test in main_counters["ERROR"]:
|
|
main_counters["ERROR"].remove(test)
|
|
is_flaky = True
|
|
if is_flaky:
|
|
main_counters["FLAKY"].append(test)
|
|
else:
|
|
main_counters["PASSED"].append(test)
|
|
|
|
for state in ("ERROR", "FAILED"):
|
|
for test in current_counters[state]:
|
|
if test in main_counters["FLAKY"]:
|
|
continue
|
|
if test in main_counters["PASSED"]:
|
|
main_counters["PASSED"].remove(test)
|
|
main_counters["FLAKY"].append(test)
|
|
continue
|
|
if test not in main_counters[state]:
|
|
main_counters[state].append(test)
|
|
|
|
def _get_runner_image_cmd(self, repo_path):
|
|
image_cmd = ''
|
|
if self._can_run_with(os.path.join(repo_path, "tests/integration", "runner"), '--docker-image-version'):
|
|
for img in self.get_images_names():
|
|
if img == "clickhouse/integration-tests-runner":
|
|
runner_version = self.get_single_image_version()
|
|
logging.info("Can run with custom docker image version %s", runner_version)
|
|
image_cmd += ' --docker-image-version={} '.format(runner_version)
|
|
else:
|
|
if self._can_run_with(os.path.join(repo_path, "tests/integration", "runner"), '--docker-compose-images-tags'):
|
|
image_cmd += '--docker-compose-images-tags={} '.format(self.get_image_with_version(img))
|
|
else:
|
|
image_cmd = ''
|
|
logging.info("Cannot run with custom docker image version :(")
|
|
return image_cmd
|
|
|
|
def _find_test_data_dirs(self, repo_path, test_names):
|
|
relpaths = {}
|
|
for test_name in test_names:
|
|
if '/' in test_name:
|
|
test_dir = test_name[:test_name.find('/')]
|
|
else:
|
|
test_dir = test_name
|
|
if os.path.isdir(os.path.join(repo_path, "tests/integration", test_dir)):
|
|
for name in os.listdir(os.path.join(repo_path, "tests/integration", test_dir)):
|
|
relpath = os.path.join(os.path.join(test_dir, name))
|
|
mtime = os.path.getmtime(os.path.join(repo_path, "tests/integration", relpath))
|
|
relpaths[relpath] = mtime
|
|
return relpaths
|
|
|
|
def _get_test_data_dirs_difference(self, new_snapshot, old_snapshot):
|
|
res = set()
|
|
for path in new_snapshot:
|
|
if (not path in old_snapshot) or (old_snapshot[path] != new_snapshot[path]):
|
|
res.add(path)
|
|
return res
|
|
|
|
def try_run_test_group(self, repo_path, test_group, tests_in_group, num_tries, num_workers):
|
|
try:
|
|
return self.run_test_group(repo_path, test_group, tests_in_group, num_tries, num_workers)
|
|
except Exception as e:
|
|
logging.info("Failed to run {}:\n{}".format(str(test_group), str(e)))
|
|
counters = {
|
|
"ERROR": [],
|
|
"PASSED": [],
|
|
"FAILED": [],
|
|
"SKIPPED": [],
|
|
"FLAKY": [],
|
|
}
|
|
tests_times = defaultdict(float)
|
|
for test in tests_in_group:
|
|
counters["ERROR"].append(test)
|
|
tests_times[test] = 0
|
|
return counters, tests_times, []
|
|
|
|
def run_test_group(self, repo_path, test_group, tests_in_group, num_tries, num_workers):
|
|
counters = {
|
|
"ERROR": [],
|
|
"PASSED": [],
|
|
"FAILED": [],
|
|
"SKIPPED": [],
|
|
"FLAKY": [],
|
|
}
|
|
tests_times = defaultdict(float)
|
|
|
|
if self.soft_deadline_time < time.time():
|
|
for test in tests_in_group:
|
|
logging.info("Task timeout exceeded, skipping %s", test)
|
|
counters["SKIPPED"].append(test)
|
|
tests_times[test] = 0
|
|
return counters, tests_times, []
|
|
|
|
image_cmd = self._get_runner_image_cmd(repo_path)
|
|
test_group_str = test_group.replace('/', '_').replace('.', '_')
|
|
|
|
log_paths = []
|
|
test_data_dirs = {}
|
|
|
|
for i in range(num_tries):
|
|
logging.info("Running test group %s for the %s retry", test_group, i)
|
|
clear_ip_tables_and_restart_daemons()
|
|
|
|
test_names = set([])
|
|
for test_name in tests_in_group:
|
|
if test_name not in counters["PASSED"]:
|
|
if '[' in test_name:
|
|
test_names.add(test_name[:test_name.find('[')])
|
|
else:
|
|
test_names.add(test_name)
|
|
|
|
if i == 0:
|
|
test_data_dirs = self._find_test_data_dirs(repo_path, test_names)
|
|
|
|
info_basename = test_group_str + "_" + str(i) + ".nfo"
|
|
info_path = os.path.join(repo_path, "tests/integration", info_basename)
|
|
|
|
test_cmd = ' '.join([test for test in sorted(test_names)])
|
|
parallel_cmd = " --parallel {} ".format(num_workers) if num_workers > 0 else ""
|
|
cmd = "cd {}/tests/integration && timeout -s 9 1h ./runner {} {} -t {} {} '-rfEp --run-id={} --color=no --durations=0 {}' | tee {}".format(
|
|
repo_path, self._get_runner_opts(), image_cmd, test_cmd, parallel_cmd, i, _get_deselect_option(self.should_skip_tests()), info_path)
|
|
|
|
log_basename = test_group_str + "_" + str(i) + ".log"
|
|
log_path = os.path.join(repo_path, "tests/integration", log_basename)
|
|
with open(log_path, 'w') as log:
|
|
logging.info("Executing cmd: %s", cmd)
|
|
retcode = subprocess.Popen(cmd, shell=True, stderr=log, stdout=log).wait()
|
|
if retcode == 0:
|
|
logging.info("Run %s group successfully", test_group)
|
|
else:
|
|
logging.info("Some tests failed")
|
|
|
|
extra_logs_names = [log_basename]
|
|
log_result_path = os.path.join(str(self.path()), 'integration_run_' + log_basename)
|
|
shutil.copy(log_path, log_result_path)
|
|
log_paths.append(log_result_path)
|
|
|
|
for pytest_log_path in glob.glob(os.path.join(repo_path, "tests/integration/pytest*.log")):
|
|
new_name = test_group_str + "_" + str(i) + "_" + os.path.basename(pytest_log_path)
|
|
os.rename(pytest_log_path, os.path.join(repo_path, "tests/integration", new_name))
|
|
extra_logs_names.append(new_name)
|
|
|
|
dockerd_log_path = os.path.join(repo_path, "tests/integration/dockerd.log")
|
|
if os.path.exists(dockerd_log_path):
|
|
new_name = test_group_str + "_" + str(i) + "_" + os.path.basename(dockerd_log_path)
|
|
os.rename(dockerd_log_path, os.path.join(repo_path, "tests/integration", new_name))
|
|
extra_logs_names.append(new_name)
|
|
|
|
if os.path.exists(info_path):
|
|
extra_logs_names.append(info_basename)
|
|
lines = parse_test_results_output(info_path)
|
|
new_counters = get_counters(lines)
|
|
times_lines = parse_test_times(info_path)
|
|
new_tests_times = get_test_times(times_lines)
|
|
self._update_counters(counters, new_counters)
|
|
for test_name, test_time in new_tests_times.items():
|
|
tests_times[test_name] = test_time
|
|
|
|
test_data_dirs_new = self._find_test_data_dirs(repo_path, test_names)
|
|
test_data_dirs_diff = self._get_test_data_dirs_difference(test_data_dirs_new, test_data_dirs)
|
|
test_data_dirs = test_data_dirs_new
|
|
|
|
if extra_logs_names or test_data_dirs_diff:
|
|
extras_result_path = os.path.join(str(self.path()), "integration_run_" + test_group_str + "_" + str(i) + ".tar.gz")
|
|
self._compress_logs(os.path.join(repo_path, "tests/integration"), extra_logs_names + list(test_data_dirs_diff), extras_result_path)
|
|
log_paths.append(extras_result_path)
|
|
|
|
if len(counters["PASSED"]) + len(counters["FLAKY"]) == len(tests_in_group):
|
|
logging.info("All tests from group %s passed", test_group)
|
|
break
|
|
if len(counters["PASSED"]) + len(counters["FLAKY"]) >= 0 and len(counters["FAILED"]) == 0 and len(counters["ERROR"]) == 0:
|
|
logging.info("Seems like all tests passed but some of them are skipped or deselected. Ignoring them and finishing group.")
|
|
break
|
|
else:
|
|
# Mark all non tried tests as errors, with '::' in name
|
|
# (example test_partition/test.py::test_partition_simple). For flaky check
|
|
# we run whole test dirs like "test_odbc_interaction" and don't
|
|
# want to mark them as error so we filter by '::'.
|
|
for test in tests_in_group:
|
|
if (test not in counters["PASSED"] and
|
|
test not in counters["ERROR"] and
|
|
test not in counters["FAILED"] and
|
|
'::' in test):
|
|
counters["ERROR"].append(test)
|
|
|
|
return counters, tests_times, log_paths
|
|
|
|
def run_flaky_check(self, repo_path, build_path):
|
|
pr_info = self.params['pr_info']
|
|
|
|
# pytest swears, if we require to run some tests which was renamed or deleted
|
|
tests_to_run = filter_existing_tests(get_tests_to_run(pr_info), repo_path)
|
|
if not tests_to_run:
|
|
logging.info("No tests to run found")
|
|
return 'success', 'Nothing to run', [('Nothing to run', 'OK')], ''
|
|
|
|
self._install_clickhouse(build_path)
|
|
logging.info("Found '%s' tests to run", ' '.join(tests_to_run))
|
|
result_state = "success"
|
|
description_prefix = "No flaky tests: "
|
|
start = time.time()
|
|
logging.info("Starting check with retries")
|
|
final_retry = 0
|
|
logs = []
|
|
for i in range(TRIES_COUNT):
|
|
final_retry += 1
|
|
logging.info("Running tests for the %s time", i)
|
|
counters, tests_times, log_paths = self.try_run_test_group(repo_path, "flaky", tests_to_run, 1, 1)
|
|
logs += log_paths
|
|
if counters["FAILED"]:
|
|
logging.info("Found failed tests: %s", ' '.join(counters["FAILED"]))
|
|
description_prefix = "Flaky tests found: "
|
|
result_state = "failure"
|
|
break
|
|
if counters["ERROR"]:
|
|
description_prefix = "Flaky tests found: "
|
|
logging.info("Found error tests: %s", ' '.join(counters["ERROR"]))
|
|
# NOTE "error" result state will restart the whole test task, so we use "failure" here
|
|
result_state = "failure"
|
|
break
|
|
assert len(counters["FLAKY"]) == 0
|
|
logging.info("Try is OK, all tests passed, going to clear env")
|
|
clear_ip_tables_and_restart_daemons()
|
|
logging.info("And going to sleep for some time")
|
|
if time.time() - start > MAX_TIME_SECONDS:
|
|
logging.info("Timeout reached, going to finish flaky check")
|
|
break
|
|
time.sleep(5)
|
|
|
|
test_result = []
|
|
for state in ("ERROR", "FAILED", "PASSED", "SKIPPED", "FLAKY"):
|
|
if state == "PASSED":
|
|
text_state = "OK"
|
|
elif state == "FAILED":
|
|
text_state = "FAIL"
|
|
else:
|
|
text_state = state
|
|
test_result += [(c + ' (✕' + str(final_retry) + ')', text_state, "{:.2f}".format(tests_times[c])) for c in counters[state]]
|
|
status_text = description_prefix + ', '.join([str(n).lower().replace('failed', 'fail') + ': ' + str(len(c)) for n, c in counters.items()])
|
|
|
|
return result_state, status_text, test_result, logs
|
|
|
|
def run_impl(self, repo_path, build_path):
|
|
if self.flaky_check:
|
|
return self.run_flaky_check(repo_path, build_path)
|
|
|
|
self._install_clickhouse(build_path)
|
|
logging.info("Dump iptables before run %s", subprocess.check_output("sudo iptables -L", shell=True))
|
|
all_tests = self._get_all_tests(repo_path)
|
|
|
|
if self.run_by_hash_total != 0:
|
|
grouped_tests = self.group_test_by_file(all_tests)
|
|
all_filtered_by_hash_tests = []
|
|
for group, tests_in_group in grouped_tests.items():
|
|
if stringhash(group) % self.run_by_hash_total == self.run_by_hash_num:
|
|
all_filtered_by_hash_tests += tests_in_group
|
|
all_tests = all_filtered_by_hash_tests
|
|
|
|
parallel_skip_tests = self._get_parallel_tests_skip_list(repo_path)
|
|
logging.info("Found %s tests first 3 %s", len(all_tests), ' '.join(all_tests[:3]))
|
|
filtered_sequential_tests = list(filter(lambda test: test in all_tests, parallel_skip_tests))
|
|
filtered_parallel_tests = list(filter(lambda test: test not in parallel_skip_tests, all_tests))
|
|
not_found_tests = list(filter(lambda test: test not in all_tests, parallel_skip_tests))
|
|
logging.info("Found %s tests first 3 %s, parallel %s, other %s", len(all_tests), ' '.join(all_tests[:3]), len(filtered_parallel_tests), len(filtered_sequential_tests))
|
|
logging.info("Not found %s tests first 3 %s", len(not_found_tests), ' '.join(not_found_tests[:3]))
|
|
|
|
grouped_tests = self.group_test_by_file(filtered_sequential_tests)
|
|
i = 0
|
|
for par_group in chunks(filtered_parallel_tests, PARALLEL_GROUP_SIZE):
|
|
grouped_tests["parallel{}".format(i)] = par_group
|
|
i+=1
|
|
logging.info("Found %s tests groups", len(grouped_tests))
|
|
|
|
counters = {
|
|
"ERROR": [],
|
|
"PASSED": [],
|
|
"FAILED": [],
|
|
"SKIPPED": [],
|
|
"FLAKY": [],
|
|
}
|
|
tests_times = defaultdict(float)
|
|
tests_log_paths = defaultdict(list)
|
|
|
|
items_to_run = list(grouped_tests.items())
|
|
|
|
logging.info("Total test groups %s", len(items_to_run))
|
|
if self.shuffle_test_groups():
|
|
logging.info("Shuffling test groups")
|
|
random.shuffle(items_to_run)
|
|
|
|
for group, tests in items_to_run:
|
|
logging.info("Running test group %s containing %s tests", group, len(tests))
|
|
group_counters, group_test_times, log_paths = self.try_run_test_group(repo_path, group, tests, MAX_RETRY, NUM_WORKERS)
|
|
total_tests = 0
|
|
for counter, value in group_counters.items():
|
|
logging.info("Tests from group %s stats, %s count %s", group, counter, len(value))
|
|
counters[counter] += value
|
|
logging.info("Totally have %s with status %s", len(counters[counter]), counter)
|
|
total_tests += len(counters[counter])
|
|
logging.info("Totally finished tests %s/%s", total_tests, len(all_tests))
|
|
|
|
for test_name, test_time in group_test_times.items():
|
|
tests_times[test_name] = test_time
|
|
tests_log_paths[test_name] = log_paths
|
|
|
|
if len(counters["FAILED"]) + len(counters["ERROR"]) >= 20:
|
|
logging.info("Collected more than 20 failed/error tests, stopping")
|
|
break
|
|
|
|
if counters["FAILED"] or counters["ERROR"]:
|
|
logging.info("Overall status failure, because we have tests in FAILED or ERROR state")
|
|
result_state = "failure"
|
|
else:
|
|
logging.info("Overall success!")
|
|
result_state = "success"
|
|
|
|
test_result = []
|
|
for state in ("ERROR", "FAILED", "PASSED", "SKIPPED", "FLAKY"):
|
|
if state == "PASSED":
|
|
text_state = "OK"
|
|
elif state == "FAILED":
|
|
text_state = "FAIL"
|
|
else:
|
|
text_state = state
|
|
test_result += [(c, text_state, "{:.2f}".format(tests_times[c]), tests_log_paths[c]) for c in counters[state]]
|
|
|
|
failed_sum = len(counters['FAILED']) + len(counters['ERROR'])
|
|
status_text = "fail: {}, passed: {}, flaky: {}".format(failed_sum, len(counters['PASSED']), len(counters['FLAKY']))
|
|
|
|
if self.soft_deadline_time < time.time():
|
|
status_text = "Timeout, " + status_text
|
|
result_state = "failure"
|
|
|
|
counters['FLAKY'] = []
|
|
if not counters or sum(len(counter) for counter in counters.values()) == 0:
|
|
status_text = "No tests found for some reason! It's a bug"
|
|
result_state = "failure"
|
|
|
|
if '(memory)' in self.params['context_name']:
|
|
result_state = "success"
|
|
|
|
return result_state, status_text, test_result, []
|
|
|
|
def write_results(results_file, status_file, results, status):
|
|
with open(results_file, 'w') as f:
|
|
out = csv.writer(f, delimiter='\t')
|
|
out.writerows(results)
|
|
with open(status_file, 'w') as f:
|
|
out = csv.writer(f, delimiter='\t')
|
|
out.writerow(status)
|
|
|
|
if __name__ == "__main__":
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
|
|
|
|
repo_path = os.environ.get("CLICKHOUSE_TESTS_REPO_PATH")
|
|
build_path = os.environ.get("CLICKHOUSE_TESTS_BUILD_PATH")
|
|
result_path = os.environ.get("CLICKHOUSE_TESTS_RESULT_PATH")
|
|
params_path = os.environ.get("CLICKHOUSE_TESTS_JSON_PARAMS_PATH")
|
|
|
|
params = json.loads(open(params_path, 'r').read())
|
|
runner = ClickhouseIntegrationTestsRunner(result_path, params)
|
|
|
|
logging.info("Running tests")
|
|
state, description, test_results, _ = runner.run_impl(repo_path, build_path)
|
|
logging.info("Tests finished")
|
|
|
|
status = (state, description)
|
|
out_results_file = os.path.join(str(runner.path()), "test_results.tsv")
|
|
out_status_file = os.path.join(str(runner.path()), "check_status.tsv")
|
|
write_results(out_results_file, out_status_file, test_results, status)
|
|
logging.info("Result written")
|