diff --git a/tests/clickhouse-test b/tests/clickhouse-test index 21034931bc4..e1231a61ad9 100755 --- a/tests/clickhouse-test +++ b/tests/clickhouse-test @@ -689,6 +689,46 @@ def collect_build_flags(client): return result +def do_run_tests(jobs, suite, suite_dir, suite_tmp_dir, all_tests, parallel_tests, sequential_tests, parallel): + if jobs > 1 and len(parallel_tests) > 0: + print("Found", len(parallel_tests), "parallel tests and", len(sequential_tests), "sequential tests") + run_n, run_total = parallel.split('/') + run_n = float(run_n) + run_total = float(run_total) + tests_n = len(parallel_tests) + if run_total > tests_n: + run_total = tests_n + + if jobs > tests_n: + jobs = tests_n + if jobs > run_total: + run_total = jobs + + batch_size = max(1, len(parallel_tests) // jobs) + parallel_tests_array = [] + for _ in range(jobs): + parallel_tests_array.append((None, batch_size, suite, suite_dir, suite_tmp_dir)) + + with closing(multiprocessing.Pool(processes=jobs)) as pool: + pool.map_async(run_tests_array, parallel_tests_array) + + for suit in parallel_tests: + queue.put(suit) + + for _ in range(jobs): + queue.put(None) + + queue.close() + + pool.join() + + run_tests_array((sequential_tests, len(sequential_tests), suite, suite_dir, suite_tmp_dir)) + total_tests_run += len(sequential_tests) + len(parallel_tests) + else: + run_tests_array((all_tests, len(all_tests), suite, suite_dir, suite_tmp_dir)) + total_tests_run += len(all_tests) + + def main(args): global SERVER_DIED global stop_time @@ -852,43 +892,7 @@ def main(args): else: parallel_tests.append(test) - if jobs > 1 and len(parallel_tests) > 0: - print("Found", len(parallel_tests), "parallel tests and", len(sequential_tests), "sequential tests") - run_n, run_total = args.parallel.split('/') - run_n = float(run_n) - run_total = float(run_total) - tests_n = len(parallel_tests) - if run_total > tests_n: - run_total = tests_n - - if jobs > tests_n: - jobs = tests_n - if jobs > run_total: - run_total = jobs - - batch_size = max(1, len(parallel_tests) // jobs) - parallel_tests_array = [] - for i in range(jobs): - parallel_tests_array.append((None, batch_size, suite, suite_dir, suite_tmp_dir)) - - with closing(multiprocessing.Pool(processes=jobs)) as pool: - pool.map_async(run_tests_array, parallel_tests_array) - - for suit in parallel_tests: - queue.put(suit) - - for i in range(jobs): - queue.put(None) - - queue.close() - - pool.join() - - run_tests_array((sequential_tests, len(sequential_tests), suite, suite_dir, suite_tmp_dir)) - total_tests_run += len(sequential_tests) + len(parallel_tests) - else: - run_tests_array((all_tests, len(all_tests), suite, suite_dir, suite_tmp_dir)) - total_tests_run += len(all_tests) + do_run_tests(jobs, suite, suite_dir, suite_tmp_dir, all_tests, parallel_tests, sequential_tests, args.parallel) if args.hung_check: