diff --git a/dbms/tests/clickhouse-test b/dbms/tests/clickhouse-test index 27a71f9949c..9a29dce0a76 100755 --- a/dbms/tests/clickhouse-test +++ b/dbms/tests/clickhouse-test @@ -1,9 +1,12 @@ #!/usr/bin/env python from __future__ import print_function -import sys + +import atexit import os import os.path import re +import signal +import sys from argparse import ArgumentParser from argparse import FileType @@ -17,12 +20,16 @@ from subprocess import CalledProcessError from datetime import datetime from time import sleep from errno import ESRCH + try: import termcolor except ImportError: termcolor = None + from random import random import commands +import concurrent.futures +import functools import multiprocessing from contextlib import closing @@ -137,10 +144,8 @@ def colored(text, args, color=None, on_color=None, attrs=None): SERVER_DIED = False exit_code = 0 - # def run_tests_array(all_tests, suite, suite_dir, suite_tmp_dir, run_total): -def run_tests_array(all_tests_with_params): - all_tests, suite, suite_dir, suite_tmp_dir, run_total = all_tests_with_params +def run_tests_array(tests, suite, suite_dir, suite_tmp_dir, run_total): global exit_code global SERVER_DIED @@ -164,10 +169,10 @@ def run_tests_array(all_tests_with_params): if args.print_time: print(" {0:.2f} sec.".format(test_time), end='') - if len(all_tests): - print("\nRunning {} {} tests.".format(len(all_tests), suite) + "\n") + if len(tests): + print("\nRunning {} {} tests.".format(len(tests), suite) + "\n") - for case in all_tests: + for case in tests: if SERVER_DIED: break @@ -282,9 +287,10 @@ def run_tests_array(all_tests_with_params): os.remove(stdout_file) if os.path.exists(stderr_file): os.remove(stderr_file) - except KeyboardInterrupt as e: + except KeyboardInterrupt: print(colored("Break tests execution", args, "red")) - raise e + SERVER_DIED = 1 + break except: import traceback exc_type, exc_value, tb = sys.exc_info() @@ -306,6 +312,43 @@ def run_tests_array(all_tests_with_params): server_logs_level = "warning" +def atexit_kill_process_group(): + # Ignore the terminate signal in this process, so that the exit code is + # preserved. Don't use SIG_IGN because is broken: + # https://bugs.python.org/issue23395 + def ignore_signal(signal, frame): + pass + signal.signal(signal.SIGTERM, ignore_signal) + os.killpg(0, signal.SIGTERM) + +# SIG_DFL is broken: https://bugs.python.org/issue23395 +original_signal_handlers = {} +def signal_kill_process_group(signal_number, frame): + handler = signal.signal(signal_number, original_signal_handlers[signal_number]) + os.killpg(0, signal_number) + # Restore the handler back in case the signal was not deadly. + signal.signal(signal_number, handler) + +def register_killgroup_callbacks(): + atexit.register(atexit_kill_process_group) + original_signal_handlers[signal.SIGHUP] = signal.signal(signal.SIGHUP, signal_kill_process_group) + original_signal_handlers[signal.SIGINT] = signal.signal(signal.SIGINT, signal_kill_process_group) + original_signal_handlers[signal.SIGTERM] = signal.signal(signal.SIGTERM, signal_kill_process_group) + +def unregister_killgroup_callbacks(): + # Subprocesses will inherit the atexit callback and the signal handlers, so + # we have to unregister them there. + # GODHELPME atexit.unregister is somehow unavailable in my python 3.6.9 so + # the subprocess fails silently. Commented out for now. + # atexit.unregister(atexit_kill_process_group) + signal.signal(signal.SIGHUP, original_signal_handlers[signal.SIGHUP]) + signal.signal(signal.SIGINT, original_signal_handlers[signal.SIGINT]) + signal.signal(signal.SIGTERM, original_signal_handlers[signal.SIGTERM]) + +def run_tests_array_subprocess(params): + unregister_killgroup_callbacks() + run_tests_array(**params) + def main(args): global SERVER_DIED global exit_code @@ -428,9 +471,7 @@ def main(args): all_tests = [t for t in all_tests if any([re.search(r, t) for r in args.test])] all_tests.sort(key=key_func) - run_n, run_total = args.parallel.split('/') - run_n = float(run_n) - run_total = float(run_total) + run_n, run_total = [int(x) for x in args.parallel.split('/')] tests_n = len(all_tests) if run_total > tests_n: run_total = tests_n @@ -443,18 +484,31 @@ def main(args): if jobs > run_total: run_total = jobs + tests_per_group = tests_n / run_total + all_tests_array = [] - for n in range(1, 1 + int(run_total)): - start = int(tests_n / run_total * (n - 1)) - end = int(tests_n / run_total * n) - all_tests_array.append([all_tests[start : end], suite, suite_dir, suite_tmp_dir, run_total]) + for n in range(0, run_total): + start = tests_per_group * n + end = tests_per_group * (n + 1) + if n == run_total - 1: + end = len(all_tests) + + all_tests_array.append(dict( + tests=all_tests[start : end], + suite=suite, + suite_dir=suite_dir, + suite_tmp_dir=suite_tmp_dir, + run_total=run_total)) + + assert(functools.reduce(lambda x, y: x + len(y['tests']), all_tests_array, 0) == len(all_tests)) if jobs > 1: - with closing(multiprocessing.Pool(processes=jobs)) as pool: - pool.map(run_tests_array, all_tests_array) - pool.terminate() + # Handling of KeyboardInterrupt in multiprocessing.Pool + # is broken: https://bugs.python.org/msg221549 + with concurrent.futures.ProcessPoolExecutor() as pool: + pool.map(run_tests_array_subprocess, all_tests_array) else: - run_tests_array(all_tests_array[int(run_n)-1]) + run_tests_array(**all_tests_array[int(run_n)-1]) total_tests_run += tests_n @@ -521,6 +575,11 @@ def get_additional_client_options_url(args): if __name__ == '__main__': + # Move to a new process group and kill it at exit so that we don't have any + # infinite tests processes left. + os.setpgid(0, 0) + register_killgroup_callbacks() + parser=ArgumentParser(description='ClickHouse functional tests') parser.add_argument('-q', '--queries', help='Path to queries dir') parser.add_argument('--tmp', help='Path to tmp dir')