mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Better parallelizm for fast tests.
This commit is contained in:
parent
333e291b5d
commit
fa88e8725a
@ -27,6 +27,7 @@ except ImportError:
|
||||
import random
|
||||
import string
|
||||
import multiprocessing
|
||||
import multiprocessing.queues
|
||||
from contextlib import closing
|
||||
|
||||
|
||||
@ -318,7 +319,7 @@ stop_time = None
|
||||
|
||||
# def run_tests_array(all_tests, suite, suite_dir, suite_tmp_dir, run_total):
|
||||
def run_tests_array(all_tests_with_params):
|
||||
all_tests, suite, suite_dir, suite_tmp_dir = all_tests_with_params
|
||||
tests_queue, num_tests, suite, suite_dir, suite_tmp_dir = all_tests_with_params
|
||||
global exit_code
|
||||
global SERVER_DIED
|
||||
global stop_time
|
||||
@ -348,10 +349,22 @@ def run_tests_array(all_tests_with_params):
|
||||
else:
|
||||
return ''
|
||||
|
||||
if all_tests:
|
||||
print(f"\nRunning {len(all_tests)} {suite} tests ({multiprocessing.current_process().name}).\n")
|
||||
if num_tests > 0:
|
||||
about = 'about ' if is_concurrent else ''
|
||||
print(f"\nRunning {about}{num_tests} {suite} tests ({multiprocessing.current_process().name}).\n")
|
||||
|
||||
while True:
|
||||
if isinstance(tests_queue, multiprocessing.queues.Queue):
|
||||
try:
|
||||
case = tests_queue.get()
|
||||
except (ValueError, OSError):
|
||||
break
|
||||
else:
|
||||
if tests_queue:
|
||||
case = tests_queue.pop(0)
|
||||
else:
|
||||
break
|
||||
|
||||
for case in all_tests:
|
||||
if SERVER_DIED:
|
||||
stop_tests()
|
||||
break
|
||||
@ -856,17 +869,22 @@ def main(args):
|
||||
|
||||
# Create two batches per process for more uniform execution time.
|
||||
batch_size = max(1, len(parallel_tests) // (jobs * 2))
|
||||
queue = multiprocessing.queues.Queue()
|
||||
parallel_tests_array = []
|
||||
for i in range(0, len(parallel_tests), batch_size):
|
||||
parallel_tests_array.append((parallel_tests[i:i+batch_size], suite, suite_dir, suite_tmp_dir))
|
||||
for i in range(jobs):
|
||||
parallel_tests_array.append((queue, batch_size, suite, suite_dir, suite_tmp_dir))
|
||||
|
||||
with closing(multiprocessing.Pool(processes=jobs)) as pool:
|
||||
pool.map(run_tests_array, parallel_tests_array)
|
||||
pool.map_async(run_tests_array, parallel_tests_array)
|
||||
|
||||
run_tests_array((sequential_tests, suite, suite_dir, suite_tmp_dir))
|
||||
for suit in parallel_tests:
|
||||
queue.put(suit)
|
||||
queue.close()
|
||||
|
||||
run_tests_array((sequential_tests, len(sequential_tests), suite, suite_dir, suite_tmp_dir))
|
||||
total_tests_run += len(sequential_tests) + len(parallel_tests)
|
||||
else:
|
||||
run_tests_array((all_tests, suite, suite_dir, suite_tmp_dir))
|
||||
run_tests_array((all_tests, len(all_tests), suite, suite_dir, suite_tmp_dir))
|
||||
total_tests_run += len(all_tests)
|
||||
|
||||
if args.hung_check:
|
||||
|
Loading…
Reference in New Issue
Block a user