Some fixes for interactive clickhouse-test

* Don't hang indefinitely on Ctrl+C (still broken but somewhat better)

* Move to a separate process group and kill it at exit, so that we don't
leave endless test processes.
This commit is contained in:
Alexander Kuzmenkov 2020-03-26 08:31:16 +03:00
parent d5ada06d7f
commit 573983d407

View File

@ -1,9 +1,12 @@
#!/usr/bin/env python
from __future__ import print_function
import sys
import atexit
import os
import os.path
import re
import signal
import sys
from argparse import ArgumentParser
from argparse import FileType
@ -17,12 +20,16 @@ from subprocess import CalledProcessError
from datetime import datetime
from time import sleep
from errno import ESRCH
try:
import termcolor
except ImportError:
termcolor = None
from random import random
import commands
import concurrent.futures
import functools
import multiprocessing
from contextlib import closing
@ -137,10 +144,8 @@ def colored(text, args, color=None, on_color=None, attrs=None):
SERVER_DIED = False
exit_code = 0
# def run_tests_array(all_tests, suite, suite_dir, suite_tmp_dir, run_total):
def run_tests_array(all_tests_with_params):
all_tests, suite, suite_dir, suite_tmp_dir, run_total = all_tests_with_params
def run_tests_array(tests, suite, suite_dir, suite_tmp_dir, run_total):
global exit_code
global SERVER_DIED
@ -164,10 +169,10 @@ def run_tests_array(all_tests_with_params):
if args.print_time:
print(" {0:.2f} sec.".format(test_time), end='')
if len(all_tests):
print("\nRunning {} {} tests.".format(len(all_tests), suite) + "\n")
if len(tests):
print("\nRunning {} {} tests.".format(len(tests), suite) + "\n")
for case in all_tests:
for case in tests:
if SERVER_DIED:
break
@ -282,9 +287,10 @@ def run_tests_array(all_tests_with_params):
os.remove(stdout_file)
if os.path.exists(stderr_file):
os.remove(stderr_file)
except KeyboardInterrupt as e:
except KeyboardInterrupt:
print(colored("Break tests execution", args, "red"))
raise e
SERVER_DIED = 1
break
except:
import traceback
exc_type, exc_value, tb = sys.exc_info()
@ -306,6 +312,43 @@ def run_tests_array(all_tests_with_params):
server_logs_level = "warning"
def atexit_kill_process_group():
# Ignore the terminate signal in this process, so that the exit code is
# preserved. Don't use SIG_IGN because is broken:
# https://bugs.python.org/issue23395
def ignore_signal(signal, frame):
pass
signal.signal(signal.SIGTERM, ignore_signal)
os.killpg(0, signal.SIGTERM)
# SIG_DFL is broken: https://bugs.python.org/issue23395
original_signal_handlers = {}
def signal_kill_process_group(signal_number, frame):
handler = signal.signal(signal_number, original_signal_handlers[signal_number])
os.killpg(0, signal_number)
# Restore the handler back in case the signal was not deadly.
signal.signal(signal_number, handler)
def register_killgroup_callbacks():
atexit.register(atexit_kill_process_group)
original_signal_handlers[signal.SIGHUP] = signal.signal(signal.SIGHUP, signal_kill_process_group)
original_signal_handlers[signal.SIGINT] = signal.signal(signal.SIGINT, signal_kill_process_group)
original_signal_handlers[signal.SIGTERM] = signal.signal(signal.SIGTERM, signal_kill_process_group)
def unregister_killgroup_callbacks():
# Subprocesses will inherit the atexit callback and the signal handlers, so
# we have to unregister them there.
# GODHELPME atexit.unregister is somehow unavailable in my python 3.6.9 so
# the subprocess fails silently. Commented out for now.
# atexit.unregister(atexit_kill_process_group)
signal.signal(signal.SIGHUP, original_signal_handlers[signal.SIGHUP])
signal.signal(signal.SIGINT, original_signal_handlers[signal.SIGINT])
signal.signal(signal.SIGTERM, original_signal_handlers[signal.SIGTERM])
def run_tests_array_subprocess(params):
unregister_killgroup_callbacks()
run_tests_array(**params)
def main(args):
global SERVER_DIED
global exit_code
@ -428,9 +471,7 @@ def main(args):
all_tests = [t for t in all_tests if any([re.search(r, t) for r in args.test])]
all_tests.sort(key=key_func)
run_n, run_total = args.parallel.split('/')
run_n = float(run_n)
run_total = float(run_total)
run_n, run_total = [int(x) for x in args.parallel.split('/')]
tests_n = len(all_tests)
if run_total > tests_n:
run_total = tests_n
@ -443,18 +484,31 @@ def main(args):
if jobs > run_total:
run_total = jobs
tests_per_group = tests_n / run_total
all_tests_array = []
for n in range(1, 1 + int(run_total)):
start = int(tests_n / run_total * (n - 1))
end = int(tests_n / run_total * n)
all_tests_array.append([all_tests[start : end], suite, suite_dir, suite_tmp_dir, run_total])
for n in range(0, run_total):
start = tests_per_group * n
end = tests_per_group * (n + 1)
if n == run_total - 1:
end = len(all_tests)
all_tests_array.append(dict(
tests=all_tests[start : end],
suite=suite,
suite_dir=suite_dir,
suite_tmp_dir=suite_tmp_dir,
run_total=run_total))
assert(functools.reduce(lambda x, y: x + len(y['tests']), all_tests_array, 0) == len(all_tests))
if jobs > 1:
with closing(multiprocessing.Pool(processes=jobs)) as pool:
pool.map(run_tests_array, all_tests_array)
pool.terminate()
# Handling of KeyboardInterrupt in multiprocessing.Pool
# is broken: https://bugs.python.org/msg221549
with concurrent.futures.ProcessPoolExecutor() as pool:
pool.map(run_tests_array_subprocess, all_tests_array)
else:
run_tests_array(all_tests_array[int(run_n)-1])
run_tests_array(**all_tests_array[int(run_n)-1])
total_tests_run += tests_n
@ -521,6 +575,11 @@ def get_additional_client_options_url(args):
if __name__ == '__main__':
# Move to a new process group and kill it at exit so that we don't have any
# infinite tests processes left.
os.setpgid(0, 0)
register_killgroup_callbacks()
parser=ArgumentParser(description='ClickHouse functional tests')
parser.add_argument('-q', '--queries', help='Path to queries dir')
parser.add_argument('--tmp', help='Path to tmp dir')