Merge pull request #34556 from ClickHouse/hadoop-tests

Adjust HDFS tests to a new hadoop image
This commit is contained in:
Mikhail f. Shiryaev 2022-02-14 01:37:00 +01:00 committed by GitHub
commit cf2d3d91f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 388 additions and 166 deletions

View File

@ -15,9 +15,10 @@ RUN curl -o krb5-libs-1.10.3-65.el6.x86_64.rpm ftp://ftp.pbone.net/mirror/vault.
rm -fr *.rpm
RUN cd /tmp && \
curl http://archive.apache.org/dist/commons/daemon/source/commons-daemon-1.0.15-src.tar.gz -o commons-daemon-1.0.15-src.tar.gz && \
tar xzf commons-daemon-1.0.15-src.tar.gz && \
cd commons-daemon-1.0.15-src/src/native/unix && \
./configure && \
make && \
cp ./jsvc /usr/local/hadoop-2.7.0/sbin
curl http://archive.apache.org/dist/commons/daemon/source/commons-daemon-1.0.15-src.tar.gz -o commons-daemon-1.0.15-src.tar.gz && \
tar xzf commons-daemon-1.0.15-src.tar.gz && \
cd commons-daemon-1.0.15-src/src/native/unix && \
./configure && \
make && \
cp ./jsvc /usr/local/hadoop-2.7.0/sbin && \
[ -e /usr/local/hadoop ] || ln -s ./hadoop-2.7.0 /usr/local/hadoop

View File

@ -4,7 +4,7 @@ services:
kerberizedhdfs1:
cap_add:
- DAC_READ_SEARCH
image: clickhouse/kerberized-hadoop
image: clickhouse/kerberized-hadoop:${DOCKER_KERBERIZED_HADOOP_TAG:-latest}
hostname: kerberizedhdfs1
restart: always
volumes:

View File

@ -45,6 +45,7 @@ export DOCKER_MYSQL_JS_CLIENT_TAG=${DOCKER_MYSQL_JS_CLIENT_TAG:=latest}
export DOCKER_MYSQL_PHP_CLIENT_TAG=${DOCKER_MYSQL_PHP_CLIENT_TAG:=latest}
export DOCKER_POSTGRESQL_JAVA_CLIENT_TAG=${DOCKER_POSTGRESQL_JAVA_CLIENT_TAG:=latest}
export DOCKER_KERBEROS_KDC_TAG=${DOCKER_KERBEROS_KDC_TAG:=latest}
export DOCKER_KERBERIZED_HADOOP_TAG=${DOCKER_KERBERIZED_HADOOP_TAG:=latest}
cd /ClickHouse/tests/integration
exec "$@"

View File

