mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Merge pull request #66411 from ClickHouse/fix-hanging-on-gdb
Stateless tests: deal with hang-ups more roughly
This commit is contained in:
commit
258ae45aa8
@ -6,8 +6,8 @@ source /setup_export_logs.sh
|
||||
# fail on errors, verbose and export all env variables
|
||||
set -e -x -a
|
||||
|
||||
MAX_RUN_TIME=${MAX_RUN_TIME:-10800}
|
||||
MAX_RUN_TIME=$((MAX_RUN_TIME == 0 ? 10800 : MAX_RUN_TIME))
|
||||
MAX_RUN_TIME=${MAX_RUN_TIME:-7200}
|
||||
MAX_RUN_TIME=$((MAX_RUN_TIME == 0 ? 7200 : MAX_RUN_TIME))
|
||||
|
||||
USE_DATABASE_REPLICATED=${USE_DATABASE_REPLICATED:=0}
|
||||
USE_SHARED_CATALOG=${USE_SHARED_CATALOG:=0}
|
||||
@ -320,7 +320,7 @@ export -f run_tests
|
||||
|
||||
|
||||
# This should be enough to setup job and collect artifacts
|
||||
TIMEOUT=$((MAX_RUN_TIME - 600))
|
||||
TIMEOUT=$((MAX_RUN_TIME - 700))
|
||||
if [ "$NUM_TRIES" -gt "1" ]; then
|
||||
# We don't run tests with Ordinary database in PRs, only in master.
|
||||
# So run new/changed tests with Ordinary at least once in flaky check.
|
||||
|
@ -11,6 +11,7 @@ TIMEOUT_SIGN = "[ Timeout! "
|
||||
UNKNOWN_SIGN = "[ UNKNOWN "
|
||||
SKIPPED_SIGN = "[ SKIPPED "
|
||||
HUNG_SIGN = "Found hung queries in processlist"
|
||||
SERVER_DIED_SIGN = "Server died, terminating all processes"
|
||||
DATABASE_SIGN = "Database: "
|
||||
|
||||
SUCCESS_FINISH_SIGNS = ["All tests have finished", "No tests were run"]
|
||||
@ -25,6 +26,7 @@ def process_test_log(log_path, broken_tests):
|
||||
failed = 0
|
||||
success = 0
|
||||
hung = False
|
||||
server_died = False
|
||||
retries = False
|
||||
success_finish = False
|
||||
test_results = []
|
||||
@ -41,6 +43,8 @@ def process_test_log(log_path, broken_tests):
|
||||
if HUNG_SIGN in line:
|
||||
hung = True
|
||||
break
|
||||
if SERVER_DIED_SIGN in line:
|
||||
server_died = True
|
||||
if RETRIES_SIGN in line:
|
||||
retries = True
|
||||
if any(
|
||||
@ -123,6 +127,7 @@ def process_test_log(log_path, broken_tests):
|
||||
failed,
|
||||
success,
|
||||
hung,
|
||||
server_died,
|
||||
success_finish,
|
||||
retries,
|
||||
test_results,
|
||||
@ -150,6 +155,7 @@ def process_result(result_path, broken_tests):
|
||||
failed,
|
||||
success,
|
||||
hung,
|
||||
server_died,
|
||||
success_finish,
|
||||
retries,
|
||||
test_results,
|
||||
@ -165,6 +171,10 @@ def process_result(result_path, broken_tests):
|
||||
description = "Some queries hung, "
|
||||
state = "failure"
|
||||
test_results.append(("Some queries hung", "FAIL", "0", ""))
|
||||
elif server_died:
|
||||
description = "Server died, "
|
||||
state = "failure"
|
||||
test_results.append(("Server died", "FAIL", "0", ""))
|
||||
elif not success_finish:
|
||||
description = "Tests are not finished, "
|
||||
state = "failure"
|
||||
@ -218,5 +228,20 @@ if __name__ == "__main__":
|
||||
state, description, test_results = process_result(args.in_results_dir, broken_tests)
|
||||
logging.info("Result parsed")
|
||||
status = (state, description)
|
||||
|
||||
def test_result_comparator(item):
|
||||
# sort by status then by check name
|
||||
order = {
|
||||
"FAIL": 0,
|
||||
"Timeout": 1,
|
||||
"NOT_FAILED": 2,
|
||||
"BROKEN": 3,
|
||||
"OK": 4,
|
||||
"SKIPPED": 5,
|
||||
}
|
||||
return order.get(item[1], 10), str(item[0]), item[1]
|
||||
|
||||
test_results.sort(key=test_result_comparator)
|
||||
|
||||
write_results(args.out_results_file, args.out_status_file, test_results, status)
|
||||
logging.info("Result written")
|
||||
|
@ -378,7 +378,7 @@ class CommonJobConfigs:
|
||||
),
|
||||
run_command='functional_test_check.py "$CHECK_NAME"',
|
||||
runner_type=Runners.FUNC_TESTER,
|
||||
timeout=10800,
|
||||
timeout=7200,
|
||||
)
|
||||
STATEFUL_TEST = JobConfig(
|
||||
job_name_keyword="stateful",
|
||||
|
@ -1750,7 +1750,7 @@ class TestCase:
|
||||
return TestResult(
|
||||
self.name,
|
||||
TestStatus.FAIL,
|
||||
FailureReason.INTERNAL_QUERY_FAIL,
|
||||
FailureReason.TIMEOUT,
|
||||
total_time,
|
||||
self.add_info_about_settings(
|
||||
self.get_description_from_exception_info(sys.exc_info())
|
||||
@ -2189,11 +2189,26 @@ def run_tests_array(all_tests_with_params: Tuple[List[str], int, TestSuite, bool
|
||||
sys.stdout.flush()
|
||||
|
||||
while True:
|
||||
test_result = test_case.run(
|
||||
args, test_suite, client_options, server_logs_level
|
||||
)
|
||||
test_result = test_case.process_result(test_result, MESSAGES)
|
||||
if not test_result.need_retry:
|
||||
# This is the upper level timeout
|
||||
# It helps with completely frozen processes, like in case of gdb errors
|
||||
def timeout_handler(signum, frame):
|
||||
raise TimeoutError("Test execution timed out")
|
||||
|
||||
signal.signal(signal.SIGALRM, timeout_handler)
|
||||
signal.alarm(int(args.timeout * 1.1))
|
||||
test_result = None
|
||||
try:
|
||||
test_result = test_case.run(
|
||||
args, test_suite, client_options, server_logs_level
|
||||
)
|
||||
test_result = test_case.process_result(test_result, MESSAGES)
|
||||
break
|
||||
except TimeoutError:
|
||||
break
|
||||
finally:
|
||||
signal.alarm(0)
|
||||
|
||||
if not test_result or not test_result.need_retry:
|
||||
break
|
||||
restarted_tests.append(test_result)
|
||||
|
||||
@ -2452,6 +2467,10 @@ def override_envs(*args_, **kwargs):
|
||||
run_tests_array(*args_, **kwargs)
|
||||
|
||||
|
||||
def run_tests_process(*args, **kwargs):
|
||||
return run_tests_array(*args, **kwargs)
|
||||
|
||||
|
||||
def do_run_tests(jobs, test_suite: TestSuite):
|
||||
if jobs > 1 and len(test_suite.parallel_tests) > 0:
|
||||
print(
|
||||
@ -2475,39 +2494,70 @@ def do_run_tests(jobs, test_suite: TestSuite):
|
||||
# of failures will be nearly the same for all tests from the group.
|
||||
random.shuffle(test_suite.parallel_tests)
|
||||
|
||||
batch_size = max(1, len(test_suite.parallel_tests) // jobs)
|
||||
batch_size = max(1, (len(test_suite.parallel_tests) // jobs) + 1)
|
||||
parallel_tests_array = []
|
||||
for job in range(jobs):
|
||||
range_ = job * batch_size, job * batch_size + batch_size
|
||||
batch = test_suite.parallel_tests[range_[0] : range_[1]]
|
||||
parallel_tests_array.append((batch, batch_size, test_suite, True))
|
||||
|
||||
try:
|
||||
with multiprocessing.Pool(processes=jobs + 1) as pool:
|
||||
future = pool.map_async(run_tests_array, parallel_tests_array)
|
||||
processes = []
|
||||
|
||||
if args.run_sequential_tests_in_parallel:
|
||||
# Run parallel tests and sequential tests at the same time
|
||||
# Sequential tests will use different ClickHouse instance
|
||||
# In this process we can safely override values in `args` and `os.environ`
|
||||
future_seq = pool.map_async(
|
||||
override_envs,
|
||||
[
|
||||
(
|
||||
test_suite.sequential_tests,
|
||||
len(test_suite.sequential_tests),
|
||||
test_suite,
|
||||
False,
|
||||
)
|
||||
],
|
||||
)
|
||||
future_seq.wait()
|
||||
for test_batch in parallel_tests_array:
|
||||
process = multiprocessing.Process(
|
||||
target=run_tests_process, args=(test_batch,)
|
||||
)
|
||||
processes.append(process)
|
||||
process.start()
|
||||
|
||||
future.wait()
|
||||
finally:
|
||||
pool.terminate()
|
||||
pool.close()
|
||||
pool.join()
|
||||
if args.run_sequential_tests_in_parallel:
|
||||
# Run parallel tests and sequential tests at the same time
|
||||
# Sequential tests will use different ClickHouse instance
|
||||
# In this process we can safely override values in `args` and `os.environ`
|
||||
process = multiprocessing.Process(
|
||||
target=override_envs,
|
||||
args=(
|
||||
(
|
||||
test_suite.sequential_tests,
|
||||
len(test_suite.sequential_tests),
|
||||
test_suite,
|
||||
False,
|
||||
),
|
||||
),
|
||||
)
|
||||
processes.append(process)
|
||||
process.start()
|
||||
|
||||
while processes:
|
||||
sys.stdout.flush()
|
||||
# Periodically check the server for hangs
|
||||
# and stop all processes in this case
|
||||
try:
|
||||
clickhouse_execute(
|
||||
args,
|
||||
query="SELECT 1 /*hang up check*/",
|
||||
max_http_retries=5,
|
||||
timeout=20,
|
||||
)
|
||||
except Exception:
|
||||
print("Hang up check failed")
|
||||
server_died.set()
|
||||
|
||||
if server_died.is_set():
|
||||
print("Server died, terminating all processes...")
|
||||
kill_gdb_if_any()
|
||||
# Wait for test results
|
||||
sleep(args.timeout)
|
||||
for p in processes:
|
||||
if p.is_alive():
|
||||
p.terminate()
|
||||
break
|
||||
|
||||
for p in processes[:]:
|
||||
if not p.is_alive():
|
||||
processes.remove(p)
|
||||
|
||||
sleep(5)
|
||||
|
||||
if not args.run_sequential_tests_in_parallel:
|
||||
run_tests_array(
|
||||
@ -3358,6 +3408,14 @@ def parse_args():
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
class Terminated(KeyboardInterrupt):
|
||||
pass
|
||||
|
||||
|
||||
def signal_handler(sig, frame):
|
||||
raise Terminated(f"Terminated with {sig} signal")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
stop_time = None
|
||||
exit_code = multiprocessing.Value("i", 0)
|
||||
@ -3369,6 +3427,9 @@ if __name__ == "__main__":
|
||||
# infinite tests processes left
|
||||
# (new process group is required to avoid killing some parent processes)
|
||||
os.setpgid(0, 0)
|
||||
signal.signal(signal.SIGTERM, signal_handler)
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
signal.signal(signal.SIGHUP, signal_handler)
|
||||
|
||||
try:
|
||||
args = parse_args()
|
||||
|
Loading…
Reference in New Issue
Block a user