From 9d4927a5b97771341c2de828d18d48668e248aab Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 20 May 2021 19:44:35 +0300 Subject: [PATCH] Better parallelizm for fast tests. --- tests/clickhouse-test | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/tests/clickhouse-test b/tests/clickhouse-test index 0b486e4614c..66831d4cd5c 100755 --- a/tests/clickhouse-test +++ b/tests/clickhouse-test @@ -27,7 +27,8 @@ except ImportError: import random import string import multiprocessing -import multiprocessing.queues +from collections.abc import Iterable +#import multiprocessing.queues from contextlib import closing @@ -315,11 +316,12 @@ def colored(text, args, color=None, on_color=None, attrs=None): SERVER_DIED = False exit_code = 0 stop_time = None +queue = multiprocessing.Queue(maxsize=1) # def run_tests_array(all_tests, suite, suite_dir, suite_tmp_dir, run_total): def run_tests_array(all_tests_with_params): - tests_queue, num_tests, suite, suite_dir, suite_tmp_dir = all_tests_with_params + all_tests, num_tests, suite, suite_dir, suite_tmp_dir = all_tests_with_params global exit_code global SERVER_DIED global stop_time @@ -354,14 +356,13 @@ def run_tests_array(all_tests_with_params): print(f"\nRunning {about}{num_tests} {suite} tests ({multiprocessing.current_process().name}).\n") while True: - if isinstance(tests_queue, multiprocessing.queues.Queue): - try: - case = tests_queue.get() - except (ValueError, OSError): + if is_concurrent: + case = queue.get() + if not case: break else: - if tests_queue: - case = tests_queue.pop(0) + if all_tests: + case = all_tests.pop(0) else: break @@ -867,20 +868,24 @@ def main(args): if jobs > run_total: run_total = jobs - # Create two batches per process for more uniform execution time. - batch_size = max(1, len(parallel_tests) // (jobs * 2)) - queue = multiprocessing.queues.Queue() + batch_size = max(1, len(parallel_tests) // jobs) parallel_tests_array = [] for i in range(jobs): - parallel_tests_array.append((queue, batch_size, suite, suite_dir, suite_tmp_dir)) + parallel_tests_array.append((None, batch_size, suite, suite_dir, suite_tmp_dir)) with closing(multiprocessing.Pool(processes=jobs)) as pool: pool.map_async(run_tests_array, parallel_tests_array) for suit in parallel_tests: queue.put(suit) + + for i in range(jobs): + queue.put(None) + queue.close() + pool.join() + run_tests_array((sequential_tests, len(sequential_tests), suite, suite_dir, suite_tmp_dir)) total_tests_run += len(sequential_tests) + len(parallel_tests) else: