mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
Improve logging in integration tests.
This commit is contained in:
parent
92ace627d2
commit
618a77fafa
@ -3,6 +3,7 @@
|
||||
import logging
|
||||
import subprocess
|
||||
import os
|
||||
import glob
|
||||
import time
|
||||
import shutil
|
||||
from collections import defaultdict
|
||||
@ -17,7 +18,6 @@ SLEEP_BETWEEN_RETRIES = 5
|
||||
PARALLEL_GROUP_SIZE = 100
|
||||
CLICKHOUSE_BINARY_PATH = "/usr/bin/clickhouse"
|
||||
CLICKHOUSE_ODBC_BRIDGE_BINARY_PATH = "/usr/bin/clickhouse-odbc-bridge"
|
||||
DOCKERD_LOGS_PATH = "/ClickHouse/tests/integration/dockerd.log"
|
||||
CLICKHOUSE_LIBRARY_BRIDGE_BINARY_PATH = "/usr/bin/clickhouse-library-bridge"
|
||||
|
||||
TRIES_COUNT = 10
|
||||
@ -256,8 +256,8 @@ class ClickhouseIntegrationTestsRunner:
|
||||
shutil.copy(CLICKHOUSE_LIBRARY_BRIDGE_BINARY_PATH, result_path_library_bridge)
|
||||
return None, None
|
||||
|
||||
def _compress_logs(self, path, result_path):
|
||||
subprocess.check_call("tar czf {} -C {} .".format(result_path, path), shell=True) # STYLE_CHECK_ALLOW_SUBPROCESS_CHECK_CALL
|
||||
def _compress_logs(self, dir, relpaths, result_path):
|
||||
subprocess.check_call("tar czf {} -C {} {}".format(result_path, dir, ' '.join(relpaths)), shell=True) # STYLE_CHECK_ALLOW_SUBPROCESS_CHECK_CALL
|
||||
|
||||
def _get_all_tests(self, repo_path):
|
||||
image_cmd = self._get_runner_image_cmd(repo_path)
|
||||
@ -336,6 +336,27 @@ class ClickhouseIntegrationTestsRunner:
|
||||
logging.info("Cannot run with custom docker image version :(")
|
||||
return image_cmd
|
||||
|
||||
def _find_test_data_dirs(self, repo_path, test_names):
|
||||
relpaths = {}
|
||||
for test_name in test_names:
|
||||
if '/' in test_name:
|
||||
test_dir = test_name[:test_name.find('/')]
|
||||
else:
|
||||
test_dir = test_name
|
||||
if os.path.isdir(os.path.join(repo_path, "tests/integration", test_dir)):
|
||||
for name in os.listdir(os.path.join(repo_path, "tests/integration", test_dir)):
|
||||
relpath = os.path.join(os.path.join(test_dir, name))
|
||||
mtime = os.path.getmtime(os.path.join(repo_path, "tests/integration", relpath))
|
||||
relpaths[relpath] = mtime
|
||||
return relpaths
|
||||
|
||||
def _get_test_data_dirs_difference(self, new_snapshot, old_snapshot):
|
||||
res = set()
|
||||
for path in new_snapshot:
|
||||
if (not path in old_snapshot) or (old_snapshot[path] != new_snapshot[path]):
|
||||
res.add(path)
|
||||
return res
|
||||
|
||||
def run_test_group(self, repo_path, test_group, tests_in_group, num_tries, num_workers):
|
||||
counters = {
|
||||
"ERROR": [],
|
||||
@ -355,18 +376,14 @@ class ClickhouseIntegrationTestsRunner:
|
||||
|
||||
image_cmd = self._get_runner_image_cmd(repo_path)
|
||||
test_group_str = test_group.replace('/', '_').replace('.', '_')
|
||||
|
||||
log_paths = []
|
||||
test_data_dirs = {}
|
||||
|
||||
for i in range(num_tries):
|
||||
logging.info("Running test group %s for the %s retry", test_group, i)
|
||||
clear_ip_tables_and_restart_daemons()
|
||||
|
||||
output_path = os.path.join(str(self.path()), "test_output_" + test_group_str + "_" + str(i) + ".log")
|
||||
log_name = "integration_run_" + test_group_str + "_" + str(i) + ".txt"
|
||||
log_path = os.path.join(str(self.path()), log_name)
|
||||
log_paths.append(log_path)
|
||||
logging.info("Will wait output inside %s", output_path)
|
||||
|
||||
test_names = set([])
|
||||
for test_name in tests_in_group:
|
||||
if test_name not in counters["PASSED"]:
|
||||
@ -375,11 +392,19 @@ class ClickhouseIntegrationTestsRunner:
|
||||
else:
|
||||
test_names.add(test_name)
|
||||
|
||||
if i == 0:
|
||||
test_data_dirs = self._find_test_data_dirs(repo_path, test_names)
|
||||
|
||||
info_basename = test_group_str + "_" + str(i) + ".nfo"
|
||||
info_path = os.path.join(repo_path, "tests/integration", info_basename)
|
||||
|
||||
test_cmd = ' '.join([test for test in sorted(test_names)])
|
||||
parallel_cmd = " --parallel {} ".format(num_workers) if num_workers > 0 else ""
|
||||
cmd = "cd {}/tests/integration && ./runner --tmpfs {} -t {} {} '-ss -rfEp --run-id={} --color=no --durations=0 {}' | tee {}".format(
|
||||
repo_path, image_cmd, test_cmd, parallel_cmd, i, _get_deselect_option(self.should_skip_tests()), output_path)
|
||||
cmd = "cd {}/tests/integration && ./runner --tmpfs {} -t {} {} '-rfEp --run-id={} --color=no --durations=0 {}' | tee {}".format(
|
||||
repo_path, image_cmd, test_cmd, parallel_cmd, i, _get_deselect_option(self.should_skip_tests()), info_path)
|
||||
|
||||
log_basename = test_group_str + "_" + str(i) + ".log"
|
||||
log_path = os.path.join(repo_path, "tests/integration", log_basename)
|
||||
with open(log_path, 'w') as log:
|
||||
logging.info("Executing cmd: %s", cmd)
|
||||
retcode = subprocess.Popen(cmd, shell=True, stderr=log, stdout=log).wait()
|
||||
@ -388,15 +413,41 @@ class ClickhouseIntegrationTestsRunner:
|
||||
else:
|
||||
logging.info("Some tests failed")
|
||||
|
||||
if os.path.exists(output_path):
|
||||
lines = parse_test_results_output(output_path)
|
||||
extra_logs_names = [log_basename]
|
||||
log_result_path = os.path.join(str(self.path()), 'integration_run_' + log_basename)
|
||||
shutil.copy(log_path, log_result_path)
|
||||
log_paths.append(log_result_path)
|
||||
|
||||
for pytest_log_path in glob.glob(os.path.join(repo_path, "tests/integration/pytest*.log")):
|
||||
new_name = test_group_str + "_" + str(i) + "_" + os.path.basename(pytest_log_path)
|
||||
os.rename(pytest_log_path, os.path.join(repo_path, "tests/integration", new_name))
|
||||
extra_logs_names.append(new_name)
|
||||
|
||||
dockerd_log_path = os.path.join(repo_path, "tests/integration/dockerd.log")
|
||||
if os.path.exists(dockerd_log_path):
|
||||
new_name = test_group_str + "_" + str(i) + "_" + os.path.basename(dockerd_log_path)
|
||||
os.rename(dockerd_log_path, os.path.join(repo_path, "tests/integration", new_name))
|
||||
extra_logs_names.append(new_name)
|
||||
|
||||
if os.path.exists(info_path):
|
||||
extra_logs_names.append(info_basename)
|
||||
lines = parse_test_results_output(info_path)
|
||||
new_counters = get_counters(lines)
|
||||
times_lines = parse_test_times(output_path)
|
||||
times_lines = parse_test_times(info_path)
|
||||
new_tests_times = get_test_times(times_lines)
|
||||
self._update_counters(counters, new_counters)
|
||||
for test_name, test_time in new_tests_times.items():
|
||||
tests_times[test_name] = test_time
|
||||
os.remove(output_path)
|
||||
|
||||
test_data_dirs_new = self._find_test_data_dirs(repo_path, test_names)
|
||||
test_data_dirs_diff = self._get_test_data_dirs_difference(test_data_dirs_new, test_data_dirs)
|
||||
test_data_dirs = test_data_dirs_new
|
||||
|
||||
if extra_logs_names or test_data_dirs_diff:
|
||||
extras_result_path = os.path.join(str(self.path()), "integration_run_" + test_group_str + "_" + str(i) + ".tar.gz")
|
||||
self._compress_logs(os.path.join(repo_path, "tests/integration"), extra_logs_names + list(test_data_dirs_diff), extras_result_path)
|
||||
log_paths.append(extras_result_path)
|
||||
|
||||
if len(counters["PASSED"]) + len(counters["FLAKY"]) == len(tests_in_group):
|
||||
logging.info("All tests from group %s passed", test_group)
|
||||
break
|
||||
@ -459,15 +510,6 @@ class ClickhouseIntegrationTestsRunner:
|
||||
break
|
||||
time.sleep(5)
|
||||
|
||||
logging.info("Finally all tests done, going to compress test dir")
|
||||
test_logs = os.path.join(str(self.path()), "./test_dir.tar.gz")
|
||||
self._compress_logs("{}/tests/integration".format(repo_path), test_logs)
|
||||
logging.info("Compression finished")
|
||||
|
||||
result_path_dockerd_logs = os.path.join(str(self.path()), "dockerd.log")
|
||||
if os.path.exists(result_path_dockerd_logs):
|
||||
shutil.copy(DOCKERD_LOGS_PATH, result_path_dockerd_logs)
|
||||
|
||||
test_result = []
|
||||
for state in ("ERROR", "FAILED", "PASSED", "SKIPPED", "FLAKY"):
|
||||
if state == "PASSED":
|
||||
@ -479,7 +521,7 @@ class ClickhouseIntegrationTestsRunner:
|
||||
test_result += [(c + ' (✕' + str(final_retry) + ')', text_state, "{:.2f}".format(tests_times[c])) for c in counters[state]]
|
||||
status_text = description_prefix + ', '.join([str(n).lower().replace('failed', 'fail') + ': ' + str(len(c)) for n, c in counters.items()])
|
||||
|
||||
return result_state, status_text, test_result, [test_logs] + logs
|
||||
return result_state, status_text, test_result, logs
|
||||
|
||||
def run_impl(self, repo_path, build_path):
|
||||
if self.flaky_check:
|
||||
@ -539,15 +581,6 @@ class ClickhouseIntegrationTestsRunner:
|
||||
logging.info("Collected more than 20 failed/error tests, stopping")
|
||||
break
|
||||
|
||||
logging.info("Finally all tests done, going to compress test dir")
|
||||
test_logs = os.path.join(str(self.path()), "./test_dir.tar.gz")
|
||||
self._compress_logs("{}/tests/integration".format(repo_path), test_logs)
|
||||
logging.info("Compression finished")
|
||||
|
||||
result_path_dockerd_logs = os.path.join(str(self.path()), "dockerd.log")
|
||||
if os.path.exists(result_path_dockerd_logs):
|
||||
shutil.copy(DOCKERD_LOGS_PATH, result_path_dockerd_logs)
|
||||
|
||||
if counters["FAILED"] or counters["ERROR"]:
|
||||
logging.info("Overall status failure, because we have tests in FAILED or ERROR state")
|
||||
result_state = "failure"
|
||||
@ -580,7 +613,7 @@ class ClickhouseIntegrationTestsRunner:
|
||||
if '(memory)' in self.params['context_name']:
|
||||
result_state = "success"
|
||||
|
||||
return result_state, status_text, test_result, [test_logs]
|
||||
return result_state, status_text, test_result, []
|
||||
|
||||
def write_results(results_file, status_file, results, status):
|
||||
with open(results_file, 'w') as f:
|
||||
|
@ -30,6 +30,7 @@ from kazoo.client import KazooClient
|
||||
from kazoo.exceptions import KazooException
|
||||
from minio import Minio
|
||||
from helpers.test_tools import assert_eq_with_retry
|
||||
from helpers import pytest_xdist_logging_to_separate_files
|
||||
|
||||
import docker
|
||||
|
||||
@ -56,22 +57,22 @@ def run_and_check(args, env=None, shell=False, stdout=subprocess.PIPE, stderr=su
|
||||
subprocess.Popen(args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, env=env, shell=shell)
|
||||
return
|
||||
|
||||
logging.debug(f"Command:{args}")
|
||||
res = subprocess.run(args, stdout=stdout, stderr=stderr, env=env, shell=shell, timeout=timeout)
|
||||
out = res.stdout.decode('utf-8')
|
||||
err = res.stderr.decode('utf-8')
|
||||
if res.returncode != 0:
|
||||
# check_call(...) from subprocess does not print stderr, so we do it manually
|
||||
logging.debug(f"Command:{args}")
|
||||
logging.debug(f"Stderr:{err}")
|
||||
# check_call(...) from subprocess does not print stderr, so we do it manually
|
||||
if out:
|
||||
logging.debug(f"Stdout:{out}")
|
||||
logging.debug(f"Env: {env}")
|
||||
if err:
|
||||
logging.debug(f"Stderr:{err}")
|
||||
if res.returncode != 0:
|
||||
logging.debug(f"Exitcode:{res.returncode}")
|
||||
if env:
|
||||
logging.debug(f"Env:{env}")
|
||||
if not nothrow:
|
||||
raise Exception(f"Command {args} return non-zero code {res.returncode}: {res.stderr.decode('utf-8')}")
|
||||
else:
|
||||
logging.debug(f"Command:{args}")
|
||||
logging.debug(f"Stderr: {err}")
|
||||
logging.debug(f"Stdout: {out}")
|
||||
return out
|
||||
return out
|
||||
|
||||
# Based on https://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#2838309
|
||||
def get_free_port():
|
||||
@ -192,6 +193,7 @@ class ClickHouseCluster:
|
||||
zookeeper_keyfile=None, zookeeper_certfile=None):
|
||||
for param in list(os.environ.keys()):
|
||||
logging.debug("ENV %40s %s" % (param, os.environ[param]))
|
||||
self.base_path = base_path
|
||||
self.base_dir = p.dirname(base_path)
|
||||
self.name = name if name is not None else ''
|
||||
|
||||
@ -1290,6 +1292,9 @@ class ClickHouseCluster:
|
||||
raise Exception("Can't wait Cassandra to start")
|
||||
|
||||
def start(self, destroy_dirs=True):
|
||||
pytest_xdist_logging_to_separate_files.setup()
|
||||
logging.info("Running tests in {}".format(self.base_path))
|
||||
|
||||
logging.debug("Cluster start called. is_up={}, destroy_dirs={}".format(self.is_up, destroy_dirs))
|
||||
if self.is_up:
|
||||
return
|
||||
@ -1771,12 +1776,14 @@ class ClickHouseInstance:
|
||||
# Connects to the instance via clickhouse-client, sends a query (1st argument) and returns the answer
|
||||
def query(self, sql, stdin=None, timeout=None, settings=None, user=None, password=None, database=None,
|
||||
ignore_error=False):
|
||||
logging.debug(f"Executing query {sql} on {self.name}")
|
||||
return self.client.query(sql, stdin=stdin, timeout=timeout, settings=settings, user=user, password=password,
|
||||
database=database, ignore_error=ignore_error)
|
||||
|
||||
def query_with_retry(self, sql, stdin=None, timeout=None, settings=None, user=None, password=None, database=None,
|
||||
ignore_error=False,
|
||||
retry_count=20, sleep_time=0.5, check_callback=lambda x: True):
|
||||
logging.debug(f"Executing query {sql} on {self.name}")
|
||||
result = None
|
||||
for i in range(retry_count):
|
||||
try:
|
||||
@ -1794,23 +1801,27 @@ class ClickHouseInstance:
|
||||
raise Exception("Can't execute query {}".format(sql))
|
||||
|
||||
# As query() but doesn't wait response and returns response handler
|
||||
def get_query_request(self, *args, **kwargs):
|
||||
return self.client.get_query_request(*args, **kwargs)
|
||||
def get_query_request(self, sql, *args, **kwargs):
|
||||
logging.debug(f"Executing query {sql} on {self.name}")
|
||||
return self.client.get_query_request(sql, *args, **kwargs)
|
||||
|
||||
# Connects to the instance via clickhouse-client, sends a query (1st argument), expects an error and return its code
|
||||
def query_and_get_error(self, sql, stdin=None, timeout=None, settings=None, user=None, password=None,
|
||||
database=None):
|
||||
logging.debug(f"Executing query {sql} on {self.name}")
|
||||
return self.client.query_and_get_error(sql, stdin=stdin, timeout=timeout, settings=settings, user=user,
|
||||
password=password, database=database)
|
||||
|
||||
# The same as query_and_get_error but ignores successful query.
|
||||
def query_and_get_answer_with_error(self, sql, stdin=None, timeout=None, settings=None, user=None, password=None,
|
||||
database=None):
|
||||
logging.debug(f"Executing query {sql} on {self.name}")
|
||||
return self.client.query_and_get_answer_with_error(sql, stdin=stdin, timeout=timeout, settings=settings,
|
||||
user=user, password=password, database=database)
|
||||
|
||||
# Connects to the instance via HTTP interface, sends a query and returns the answer
|
||||
def http_query(self, sql, data=None, params=None, user=None, password=None, expect_fail_and_get_error=False):
|
||||
logging.debug(f"Executing query {sql} on {self.name} via HTTP interface")
|
||||
if params is None:
|
||||
params = {}
|
||||
else:
|
||||
@ -1845,11 +1856,13 @@ class ClickHouseInstance:
|
||||
|
||||
# Connects to the instance via HTTP interface, sends a query and returns the answer
|
||||
def http_request(self, url, method='GET', params=None, data=None, headers=None):
|
||||
logging.debug(f"Sending HTTP request {url} to {self.name}")
|
||||
url = "http://" + self.ip_address + ":8123/" + url
|
||||
return requests.request(method=method, url=url, params=params, data=data, headers=headers)
|
||||
|
||||
# Connects to the instance via HTTP interface, sends a query, expects an error and return the error message
|
||||
def http_query_and_get_error(self, sql, data=None, params=None, user=None, password=None):
|
||||
logging.debug(f"Executing query {sql} on {self.name} via HTTP interface")
|
||||
return self.http_query(sql=sql, data=data, params=params, user=user, password=password,
|
||||
expect_fail_and_get_error=True)
|
||||
|
||||
|
@ -0,0 +1,28 @@
|
||||
import logging
|
||||
import os.path
|
||||
|
||||
# Makes the parallel workers of pytest-xdist to log to separate files.
|
||||
# Without this function all workers will log to the same log file
|
||||
# and mix everything together making it much more difficult for troubleshooting.
|
||||
def setup():
|
||||
worker_name = os.environ.get('PYTEST_XDIST_WORKER', 'master')
|
||||
if worker_name == 'master':
|
||||
return
|
||||
logger = logging.getLogger('')
|
||||
new_handlers = []
|
||||
handlers_to_remove = []
|
||||
for handler in logger.handlers:
|
||||
if isinstance(handler, logging.FileHandler):
|
||||
filename, ext = os.path.splitext(handler.baseFilename)
|
||||
if not filename.endswith('-' + worker_name):
|
||||
new_filename = filename + '-' + worker_name
|
||||
new_handler = logging.FileHandler(new_filename + ext)
|
||||
new_handler.setFormatter(handler.formatter)
|
||||
new_handler.setLevel(handler.level)
|
||||
new_handlers.append(new_handler)
|
||||
handlers_to_remove.append(handler)
|
||||
for new_handler in new_handlers:
|
||||
logger.addHandler(new_handler)
|
||||
for handler in handlers_to_remove:
|
||||
handler.flush()
|
||||
logger.removeHandler(handler)
|
@ -4,10 +4,14 @@ norecursedirs = _instances*
|
||||
timeout = 1800
|
||||
junit_duration_report = call
|
||||
junit_suite_name = integration
|
||||
log_cli = 1
|
||||
log_level = DEBUG
|
||||
log_format = %(asctime)s %(levelname)s : %(message)s (%(filename)s:%(lineno)s, %(funcName)s)
|
||||
log_date_format=%Y-%m-%d %H:%M:%S
|
||||
log_cli = true
|
||||
log_cli_level = CRITICAL
|
||||
log_cli_format = %%(asctime)s [%(levelname)8s] %(funcName)s %(message)s (%(filename)s:%(lineno)s)
|
||||
log_cli_format = %(asctime)s %(levelname)s : %(message)s (%(filename)s:%(lineno)s, %(funcName)s)
|
||||
log_cli_date_format=%Y-%m-%d %H:%M:%S
|
||||
log_file = pytest.log
|
||||
log_file_level = DEBUG
|
||||
log_file_format = %(asctime)s [%(levelname)8s] %(funcName)s %(message)s (%(filename)s:%(lineno)s)
|
||||
log_file_date_format=%Y-%m-%d %H:%M:%S
|
||||
log_file_format = %(asctime)s %(levelname)s : %(message)s (%(filename)s:%(lineno)s, %(funcName)s)
|
||||
log_file_date_format = %Y-%m-%d %H:%M:%S
|
||||
|
@ -3,6 +3,7 @@
|
||||
import subprocess
|
||||
import os
|
||||
import getpass
|
||||
import glob
|
||||
import argparse
|
||||
import logging
|
||||
import signal
|
||||
@ -99,7 +100,7 @@ signal.signal(signal.SIGINT, docker_kill_handler_handler)
|
||||
# 2) path of runner script is used to determine paths for trivial case, when we run it from repository
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s : %(message)s (%(filename)s:%(lineno)s, %(funcName)s)')
|
||||
parser = argparse.ArgumentParser(description="ClickHouse integration tests runner")
|
||||
|
||||
parser.add_argument(
|
||||
@ -257,6 +258,9 @@ if __name__ == "__main__":
|
||||
if sys.stdout.isatty() and sys.stdin.isatty():
|
||||
tty = "-it"
|
||||
|
||||
# Remove old logs.
|
||||
for old_log_path in glob.glob(args.cases_dir + "/pytest*.log"):
|
||||
os.remove(old_log_path)
|
||||
|
||||
cmd = "docker run {net} {tty} --rm --name {name} --privileged \
|
||||
--volume={odbc_bridge_bin}:/clickhouse-odbc-bridge --volume={bin}:/clickhouse \
|
||||
|
Loading…
Reference in New Issue
Block a user