Merge pull request #67737 from azat/tests-processes-leftovers

Smart handling of processes leftovers in tests
This commit is contained in:
Alexey Milovidov 2024-08-09 05:21:12 +00:00 committed by GitHub
commit 917920c59e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 186 additions and 83 deletions

View File

@ -232,15 +232,26 @@ function run_tests()
set +e
TEST_ARGS=(
-j 2
--testname
--shard
--zookeeper
--check-zookeeper-session
--no-stateless
--hung-check
--print-time
--capture-client-stacktrace
"${ADDITIONAL_OPTIONS[@]}"
"$SKIP_TESTS_OPTION"
)
if [[ -n "$USE_PARALLEL_REPLICAS" ]] && [[ "$USE_PARALLEL_REPLICAS" -eq 1 ]]; then
clickhouse-test --client="clickhouse-client --allow_experimental_parallel_reading_from_replicas=1 --parallel_replicas_for_non_replicated_merge_tree=1 \
--max_parallel_replicas=100 --cluster_for_parallel_replicas='parallel_replicas'" \
-j 2 --testname --shard --zookeeper --check-zookeeper-session --no-stateless --no-parallel-replicas --hung-check --print-time "${ADDITIONAL_OPTIONS[@]}" \
"$SKIP_TESTS_OPTION" 2>&1 | ts '%Y-%m-%d %H:%M:%S' | tee test_output/test_result.txt
else
clickhouse-test -j 2 --testname --shard --zookeeper --check-zookeeper-session --no-stateless --hung-check --print-time "${ADDITIONAL_OPTIONS[@]}" \
"$SKIP_TESTS_OPTION" 2>&1 | ts '%Y-%m-%d %H:%M:%S' | tee test_output/test_result.txt
TEST_ARGS+=(
--client="clickhouse-client --allow_experimental_parallel_reading_from_replicas=1 --parallel_replicas_for_non_replicated_merge_tree=1 --max_parallel_replicas=100 --cluster_for_parallel_replicas='parallel_replicas'"
--no-parallel-replicas
)
fi
clickhouse-test "${TEST_ARGS[@]}" 2>&1 | ts '%Y-%m-%d %H:%M:%S' | tee test_output/test_result.txt
set -e
}

View File

@ -264,11 +264,22 @@ function run_tests()
TIMEOUT=$((MAX_RUN_TIME - 800 > 8400 ? 8400 : MAX_RUN_TIME - 800))
START_TIME=${SECONDS}
set +e
timeout --preserve-status --signal TERM --kill-after 60m ${TIMEOUT}s \
clickhouse-test --testname --shard --zookeeper --check-zookeeper-session --hung-check --print-time \
--no-drop-if-fail --test-runs "$NUM_TRIES" "${ADDITIONAL_OPTIONS[@]}" 2>&1 \
| ts '%Y-%m-%d %H:%M:%S' \
| tee -a test_output/test_result.txt
TEST_ARGS=(
--testname
--shard
--zookeeper
--check-zookeeper-session
--hung-check
--print-time
--no-drop-if-fail
--capture-client-stacktrace
--test-runs "$NUM_TRIES"
"${ADDITIONAL_OPTIONS[@]}"
)
timeout --preserve-status --signal TERM --kill-after 60m ${TIMEOUT}s clickhouse-test "${TEST_ARGS[@]}" 2>&1 \
| ts '%Y-%m-%d %H:%M:%S' \
| tee -a test_output/test_result.txt
set -e
DURATION=$((SECONDS - START_TIME))

View File

