From 0ae948866a1105569176e30b4e8667270ff9b656 Mon Sep 17 00:00:00 2001 From: "Mikhail f. Shiryaev" Date: Sun, 13 Feb 2022 01:04:07 +0100 Subject: [PATCH 1/4] Create symlink hadoop -> ./hadoop-2.7.0 --- .../test/integration/kerberized_hadoop/Dockerfile | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/docker/test/integration/kerberized_hadoop/Dockerfile b/docker/test/integration/kerberized_hadoop/Dockerfile index e42d115999a..592c3e36ef7 100644 --- a/docker/test/integration/kerberized_hadoop/Dockerfile +++ b/docker/test/integration/kerberized_hadoop/Dockerfile @@ -15,9 +15,10 @@ RUN curl -o krb5-libs-1.10.3-65.el6.x86_64.rpm ftp://ftp.pbone.net/mirror/vault. rm -fr *.rpm RUN cd /tmp && \ - curl http://archive.apache.org/dist/commons/daemon/source/commons-daemon-1.0.15-src.tar.gz -o commons-daemon-1.0.15-src.tar.gz && \ - tar xzf commons-daemon-1.0.15-src.tar.gz && \ - cd commons-daemon-1.0.15-src/src/native/unix && \ - ./configure && \ - make && \ - cp ./jsvc /usr/local/hadoop-2.7.0/sbin + curl http://archive.apache.org/dist/commons/daemon/source/commons-daemon-1.0.15-src.tar.gz -o commons-daemon-1.0.15-src.tar.gz && \ + tar xzf commons-daemon-1.0.15-src.tar.gz && \ + cd commons-daemon-1.0.15-src/src/native/unix && \ + ./configure && \ + make && \ + cp ./jsvc /usr/local/hadoop-2.7.0/sbin && \ + [ -e /usr/local/hadoop ] || ln -s ./hadoop-2.7.0 /usr/local/hadoop From 23546ab825707a8e2aabe912b3faafd098031d41 Mon Sep 17 00:00:00 2001 From: "Mikhail f. Shiryaev" Date: Sun, 13 Feb 2022 14:39:30 +0100 Subject: [PATCH 2/4] Use the current kerberized-haddop image --- .../runner/compose/docker_compose_kerberized_hdfs.yml | 2 +- docker/test/integration/runner/dockerd-entrypoint.sh | 1 + tests/ci/integration_test_check.py | 3 +++ tests/integration/ci-runner.py | 2 +- tests/integration/runner | 2 ++ .../test_storage_kerberized_hdfs/kerberos_image_config.sh | 1 + 6 files changed, 9 insertions(+), 2 deletions(-) diff --git a/docker/test/integration/runner/compose/docker_compose_kerberized_hdfs.yml b/docker/test/integration/runner/compose/docker_compose_kerberized_hdfs.yml index 88be3e45085..e1b4d393169 100644 --- a/docker/test/integration/runner/compose/docker_compose_kerberized_hdfs.yml +++ b/docker/test/integration/runner/compose/docker_compose_kerberized_hdfs.yml @@ -4,7 +4,7 @@ services: kerberizedhdfs1: cap_add: - DAC_READ_SEARCH - image: clickhouse/kerberized-hadoop + image: clickhouse/kerberized-hadoop:${DOCKER_KERBERIZED_HADOOP_TAG:-latest} hostname: kerberizedhdfs1 restart: always volumes: diff --git a/docker/test/integration/runner/dockerd-entrypoint.sh b/docker/test/integration/runner/dockerd-entrypoint.sh index 8109ef7ae64..34414abc3f5 100755 --- a/docker/test/integration/runner/dockerd-entrypoint.sh +++ b/docker/test/integration/runner/dockerd-entrypoint.sh @@ -45,6 +45,7 @@ export DOCKER_MYSQL_JS_CLIENT_TAG=${DOCKER_MYSQL_JS_CLIENT_TAG:=latest} export DOCKER_MYSQL_PHP_CLIENT_TAG=${DOCKER_MYSQL_PHP_CLIENT_TAG:=latest} export DOCKER_POSTGRESQL_JAVA_CLIENT_TAG=${DOCKER_POSTGRESQL_JAVA_CLIENT_TAG:=latest} export DOCKER_KERBEROS_KDC_TAG=${DOCKER_KERBEROS_KDC_TAG:=latest} +export DOCKER_KERBERIZED_HADOOP_TAG=${DOCKER_KERBERIZED_HADOOP_TAG:=latest} cd /ClickHouse/tests/integration exec "$@" diff --git a/tests/ci/integration_test_check.py b/tests/ci/integration_test_check.py index e87528dd528..786521db418 100644 --- a/tests/ci/integration_test_check.py +++ b/tests/ci/integration_test_check.py @@ -23,6 +23,8 @@ from rerun_helper import RerunHelper from tee_popen import TeePopen +# When update, update +# integration/ci-runner.py:ClickhouseIntegrationTestsRunner.get_images_names too IMAGES = [ "clickhouse/integration-tests-runner", "clickhouse/mysql-golang-client", @@ -32,6 +34,7 @@ IMAGES = [ "clickhouse/postgresql-java-client", "clickhouse/integration-test", "clickhouse/kerberos-kdc", + "clickhouse/kerberized-hadoop", "clickhouse/integration-helper", "clickhouse/dotnet-client", ] diff --git a/tests/integration/ci-runner.py b/tests/integration/ci-runner.py index 40cb2c6fdd7..c1a2e6cf0b1 100755 --- a/tests/integration/ci-runner.py +++ b/tests/integration/ci-runner.py @@ -235,7 +235,7 @@ class ClickhouseIntegrationTestsRunner: "clickhouse/mysql-java-client", "clickhouse/mysql-js-client", "clickhouse/mysql-php-client", "clickhouse/postgresql-java-client", "clickhouse/integration-test", "clickhouse/kerberos-kdc", - "clickhouse/dotnet-client", + "clickhouse/kerberized-hadoop", "clickhouse/dotnet-client", "clickhouse/integration-helper", ] diff --git a/tests/integration/runner b/tests/integration/runner index 3687ca4068c..737eaeef683 100755 --- a/tests/integration/runner +++ b/tests/integration/runner @@ -238,6 +238,8 @@ if __name__ == "__main__": env_tags += "-e {}={} ".format("DOCKER_POSTGRESQL_JAVA_CLIENT_TAG", tag) elif image == "clickhouse/integration-test": env_tags += "-e {}={} ".format("DOCKER_BASE_TAG", tag) + elif image == "clickhouse/kerberized-hadoop": + env_tags += "-e {}={} ".format("DOCKER_KERBERIZED_HADOOP_TAG", tag) elif image == "clickhouse/kerberos-kdc": env_tags += "-e {}={} ".format("DOCKER_KERBEROS_KDC_TAG", tag) else: diff --git a/tests/integration/test_storage_kerberized_hdfs/kerberos_image_config.sh b/tests/integration/test_storage_kerberized_hdfs/kerberos_image_config.sh index 0a746eb1a67..45fb93792e0 100644 --- a/tests/integration/test_storage_kerberized_hdfs/kerberos_image_config.sh +++ b/tests/integration/test_storage_kerberized_hdfs/kerberos_image_config.sh @@ -90,6 +90,7 @@ create_admin_user() { } create_keytabs() { + rm /tmp/keytab/*.keytab # kadmin.local -q "addprinc -randkey hdfs/kerberizedhdfs1.${DOMAIN_REALM}@${REALM}" From 9340aab154b6acc4d04e78d7cda7b8eed5d83d2f Mon Sep 17 00:00:00 2001 From: "Mikhail f. Shiryaev" Date: Sun, 13 Feb 2022 14:42:51 +0100 Subject: [PATCH 3/4] Clean kerberos keytab between running --- .../test_storage_kerberized_kafka/kerberos_image_config.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/test_storage_kerberized_kafka/kerberos_image_config.sh b/tests/integration/test_storage_kerberized_kafka/kerberos_image_config.sh index 723868ec68a..07437c42359 100644 --- a/tests/integration/test_storage_kerberized_kafka/kerberos_image_config.sh +++ b/tests/integration/test_storage_kerberized_kafka/kerberos_image_config.sh @@ -90,6 +90,7 @@ create_admin_user() { } create_keytabs() { + rm /tmp/keytab/*.keytab kadmin.local -q "addprinc -randkey zookeeper/kafka_kerberized_zookeeper@${REALM}" kadmin.local -q "ktadd -norandkey -k /tmp/keytab/kafka_kerberized_zookeeper.keytab zookeeper/kafka_kerberized_zookeeper@${REALM}" From 1e82b04de30b307482f4c4b0f96f8aa5d354425d Mon Sep 17 00:00:00 2001 From: "Mikhail f. Shiryaev" Date: Sun, 13 Feb 2022 23:54:18 +0100 Subject: [PATCH 4/4] Apply black formatter to ci-runner to trigger license/cla --- tests/integration/ci-runner.py | 531 +++++++++++++++++++++++---------- 1 file changed, 372 insertions(+), 159 deletions(-) diff --git a/tests/integration/ci-runner.py b/tests/integration/ci-runner.py index c1a2e6cf0b1..8f228d91e9e 100755 --- a/tests/integration/ci-runner.py +++ b/tests/integration/ci-runner.py @@ -1,17 +1,16 @@ #!/usr/bin/env python3 -import logging -import subprocess -import os -import glob -import time -import shutil from collections import defaultdict -import random -import json import csv -# for crc32 -import zlib +import glob +import json +import logging +import os +import random +import shutil +import subprocess +import time +import zlib # for crc32 MAX_RETRY = 3 @@ -25,54 +24,62 @@ CLICKHOUSE_LIBRARY_BRIDGE_BINARY_PATH = "usr/bin/clickhouse-library-bridge" TRIES_COUNT = 10 MAX_TIME_SECONDS = 3600 -MAX_TIME_IN_SANDBOX = 20 * 60 # 20 minutes -TASK_TIMEOUT = 8 * 60 * 60 # 8 hours +MAX_TIME_IN_SANDBOX = 20 * 60 # 20 minutes +TASK_TIMEOUT = 8 * 60 * 60 # 8 hours + def stringhash(s): - return zlib.crc32(s.encode('utf-8')) + return zlib.crc32(s.encode("utf-8")) + def get_tests_to_run(pr_info): result = set([]) - changed_files = pr_info['changed_files'] + changed_files = pr_info["changed_files"] if changed_files is None: return [] for fpath in changed_files: - if 'tests/integration/test_' in fpath: - logging.info('File %s changed and seems like integration test', fpath) - result.add(fpath.split('/')[2]) + if "tests/integration/test_" in fpath: + logging.info("File %s changed and seems like integration test", fpath) + result.add(fpath.split("/")[2]) return list(result) def filter_existing_tests(tests_to_run, repo_path): result = [] for relative_test_path in tests_to_run: - if os.path.exists(os.path.join(repo_path, 'tests/integration', relative_test_path)): + if os.path.exists( + os.path.join(repo_path, "tests/integration", relative_test_path) + ): result.append(relative_test_path) else: - logging.info("Skipping test %s, seems like it was removed", relative_test_path) + logging.info( + "Skipping test %s, seems like it was removed", relative_test_path + ) return result def _get_deselect_option(tests): - return ' '.join(['--deselect {}'.format(t) for t in tests]) + return " ".join([f"--deselect {t}" for t in tests]) + # https://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks def chunks(lst, n): """Yield successive n-sized chunks from lst.""" for i in range(0, len(lst), n): - yield lst[i:i + n] + yield lst[i : i + n] + def get_counters(fname): counters = { - "ERROR": set([]), - "PASSED": set([]), - "FAILED": set([]), + "ERROR": set([]), + "PASSED": set([]), + "FAILED": set([]), "SKIPPED": set([]), } - with open(fname, 'r') as out: + with open(fname, "r") as out: for line in out: line = line.strip() # Example of log: @@ -81,10 +88,10 @@ def get_counters(fname): # [gw0] [ 7%] ERROR test_mysql_protocol/test.py::test_golang_client # # And only the line with test status should be matched - if not('.py::' in line and ' ' in line): + if not (".py::" in line and " " in line): continue - line_arr = line.strip().split(' ') + line_arr = line.strip().split(" ") if len(line_arr) < 2: logging.debug("Strange line %s", line) continue @@ -97,9 +104,9 @@ def get_counters(fname): if state in counters: counters[state].add(test_name) else: - # will skip lines line: - # 30.76s call test_host_ip_change/test.py::test_ip_change_drop_dns_cache - # 5.71s teardown test_host_ip_change/test.py::test_user_access_ip_change[node1] + # will skip lines like: + # 30.76s call test_host_ip_change/test.py::test_ip_drop_cache + # 5.71s teardown test_host_ip_change/test.py::test_ip_change[node1] # and similar logging.debug("Strange state in line %s", line) @@ -109,13 +116,13 @@ def get_counters(fname): def parse_test_times(fname): read = False description_output = [] - with open(fname, 'r') as out: + with open(fname, "r") as out: for line in out: - if read and '==' in line: + if read and "==" in line: break if read and line.strip(): description_output.append(line.strip()) - if 'slowest durations' in line: + if "slowest durations" in line: read = True return description_output @@ -123,10 +130,10 @@ def parse_test_times(fname): def get_test_times(output): result = defaultdict(float) for line in output: - if '.py' in line: - line_arr = line.strip().split(' ') + if ".py" in line: + line_arr = line.strip().split(" ") test_time = line_arr[0] - test_name = ' '.join([elem for elem in line_arr[2:] if elem]) + test_name = " ".join([elem for elem in line_arr[2:] if elem]) if test_name not in result: result[test_name] = 0.0 result[test_name] += float(test_time[:-1]) @@ -134,21 +141,28 @@ def get_test_times(output): def clear_ip_tables_and_restart_daemons(): - logging.info("Dump iptables after run %s", subprocess.check_output("sudo iptables -L", shell=True)) + logging.info( + "Dump iptables after run %s", + subprocess.check_output("sudo iptables -L", shell=True), + ) try: logging.info("Killing all alive docker containers") - subprocess.check_output("timeout -s 9 10m docker kill $(docker ps -q)", shell=True) + subprocess.check_output( + "timeout -s 9 10m docker kill $(docker ps -q)", shell=True + ) except subprocess.CalledProcessError as err: logging.info("docker kill excepted: " + str(err)) try: logging.info("Removing all docker containers") - subprocess.check_output("timeout -s 9 10m docker rm $(docker ps -a -q) --force", shell=True) + subprocess.check_output( + "timeout -s 9 10m docker rm $(docker ps -a -q) --force", shell=True + ) except subprocess.CalledProcessError as err: logging.info("docker rm excepted: " + str(err)) # don't restart docker if it's disabled - if os.environ.get("CLICKHOUSE_TESTS_RUNNER_RESTART_DOCKER", '1') == '1': + if os.environ.get("CLICKHOUSE_TESTS_RUNNER_RESTART_DOCKER", "1") == "1": try: logging.info("Stopping docker daemon") subprocess.check_output("service docker stop", shell=True) @@ -177,27 +191,33 @@ def clear_ip_tables_and_restart_daemons(): # when rules will be empty, it will raise exception subprocess.check_output("sudo iptables -D DOCKER-USER 1", shell=True) except subprocess.CalledProcessError as err: - logging.info("All iptables rules cleared, " + str(iptables_iter) + "iterations, last error: " + str(err)) + logging.info( + "All iptables rules cleared, " + + str(iptables_iter) + + "iterations, last error: " + + str(err) + ) class ClickhouseIntegrationTestsRunner: - def __init__(self, result_path, params): self.result_path = result_path self.params = params - self.image_versions = self.params['docker_images_with_versions'] - self.shuffle_groups = self.params['shuffle_test_groups'] - self.flaky_check = 'flaky check' in self.params['context_name'] + self.image_versions = self.params["docker_images_with_versions"] + self.shuffle_groups = self.params["shuffle_test_groups"] + self.flaky_check = "flaky check" in self.params["context_name"] # if use_tmpfs is not set we assume it to be true, otherwise check - self.use_tmpfs = 'use_tmpfs' not in self.params or self.params['use_tmpfs'] - self.disable_net_host = 'disable_net_host' in self.params and self.params['disable_net_host'] + self.use_tmpfs = "use_tmpfs" not in self.params or self.params["use_tmpfs"] + self.disable_net_host = ( + "disable_net_host" in self.params and self.params["disable_net_host"] + ) self.start_time = time.time() self.soft_deadline_time = self.start_time + (TASK_TIMEOUT - MAX_TIME_IN_SANDBOX) - if 'run_by_hash_total' in self.params: - self.run_by_hash_total = self.params['run_by_hash_total'] - self.run_by_hash_num = self.params['run_by_hash_num'] + if "run_by_hash_total" in self.params: + self.run_by_hash_total = self.params["run_by_hash_total"] + self.run_by_hash_num = self.params["run_by_hash_num"] else: self.run_by_hash_total = 0 self.run_by_hash_num = 0 @@ -206,7 +226,7 @@ class ClickhouseIntegrationTestsRunner: return self.result_path def base_path(self): - return os.path.join(str(self.result_path), '../') + return os.path.join(str(self.result_path), "../") def should_skip_tests(self): return [] @@ -214,8 +234,10 @@ class ClickhouseIntegrationTestsRunner: def get_image_with_version(self, name): if name in self.image_versions: return name + ":" + self.image_versions[name] - logging.warn("Cannot find image %s in params list %s", name, self.image_versions) - if ':' not in name: + logging.warn( + "Cannot find image %s in params list %s", name, self.image_versions + ) + if ":" not in name: return name + ":latest" return name @@ -223,31 +245,44 @@ class ClickhouseIntegrationTestsRunner: name = self.get_images_names()[0] if name in self.image_versions: return self.image_versions[name] - logging.warn("Cannot find image %s in params list %s", name, self.image_versions) - return 'latest' + logging.warn( + "Cannot find image %s in params list %s", name, self.image_versions + ) + return "latest" def shuffle_test_groups(self): return self.shuffle_groups != 0 @staticmethod def get_images_names(): - return ["clickhouse/integration-tests-runner", "clickhouse/mysql-golang-client", - "clickhouse/mysql-java-client", "clickhouse/mysql-js-client", - "clickhouse/mysql-php-client", "clickhouse/postgresql-java-client", - "clickhouse/integration-test", "clickhouse/kerberos-kdc", - "clickhouse/kerberized-hadoop", "clickhouse/dotnet-client", - "clickhouse/integration-helper", ] - + return [ + "clickhouse/dotnet-client", + "clickhouse/integration-helper", + "clickhouse/integration-test", + "clickhouse/integration-tests-runner", + "clickhouse/kerberized-hadoop", + "clickhouse/kerberos-kdc", + "clickhouse/mysql-golang-client", + "clickhouse/mysql-java-client", + "clickhouse/mysql-js-client", + "clickhouse/mysql-php-client", + "clickhouse/postgresql-java-client", + ] def _can_run_with(self, path, opt): - with open(path, 'r') as script: + with open(path, "r") as script: for line in script: if opt in line: return True return False def _install_clickhouse(self, debs_path): - for package in ('clickhouse-common-static_', 'clickhouse-server_', 'clickhouse-client', 'clickhouse-common-static-dbg_'): # order matters + for package in ( + "clickhouse-common-static_", + "clickhouse-server_", + "clickhouse-client", + "clickhouse-common-static-dbg_", + ): # order matters logging.info("Installing package %s", package) for f in os.listdir(debs_path): if package in f: @@ -255,10 +290,12 @@ class ClickhouseIntegrationTestsRunner: logging.info("Package found in %s", full_path) log_name = "install_" + f + ".log" log_path = os.path.join(str(self.path()), log_name) - with open(log_path, 'w') as log: + with open(log_path, "w") as log: cmd = "dpkg -x {} .".format(full_path) logging.info("Executing installation cmd %s", cmd) - retcode = subprocess.Popen(cmd, shell=True, stderr=log, stdout=log).wait() + retcode = subprocess.Popen( + cmd, shell=True, stderr=log, stdout=log + ).wait() if retcode == 0: logging.info("Installation of %s successfull", full_path) else: @@ -267,18 +304,35 @@ class ClickhouseIntegrationTestsRunner: else: raise Exception("Package with {} not found".format(package)) logging.info("Unstripping binary") - # logging.info("Unstring %s", subprocess.check_output("eu-unstrip /usr/bin/clickhouse {}".format(CLICKHOUSE_BINARY_PATH), shell=True)) + # logging.info( + # "Unstring %s", + # subprocess.check_output( + # "eu-unstrip /usr/bin/clickhouse {}".format(CLICKHOUSE_BINARY_PATH), + # shell=True, + # ), + # ) logging.info("All packages installed") os.chmod(CLICKHOUSE_BINARY_PATH, 0o777) os.chmod(CLICKHOUSE_ODBC_BRIDGE_BINARY_PATH, 0o777) os.chmod(CLICKHOUSE_LIBRARY_BRIDGE_BINARY_PATH, 0o777) - shutil.copy(CLICKHOUSE_BINARY_PATH, os.getenv("CLICKHOUSE_TESTS_SERVER_BIN_PATH")) - shutil.copy(CLICKHOUSE_ODBC_BRIDGE_BINARY_PATH, os.getenv("CLICKHOUSE_TESTS_ODBC_BRIDGE_BIN_PATH")) - shutil.copy(CLICKHOUSE_LIBRARY_BRIDGE_BINARY_PATH, os.getenv("CLICKHOUSE_TESTS_LIBRARY_BRIDGE_BIN_PATH")) + shutil.copy( + CLICKHOUSE_BINARY_PATH, os.getenv("CLICKHOUSE_TESTS_SERVER_BIN_PATH") + ) + shutil.copy( + CLICKHOUSE_ODBC_BRIDGE_BINARY_PATH, + os.getenv("CLICKHOUSE_TESTS_ODBC_BRIDGE_BIN_PATH"), + ) + shutil.copy( + CLICKHOUSE_LIBRARY_BRIDGE_BINARY_PATH, + os.getenv("CLICKHOUSE_TESTS_LIBRARY_BRIDGE_BIN_PATH"), + ) def _compress_logs(self, dir, relpaths, result_path): - subprocess.check_call("tar czf {} -C {} {}".format(result_path, dir, ' '.join(relpaths)), shell=True) # STYLE_CHECK_ALLOW_SUBPROCESS_CHECK_CALL + subprocess.check_call( # STYLE_CHECK_ALLOW_SUBPROCESS_CHECK_CALL + "tar czf {} -C {} {}".format(result_path, dir, " ".join(relpaths)), + shell=True, + ) def _get_runner_opts(self): result = [] @@ -292,22 +346,40 @@ class ClickhouseIntegrationTestsRunner: image_cmd = self._get_runner_image_cmd(repo_path) out_file = "all_tests.txt" out_file_full = "all_tests_full.txt" - cmd = "cd {repo_path}/tests/integration && " \ - "timeout -s 9 1h ./runner {runner_opts} {image_cmd} ' --setup-plan' " \ - "| tee {out_file_full} | grep '::' | sed 's/ (fixtures used:.*//g' | sed 's/^ *//g' | sed 's/ *$//g' " \ + cmd = ( + "cd {repo_path}/tests/integration && " + "timeout -s 9 1h ./runner {runner_opts} {image_cmd} ' --setup-plan' " + "| tee {out_file_full} | grep '::' | sed 's/ (fixtures used:.*//g' | sed 's/^ *//g' | sed 's/ *$//g' " "| grep -v 'SKIPPED' | sort -u > {out_file}".format( - repo_path=repo_path, runner_opts=self._get_runner_opts(), image_cmd=image_cmd, out_file=out_file, out_file_full=out_file_full) + repo_path=repo_path, + runner_opts=self._get_runner_opts(), + image_cmd=image_cmd, + out_file=out_file, + out_file_full=out_file_full, + ) + ) logging.info("Getting all tests with cmd '%s'", cmd) - subprocess.check_call(cmd, shell=True) # STYLE_CHECK_ALLOW_SUBPROCESS_CHECK_CALL + subprocess.check_call( # STYLE_CHECK_ALLOW_SUBPROCESS_CHECK_CALL + cmd, shell=True + ) - all_tests_file_path = "{repo_path}/tests/integration/{out_file}".format(repo_path=repo_path, out_file=out_file) - if not os.path.isfile(all_tests_file_path) or os.path.getsize(all_tests_file_path) == 0: - all_tests_full_file_path = "{repo_path}/tests/integration/{out_file}".format(repo_path=repo_path, out_file=out_file_full) + all_tests_file_path = "{repo_path}/tests/integration/{out_file}".format( + repo_path=repo_path, out_file=out_file + ) + if ( + not os.path.isfile(all_tests_file_path) + or os.path.getsize(all_tests_file_path) == 0 + ): + all_tests_full_file_path = ( + "{repo_path}/tests/integration/{out_file}".format( + repo_path=repo_path, out_file=out_file_full + ) + ) if os.path.isfile(all_tests_full_file_path): # log runner output logging.info("runner output:") - with open(all_tests_full_file_path, 'r') as all_tests_full_file: + with open(all_tests_full_file_path, "r") as all_tests_full_file: for line in all_tests_full_file: line = line.rstrip() if line: @@ -315,7 +387,11 @@ class ClickhouseIntegrationTestsRunner: else: logging.info("runner output '%s' is empty", all_tests_full_file_path) - raise Exception("There is something wrong with getting all tests list: file '{}' is empty or does not exist.".format(all_tests_file_path)) + raise Exception( + "There is something wrong with getting all tests list: file '{}' is empty or does not exist.".format( + all_tests_file_path + ) + ) all_tests = [] with open(all_tests_file_path, "r") as all_tests_file: @@ -324,9 +400,18 @@ class ClickhouseIntegrationTestsRunner: return list(sorted(all_tests)) def _get_parallel_tests_skip_list(self, repo_path): - skip_list_file_path = "{}/tests/integration/parallel_skip.json".format(repo_path) - if not os.path.isfile(skip_list_file_path) or os.path.getsize(skip_list_file_path) == 0: - raise Exception("There is something wrong with getting all tests list: file '{}' is empty or does not exist.".format(skip_list_file_path)) + skip_list_file_path = "{}/tests/integration/parallel_skip.json".format( + repo_path + ) + if ( + not os.path.isfile(skip_list_file_path) + or os.path.getsize(skip_list_file_path) == 0 + ): + raise Exception( + "There is something wrong with getting all tests list: file '{}' is empty or does not exist.".format( + skip_list_file_path + ) + ) skip_list_tests = [] with open(skip_list_file_path, "r") as skip_list_file: @@ -336,7 +421,7 @@ class ClickhouseIntegrationTestsRunner: def group_test_by_file(self, tests): result = {} for test in tests: - test_file = test.split('::')[0] + test_file = test.split("::")[0] if test_file not in result: result[test_file] = [] result[test_file].append(test) @@ -344,7 +429,10 @@ class ClickhouseIntegrationTestsRunner: def _update_counters(self, main_counters, current_counters): for test in current_counters["PASSED"]: - if test not in main_counters["PASSED"] and test not in main_counters["FLAKY"]: + if ( + test not in main_counters["PASSED"] + and test not in main_counters["FLAKY"] + ): is_flaky = False if test in main_counters["FAILED"]: main_counters["FAILED"].remove(test) @@ -369,45 +457,63 @@ class ClickhouseIntegrationTestsRunner: main_counters[state].append(test) def _get_runner_image_cmd(self, repo_path): - image_cmd = '' - if self._can_run_with(os.path.join(repo_path, "tests/integration", "runner"), '--docker-image-version'): + image_cmd = "" + if self._can_run_with( + os.path.join(repo_path, "tests/integration", "runner"), + "--docker-image-version", + ): for img in self.get_images_names(): if img == "clickhouse/integration-tests-runner": runner_version = self.get_single_image_version() - logging.info("Can run with custom docker image version %s", runner_version) - image_cmd += ' --docker-image-version={} '.format(runner_version) + logging.info( + "Can run with custom docker image version %s", runner_version + ) + image_cmd += " --docker-image-version={} ".format(runner_version) else: - if self._can_run_with(os.path.join(repo_path, "tests/integration", "runner"), '--docker-compose-images-tags'): - image_cmd += '--docker-compose-images-tags={} '.format(self.get_image_with_version(img)) + if self._can_run_with( + os.path.join(repo_path, "tests/integration", "runner"), + "--docker-compose-images-tags", + ): + image_cmd += "--docker-compose-images-tags={} ".format( + self.get_image_with_version(img) + ) else: - image_cmd = '' + image_cmd = "" logging.info("Cannot run with custom docker image version :(") return image_cmd def _find_test_data_dirs(self, repo_path, test_names): relpaths = {} for test_name in test_names: - if '/' in test_name: - test_dir = test_name[:test_name.find('/')] + if "/" in test_name: + test_dir = test_name[: test_name.find("/")] else: test_dir = test_name if os.path.isdir(os.path.join(repo_path, "tests/integration", test_dir)): - for name in os.listdir(os.path.join(repo_path, "tests/integration", test_dir)): + for name in os.listdir( + os.path.join(repo_path, "tests/integration", test_dir) + ): relpath = os.path.join(os.path.join(test_dir, name)) - mtime = os.path.getmtime(os.path.join(repo_path, "tests/integration", relpath)) + mtime = os.path.getmtime( + os.path.join(repo_path, "tests/integration", relpath) + ) relpaths[relpath] = mtime return relpaths def _get_test_data_dirs_difference(self, new_snapshot, old_snapshot): res = set() for path in new_snapshot: - if (not path in old_snapshot) or (old_snapshot[path] != new_snapshot[path]): + if (path not in old_snapshot) or (old_snapshot[path] != new_snapshot[path]): res.add(path) return res - def try_run_test_group(self, repo_path, test_group, tests_in_group, num_tries, num_workers): + def try_run_test_group( + self, repo_path, test_group, tests_in_group, num_tries, num_workers + ): try: - return self.run_test_group(repo_path, test_group, tests_in_group, num_tries, num_workers) + return self.run_test_group( + repo_path, test_group, tests_in_group, num_tries, num_workers + ) except Exception as e: logging.info("Failed to run {}:\n{}".format(str(test_group), str(e))) counters = { @@ -423,7 +529,9 @@ class ClickhouseIntegrationTestsRunner: tests_times[test] = 0 return counters, tests_times, [] - def run_test_group(self, repo_path, test_group, tests_in_group, num_tries, num_workers): + def run_test_group( + self, repo_path, test_group, tests_in_group, num_tries, num_workers + ): counters = { "ERROR": [], "PASSED": [], @@ -441,7 +549,7 @@ class ClickhouseIntegrationTestsRunner: return counters, tests_times, [] image_cmd = self._get_runner_image_cmd(repo_path) - test_group_str = test_group.replace('/', '_').replace('.', '_') + test_group_str = test_group.replace("/", "_").replace(".", "_") log_paths = [] test_data_dirs = {} @@ -453,8 +561,8 @@ class ClickhouseIntegrationTestsRunner: test_names = set([]) for test_name in tests_in_group: if test_name not in counters["PASSED"]: - if '[' in test_name: - test_names.add(test_name[:test_name.find('[')]) + if "[" in test_name: + test_names.add(test_name[: test_name.find("[")]) else: test_names.add(test_name) @@ -464,47 +572,83 @@ class ClickhouseIntegrationTestsRunner: info_basename = test_group_str + "_" + str(i) + ".nfo" info_path = os.path.join(repo_path, "tests/integration", info_basename) - test_cmd = ' '.join([test for test in sorted(test_names)]) - parallel_cmd = " --parallel {} ".format(num_workers) if num_workers > 0 else "" + test_cmd = " ".join([test for test in sorted(test_names)]) + parallel_cmd = ( + " --parallel {} ".format(num_workers) if num_workers > 0 else "" + ) # -r -- show extra test summary: # -f -- (f)ailed # -E -- (E)rror # -p -- (p)assed # -s -- (s)kipped cmd = "cd {}/tests/integration && timeout -s 9 1h ./runner {} {} -t {} {} '-rfEps --run-id={} --color=no --durations=0 {}' | tee {}".format( - repo_path, self._get_runner_opts(), image_cmd, test_cmd, parallel_cmd, i, _get_deselect_option(self.should_skip_tests()), info_path) + repo_path, + self._get_runner_opts(), + image_cmd, + test_cmd, + parallel_cmd, + i, + _get_deselect_option(self.should_skip_tests()), + info_path, + ) log_basename = test_group_str + "_" + str(i) + ".log" log_path = os.path.join(repo_path, "tests/integration", log_basename) - with open(log_path, 'w') as log: + with open(log_path, "w") as log: logging.info("Executing cmd: %s", cmd) - retcode = subprocess.Popen(cmd, shell=True, stderr=log, stdout=log).wait() + retcode = subprocess.Popen( + cmd, shell=True, stderr=log, stdout=log + ).wait() if retcode == 0: logging.info("Run %s group successfully", test_group) else: logging.info("Some tests failed") extra_logs_names = [log_basename] - log_result_path = os.path.join(str(self.path()), 'integration_run_' + log_basename) + log_result_path = os.path.join( + str(self.path()), "integration_run_" + log_basename + ) shutil.copy(log_path, log_result_path) log_paths.append(log_result_path) - for pytest_log_path in glob.glob(os.path.join(repo_path, "tests/integration/pytest*.log")): - new_name = test_group_str + "_" + str(i) + "_" + os.path.basename(pytest_log_path) - os.rename(pytest_log_path, os.path.join(repo_path, "tests/integration", new_name)) + for pytest_log_path in glob.glob( + os.path.join(repo_path, "tests/integration/pytest*.log") + ): + new_name = ( + test_group_str + + "_" + + str(i) + + "_" + + os.path.basename(pytest_log_path) + ) + os.rename( + pytest_log_path, + os.path.join(repo_path, "tests/integration", new_name), + ) extra_logs_names.append(new_name) dockerd_log_path = os.path.join(repo_path, "tests/integration/dockerd.log") if os.path.exists(dockerd_log_path): - new_name = test_group_str + "_" + str(i) + "_" + os.path.basename(dockerd_log_path) - os.rename(dockerd_log_path, os.path.join(repo_path, "tests/integration", new_name)) + new_name = ( + test_group_str + + "_" + + str(i) + + "_" + + os.path.basename(dockerd_log_path) + ) + os.rename( + dockerd_log_path, + os.path.join(repo_path, "tests/integration", new_name), + ) extra_logs_names.append(new_name) if os.path.exists(info_path): extra_logs_names.append(info_basename) new_counters = get_counters(info_path) for state, tests in new_counters.items(): - logging.info("Tests with %s state (%s): %s", state, len(tests), tests) + logging.info( + "Tests with %s state (%s): %s", state, len(tests), tests + ) times_lines = parse_test_times(info_path) new_tests_times = get_test_times(times_lines) self._update_counters(counters, new_counters) @@ -512,19 +656,35 @@ class ClickhouseIntegrationTestsRunner: tests_times[test_name] = test_time test_data_dirs_new = self._find_test_data_dirs(repo_path, test_names) - test_data_dirs_diff = self._get_test_data_dirs_difference(test_data_dirs_new, test_data_dirs) + test_data_dirs_diff = self._get_test_data_dirs_difference( + test_data_dirs_new, test_data_dirs + ) test_data_dirs = test_data_dirs_new if extra_logs_names or test_data_dirs_diff: - extras_result_path = os.path.join(str(self.path()), "integration_run_" + test_group_str + "_" + str(i) + ".tar.gz") - self._compress_logs(os.path.join(repo_path, "tests/integration"), extra_logs_names + list(test_data_dirs_diff), extras_result_path) + extras_result_path = os.path.join( + str(self.path()), + "integration_run_" + test_group_str + "_" + str(i) + ".tar.gz", + ) + self._compress_logs( + os.path.join(repo_path, "tests/integration"), + extra_logs_names + list(test_data_dirs_diff), + extras_result_path, + ) log_paths.append(extras_result_path) if len(counters["PASSED"]) + len(counters["FLAKY"]) == len(tests_in_group): logging.info("All tests from group %s passed", test_group) break - if len(counters["PASSED"]) + len(counters["FLAKY"]) >= 0 and len(counters["FAILED"]) == 0 and len(counters["ERROR"]) == 0: - logging.info("Seems like all tests passed but some of them are skipped or deselected. Ignoring them and finishing group.") + if ( + len(counters["PASSED"]) + len(counters["FLAKY"]) >= 0 + and len(counters["FAILED"]) == 0 + and len(counters["ERROR"]) == 0 + ): + logging.info( + "Seems like all tests passed but some of them are skipped or " + "deselected. Ignoring them and finishing group." + ) break else: # Mark all non tried tests as errors, with '::' in name @@ -532,26 +692,28 @@ class ClickhouseIntegrationTestsRunner: # we run whole test dirs like "test_odbc_interaction" and don't # want to mark them as error so we filter by '::'. for test in tests_in_group: - if (test not in counters["PASSED"] and - test not in counters["ERROR"] and - test not in counters["SKIPPED"] and - test not in counters["FAILED"] and - '::' in test): + if ( + test not in counters["PASSED"] + and test not in counters["ERROR"] + and test not in counters["SKIPPED"] + and test not in counters["FAILED"] + and "::" in test + ): counters["ERROR"].append(test) return counters, tests_times, log_paths def run_flaky_check(self, repo_path, build_path): - pr_info = self.params['pr_info'] + pr_info = self.params["pr_info"] # pytest swears, if we require to run some tests which was renamed or deleted tests_to_run = filter_existing_tests(get_tests_to_run(pr_info), repo_path) if not tests_to_run: logging.info("No tests to run found") - return 'success', 'Nothing to run', [('Nothing to run', 'OK')], '' + return "success", "Nothing to run", [("Nothing to run", "OK")], "" self._install_clickhouse(build_path) - logging.info("Found '%s' tests to run", ' '.join(tests_to_run)) + logging.info("Found '%s' tests to run", " ".join(tests_to_run)) result_state = "success" description_prefix = "No flaky tests: " start = time.time() @@ -561,17 +723,20 @@ class ClickhouseIntegrationTestsRunner: for i in range(TRIES_COUNT): final_retry += 1 logging.info("Running tests for the %s time", i) - counters, tests_times, log_paths = self.try_run_test_group(repo_path, "flaky", tests_to_run, 1, 1) + counters, tests_times, log_paths = self.try_run_test_group( + repo_path, "flaky", tests_to_run, 1, 1 + ) logs += log_paths if counters["FAILED"]: - logging.info("Found failed tests: %s", ' '.join(counters["FAILED"])) + logging.info("Found failed tests: %s", " ".join(counters["FAILED"])) description_prefix = "Flaky tests found: " result_state = "failure" break if counters["ERROR"]: description_prefix = "Flaky tests found: " - logging.info("Found error tests: %s", ' '.join(counters["ERROR"])) - # NOTE "error" result state will restart the whole test task, so we use "failure" here + logging.info("Found error tests: %s", " ".join(counters["ERROR"])) + # NOTE "error" result state will restart the whole test task, + # so we use "failure" here result_state = "failure" break assert len(counters["FLAKY"]) == 0 @@ -591,8 +756,20 @@ class ClickhouseIntegrationTestsRunner: text_state = "FAIL" else: text_state = state - test_result += [(c + ' (✕' + str(final_retry) + ')', text_state, "{:.2f}".format(tests_times[c])) for c in counters[state]] - status_text = description_prefix + ', '.join([str(n).lower().replace('failed', 'fail') + ': ' + str(len(c)) for n, c in counters.items()]) + test_result += [ + ( + c + " (✕" + str(final_retry) + ")", + text_state, + "{:.2f}".format(tests_times[c]), + ) + for c in counters[state] + ] + status_text = description_prefix + ", ".join( + [ + str(n).lower().replace("failed", "fail") + ": " + str(len(c)) + for n, c in counters.items() + ] + ) return result_state, status_text, test_result, logs @@ -601,7 +778,10 @@ class ClickhouseIntegrationTestsRunner: return self.run_flaky_check(repo_path, build_path) self._install_clickhouse(build_path) - logging.info("Dump iptables before run %s", subprocess.check_output("sudo iptables -L", shell=True)) + logging.info( + "Dump iptables before run %s", + subprocess.check_output("sudo iptables -L", shell=True), + ) all_tests = self._get_all_tests(repo_path) if self.run_by_hash_total != 0: @@ -613,18 +793,36 @@ class ClickhouseIntegrationTestsRunner: all_tests = all_filtered_by_hash_tests parallel_skip_tests = self._get_parallel_tests_skip_list(repo_path) - logging.info("Found %s tests first 3 %s", len(all_tests), ' '.join(all_tests[:3])) - filtered_sequential_tests = list(filter(lambda test: test in all_tests, parallel_skip_tests)) - filtered_parallel_tests = list(filter(lambda test: test not in parallel_skip_tests, all_tests)) - not_found_tests = list(filter(lambda test: test not in all_tests, parallel_skip_tests)) - logging.info("Found %s tests first 3 %s, parallel %s, other %s", len(all_tests), ' '.join(all_tests[:3]), len(filtered_parallel_tests), len(filtered_sequential_tests)) - logging.info("Not found %s tests first 3 %s", len(not_found_tests), ' '.join(not_found_tests[:3])) + logging.info( + "Found %s tests first 3 %s", len(all_tests), " ".join(all_tests[:3]) + ) + filtered_sequential_tests = list( + filter(lambda test: test in all_tests, parallel_skip_tests) + ) + filtered_parallel_tests = list( + filter(lambda test: test not in parallel_skip_tests, all_tests) + ) + not_found_tests = list( + filter(lambda test: test not in all_tests, parallel_skip_tests) + ) + logging.info( + "Found %s tests first 3 %s, parallel %s, other %s", + len(all_tests), + " ".join(all_tests[:3]), + len(filtered_parallel_tests), + len(filtered_sequential_tests), + ) + logging.info( + "Not found %s tests first 3 %s", + len(not_found_tests), + " ".join(not_found_tests[:3]), + ) grouped_tests = self.group_test_by_file(filtered_sequential_tests) i = 0 for par_group in chunks(filtered_parallel_tests, PARALLEL_GROUP_SIZE): - grouped_tests["parallel{}".format(i)] = par_group - i+=1 + grouped_tests[f"parallel{i}"] = par_group + i += 1 logging.info("Found %s tests groups", len(grouped_tests)) counters = { @@ -646,12 +844,18 @@ class ClickhouseIntegrationTestsRunner: for group, tests in items_to_run: logging.info("Running test group %s containing %s tests", group, len(tests)) - group_counters, group_test_times, log_paths = self.try_run_test_group(repo_path, group, tests, MAX_RETRY, NUM_WORKERS) + group_counters, group_test_times, log_paths = self.try_run_test_group( + repo_path, group, tests, MAX_RETRY, NUM_WORKERS + ) total_tests = 0 for counter, value in group_counters.items(): - logging.info("Tests from group %s stats, %s count %s", group, counter, len(value)) + logging.info( + "Tests from group %s stats, %s count %s", group, counter, len(value) + ) counters[counter] += value - logging.info("Totally have %s with status %s", len(counters[counter]), counter) + logging.info( + "Totally have %s with status %s", len(counters[counter]), counter + ) total_tests += len(counters[counter]) logging.info("Totally finished tests %s/%s", total_tests, len(all_tests)) @@ -664,7 +868,9 @@ class ClickhouseIntegrationTestsRunner: break if counters["FAILED"] or counters["ERROR"]: - logging.info("Overall status failure, because we have tests in FAILED or ERROR state") + logging.info( + "Overall status failure, because we have tests in FAILED or ERROR state" + ) result_state = "failure" else: logging.info("Overall success!") @@ -678,42 +884,49 @@ class ClickhouseIntegrationTestsRunner: text_state = "FAIL" else: text_state = state - test_result += [(c, text_state, "{:.2f}".format(tests_times[c]), tests_log_paths[c]) for c in counters[state]] + test_result += [ + (c, text_state, "{:.2f}".format(tests_times[c]), tests_log_paths[c]) + for c in counters[state] + ] - failed_sum = len(counters['FAILED']) + len(counters['ERROR']) - status_text = "fail: {}, passed: {}, flaky: {}".format(failed_sum, len(counters['PASSED']), len(counters['FLAKY'])) + failed_sum = len(counters["FAILED"]) + len(counters["ERROR"]) + status_text = "fail: {}, passed: {}, flaky: {}".format( + failed_sum, len(counters["PASSED"]), len(counters["FLAKY"]) + ) if self.soft_deadline_time < time.time(): status_text = "Timeout, " + status_text result_state = "failure" - counters['FLAKY'] = [] + counters["FLAKY"] = [] if not counters or sum(len(counter) for counter in counters.values()) == 0: status_text = "No tests found for some reason! It's a bug" result_state = "failure" - if '(memory)' in self.params['context_name']: + if "(memory)" in self.params["context_name"]: result_state = "success" return result_state, status_text, test_result, [] + def write_results(results_file, status_file, results, status): - with open(results_file, 'w') as f: - out = csv.writer(f, delimiter='\t') + with open(results_file, "w") as f: + out = csv.writer(f, delimiter="\t") out.writerows(results) - with open(status_file, 'w') as f: - out = csv.writer(f, delimiter='\t') + with open(status_file, "w") as f: + out = csv.writer(f, delimiter="\t") out.writerow(status) + if __name__ == "__main__": - logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s') + logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s") repo_path = os.environ.get("CLICKHOUSE_TESTS_REPO_PATH") build_path = os.environ.get("CLICKHOUSE_TESTS_BUILD_PATH") result_path = os.environ.get("CLICKHOUSE_TESTS_RESULT_PATH") params_path = os.environ.get("CLICKHOUSE_TESTS_JSON_PARAMS_PATH") - params = json.loads(open(params_path, 'r').read()) + params = json.loads(open(params_path, "r").read()) runner = ClickhouseIntegrationTestsRunner(result_path, params) logging.info("Running tests")