From 618a77fafaa52912c37d652bbe3ade7f1d6ac306 Mon Sep 17 00:00:00 2001 From: Vitaly Baranov Date: Thu, 1 Jul 2021 17:41:59 +0300 Subject: [PATCH] Improve logging in integration tests. --- tests/integration/ci-runner.py | 103 ++++++++++++------ tests/integration/helpers/cluster.py | 37 +++++-- .../pytest_xdist_logging_to_separate_files.py | 28 +++++ tests/integration/pytest.ini | 12 +- tests/integration/runner | 6 +- 5 files changed, 134 insertions(+), 52 deletions(-) create mode 100644 tests/integration/helpers/pytest_xdist_logging_to_separate_files.py diff --git a/tests/integration/ci-runner.py b/tests/integration/ci-runner.py index 0af76fe2648..97d076f698e 100755 --- a/tests/integration/ci-runner.py +++ b/tests/integration/ci-runner.py @@ -3,6 +3,7 @@ import logging import subprocess import os +import glob import time import shutil from collections import defaultdict @@ -17,7 +18,6 @@ SLEEP_BETWEEN_RETRIES = 5 PARALLEL_GROUP_SIZE = 100 CLICKHOUSE_BINARY_PATH = "/usr/bin/clickhouse" CLICKHOUSE_ODBC_BRIDGE_BINARY_PATH = "/usr/bin/clickhouse-odbc-bridge" -DOCKERD_LOGS_PATH = "/ClickHouse/tests/integration/dockerd.log" CLICKHOUSE_LIBRARY_BRIDGE_BINARY_PATH = "/usr/bin/clickhouse-library-bridge" TRIES_COUNT = 10 @@ -256,8 +256,8 @@ class ClickhouseIntegrationTestsRunner: shutil.copy(CLICKHOUSE_LIBRARY_BRIDGE_BINARY_PATH, result_path_library_bridge) return None, None - def _compress_logs(self, path, result_path): - subprocess.check_call("tar czf {} -C {} .".format(result_path, path), shell=True) # STYLE_CHECK_ALLOW_SUBPROCESS_CHECK_CALL + def _compress_logs(self, dir, relpaths, result_path): + subprocess.check_call("tar czf {} -C {} {}".format(result_path, dir, ' '.join(relpaths)), shell=True) # STYLE_CHECK_ALLOW_SUBPROCESS_CHECK_CALL def _get_all_tests(self, repo_path): image_cmd = self._get_runner_image_cmd(repo_path) @@ -336,6 +336,27 @@ class ClickhouseIntegrationTestsRunner: logging.info("Cannot run with custom docker image version :(") return image_cmd + def _find_test_data_dirs(self, repo_path, test_names): + relpaths = {} + for test_name in test_names: + if '/' in test_name: + test_dir = test_name[:test_name.find('/')] + else: + test_dir = test_name + if os.path.isdir(os.path.join(repo_path, "tests/integration", test_dir)): + for name in os.listdir(os.path.join(repo_path, "tests/integration", test_dir)): + relpath = os.path.join(os.path.join(test_dir, name)) + mtime = os.path.getmtime(os.path.join(repo_path, "tests/integration", relpath)) + relpaths[relpath] = mtime + return relpaths + + def _get_test_data_dirs_difference(self, new_snapshot, old_snapshot): + res = set() + for path in new_snapshot: + if (not path in old_snapshot) or (old_snapshot[path] != new_snapshot[path]): + res.add(path) + return res + def run_test_group(self, repo_path, test_group, tests_in_group, num_tries, num_workers): counters = { "ERROR": [], @@ -355,18 +376,14 @@ class ClickhouseIntegrationTestsRunner: image_cmd = self._get_runner_image_cmd(repo_path) test_group_str = test_group.replace('/', '_').replace('.', '_') + log_paths = [] + test_data_dirs = {} for i in range(num_tries): logging.info("Running test group %s for the %s retry", test_group, i) clear_ip_tables_and_restart_daemons() - output_path = os.path.join(str(self.path()), "test_output_" + test_group_str + "_" + str(i) + ".log") - log_name = "integration_run_" + test_group_str + "_" + str(i) + ".txt" - log_path = os.path.join(str(self.path()), log_name) - log_paths.append(log_path) - logging.info("Will wait output inside %s", output_path) - test_names = set([]) for test_name in tests_in_group: if test_name not in counters["PASSED"]: @@ -375,11 +392,19 @@ class ClickhouseIntegrationTestsRunner: else: test_names.add(test_name) + if i == 0: + test_data_dirs = self._find_test_data_dirs(repo_path, test_names) + + info_basename = test_group_str + "_" + str(i) + ".nfo" + info_path = os.path.join(repo_path, "tests/integration", info_basename) + test_cmd = ' '.join([test for test in sorted(test_names)]) parallel_cmd = " --parallel {} ".format(num_workers) if num_workers > 0 else "" - cmd = "cd {}/tests/integration && ./runner --tmpfs {} -t {} {} '-ss -rfEp --run-id={} --color=no --durations=0 {}' | tee {}".format( - repo_path, image_cmd, test_cmd, parallel_cmd, i, _get_deselect_option(self.should_skip_tests()), output_path) + cmd = "cd {}/tests/integration && ./runner --tmpfs {} -t {} {} '-rfEp --run-id={} --color=no --durations=0 {}' | tee {}".format( + repo_path, image_cmd, test_cmd, parallel_cmd, i, _get_deselect_option(self.should_skip_tests()), info_path) + log_basename = test_group_str + "_" + str(i) + ".log" + log_path = os.path.join(repo_path, "tests/integration", log_basename) with open(log_path, 'w') as log: logging.info("Executing cmd: %s", cmd) retcode = subprocess.Popen(cmd, shell=True, stderr=log, stdout=log).wait() @@ -388,15 +413,41 @@ class ClickhouseIntegrationTestsRunner: else: logging.info("Some tests failed") - if os.path.exists(output_path): - lines = parse_test_results_output(output_path) + extra_logs_names = [log_basename] + log_result_path = os.path.join(str(self.path()), 'integration_run_' + log_basename) + shutil.copy(log_path, log_result_path) + log_paths.append(log_result_path) + + for pytest_log_path in glob.glob(os.path.join(repo_path, "tests/integration/pytest*.log")): + new_name = test_group_str + "_" + str(i) + "_" + os.path.basename(pytest_log_path) + os.rename(pytest_log_path, os.path.join(repo_path, "tests/integration", new_name)) + extra_logs_names.append(new_name) + + dockerd_log_path = os.path.join(repo_path, "tests/integration/dockerd.log") + if os.path.exists(dockerd_log_path): + new_name = test_group_str + "_" + str(i) + "_" + os.path.basename(dockerd_log_path) + os.rename(dockerd_log_path, os.path.join(repo_path, "tests/integration", new_name)) + extra_logs_names.append(new_name) + + if os.path.exists(info_path): + extra_logs_names.append(info_basename) + lines = parse_test_results_output(info_path) new_counters = get_counters(lines) - times_lines = parse_test_times(output_path) + times_lines = parse_test_times(info_path) new_tests_times = get_test_times(times_lines) self._update_counters(counters, new_counters) for test_name, test_time in new_tests_times.items(): tests_times[test_name] = test_time - os.remove(output_path) + + test_data_dirs_new = self._find_test_data_dirs(repo_path, test_names) + test_data_dirs_diff = self._get_test_data_dirs_difference(test_data_dirs_new, test_data_dirs) + test_data_dirs = test_data_dirs_new + + if extra_logs_names or test_data_dirs_diff: + extras_result_path = os.path.join(str(self.path()), "integration_run_" + test_group_str + "_" + str(i) + ".tar.gz") + self._compress_logs(os.path.join(repo_path, "tests/integration"), extra_logs_names + list(test_data_dirs_diff), extras_result_path) + log_paths.append(extras_result_path) + if len(counters["PASSED"]) + len(counters["FLAKY"]) == len(tests_in_group): logging.info("All tests from group %s passed", test_group) break @@ -459,15 +510,6 @@ class ClickhouseIntegrationTestsRunner: break time.sleep(5) - logging.info("Finally all tests done, going to compress test dir") - test_logs = os.path.join(str(self.path()), "./test_dir.tar.gz") - self._compress_logs("{}/tests/integration".format(repo_path), test_logs) - logging.info("Compression finished") - - result_path_dockerd_logs = os.path.join(str(self.path()), "dockerd.log") - if os.path.exists(result_path_dockerd_logs): - shutil.copy(DOCKERD_LOGS_PATH, result_path_dockerd_logs) - test_result = [] for state in ("ERROR", "FAILED", "PASSED", "SKIPPED", "FLAKY"): if state == "PASSED": @@ -479,7 +521,7 @@ class ClickhouseIntegrationTestsRunner: test_result += [(c + ' (✕' + str(final_retry) + ')', text_state, "{:.2f}".format(tests_times[c])) for c in counters[state]] status_text = description_prefix + ', '.join([str(n).lower().replace('failed', 'fail') + ': ' + str(len(c)) for n, c in counters.items()]) - return result_state, status_text, test_result, [test_logs] + logs + return result_state, status_text, test_result, logs def run_impl(self, repo_path, build_path): if self.flaky_check: @@ -539,15 +581,6 @@ class ClickhouseIntegrationTestsRunner: logging.info("Collected more than 20 failed/error tests, stopping") break - logging.info("Finally all tests done, going to compress test dir") - test_logs = os.path.join(str(self.path()), "./test_dir.tar.gz") - self._compress_logs("{}/tests/integration".format(repo_path), test_logs) - logging.info("Compression finished") - - result_path_dockerd_logs = os.path.join(str(self.path()), "dockerd.log") - if os.path.exists(result_path_dockerd_logs): - shutil.copy(DOCKERD_LOGS_PATH, result_path_dockerd_logs) - if counters["FAILED"] or counters["ERROR"]: logging.info("Overall status failure, because we have tests in FAILED or ERROR state") result_state = "failure" @@ -580,7 +613,7 @@ class ClickhouseIntegrationTestsRunner: if '(memory)' in self.params['context_name']: result_state = "success" - return result_state, status_text, test_result, [test_logs] + return result_state, status_text, test_result, [] def write_results(results_file, status_file, results, status): with open(results_file, 'w') as f: diff --git a/tests/integration/helpers/cluster.py b/tests/integration/helpers/cluster.py index 8863492dd12..1db9e07a69e 100644 --- a/tests/integration/helpers/cluster.py +++ b/tests/integration/helpers/cluster.py @@ -30,6 +30,7 @@ from kazoo.client import KazooClient from kazoo.exceptions import KazooException from minio import Minio from helpers.test_tools import assert_eq_with_retry +from helpers import pytest_xdist_logging_to_separate_files import docker @@ -56,22 +57,22 @@ def run_and_check(args, env=None, shell=False, stdout=subprocess.PIPE, stderr=su subprocess.Popen(args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, env=env, shell=shell) return + logging.debug(f"Command:{args}") res = subprocess.run(args, stdout=stdout, stderr=stderr, env=env, shell=shell, timeout=timeout) out = res.stdout.decode('utf-8') err = res.stderr.decode('utf-8') - if res.returncode != 0: - # check_call(...) from subprocess does not print stderr, so we do it manually - logging.debug(f"Command:{args}") - logging.debug(f"Stderr:{err}") + # check_call(...) from subprocess does not print stderr, so we do it manually + if out: logging.debug(f"Stdout:{out}") - logging.debug(f"Env: {env}") + if err: + logging.debug(f"Stderr:{err}") + if res.returncode != 0: + logging.debug(f"Exitcode:{res.returncode}") + if env: + logging.debug(f"Env:{env}") if not nothrow: raise Exception(f"Command {args} return non-zero code {res.returncode}: {res.stderr.decode('utf-8')}") - else: - logging.debug(f"Command:{args}") - logging.debug(f"Stderr: {err}") - logging.debug(f"Stdout: {out}") - return out + return out # Based on https://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#2838309 def get_free_port(): @@ -192,6 +193,7 @@ class ClickHouseCluster: zookeeper_keyfile=None, zookeeper_certfile=None): for param in list(os.environ.keys()): logging.debug("ENV %40s %s" % (param, os.environ[param])) + self.base_path = base_path self.base_dir = p.dirname(base_path) self.name = name if name is not None else '' @@ -1290,6 +1292,9 @@ class ClickHouseCluster: raise Exception("Can't wait Cassandra to start") def start(self, destroy_dirs=True): + pytest_xdist_logging_to_separate_files.setup() + logging.info("Running tests in {}".format(self.base_path)) + logging.debug("Cluster start called. is_up={}, destroy_dirs={}".format(self.is_up, destroy_dirs)) if self.is_up: return @@ -1771,12 +1776,14 @@ class ClickHouseInstance: # Connects to the instance via clickhouse-client, sends a query (1st argument) and returns the answer def query(self, sql, stdin=None, timeout=None, settings=None, user=None, password=None, database=None, ignore_error=False): + logging.debug(f"Executing query {sql} on {self.name}") return self.client.query(sql, stdin=stdin, timeout=timeout, settings=settings, user=user, password=password, database=database, ignore_error=ignore_error) def query_with_retry(self, sql, stdin=None, timeout=None, settings=None, user=None, password=None, database=None, ignore_error=False, retry_count=20, sleep_time=0.5, check_callback=lambda x: True): + logging.debug(f"Executing query {sql} on {self.name}") result = None for i in range(retry_count): try: @@ -1794,23 +1801,27 @@ class ClickHouseInstance: raise Exception("Can't execute query {}".format(sql)) # As query() but doesn't wait response and returns response handler - def get_query_request(self, *args, **kwargs): - return self.client.get_query_request(*args, **kwargs) + def get_query_request(self, sql, *args, **kwargs): + logging.debug(f"Executing query {sql} on {self.name}") + return self.client.get_query_request(sql, *args, **kwargs) # Connects to the instance via clickhouse-client, sends a query (1st argument), expects an error and return its code def query_and_get_error(self, sql, stdin=None, timeout=None, settings=None, user=None, password=None, database=None): + logging.debug(f"Executing query {sql} on {self.name}") return self.client.query_and_get_error(sql, stdin=stdin, timeout=timeout, settings=settings, user=user, password=password, database=database) # The same as query_and_get_error but ignores successful query. def query_and_get_answer_with_error(self, sql, stdin=None, timeout=None, settings=None, user=None, password=None, database=None): + logging.debug(f"Executing query {sql} on {self.name}") return self.client.query_and_get_answer_with_error(sql, stdin=stdin, timeout=timeout, settings=settings, user=user, password=password, database=database) # Connects to the instance via HTTP interface, sends a query and returns the answer def http_query(self, sql, data=None, params=None, user=None, password=None, expect_fail_and_get_error=False): + logging.debug(f"Executing query {sql} on {self.name} via HTTP interface") if params is None: params = {} else: @@ -1845,11 +1856,13 @@ class ClickHouseInstance: # Connects to the instance via HTTP interface, sends a query and returns the answer def http_request(self, url, method='GET', params=None, data=None, headers=None): + logging.debug(f"Sending HTTP request {url} to {self.name}") url = "http://" + self.ip_address + ":8123/" + url return requests.request(method=method, url=url, params=params, data=data, headers=headers) # Connects to the instance via HTTP interface, sends a query, expects an error and return the error message def http_query_and_get_error(self, sql, data=None, params=None, user=None, password=None): + logging.debug(f"Executing query {sql} on {self.name} via HTTP interface") return self.http_query(sql=sql, data=data, params=params, user=user, password=password, expect_fail_and_get_error=True) diff --git a/tests/integration/helpers/pytest_xdist_logging_to_separate_files.py b/tests/integration/helpers/pytest_xdist_logging_to_separate_files.py new file mode 100644 index 00000000000..ee9a52e042c --- /dev/null +++ b/tests/integration/helpers/pytest_xdist_logging_to_separate_files.py @@ -0,0 +1,28 @@ +import logging +import os.path + +# Makes the parallel workers of pytest-xdist to log to separate files. +# Without this function all workers will log to the same log file +# and mix everything together making it much more difficult for troubleshooting. +def setup(): + worker_name = os.environ.get('PYTEST_XDIST_WORKER', 'master') + if worker_name == 'master': + return + logger = logging.getLogger('') + new_handlers = [] + handlers_to_remove = [] + for handler in logger.handlers: + if isinstance(handler, logging.FileHandler): + filename, ext = os.path.splitext(handler.baseFilename) + if not filename.endswith('-' + worker_name): + new_filename = filename + '-' + worker_name + new_handler = logging.FileHandler(new_filename + ext) + new_handler.setFormatter(handler.formatter) + new_handler.setLevel(handler.level) + new_handlers.append(new_handler) + handlers_to_remove.append(handler) + for new_handler in new_handlers: + logger.addHandler(new_handler) + for handler in handlers_to_remove: + handler.flush() + logger.removeHandler(handler) diff --git a/tests/integration/pytest.ini b/tests/integration/pytest.ini index 6d451adf7eb..9a6ddd1be4b 100644 --- a/tests/integration/pytest.ini +++ b/tests/integration/pytest.ini @@ -4,10 +4,14 @@ norecursedirs = _instances* timeout = 1800 junit_duration_report = call junit_suite_name = integration -log_cli = 1 +log_level = DEBUG +log_format = %(asctime)s %(levelname)s : %(message)s (%(filename)s:%(lineno)s, %(funcName)s) +log_date_format=%Y-%m-%d %H:%M:%S +log_cli = true log_cli_level = CRITICAL -log_cli_format = %%(asctime)s [%(levelname)8s] %(funcName)s %(message)s (%(filename)s:%(lineno)s) +log_cli_format = %(asctime)s %(levelname)s : %(message)s (%(filename)s:%(lineno)s, %(funcName)s) +log_cli_date_format=%Y-%m-%d %H:%M:%S log_file = pytest.log log_file_level = DEBUG -log_file_format = %(asctime)s [%(levelname)8s] %(funcName)s %(message)s (%(filename)s:%(lineno)s) -log_file_date_format=%Y-%m-%d %H:%M:%S +log_file_format = %(asctime)s %(levelname)s : %(message)s (%(filename)s:%(lineno)s, %(funcName)s) +log_file_date_format = %Y-%m-%d %H:%M:%S diff --git a/tests/integration/runner b/tests/integration/runner index 160c4a23652..1bef8f60db9 100755 --- a/tests/integration/runner +++ b/tests/integration/runner @@ -3,6 +3,7 @@ import subprocess import os import getpass +import glob import argparse import logging import signal @@ -99,7 +100,7 @@ signal.signal(signal.SIGINT, docker_kill_handler_handler) # 2) path of runner script is used to determine paths for trivial case, when we run it from repository if __name__ == "__main__": - logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s') + logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s : %(message)s (%(filename)s:%(lineno)s, %(funcName)s)') parser = argparse.ArgumentParser(description="ClickHouse integration tests runner") parser.add_argument( @@ -257,6 +258,9 @@ if __name__ == "__main__": if sys.stdout.isatty() and sys.stdin.isatty(): tty = "-it" + # Remove old logs. + for old_log_path in glob.glob(args.cases_dir + "/pytest*.log"): + os.remove(old_log_path) cmd = "docker run {net} {tty} --rm --name {name} --privileged \ --volume={odbc_bridge_bin}:/clickhouse-odbc-bridge --volume={bin}:/clickhouse \