Better parallelizm for fast tests.

This commit is contained in:
Nikolai Kochetov 2021-05-20 19:44:35 +03:00
parent fa88e8725a
commit 9d4927a5b9

View File

@ -27,7 +27,8 @@ except ImportError:
import random
import string
import multiprocessing
import multiprocessing.queues
from collections.abc import Iterable
#import multiprocessing.queues
from contextlib import closing
@ -315,11 +316,12 @@ def colored(text, args, color=None, on_color=None, attrs=None):
SERVER_DIED = False
exit_code = 0
stop_time = None
queue = multiprocessing.Queue(maxsize=1)
# def run_tests_array(all_tests, suite, suite_dir, suite_tmp_dir, run_total):
def run_tests_array(all_tests_with_params):
tests_queue, num_tests, suite, suite_dir, suite_tmp_dir = all_tests_with_params
all_tests, num_tests, suite, suite_dir, suite_tmp_dir = all_tests_with_params
global exit_code
global SERVER_DIED
global stop_time
@ -354,14 +356,13 @@ def run_tests_array(all_tests_with_params):
print(f"\nRunning {about}{num_tests} {suite} tests ({multiprocessing.current_process().name}).\n")
while True:
if isinstance(tests_queue, multiprocessing.queues.Queue):
try:
case = tests_queue.get()
except (ValueError, OSError):
if is_concurrent:
case = queue.get()
if not case:
break
else:
if tests_queue:
case = tests_queue.pop(0)
if all_tests:
case = all_tests.pop(0)
else:
break
@ -867,20 +868,24 @@ def main(args):
if jobs > run_total:
run_total = jobs
# Create two batches per process for more uniform execution time.
batch_size = max(1, len(parallel_tests) // (jobs * 2))
queue = multiprocessing.queues.Queue()
batch_size = max(1, len(parallel_tests) // jobs)
parallel_tests_array = []
for i in range(jobs):
parallel_tests_array.append((queue, batch_size, suite, suite_dir, suite_tmp_dir))
parallel_tests_array.append((None, batch_size, suite, suite_dir, suite_tmp_dir))
with closing(multiprocessing.Pool(processes=jobs)) as pool:
pool.map_async(run_tests_array, parallel_tests_array)
for suit in parallel_tests:
queue.put(suit)
for i in range(jobs):
queue.put(None)
queue.close()
pool.join()
run_tests_array((sequential_tests, len(sequential_tests), suite, suite_dir, suite_tmp_dir))
total_tests_run += len(sequential_tests) + len(parallel_tests)
else: