diff --git a/docker/test/stateless/run.sh b/docker/test/stateless/run.sh index 5f2cb95de75..2d32d188561 100755 --- a/docker/test/stateless/run.sh +++ b/docker/test/stateless/run.sh @@ -6,8 +6,8 @@ source /setup_export_logs.sh # fail on errors, verbose and export all env variables set -e -x -a -MAX_RUN_TIME=${MAX_RUN_TIME:-10800} -MAX_RUN_TIME=$((MAX_RUN_TIME == 0 ? 10800 : MAX_RUN_TIME)) +MAX_RUN_TIME=${MAX_RUN_TIME:-7200} +MAX_RUN_TIME=$((MAX_RUN_TIME == 0 ? 7200 : MAX_RUN_TIME)) USE_DATABASE_REPLICATED=${USE_DATABASE_REPLICATED:=0} USE_SHARED_CATALOG=${USE_SHARED_CATALOG:=0} @@ -320,7 +320,7 @@ export -f run_tests # This should be enough to setup job and collect artifacts -TIMEOUT=$((MAX_RUN_TIME - 600)) +TIMEOUT=$((MAX_RUN_TIME - 700)) if [ "$NUM_TRIES" -gt "1" ]; then # We don't run tests with Ordinary database in PRs, only in master. # So run new/changed tests with Ordinary at least once in flaky check. diff --git a/docker/test/util/process_functional_tests_result.py b/docker/test/util/process_functional_tests_result.py index fd4cc9f4bf7..4442c9d7d9e 100755 --- a/docker/test/util/process_functional_tests_result.py +++ b/docker/test/util/process_functional_tests_result.py @@ -11,6 +11,7 @@ TIMEOUT_SIGN = "[ Timeout! " UNKNOWN_SIGN = "[ UNKNOWN " SKIPPED_SIGN = "[ SKIPPED " HUNG_SIGN = "Found hung queries in processlist" +SERVER_DIED_SIGN = "Server died, terminating all processes" DATABASE_SIGN = "Database: " SUCCESS_FINISH_SIGNS = ["All tests have finished", "No tests were run"] @@ -25,6 +26,7 @@ def process_test_log(log_path, broken_tests): failed = 0 success = 0 hung = False + server_died = False retries = False success_finish = False test_results = [] @@ -41,6 +43,8 @@ def process_test_log(log_path, broken_tests): if HUNG_SIGN in line: hung = True break + if SERVER_DIED_SIGN in line: + server_died = True if RETRIES_SIGN in line: retries = True if any( @@ -123,6 +127,7 @@ def process_test_log(log_path, broken_tests): failed, success, hung, + server_died, success_finish, retries, test_results, @@ -150,6 +155,7 @@ def process_result(result_path, broken_tests): failed, success, hung, + server_died, success_finish, retries, test_results, @@ -165,6 +171,10 @@ def process_result(result_path, broken_tests): description = "Some queries hung, " state = "failure" test_results.append(("Some queries hung", "FAIL", "0", "")) + elif server_died: + description = "Server died, " + state = "failure" + test_results.append(("Server died", "FAIL", "0", "")) elif not success_finish: description = "Tests are not finished, " state = "failure" @@ -218,5 +228,20 @@ if __name__ == "__main__": state, description, test_results = process_result(args.in_results_dir, broken_tests) logging.info("Result parsed") status = (state, description) + + def test_result_comparator(item): + # sort by status then by check name + order = { + "FAIL": 0, + "Timeout": 1, + "NOT_FAILED": 2, + "BROKEN": 3, + "OK": 4, + "SKIPPED": 5, + } + return order.get(item[1], 10), str(item[0]), item[1] + + test_results.sort(key=test_result_comparator) + write_results(args.out_results_file, args.out_status_file, test_results, status) logging.info("Result written") diff --git a/tests/ci/ci_definitions.py b/tests/ci/ci_definitions.py index 48e1280d939..4ae252560e9 100644 --- a/tests/ci/ci_definitions.py +++ b/tests/ci/ci_definitions.py @@ -378,7 +378,7 @@ class CommonJobConfigs: ), run_command='functional_test_check.py "$CHECK_NAME"', runner_type=Runners.FUNC_TESTER, - timeout=10800, + timeout=7200, ) STATEFUL_TEST = JobConfig( job_name_keyword="stateful", diff --git a/tests/clickhouse-test b/tests/clickhouse-test index 958dde0606f..79f6b5d71d3 100755 --- a/tests/clickhouse-test +++ b/tests/clickhouse-test @@ -1750,7 +1750,7 @@ class TestCase: return TestResult( self.name, TestStatus.FAIL, - FailureReason.INTERNAL_QUERY_FAIL, + FailureReason.TIMEOUT, total_time, self.add_info_about_settings( self.get_description_from_exception_info(sys.exc_info()) @@ -2189,11 +2189,26 @@ def run_tests_array(all_tests_with_params: Tuple[List[str], int, TestSuite, bool sys.stdout.flush() while True: - test_result = test_case.run( - args, test_suite, client_options, server_logs_level - ) - test_result = test_case.process_result(test_result, MESSAGES) - if not test_result.need_retry: + # This is the upper level timeout + # It helps with completely frozen processes, like in case of gdb errors + def timeout_handler(signum, frame): + raise TimeoutError("Test execution timed out") + + signal.signal(signal.SIGALRM, timeout_handler) + signal.alarm(int(args.timeout * 1.1)) + test_result = None + try: + test_result = test_case.run( + args, test_suite, client_options, server_logs_level + ) + test_result = test_case.process_result(test_result, MESSAGES) + break + except TimeoutError: + break + finally: + signal.alarm(0) + + if not test_result or not test_result.need_retry: break restarted_tests.append(test_result) @@ -2452,6 +2467,10 @@ def override_envs(*args_, **kwargs): run_tests_array(*args_, **kwargs) +def run_tests_process(*args, **kwargs): + return run_tests_array(*args, **kwargs) + + def do_run_tests(jobs, test_suite: TestSuite): if jobs > 1 and len(test_suite.parallel_tests) > 0: print( @@ -2475,39 +2494,70 @@ def do_run_tests(jobs, test_suite: TestSuite): # of failures will be nearly the same for all tests from the group. random.shuffle(test_suite.parallel_tests) - batch_size = max(1, len(test_suite.parallel_tests) // jobs) + batch_size = max(1, (len(test_suite.parallel_tests) // jobs) + 1) parallel_tests_array = [] for job in range(jobs): range_ = job * batch_size, job * batch_size + batch_size batch = test_suite.parallel_tests[range_[0] : range_[1]] parallel_tests_array.append((batch, batch_size, test_suite, True)) - try: - with multiprocessing.Pool(processes=jobs + 1) as pool: - future = pool.map_async(run_tests_array, parallel_tests_array) + processes = [] - if args.run_sequential_tests_in_parallel: - # Run parallel tests and sequential tests at the same time - # Sequential tests will use different ClickHouse instance - # In this process we can safely override values in `args` and `os.environ` - future_seq = pool.map_async( - override_envs, - [ - ( - test_suite.sequential_tests, - len(test_suite.sequential_tests), - test_suite, - False, - ) - ], - ) - future_seq.wait() + for test_batch in parallel_tests_array: + process = multiprocessing.Process( + target=run_tests_process, args=(test_batch,) + ) + processes.append(process) + process.start() - future.wait() - finally: - pool.terminate() - pool.close() - pool.join() + if args.run_sequential_tests_in_parallel: + # Run parallel tests and sequential tests at the same time + # Sequential tests will use different ClickHouse instance + # In this process we can safely override values in `args` and `os.environ` + process = multiprocessing.Process( + target=override_envs, + args=( + ( + test_suite.sequential_tests, + len(test_suite.sequential_tests), + test_suite, + False, + ), + ), + ) + processes.append(process) + process.start() + + while processes: + sys.stdout.flush() + # Periodically check the server for hangs + # and stop all processes in this case + try: + clickhouse_execute( + args, + query="SELECT 1 /*hang up check*/", + max_http_retries=5, + timeout=20, + ) + except Exception: + print("Hang up check failed") + server_died.set() + + if server_died.is_set(): + print("Server died, terminating all processes...") + kill_gdb_if_any() + # Wait for test results + sleep(args.timeout) + for p in processes: + if p.is_alive(): + p.terminate() + break + + for p in processes[:]: + if not p.is_alive(): + processes.remove(p) + + sleep(5) if not args.run_sequential_tests_in_parallel: run_tests_array( @@ -3358,6 +3408,14 @@ def parse_args(): return parser.parse_args() +class Terminated(KeyboardInterrupt): + pass + + +def signal_handler(sig, frame): + raise Terminated(f"Terminated with {sig} signal") + + if __name__ == "__main__": stop_time = None exit_code = multiprocessing.Value("i", 0) @@ -3369,6 +3427,9 @@ if __name__ == "__main__": # infinite tests processes left # (new process group is required to avoid killing some parent processes) os.setpgid(0, 0) + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGHUP, signal_handler) try: args = parse_args()