@ -23,6 +23,8 @@ from rerun_helper import RerunHelper
from tee_popen import TeePopen
# When update, update
# integration/ci-runner.py:ClickhouseIntegrationTestsRunner.get_images_names too
IMAGES = [
"clickhouse/integration-tests-runner",
"clickhouse/mysql-golang-client",
@ -32,6 +34,7 @@ IMAGES = [
"clickhouse/postgresql-java-client",
"clickhouse/integration-test",
"clickhouse/kerberos-kdc",
"clickhouse/kerberized-hadoop",
"clickhouse/integration-helper",
"clickhouse/dotnet-client",
]

View File

@ -1,17 +1,16 @@
#!/usr/bin/env python3
import logging
import subprocess
import os
import glob
import time
import shutil
from collections import defaultdict
import random
import json
import csv
# for crc32
import zlib
import glob
import json
import logging
import os
import random
import shutil
import subprocess
import time
import zlib # for crc32
MAX_RETRY = 3
@ -25,54 +24,62 @@ CLICKHOUSE_LIBRARY_BRIDGE_BINARY_PATH = "usr/bin/clickhouse-library-bridge"
TRIES_COUNT = 10
MAX_TIME_SECONDS = 3600
MAX_TIME_IN_SANDBOX = 20 * 60 # 20 minutes
TASK_TIMEOUT = 8 * 60 * 60 # 8 hours
MAX_TIME_IN_SANDBOX = 20 * 60 # 20 minutes
TASK_TIMEOUT = 8 * 60 * 60 # 8 hours
def stringhash(s):
return zlib.crc32(s.encode('utf-8'))
return zlib.crc32(s.encode("utf-8"))
def get_tests_to_run(pr_info):
result = set([])
changed_files = pr_info['changed_files']
changed_files = pr_info["changed_files"]
if changed_files is None:
return []
for fpath in changed_files:
if 'tests/integration/test_' in fpath:
logging.info('File %s changed and seems like integration test', fpath)
result.add(fpath.split('/')[2])
if "tests/integration/test_" in fpath:
logging.info("File %s changed and seems like integration test", fpath)
result.add(fpath.split("/")[2])
return list(result)
def filter_existing_tests(tests_to_run, repo_path):
result = []
for relative_test_path in tests_to_run:
if os.path.exists(os.path.join(repo_path, 'tests/integration', relative_test_path)):
if os.path.exists(
os.path.join(repo_path, "tests/integration", relative_test_path)
):
result.append(relative_test_path)
else:
logging.info("Skipping test %s, seems like it was removed", relative_test_path)
logging.info(
"Skipping test %s, seems like it was removed", relative_test_path
)
return result
def _get_deselect_option(tests):
return ' '.join(['--deselect {}'.format(t) for t in tests])
return " ".join([f"--deselect {t}" for t in tests])
# https://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks
def chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i:i + n]
yield lst[i : i + n]
def get_counters(fname):
counters = {
"ERROR": set([]),
"PASSED": set([]),
"FAILED": set([]),
"ERROR": set([]),
"PASSED": set([]),
"FAILED": set([]),
"SKIPPED": set([]),
}
with open(fname, 'r') as out:
with open(fname, "r") as out:
for line in out:
line = line.strip()
# Example of log:
@ -81,10 +88,10 @@ def get_counters(fname):
# [gw0] [ 7%] ERROR test_mysql_protocol/test.py::test_golang_client
#
# And only the line with test status should be matched
if not('.py::' in line and ' ' in line):
if not (".py::" in line and " " in line):
continue
line_arr = line.strip().split(' ')
line_arr = line.strip().split(" ")
if len(line_arr) < 2:
logging.debug("Strange line %s", line)
continue
@ -97,9 +104,9 @@ def get_counters(fname):
if state in counters:
counters[state].add(test_name)
else:
# will skip lines line:
# 30.76s call test_host_ip_change/test.py::test_ip_change_drop_dns_cache
# 5.71s teardown test_host_ip_change/test.py::test_user_access_ip_change[node1]
# will skip lines like:
# 30.76s call test_host_ip_change/test.py::test_ip_drop_cache
# 5.71s teardown test_host_ip_change/test.py::test_ip_change[node1]
# and similar
logging.debug("Strange state in line %s", line)
@ -109,13 +116,13 @@ def get_counters(fname):
def parse_test_times(fname):
read = False
description_output = []
with open(fname, 'r') as out:
with open(fname, "r") as out:
for line in out:
if read and '==' in line:
if read and "==" in line:
break
if read and line.strip():
description_output.append(line.strip())
if 'slowest durations' in line:
if "slowest durations" in line:
read = True
return description_output
@ -123,10 +130,10 @@ def parse_test_times(fname):
def get_test_times(output):
result = defaultdict(float)
for line in output:
if '.py' in line:
line_arr = line.strip().split(' ')
if ".py" in line:
line_arr = line.strip().split(" ")
test_time = line_arr[0]
test_name = ' '.join([elem for elem in line_arr[2:] if elem])
test_name = " ".join([elem for elem in line_arr[2:] if elem])
if test_name not in result:
result[test_name] = 0.0
result[test_name] += float(test_time[:-1])
@ -134,21 +141,28 @@ def get_test_times(output):
def clear_ip_tables_and_restart_daemons():
logging.info("Dump iptables after run %s", subprocess.check_output("sudo iptables -L", shell=True))
logging.info(
"Dump iptables after run %s",
subprocess.check_output("sudo iptables -L", shell=True),
)
try:
logging.info("Killing all alive docker containers")
subprocess.check_output("timeout -s 9 10m docker kill $(docker ps -q)", shell=True)
subprocess.check_output(
"timeout -s 9 10m docker kill $(docker ps -q)", shell=True
)
except subprocess.CalledProcessError as err:
logging.info("docker kill excepted: " + str(err))
try:
logging.info("Removing all docker containers")
subprocess.check_output("timeout -s 9 10m docker rm $(docker ps -a -q) --force", shell=True)
subprocess.check_output(
"timeout -s 9 10m docker rm $(docker ps -a -q) --force", shell=True
)
except subprocess.CalledProcessError as err:
logging.info("docker rm excepted: " + str(err))
# don't restart docker if it's disabled
if os.environ.get("CLICKHOUSE_TESTS_RUNNER_RESTART_DOCKER", '1') == '1':
if os.environ.get("CLICKHOUSE_TESTS_RUNNER_RESTART_DOCKER", "1") == "1":
try:
logging.info("Stopping docker daemon")
subprocess.check_output("service docker stop", shell=True)
@ -177,27 +191,33 @@ def clear_ip_tables_and_restart_daemons():
# when rules will be empty, it will raise exception
subprocess.check_output("sudo iptables -D DOCKER-USER 1", shell=True)
except subprocess.CalledProcessError as err:
logging.info("All iptables rules cleared, " + str(iptables_iter) + "iterations, last error: " + str(err))
logging.info(
"All iptables rules cleared, "
+ str(iptables_iter)
+ "iterations, last error: "
+ str(err)
)
class ClickhouseIntegrationTestsRunner:
def __init__(self, result_path, params):
self.result_path = result_path
self.params = params
self.image_versions = self.params['docker_images_with_versions']
self.shuffle_groups = self.params['shuffle_test_groups']
self.flaky_check = 'flaky check' in self.params['context_name']
self.image_versions = self.params["docker_images_with_versions"]
self.shuffle_groups = self.params["shuffle_test_groups"]
self.flaky_check = "flaky check" in self.params["context_name"]
# if use_tmpfs is not set we assume it to be true, otherwise check
self.use_tmpfs = 'use_tmpfs' not in self.params or self.params['use_tmpfs']
self.disable_net_host = 'disable_net_host' in self.params and self.params['disable_net_host']
self.use_tmpfs = "use_tmpfs" not in self.params or self.params["use_tmpfs"]
self.disable_net_host = (
"disable_net_host" in self.params and self.params["disable_net_host"]
)
self.start_time = time.time()
self.soft_deadline_time = self.start_time + (TASK_TIMEOUT - MAX_TIME_IN_SANDBOX)
if 'run_by_hash_total' in self.params:
self.run_by_hash_total = self.params['run_by_hash_total']
self.run_by_hash_num = self.params['run_by_hash_num']
if "run_by_hash_total" in self.params:
self.run_by_hash_total = self.params["run_by_hash_total"]
self.run_by_hash_num = self.params["run_by_hash_num"]
else:
self.run_by_hash_total = 0
self.run_by_hash_num = 0
@ -206,7 +226,7 @@ class ClickhouseIntegrationTestsRunner:
return self.result_path
def base_path(self):
return os.path.join(str(self.result_path), '../')
return os.path.join(str(self.result_path), "../")
def should_skip_tests(self):
return []
@ -214,8 +234,10 @@ class ClickhouseIntegrationTestsRunner:
def get_image_with_version(self, name):
if name in self.image_versions:
return name + ":" + self.image_versions[name]
logging.warn("Cannot find image %s in params list %s", name, self.image_versions)
if ':' not in name:
logging.warn(
"Cannot find image %s in params list %s", name, self.image_versions
)
if ":" not in name:
return name + ":latest"
return name
@ -223,31 +245,44 @@ class ClickhouseIntegrationTestsRunner:
name = self.get_images_names()[0]
if name in self.image_versions:
return self.image_versions[name]
logging.warn("Cannot find image %s in params list %s", name, self.image_versions)
return 'latest'
logging.warn(
"Cannot find image %s in params list %s", name, self.image_versions
)
return "latest"
def shuffle_test_groups(self):
return self.shuffle_groups != 0
@staticmethod
def get_images_names():
return ["clickhouse/integration-tests-runner", "clickhouse/mysql-golang-client",
"clickhouse/mysql-java-client", "clickhouse/mysql-js-client",
"clickhouse/mysql-php-client", "clickhouse/postgresql-java-client",
"clickhouse/integration-test", "clickhouse/kerberos-kdc",
"clickhouse/dotnet-client",
"clickhouse/integration-helper", ]
return [
"clickhouse/dotnet-client",
"clickhouse/integration-helper",
"clickhouse/integration-test",
"clickhouse/integration-tests-runner",
"clickhouse/kerberized-hadoop",
"clickhouse/kerberos-kdc",
"clickhouse/mysql-golang-client",
"clickhouse/mysql-java-client",
"clickhouse/mysql-js-client",
"clickhouse/mysql-php-client",
"clickhouse/postgresql-java-client",
]
def _can_run_with(self, path, opt):
with open(path, 'r') as script:
with open(path, "r") as script:
for line in script:
if opt in line:
return True
return False
def _install_clickhouse(self, debs_path):
for package in ('clickhouse-common-static_', 'clickhouse-server_', 'clickhouse-client', 'clickhouse-common-static-dbg_'): # order matters
for package in (
"clickhouse-common-static_",
"clickhouse-server_",
"clickhouse-client",
"clickhouse-common-static-dbg_",
): # order matters
logging.info("Installing package %s", package)
for f in os.listdir(debs_path):
if package in f:
@ -255,10 +290,12 @@ class ClickhouseIntegrationTestsRunner:
logging.info("Package found in %s", full_path)
log_name = "install_" + f + ".log"
log_path = os.path.join(str(self.path()), log_name)
with open(log_path, 'w') as log:
with open(log_path, "w") as log:
cmd = "dpkg -x {} .".format(full_path)
logging.info("Executing installation cmd %s", cmd)
retcode = subprocess.Popen(cmd, shell=True, stderr=log, stdout=log).wait()
retcode = subprocess.Popen(
cmd, shell=True, stderr=log, stdout=log
).wait()
if retcode == 0:
logging.info("Installation of %s successfull", full_path)
else:
@ -267,18 +304,35 @@ class ClickhouseIntegrationTestsRunner:
else:
raise Exception("Package with {} not found".format(package))
logging.info("Unstripping binary")
# logging.info("Unstring %s", subprocess.check_output("eu-unstrip /usr/bin/clickhouse {}".format(CLICKHOUSE_BINARY_PATH), shell=True))
# logging.info(
# "Unstring %s",
# subprocess.check_output(
# "eu-unstrip /usr/bin/clickhouse {}".format(CLICKHOUSE_BINARY_PATH),
# shell=True,
# ),
# )
logging.info("All packages installed")
os.chmod(CLICKHOUSE_BINARY_PATH, 0o777)
os.chmod(CLICKHOUSE_ODBC_BRIDGE_BINARY_PATH, 0o777)
os.chmod(CLICKHOUSE_LIBRARY_BRIDGE_BINARY_PATH, 0o777)
shutil.copy(CLICKHOUSE_BINARY_PATH, os.getenv("CLICKHOUSE_TESTS_SERVER_BIN_PATH"))
shutil.copy(CLICKHOUSE_ODBC_BRIDGE_BINARY_PATH, os.getenv("CLICKHOUSE_TESTS_ODBC_BRIDGE_BIN_PATH"))
shutil.copy(CLICKHOUSE_LIBRARY_BRIDGE_BINARY_PATH, os.getenv("CLICKHOUSE_TESTS_LIBRARY_BRIDGE_BIN_PATH"))
shutil.copy(
CLICKHOUSE_BINARY_PATH, os.getenv("CLICKHOUSE_TESTS_SERVER_BIN_PATH")
)
shutil.copy(
CLICKHOUSE_ODBC_BRIDGE_BINARY_PATH,
os.getenv("CLICKHOUSE_TESTS_ODBC_BRIDGE_BIN_PATH"),
)
shutil.copy(
CLICKHOUSE_LIBRARY_BRIDGE_BINARY_PATH,
os.getenv("CLICKHOUSE_TESTS_LIBRARY_BRIDGE_BIN_PATH"),
)
def _compress_logs(self, dir, relpaths, result_path):
subprocess.check_call("tar czf {} -C {} {}".format(result_path, dir, ' '.join(relpaths)), shell=True) # STYLE_CHECK_ALLOW_SUBPROCESS_CHECK_CALL
subprocess.check_call( # STYLE_CHECK_ALLOW_SUBPROCESS_CHECK_CALL
"tar czf {} -C {} {}".format(result_path, dir, " ".join(relpaths)),
shell=True,
)
def _get_runner_opts(self):
result = []
@ -292,22 +346,40 @@ class ClickhouseIntegrationTestsRunner:
image_cmd = self._get_runner_image_cmd(repo_path)
out_file = "all_tests.txt"
out_file_full = "all_tests_full.txt"
cmd = "cd {repo_path}/tests/integration && " \
"timeout -s 9 1h ./runner {runner_opts} {image_cmd} ' --setup-plan' " \
"| tee {out_file_full} | grep '::' | sed 's/ (fixtures used:.*//g' | sed 's/^ *//g' | sed 's/ *$//g' " \
cmd = (
"cd {repo_path}/tests/integration && "
"timeout -s 9 1h ./runner {runner_opts} {image_cmd} ' --setup-plan' "
"| tee {out_file_full} | grep '::' | sed 's/ (fixtures used:.*//g' | sed 's/^ *//g' | sed 's/ *$//g' "
"| grep -v 'SKIPPED' | sort -u > {out_file}".format(
repo_path=repo_path, runner_opts=self._get_runner_opts(), image_cmd=image_cmd, out_file=out_file, out_file_full=out_file_full)
repo_path=repo_path,
runner_opts=self._get_runner_opts(),
image_cmd=image_cmd,
out_file=out_file,
out_file_full=out_file_full,
)
)
logging.info("Getting all tests with cmd '%s'", cmd)
subprocess.check_call(cmd, shell=True) # STYLE_CHECK_ALLOW_SUBPROCESS_CHECK_CALL
subprocess.check_call( # STYLE_CHECK_ALLOW_SUBPROCESS_CHECK_CALL
cmd, shell=True
)
all_tests_file_path = "{repo_path}/tests/integration/{out_file}".format(repo_path=repo_path, out_file=out_file)
if not os.path.isfile(all_tests_file_path) or os.path.getsize(all_tests_file_path) == 0:
all_tests_full_file_path = "{repo_path}/tests/integration/{out_file}".format(repo_path=repo_path, out_file=out_file_full)
all_tests_file_path = "{repo_path}/tests/integration/{out_file}".format(
repo_path=repo_path, out_file=out_file
)
if (
not os.path.isfile(all_tests_file_path)
or os.path.getsize(all_tests_file_path) == 0
):
all_tests_full_file_path = (
"{repo_path}/tests/integration/{out_file}".format(
repo_path=repo_path, out_file=out_file_full
)
)
if os.path.isfile(all_tests_full_file_path):
# log runner output
logging.info("runner output:")
with open(all_tests_full_file_path, 'r') as all_tests_full_file:
with open(all_tests_full_file_path, "r") as all_tests_full_file:
for line in all_tests_full_file:
line = line.rstrip()
if line:
@ -315,7 +387,11 @@ class ClickhouseIntegrationTestsRunner:
else:
logging.info("runner output '%s' is empty", all_tests_full_file_path)
raise Exception("There is something wrong with getting all tests list: file '{}' is empty or does not exist.".format(all_tests_file_path))
raise Exception(
"There is something wrong with getting all tests list: file '{}' is empty or does not exist.".format(
all_tests_file_path
)
)
all_tests = []
with open(all_tests_file_path, "r") as all_tests_file:
@ -324,9 +400,18 @@ class ClickhouseIntegrationTestsRunner:
return list(sorted(all_tests))
def _get_parallel_tests_skip_list(self, repo_path):
skip_list_file_path = "{}/tests/integration/parallel_skip.json".format(repo_path)
if not os.path.isfile(skip_list_file_path) or os.path.getsize(skip_list_file_path) == 0:
raise Exception("There is something wrong with getting all tests list: file '{}' is empty or does not exist.".format(skip_list_file_path))
skip_list_file_path = "{}/tests/integration/parallel_skip.json".format(
repo_path
)
if (
not os.path.isfile(skip_list_file_path)
or os.path.getsize(skip_list_file_path) == 0
):
raise Exception(
"There is something wrong with getting all tests list: file '{}' is empty or does not exist.".format(
skip_list_file_path
)
)
skip_list_tests = []
with open(skip_list_file_path, "r") as skip_list_file:
@ -336,7 +421,7 @@ class ClickhouseIntegrationTestsRunner:
def group_test_by_file(self, tests):
result = {}
for test in tests:
test_file = test.split('::')[0]
test_file = test.split("::")[0]
if test_file not in result:
result[test_file] = []
result[test_file].append(test)
@ -344,7 +429,10 @@ class ClickhouseIntegrationTestsRunner:
def _update_counters(self, main_counters, current_counters):
for test in current_counters["PASSED"]:
if test not in main_counters["PASSED"] and test not in main_counters["FLAKY"]:
if (
test not in main_counters["PASSED"]
and test not in main_counters["FLAKY"]
):
is_flaky = False
if test in main_counters["FAILED"]:
main_counters["FAILED"].remove(test)
@ -369,45 +457,63 @@ class ClickhouseIntegrationTestsRunner:
main_counters[state].append(test)
def _get_runner_image_cmd(self, repo_path):
image_cmd = ''
if self._can_run_with(os.path.join(repo_path, "tests/integration", "runner"), '--docker-image-version'):
image_cmd = ""
if self._can_run_with(
os.path.join(repo_path, "tests/integration", "runner"),
"--docker-image-version",
):
for img in self.get_images_names():
if img == "clickhouse/integration-tests-runner":
runner_version = self.get_single_image_version()
logging.info("Can run with custom docker image version %s", runner_version)
image_cmd += ' --docker-image-version={} '.format(runner_version)
logging.info(
"Can run with custom docker image version %s", runner_version
)
image_cmd += " --docker-image-version={} ".format(runner_version)
else:
if self._can_run_with(os.path.join(repo_path, "tests/integration", "runner"), '--docker-compose-images-tags'):
image_cmd += '--docker-compose-images-tags={} '.format(self.get_image_with_version(img))
if self._can_run_with(
os.path.join(repo_path, "tests/integration", "runner"),
"--docker-compose-images-tags",
):
image_cmd += "--docker-compose-images-tags={} ".format(
self.get_image_with_version(img)
)
else:
image_cmd = ''
image_cmd = ""
logging.info("Cannot run with custom docker image version :(")
return image_cmd
def _find_test_data_dirs(self, repo_path, test_names):
relpaths = {}
for test_name in test_names:
if '/' in test_name:
test_dir = test_name[:test_name.find('/')]
if "/" in test_name:
test_dir = test_name[: test_name.find("/")]
else:
test_dir = test_name
if os.path.isdir(os.path.join(repo_path, "tests/integration", test_dir)):
for name in os.listdir(os.path.join(repo_path, "tests/integration", test_dir)):
for name in os.listdir(
os.path.join(repo_path, "tests/integration", test_dir)
):
relpath = os.path.join(os.path.join(test_dir, name))
mtime = os.path.getmtime(os.path.join(repo_path, "tests/integration", relpath))
mtime = os.path.getmtime(
os.path.join(repo_path, "tests/integration", relpath)
)
relpaths[relpath] = mtime
return relpaths
def _get_test_data_dirs_difference(self, new_snapshot, old_snapshot):
res = set()
for path in new_snapshot:
if (not path in old_snapshot) or (old_snapshot[path] != new_snapshot[path]):
if (path not in old_snapshot) or (old_snapshot[path] != new_snapshot[path]):
res.add(path)
return res
def try_run_test_group(self, repo_path, test_group, tests_in_group, num_tries, num_workers):
def try_run_test_group(
self, repo_path, test_group, tests_in_group, num_tries, num_workers
):
try:
return self.run_test_group(repo_path, test_group, tests_in_group, num_tries, num_workers)
return self.run_test_group(
repo_path, test_group, tests_in_group, num_tries, num_workers
)
except Exception as e:
logging.info("Failed to run {}:\n{}".format(str(test_group), str(e)))
counters = {
@ -423,7 +529,9 @@ class ClickhouseIntegrationTestsRunner:
tests_times[test] = 0
return counters, tests_times, []
def run_test_group(self, repo_path, test_group, tests_in_group, num_tries, num_workers):
def run_test_group(
self, repo_path, test_group, tests_in_group, num_tries, num_workers
):
counters = {
"ERROR": [],
"PASSED": [],
@ -441,7 +549,7 @@ class ClickhouseIntegrationTestsRunner:
return counters, tests_times, []
image_cmd = self._get_runner_image_cmd(repo_path)
test_group_str = test_group.replace('/', '_').replace('.', '_')
test_group_str = test_group.replace("/", "_").replace(".", "_")
log_paths = []
test_data_dirs = {}
@ -453,8 +561,8 @@ class ClickhouseIntegrationTestsRunner:
test_names = set([])
for test_name in tests_in_group:
if test_name not in counters["PASSED"]:
if '[' in test_name:
test_names.add(test_name[:test_name.find('[')])
if "[" in test_name:
test_names.add(test_name[: test_name.find("[")])
else:
test_names.add(test_name)
@ -464,47 +572,83 @@ class ClickhouseIntegrationTestsRunner:
info_basename = test_group_str + "_" + str(i) + ".nfo"
info_path = os.path.join(repo_path, "tests/integration", info_basename)
test_cmd = ' '.join([test for test in sorted(test_names)])
parallel_cmd = " --parallel {} ".format(num_workers) if num_workers > 0 else ""
test_cmd = " ".join([test for test in sorted(test_names)])
parallel_cmd = (
" --parallel {} ".format(num_workers) if num_workers > 0 else ""
)
# -r -- show extra test summary:
# -f -- (f)ailed
# -E -- (E)rror
# -p -- (p)assed
# -s -- (s)kipped
cmd = "cd {}/tests/integration && timeout -s 9 1h ./runner {} {} -t {} {} '-rfEps --run-id={} --color=no --durations=0 {}' | tee {}".format(
repo_path, self._get_runner_opts(), image_cmd, test_cmd, parallel_cmd, i, _get_deselect_option(self.should_skip_tests()), info_path)
repo_path,
self._get_runner_opts(),
image_cmd,
test_cmd,
parallel_cmd,
i,
_get_deselect_option(self.should_skip_tests()),
info_path,
)
log_basename = test_group_str + "_" + str(i) + ".log"
log_path = os.path.join(repo_path, "tests/integration", log_basename)
with open(log_path, 'w') as log:
with open(log_path, "w") as log:
logging.info("Executing cmd: %s", cmd)
retcode = subprocess.Popen(cmd, shell=True, stderr=log, stdout=log).wait()
retcode = subprocess.Popen(
cmd, shell=True, stderr=log, stdout=log
).wait()
if retcode == 0:
logging.info("Run %s group successfully", test_group)
else:
logging.info("Some tests failed")
extra_logs_names = [log_basename]
log_result_path = os.path.join(str(self.path()), 'integration_run_' + log_basename)
log_result_path = os.path.join(
str(self.path()), "integration_run_" + log_basename
)
shutil.copy(log_path, log_result_path)
log_paths.append(log_result_path)
for pytest_log_path in glob.glob(os.path.join(repo_path, "tests/integration/pytest*.log")):
new_name = test_group_str + "_" + str(i) + "_" + os.path.basename(pytest_log_path)
os.rename(pytest_log_path, os.path.join(repo_path, "tests/integration", new_name))
for pytest_log_path in glob.glob(
os.path.join(repo_path, "tests/integration/pytest*.log")
):
new_name = (
test_group_str
+ "_"
+ str(i)
+ "_"
+ os.path.basename(pytest_log_path)
)
os.rename(
pytest_log_path,
os.path.join(repo_path, "tests/integration", new_name),
)
extra_logs_names.append(new_name)
dockerd_log_path = os.path.join(repo_path, "tests/integration/dockerd.log")
if os.path.exists(dockerd_log_path):
new_name = test_group_str + "_" + str(i) + "_" + os.path.basename(dockerd_log_path)
os.rename(dockerd_log_path, os.path.join(repo_path, "tests/integration", new_name))
new_name = (
test_group_str
+ "_"
+ str(i)
+ "_"
+ os.path.basename(dockerd_log_path)
)
os.rename(
dockerd_log_path,
os.path.join(repo_path, "tests/integration", new_name),
)
extra_logs_names.append(new_name)
if os.path.exists(info_path):
extra_logs_names.append(info_basename)
new_counters = get_counters(info_path)
for state, tests in new_counters.items():
logging.info("Tests with %s state (%s): %s", state, len(tests), tests)
logging.info(
"Tests with %s state (%s): %s", state, len(tests), tests
)
times_lines = parse_test_times(info_path)
new_tests_times = get_test_times(times_lines)
self._update_counters(counters, new_counters)
@ -512,19 +656,35 @@ class ClickhouseIntegrationTestsRunner:
tests_times[test_name] = test_time
test_data_dirs_new = self._find_test_data_dirs(repo_path, test_names)
test_data_dirs_diff = self._get_test_data_dirs_difference(test_data_dirs_new, test_data_dirs)
test_data_dirs_diff = self._get_test_data_dirs_difference(
test_data_dirs_new, test_data_dirs
)
test_data_dirs = test_data_dirs_new
if extra_logs_names or test_data_dirs_diff:
extras_result_path = os.path.join(str(self.path()), "integration_run_" + test_group_str + "_" + str(i) + ".tar.gz")
self._compress_logs(os.path.join(repo_path, "tests/integration"), extra_logs_names + list(test_data_dirs_diff), extras_result_path)
extras_result_path = os.path.join(
str(self.path()),
"integration_run_" + test_group_str + "_" + str(i) + ".tar.gz",
)
self._compress_logs(
os.path.join(repo_path, "tests/integration"),
extra_logs_names + list(test_data_dirs_diff),
extras_result_path,
)
log_paths.append(extras_result_path)
if len(counters["PASSED"]) + len(counters["FLAKY"]) == len(tests_in_group):
logging.info("All tests from group %s passed", test_group)
break
if len(counters["PASSED"]) + len(counters["FLAKY"]) >= 0 and len(counters["FAILED"]) == 0 and len(counters["ERROR"]) == 0:
logging.info("Seems like all tests passed but some of them are skipped or deselected. Ignoring them and finishing group.")
if (
len(counters["PASSED"]) + len(counters["FLAKY"]) >= 0
and len(counters["FAILED"]) == 0
and len(counters["ERROR"]) == 0
):
logging.info(
"Seems like all tests passed but some of them are skipped or "
"deselected. Ignoring them and finishing group."
)
break
else:
# Mark all non tried tests as errors, with '::' in name
@ -532,26 +692,28 @@ class ClickhouseIntegrationTestsRunner:
# we run whole test dirs like "test_odbc_interaction" and don't
# want to mark them as error so we filter by '::'.
for test in tests_in_group:
if (test not in counters["PASSED"] and
test not in counters["ERROR"] and
test not in counters["SKIPPED"] and
test not in counters["FAILED"] and
'::' in test):
if (
test not in counters["PASSED"]
and test not in counters["ERROR"]
and test not in counters["SKIPPED"]
and test not in counters["FAILED"]
and "::" in test
):
counters["ERROR"].append(test)
return counters, tests_times, log_paths
def run_flaky_check(self, repo_path, build_path):
pr_info = self.params['pr_info']
pr_info = self.params["pr_info"]
# pytest swears, if we require to run some tests which was renamed or deleted
tests_to_run = filter_existing_tests(get_tests_to_run(pr_info), repo_path)
if not tests_to_run:
logging.info("No tests to run found")
return 'success', 'Nothing to run', [('Nothing to run', 'OK')], ''
return "success", "Nothing to run", [("Nothing to run", "OK")], ""
self._install_clickhouse(build_path)
logging.info("Found '%s' tests to run", ' '.join(tests_to_run))
logging.info("Found '%s' tests to run", " ".join(tests_to_run))
result_state = "success"
description_prefix = "No flaky tests: "
start = time.time()
@ -561,17 +723,20 @@ class ClickhouseIntegrationTestsRunner:
for i in range(TRIES_COUNT):
final_retry += 1
logging.info("Running tests for the %s time", i)
counters, tests_times, log_paths = self.try_run_test_group(repo_path, "flaky", tests_to_run, 1, 1)
counters, tests_times, log_paths = self.try_run_test_group(
repo_path, "flaky", tests_to_run, 1, 1
)
logs += log_paths
if counters["FAILED"]:
logging.info("Found failed tests: %s", ' '.join(counters["FAILED"]))
logging.info("Found failed tests: %s", " ".join(counters["FAILED"]))
description_prefix = "Flaky tests found: "
result_state = "failure"
break
if counters["ERROR"]:
description_prefix = "Flaky tests found: "
logging.info("Found error tests: %s", ' '.join(counters["ERROR"]))
# NOTE "error" result state will restart the whole test task, so we use "failure" here
logging.info("Found error tests: %s", " ".join(counters["ERROR"]))
# NOTE "error" result state will restart the whole test task,
# so we use "failure" here
result_state = "failure"
break
assert len(counters["FLAKY"]) == 0
@ -591,8 +756,20 @@ class ClickhouseIntegrationTestsRunner:
text_state = "FAIL"
else:
text_state = state
test_result += [(c + ' (✕' + str(final_retry) + ')', text_state, "{:.2f}".format(tests_times[c])) for c in counters[state]]
status_text = description_prefix + ', '.join([str(n).lower().replace('failed', 'fail') + ': ' + str(len(c)) for n, c in counters.items()])
test_result += [
(
c + " (✕" + str(final_retry) + ")",
text_state,
"{:.2f}".format(tests_times[c]),
)
for c in counters[state]
]
status_text = description_prefix + ", ".join(
[
str(n).lower().replace("failed", "fail") + ": " + str(len(c))
for n, c in counters.items()
]
)
return result_state, status_text, test_result, logs
@ -601,7 +778,10 @@ class ClickhouseIntegrationTestsRunner:
return self.run_flaky_check(repo_path, build_path)
self._install_clickhouse(build_path)
logging.info("Dump iptables before run %s", subprocess.check_output("sudo iptables -L", shell=True))
logging.info(
"Dump iptables before run %s",
subprocess.check_output("sudo iptables -L", shell=True),
)
all_tests = self._get_all_tests(repo_path)
if self.run_by_hash_total != 0:
@ -613,18 +793,36 @@ class ClickhouseIntegrationTestsRunner:
all_tests = all_filtered_by_hash_tests
parallel_skip_tests = self._get_parallel_tests_skip_list(repo_path)
logging.info("Found %s tests first 3 %s", len(all_tests), ' '.join(all_tests[:3]))
filtered_sequential_tests = list(filter(lambda test: test in all_tests, parallel_skip_tests))
filtered_parallel_tests = list(filter(lambda test: test not in parallel_skip_tests, all_tests))
not_found_tests = list(filter(lambda test: test not in all_tests, parallel_skip_tests))
logging.info("Found %s tests first 3 %s, parallel %s, other %s", len(all_tests), ' '.join(all_tests[:3]), len(filtered_parallel_tests), len(filtered_sequential_tests))
logging.info("Not found %s tests first 3 %s", len(not_found_tests), ' '.join(not_found_tests[:3]))
logging.info(
"Found %s tests first 3 %s", len(all_tests), " ".join(all_tests[:3])
)
filtered_sequential_tests = list(
filter(lambda test: test in all_tests, parallel_skip_tests)
)
filtered_parallel_tests = list(
filter(lambda test: test not in parallel_skip_tests, all_tests)
)
not_found_tests = list(
filter(lambda test: test not in all_tests, parallel_skip_tests)
)
logging.info(
"Found %s tests first 3 %s, parallel %s, other %s",
len(all_tests),
" ".join(all_tests[:3]),
len(filtered_parallel_tests),
len(filtered_sequential_tests),
)
logging.info(
"Not found %s tests first 3 %s",
len(not_found_tests),
" ".join(not_found_tests[:3]),
)
grouped_tests = self.group_test_by_file(filtered_sequential_tests)
i = 0
for par_group in chunks(filtered_parallel_tests, PARALLEL_GROUP_SIZE):
grouped_tests["parallel{}".format(i)] = par_group
i+=1
grouped_tests[f"parallel{i}"] = par_group
i += 1
logging.info("Found %s tests groups", len(grouped_tests))
counters = {
@ -646,12 +844,18 @@ class ClickhouseIntegrationTestsRunner:
for group, tests in items_to_run:
logging.info("Running test group %s containing %s tests", group, len(tests))
group_counters, group_test_times, log_paths = self.try_run_test_group(repo_path, group, tests, MAX_RETRY, NUM_WORKERS)
group_counters, group_test_times, log_paths = self.try_run_test_group(
repo_path, group, tests, MAX_RETRY, NUM_WORKERS
)
total_tests = 0
for counter, value in group_counters.items():
logging.info("Tests from group %s stats, %s count %s", group, counter, len(value))
logging.info(
"Tests from group %s stats, %s count %s", group, counter, len(value)
)
counters[counter] += value
logging.info("Totally have %s with status %s", len(counters[counter]), counter)
logging.info(
"Totally have %s with status %s", len(counters[counter]), counter
)
total_tests += len(counters[counter])
logging.info("Totally finished tests %s/%s", total_tests, len(all_tests))
@ -664,7 +868,9 @@ class ClickhouseIntegrationTestsRunner:
break
if counters["FAILED"] or counters["ERROR"]:
logging.info("Overall status failure, because we have tests in FAILED or ERROR state")
logging.info(
"Overall status failure, because we have tests in FAILED or ERROR state"
)
result_state = "failure"
else:
logging.info("Overall success!")
@ -678,42 +884,49 @@ class ClickhouseIntegrationTestsRunner:
text_state = "FAIL"
else:
text_state = state
test_result += [(c, text_state, "{:.2f}".format(tests_times[c]), tests_log_paths[c]) for c in counters[state]]
test_result += [
(c, text_state, "{:.2f}".format(tests_times[c]), tests_log_paths[c])
for c in counters[state]
]
failed_sum = len(counters['FAILED']) + len(counters['ERROR'])
status_text = "fail: {}, passed: {}, flaky: {}".format(failed_sum, len(counters['PASSED']), len(counters['FLAKY']))
failed_sum = len(counters["FAILED"]) + len(counters["ERROR"])
status_text = "fail: {}, passed: {}, flaky: {}".format(
failed_sum, len(counters["PASSED"]), len(counters["FLAKY"])
)
if self.soft_deadline_time < time.time():
status_text = "Timeout, " + status_text
result_state = "failure"
counters['FLAKY'] = []
counters["FLAKY"] = []
if not counters or sum(len(counter) for counter in counters.values()) == 0:
status_text = "No tests found for some reason! It's a bug"
result_state = "failure"
if '(memory)' in self.params['context_name']:
if "(memory)" in self.params["context_name"]:
result_state = "success"
return result_state, status_text, test_result, []
def write_results(results_file, status_file, results, status):
with open(results_file, 'w') as f:
out = csv.writer(f, delimiter='\t')
with open(results_file, "w") as f:
out = csv.writer(f, delimiter="\t")
out.writerows(results)
with open(status_file, 'w') as f:
out = csv.writer(f, delimiter='\t')
with open(status_file, "w") as f:
out = csv.writer(f, delimiter="\t")
out.writerow(status)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
repo_path = os.environ.get("CLICKHOUSE_TESTS_REPO_PATH")
build_path = os.environ.get("CLICKHOUSE_TESTS_BUILD_PATH")
result_path = os.environ.get("CLICKHOUSE_TESTS_RESULT_PATH")
params_path = os.environ.get("CLICKHOUSE_TESTS_JSON_PARAMS_PATH")
params = json.loads(open(params_path, 'r').read())
params = json.loads(open(params_path, "r").read())
runner = ClickhouseIntegrationTestsRunner(result_path, params)
logging.info("Running tests")

View File

@ -238,6 +238,8 @@ if __name__ == "__main__":
env_tags += "-e {}={} ".format("DOCKER_POSTGRESQL_JAVA_CLIENT_TAG", tag)
elif image == "clickhouse/integration-test":
env_tags += "-e {}={} ".format("DOCKER_BASE_TAG", tag)
elif image == "clickhouse/kerberized-hadoop":
env_tags += "-e {}={} ".format("DOCKER_KERBERIZED_HADOOP_TAG", tag)
elif image == "clickhouse/kerberos-kdc":
env_tags += "-e {}={} ".format("DOCKER_KERBEROS_KDC_TAG", tag)
else:

View File

@ -90,6 +90,7 @@ create_admin_user() {
}
create_keytabs() {
rm /tmp/keytab/*.keytab
# kadmin.local -q "addprinc -randkey hdfs/kerberizedhdfs1.${DOMAIN_REALM}@${REALM}"

View File

@ -90,6 +90,7 @@ create_admin_user() {
}
create_keytabs() {
rm /tmp/keytab/*.keytab
kadmin.local -q "addprinc -randkey zookeeper/kafka_kerberized_zookeeper@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/kafka_kerberized_zookeeper.keytab zookeeper/kafka_kerberized_zookeeper@${REALM}"