@ -629,6 +629,7 @@ void HandledSignals::setupTerminateHandler()
void HandledSignals::setupCommonDeadlySignalHandlers()
{
/// SIGTSTP is added for debugging purposes. To output a stack trace of any running thread at anytime.
/// NOTE: that it is also used by clickhouse-test wrapper
addSignalHandler({SIGABRT, SIGSEGV, SIGILL, SIGBUS, SIGSYS, SIGFPE, SIGPIPE, SIGTSTP, SIGTRAP}, signalHandler, true);
#if defined(SANITIZER)

View File

@ -267,7 +267,7 @@ def clickhouse_execute_http(
max_http_retries=5,
retry_error_codes=False,
):
if args.secure:
if base_args.secure:
client = http.client.HTTPSConnection(
host=base_args.tcp_host, port=base_args.http_port, timeout=timeout
)
@ -358,14 +358,89 @@ def clickhouse_execute_json(
return rows
# Should we capture client's stacktraces via SIGTSTP
CAPTURE_CLIENT_STACKTRACE = False
def kill_process_group(pgid):
print(f"Killing process group {pgid}")
print(f"Processes in process group {pgid}:")
print(
subprocess.check_output(
f"pgrep --pgroup {pgid} -a", shell=True, stderr=subprocess.STDOUT
).decode("utf-8"),
end="",
)
try:
if CAPTURE_CLIENT_STACKTRACE:
# Let's try to dump stacktrace in client (useful to catch issues there)
os.killpg(pgid, signal.SIGTSTP)
# Wait some time for clickhouse utilities to gather stacktrace
if RELEASE_NON_SANITIZED:
sleep(0.5)
else:
sleep(10)
# NOTE: this still may leave some processes, that had been
# created by timeout(1), since it also creates new process
# group. But this should not be a problem with default
# options, since the default time for each test is 10min,
# and this is way more bigger then the timeout for each
# timeout(1) invocation.
#
# But as a workaround we are sending SIGTERM first, and
# only after SIGKILL, that way timeout(1) will have an
# ability to terminate childrens (though not always since
# signals are asynchronous).
os.killpg(pgid, signal.SIGTERM)
# We need minimal delay to let processes handle SIGTERM - 0.1 (this may
# not be enough, but at least something)
sleep(0.1)
os.killpg(pgid, signal.SIGKILL)
except OSError as e:
if e.errno == ESRCH:
print(f"Got ESRCH while killing {pgid}. Ignoring.")
else:
raise
print(f"Process group {pgid} should be killed")
def cleanup_child_processes(pid):
pgid = os.getpgid(os.getpid())
print(f"Child processes of {pid}:")
print(
subprocess.check_output(
f"pgrep --parent {pid} -a", shell=True, stderr=subprocess.STDOUT
).decode("utf-8"),
end="",
)
# Due to start_new_session=True, it is not enough to kill by PGID, we need
# to look at children processes as well.
# But we are hoping that nobody creates session in the tests (though it is
# possible via timeout(), but we are assuming that they will be killed by
# timeout).
processes = subprocess.check_output(
f"pgrep --parent {pid}", shell=True, stderr=subprocess.STDOUT
)
processes = processes.decode("utf-8")
processes = processes.strip()
processes = processes.split("\n")
processes = map(lambda x: int(x.strip()), processes)
processes = list(processes)
for child in processes:
child_pgid = os.getpgid(child)
if child_pgid != pgid:
kill_process_group(child_pgid)
# SIGKILL should not be sent, since this will kill the script itself
os.killpg(pgid, signal.SIGTERM)
# send signal to all processes in group to avoid hung check triggering
# (to avoid terminating clickhouse-test itself, the signal should be ignored)
def stop_tests():
# send signal to all processes in group to avoid hung check triggering
# (to avoid terminating clickhouse-test itself, the signal should be ignored)
print("Sending signals")
signal.signal(signal.SIGTERM, signal.SIG_IGN)
os.killpg(os.getpgid(os.getpid()), signal.SIGTERM)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
print("Sending signals DONE")
cleanup_child_processes(os.getpid())
signal.signal(signal.SIGTERM, signal_handler)
def get_db_engine(args, database_name):
@ -1248,39 +1323,35 @@ class TestCase:
return None
def process_result_impl(
self, proc, stdout: str, stderr: str, debug_log: str, total_time: float
):
def process_result_impl(self, proc, total_time: float):
if proc:
if proc.returncode is None:
kill_process_group(os.getpgid(proc.pid))
description = ""
debug_log = ""
if os.path.exists(self.testcase_args.debug_log_file):
with open(self.testcase_args.debug_log_file, "rb") as stream:
debug_log += self.testcase_args.debug_log_file + ":\n"
debug_log += str(stream.read(), errors="replace", encoding="utf-8")
debug_log += "\n"
stdout = ""
if os.path.exists(self.stdout_file):
with open(self.stdout_file, "rb") as stdfd:
stdout = str(stdfd.read(), errors="replace", encoding="utf-8")
stderr = ""
if os.path.exists(self.stderr_file):
with open(self.stderr_file, "rb") as stdfd:
stderr += str(stdfd.read(), errors="replace", encoding="utf-8")
if debug_log:
debug_log = "\n".join(debug_log.splitlines()[:100])
if proc:
if proc.returncode is None:
try:
pgid = os.getpgid(proc.pid)
# NOTE: this still may leave some processes, that had been
# created by timeout(1), since it also creates new process
# group. But this should not be a problem with default
# options, since the default time for each test is 10min,
# and this is way more bigger then the timeout for each
# timeout(1) invocation.
#
# But as a workaround we are sending SIGTERM first, and
# only after SIGKILL, that way timeout(1) will have an
# ability to terminate childrens (though not always since
# signals are asynchronous).
os.killpg(pgid, signal.SIGTERM)
# This may not be enough, but this is at least something
# (and anyway it is OK to spend 0.1 second more in case of
# test timeout).
sleep(0.1)
os.killpg(pgid, signal.SIGKILL)
except OSError as e:
if e.errno != ESRCH:
raise
if stderr:
description += stderr
if debug_log:
@ -1532,7 +1603,7 @@ class TestCase:
def run_single_test(
self, server_logs_level, client_options
) -> Tuple[Optional[Popen], str, str, str, float]:
) -> Tuple[Optional[Popen], float]:
args = self.testcase_args
client = args.testcase_client
start_time = args.testcase_start_time
@ -1609,13 +1680,6 @@ class TestCase:
# Whether the test timed out will be decided later
pass
debug_log = ""
if os.path.exists(self.testcase_args.debug_log_file):
with open(self.testcase_args.debug_log_file, "rb") as stream:
debug_log += self.testcase_args.debug_log_file + ":\n"
debug_log += str(stream.read(), errors="replace", encoding="utf-8")
debug_log += "\n"
total_time = (datetime.now() - start_time).total_seconds()
# Normalize randomized database names in stdout, stderr files.
@ -1667,17 +1731,7 @@ class TestCase:
"https://localhost:8443/",
)
stdout = ""
if os.path.exists(self.stdout_file):
with open(self.stdout_file, "rb") as stdfd:
stdout = str(stdfd.read(), errors="replace", encoding="utf-8")
stderr = ""
if os.path.exists(self.stderr_file):
with open(self.stderr_file, "rb") as stdfd:
stderr += str(stdfd.read(), errors="replace", encoding="utf-8")
return proc, stdout, stderr, debug_log, total_time
return proc, total_time
def run(self, args, suite, client_options, server_logs_level):
start_time = datetime.now()
@ -1709,14 +1763,14 @@ class TestCase:
if not is_valid_utf_8(self.case_file) or (
self.reference_file and not is_valid_utf_8(self.reference_file)
):
proc, stdout, stderr, debug_log, total_time = self.run_single_test(
proc, total_time = self.run_single_test(
server_logs_level, client_options
)
result = self.process_result_impl(
proc, stdout, stderr, debug_log, total_time
result = self.process_result_impl(proc, total_time)
result.check_if_need_retry(
args, result.description, result.description, self.runs_count
)
result.check_if_need_retry(args, stdout, stderr, self.runs_count)
# to avoid breaking CSV parser
result.description = result.description.replace("\0", "")
else:
@ -1734,17 +1788,16 @@ class TestCase:
):
(
proc,
stdout,
stderr,
debug_log,
total_time,
) = self.run_single_test(server_logs_level, client_options)
result = self.process_result_impl(
proc, stdout, stderr, debug_log, total_time
)
result = self.process_result_impl(proc, total_time)
result.check_if_need_retry(
args, stdout, stderr, self.runs_count
args,
result.description,
result.description,
self.runs_count,
)
# to avoid breaking CSV parser
result.description = result.description.replace("\0", "")
@ -2353,7 +2406,13 @@ class BuildFlags:
POLYMORPHIC_PARTS = "polymorphic-parts"
# Release and non-sanitizer build
RELEASE_NON_SANITIZED = False
def collect_build_flags(args):
global RELEASE_NON_SANITIZED
result = []
value = clickhouse_execute(
@ -2378,6 +2437,8 @@ def collect_build_flags(args):
elif b"RelWithDebInfo" in value or b"Release" in value:
result.append(BuildFlags.RELEASE)
RELEASE_NON_SANITIZED = result == [BuildFlags.RELEASE]
value = clickhouse_execute(
args,
"SELECT value FROM system.settings WHERE name = 'allow_deprecated_database_ordinary'",
@ -3390,29 +3451,36 @@ def parse_args():
default="./client.fatal.log",
help="Path to file for fatal logs from client",
)
parser.add_argument(
"--capture-client-stacktrace",
action="store_true",
help="Capture stacktraces from clickhouse-client/local on errors",
)
return parser.parse_args()
class Terminated(KeyboardInterrupt):
pass
def __init__(self, signal):
self.signal = signal
def signal_handler(sig, frame):
raise Terminated(f"Terminated with {sig} signal")
def signal_handler(signal, frame):
raise Terminated(signal)
if __name__ == "__main__":
# Move to a new process group and kill it at exit so that we don't have any
# infinite tests processes left
# (new process group is required to avoid killing some parent processes)
os.setpgid(0, 0)
stop_time = None
exit_code = multiprocessing.Value("i", 0)
server_died = multiprocessing.Event()
multiprocessing_manager = multiprocessing.Manager()
restarted_tests = multiprocessing_manager.list()
# Move to a new process group and kill it at exit so that we don't have any
# infinite tests processes left
# (new process group is required to avoid killing some parent processes)
os.setpgid(0, 0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGHUP, signal_handler)
@ -3430,6 +3498,8 @@ if __name__ == "__main__":
)
sys.exit(1)
CAPTURE_CLIENT_STACKTRACE = args.capture_client_stacktrace
# Autodetect the directory with queries if not specified
if args.queries is None:
args.queries = "queries"
@ -3551,4 +3621,14 @@ if __name__ == "__main__":
if args.replace_replicated_with_shared:
args.s3_storage = True
main(args)
try:
main(args)
except ServerDied as e:
print(f"{e}", file=sys.stderr)
sys.exit(1)
except Terminated as e:
print(f"Terminated with {e.signal} signal", file=sys.stderr)
sys.exit(128 + e.signal)
except KeyboardInterrupt:
print("Interrupted")
sys.exit(128 + signal.SIGINT)