Parallel tests runner (#5076)

* Parallel tests runner

* Slightly better style

* Fixes
This commit is contained in:
proller 2019-04-23 02:40:40 +03:00 committed by GitHub
parent 720f911379
commit ba312c138b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -20,6 +20,8 @@ from errno import ESRCH
import termcolor
from random import random
import commands
from multiprocessing import Pool
from contextlib import closing
MESSAGES_TO_RETRY = [
@ -87,23 +89,227 @@ def get_server_pid(server_tcp_port):
except Exception as ex:
return None
def dump_report(destination, suite, test_case, report):
if destination is not None:
destination_file = os.path.join(destination, suite, test_case + ".xml")
destination_dir = os.path.dirname(destination_file)
if not os.path.exists(destination_dir):
os.makedirs(destination_dir)
with open(destination_file, 'w') as report_file:
report_root = et.Element("testsuites", attrib = {'name': 'ClickHouse Tests'})
report_suite = et.Element("testsuite", attrib = {"name": suite})
report_suite.append(report)
report_root.append(report_suite)
report_file.write(et.tostring(report_root, encoding = "UTF-8", xml_declaration=True, pretty_print=True))
def colored(text, args, color=None, on_color=None, attrs=None):
if sys.stdout.isatty() or args.force_color:
return termcolor.colored(text, color, on_color, attrs)
else:
return text
SERVER_DIED = False
exit_code = 0
#def run_tests_array(all_tests, suite, suite_dir, suite_tmp_dir, run_total):
def run_tests_array(all_tests_with_params):
all_tests, suite, suite_dir, suite_tmp_dir, run_total = all_tests_with_params
global SERVER_DIED
OP_SQUARE_BRACKET = colored("[", args, attrs=['bold'])
CL_SQUARE_BRACKET = colored("]", args, attrs=['bold'])
MSG_FAIL = OP_SQUARE_BRACKET + colored(" FAIL ", args, "red", attrs=['bold']) + CL_SQUARE_BRACKET
MSG_UNKNOWN = OP_SQUARE_BRACKET + colored(" UNKNOWN ", args, "yellow", attrs=['bold']) + CL_SQUARE_BRACKET
MSG_OK = OP_SQUARE_BRACKET + colored(" OK ", args, "green", attrs=['bold']) + CL_SQUARE_BRACKET
MSG_SKIPPED = OP_SQUARE_BRACKET + colored(" SKIPPED ", args, "cyan", attrs=['bold']) + CL_SQUARE_BRACKET
passed_total = 0
skipped_total = 0
failures_total = 0
failures = 0
failures_chain = 0
if len(all_tests):
print("\nRunning {} {} tests.".format(len(all_tests), suite) + "\n")
for case in all_tests:
if SERVER_DIED:
break
case_file = os.path.join(suite_dir, case)
(name, ext) = os.path.splitext(case)
report_testcase = et.Element("testcase", attrib = {"name": name})
try:
sys.stdout.write("{0:72}".format(name + ": "))
if run_total == 1:
sys.stdout.flush()
if args.skip and any(s in name for s in args.skip):
report_testcase.append(et.Element("skipped", attrib = {"message": "skip"}))
print(MSG_SKIPPED + " - skip")
skipped_total += 1
elif not args.zookeeper and 'zookeeper' in name:
report_testcase.append(et.Element("skipped", attrib = {"message": "no zookeeper"}))
print(MSG_SKIPPED + " - no zookeeper")
skipped_total += 1
elif not args.shard and 'shard' in name:
report_testcase.append(et.Element("skipped", attrib = {"message": "no shard"}))
print(MSG_SKIPPED + " - no shard")
skipped_total += 1
elif not args.no_long and 'long' in name:
report_testcase.append(et.Element("skipped", attrib = {"message": "no long"}))
print(MSG_SKIPPED + " - no long")
skipped_total += 1
else:
disabled_file = os.path.join(suite_dir, name) + '.disabled'
if os.path.exists(disabled_file) and not args.disabled:
message = open(disabled_file, 'r').read()
report_testcase.append(et.Element("skipped", attrib = {"message": message}))
print(MSG_SKIPPED + " - " + message)
else:
if args.testname:
clickhouse_proc = Popen(shlex.split(args.client_with_database), stdin=PIPE, stdout=PIPE, stderr=PIPE)
clickhouse_proc.communicate("SELECT 'Running test {suite}/{case} from pid={pid}';".format(pid = os.getpid(), case = case, suite = suite))
reference_file = os.path.join(suite_dir, name) + '.reference'
stdout_file = os.path.join(suite_tmp_dir, name) + '.stdout'
stderr_file = os.path.join(suite_tmp_dir, name) + '.stderr'
proc, stdout, stderr = run_single_test(args, ext, server_logs_level, case_file, stdout_file, stderr_file)
if proc.returncode is None:
try:
proc.kill()
except OSError as e:
if e.errno != ESRCH:
raise
failure = et.Element("failure", attrib = {"message": "Timeout"})
report_testcase.append(failure)
failures += 1
print("{0} - Timeout!".format(MSG_FAIL))
else:
counter = 1
while proc.returncode != 0 and need_retry(stderr):
proc, stdout, stderr = run_single_test(args, ext, server_logs_level, case_file, stdout_file, stderr_file)
sleep(2**counter)
counter += 1
if counter > 6:
break
if proc.returncode != 0:
failure = et.Element("failure", attrib = {"message": "return code {}".format(proc.returncode)})
report_testcase.append(failure)
stdout_element = et.Element("system-out")
stdout_element.text = et.CDATA(stdout)
report_testcase.append(stdout_element)
failures += 1
failures_chain += 1
print("{0} - return code {1}".format(MSG_FAIL, proc.returncode))
if stderr:
stderr_element = et.Element("system-err")
stderr_element.text = et.CDATA(stderr)
report_testcase.append(stderr_element)
print(stderr.encode('utf-8'))
if args.stop and ('Connection refused' in stderr or 'Attempt to read after eof' in stderr) and not 'Received exception from server' in stderr:
SERVER_DIED = True
elif stderr:
failure = et.Element("failure", attrib = {"message": "having stderror"})
report_testcase.append(failure)
stderr_element = et.Element("system-err")
stderr_element.text = et.CDATA(stderr)
report_testcase.append(stderr_element)
failures += 1
failures_chain += 1
print("{0} - having stderror:\n{1}".format(MSG_FAIL, stderr.encode('utf-8')))
elif 'Exception' in stdout:
failure = et.Element("error", attrib = {"message": "having exception"})
report_testcase.append(failure)
stdout_element = et.Element("system-out")
stdout_element.text = et.CDATA(stdout)
report_testcase.append(stdout_element)
failures += 1
failures_chain += 1
print("{0} - having exception:\n{1}".format(MSG_FAIL, stdout.encode('utf-8')))
elif not os.path.isfile(reference_file):
skipped = et.Element("skipped", attrib = {"message": "no reference file"})
report_testcase.append(skipped)
print("{0} - no reference file".format(MSG_UNKNOWN))
else:
result_is_different = subprocess.call(['diff', '-q', reference_file, stdout_file], stdout = PIPE)
if result_is_different:
diff = Popen(['diff', '--unified', reference_file, stdout_file], stdout = PIPE).communicate()[0]
diff = unicode(diff, errors='replace', encoding='utf-8')
cat = Popen(['cat', '-vet'], stdin=PIPE, stdout=PIPE).communicate(input=diff.encode(encoding='utf-8', errors='replace'))[0]
failure = et.Element("failure", attrib = {"message": "result differs with reference"})
report_testcase.append(failure)
stdout_element = et.Element("system-out")
try:
stdout_element.text = et.CDATA(diff)
except:
stdout_element.text = et.CDATA(remove_control_characters(diff))
report_testcase.append(stdout_element)
failures += 1
print("{0} - result differs with reference:\n{1}".format(MSG_FAIL, cat.encode('utf-8')))
else:
passed_total += 1
failures_chain = 0
print(MSG_OK)
if os.path.exists(stdout_file):
os.remove(stdout_file)
if os.path.exists(stderr_file):
os.remove(stderr_file)
except KeyboardInterrupt as e:
print(colored("Break tests execution", args, "red"))
raise e
except:
import traceback
exc_type, exc_value, tb = sys.exc_info()
error = et.Element("error", attrib = {"type": exc_type.__name__, "message": str(exc_value)})
report_testcase.append(error)
failures += 1
print("{0} - Test internal error: {1}\n{2}\n{3}".format(MSG_FAIL, exc_type.__name__, exc_value, "\n".join(traceback.format_tb(tb, 10))))
finally:
dump_report(args.output, suite, name, report_testcase)
if failures_chain >= 20:
break
failures_total = failures_total + failures
if failures_total > 0:
print(colored("\nHaving {failures_total} errors! {passed_total} tests passed. {skipped_total} tests skipped.".format(passed_total = passed_total, skipped_total = skipped_total, failures_total = failures_total), args, "red", attrs=["bold"]))
exit_code = 1
else:
print(colored("\n{passed_total} tests passed. {skipped_total} tests skipped.".format(passed_total = passed_total, skipped_total = skipped_total), args, "green", attrs=["bold"]))
server_logs_level = "warning"
def main(args):
SERVER_DIED = False
def colored(text, color=None, on_color=None, attrs=None):
if sys.stdout.isatty() or args.force_color:
return termcolor.colored(text, color, on_color, attrs)
else:
return text
OP_SQUARE_BRACKET = colored("[", attrs=['bold'])
CL_SQUARE_BRACKET = colored("]", attrs=['bold'])
MSG_FAIL = OP_SQUARE_BRACKET + colored(" FAIL ", "red", attrs=['bold']) + CL_SQUARE_BRACKET
MSG_UNKNOWN = OP_SQUARE_BRACKET + colored(" UNKNOWN ", "yellow", attrs=['bold']) + CL_SQUARE_BRACKET
MSG_OK = OP_SQUARE_BRACKET + colored(" OK ", "green", attrs=['bold']) + CL_SQUARE_BRACKET
MSG_SKIPPED = OP_SQUARE_BRACKET + colored(" SKIPPED ", "cyan", attrs=['bold']) + CL_SQUARE_BRACKET
global SERVER_DIED
global exit_code
global server_logs_level
def is_data_present():
clickhouse_proc = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE)
@ -113,20 +319,6 @@ def main(args):
return stdout.startswith('1')
def dump_report(destination, suite, test_case, report):
if destination is not None:
destination_file = os.path.join(destination, suite, test_case + ".xml")
destination_dir = os.path.dirname(destination_file)
if not os.path.exists(destination_dir):
os.makedirs(destination_dir)
with open(destination_file, 'w') as report_file:
report_root = et.Element("testsuites", attrib = {'name': 'ClickHouse Tests'})
report_suite = et.Element("testsuite", attrib = {"name": suite})
report_suite.append(report)
report_root.append(report_suite)
report_file.write(et.tostring(report_root, encoding = "UTF-8", xml_declaration=True, pretty_print=True))
base_dir = os.path.abspath(args.queries)
tmp_dir = os.path.abspath(args.tmp)
@ -141,7 +333,6 @@ def main(args):
# Force to print server warnings in stderr
# Shell scripts could change logging level
server_logs_level = "warning"
os.environ.setdefault("CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL", server_logs_level)
if args.zookeeper is None:
@ -161,10 +352,6 @@ def main(args):
else:
args.shard = False
passed_total = 0
skipped_total = 0
failures_total = 0
clickhouse_proc_create = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE)
clickhouse_proc_create.communicate("CREATE DATABASE IF NOT EXISTS " + args.database)
if args.database != "test":
@ -206,8 +393,6 @@ def main(args):
suite = suite_re_obj.group(1)
if os.path.isdir(suite_dir):
failures = 0
failures_chain = 0
if 'stateful' in suite and not is_data_present():
print("Won't run stateful tests because test data wasn't loaded.")
continue
@ -236,203 +421,48 @@ def main(args):
except ValueError:
return 99997
run_n, run_total = args.parallel.split('/')
run_n = float(run_n)
run_total = float(run_total)
all_tests = os.listdir(suite_dir)
all_tests = filter(lambda case: is_test_from_dir(suite_dir, case), all_tests)
all_tests = sorted(filter(lambda case: re.search(args.test, case) if args.test else True, all_tests), key=key_func)
run_n, run_total = args.parallel.split('/')
run_n = float(run_n)
run_total = float(run_total)
tests_n = len(all_tests)
start = int(tests_n / run_total * (run_n - 1))
if start > 0:
start = start + 1
end = int(tests_n / run_total * (run_n))
all_tests = all_tests[start : end]
if run_total > tests_n:
run_total = tests_n
if run_n > run_total:
continue
print("\nRunning {} {} tests.".format(tests_n, suite) + (" {} .. {} ".format(start, end) if run_total > 1 else "") + "\n")
jobs = args.jobs
if jobs > run_total:
run_total = jobs
for case in all_tests:
if SERVER_DIED:
break
all_tests_array = []
for n in range(1, 1 + int(run_total)):
start = int(tests_n / run_total * (n - 1))
end = int(tests_n / run_total * n)
all_tests_array.append([all_tests[start : end], suite, suite_dir, suite_tmp_dir, run_total])
case_file = os.path.join(suite_dir, case)
(name, ext) = os.path.splitext(case)
report_testcase = et.Element("testcase", attrib = {"name": name})
try:
sys.stdout.write("{0:72}".format(name + ": "))
if run_total == 1:
sys.stdout.flush()
if args.skip and any(s in name for s in args.skip):
report_testcase.append(et.Element("skipped", attrib = {"message": "skip"}))
print(MSG_SKIPPED + " - skip")
skipped_total += 1
elif not args.zookeeper and 'zookeeper' in name:
report_testcase.append(et.Element("skipped", attrib = {"message": "no zookeeper"}))
print(MSG_SKIPPED + " - no zookeeper")
skipped_total += 1
elif not args.shard and 'shard' in name:
report_testcase.append(et.Element("skipped", attrib = {"message": "no shard"}))
print(MSG_SKIPPED + " - no shard")
skipped_total += 1
elif not args.no_long and 'long' in name:
report_testcase.append(et.Element("skipped", attrib = {"message": "no long"}))
print(MSG_SKIPPED + " - no long")
skipped_total += 1
else:
disabled_file = os.path.join(suite_dir, name) + '.disabled'
if os.path.exists(disabled_file) and not args.disabled:
message = open(disabled_file, 'r').read()
report_testcase.append(et.Element("skipped", attrib = {"message": message}))
print(MSG_SKIPPED + " - " + message)
else:
if args.testname:
clickhouse_proc = Popen(shlex.split(args.client_with_database), stdin=PIPE, stdout=PIPE, stderr=PIPE)
clickhouse_proc.communicate("SELECT 'Running test {suite}/{case} from pid={pid}';".format(pid = os.getpid(), case = case, suite = suite))
reference_file = os.path.join(suite_dir, name) + '.reference'
stdout_file = os.path.join(suite_tmp_dir, name) + '.stdout'
stderr_file = os.path.join(suite_tmp_dir, name) + '.stderr'
proc, stdout, stderr = run_single_test(args, ext, server_logs_level, case_file, stdout_file, stderr_file)
if proc.returncode is None:
try:
proc.kill()
except OSError as e:
if e.errno != ESRCH:
raise
failure = et.Element("failure", attrib = {"message": "Timeout"})
report_testcase.append(failure)
failures += 1
print("{0} - Timeout!".format(MSG_FAIL))
else:
counter = 1
while proc.returncode != 0 and need_retry(stderr):
proc, stdout, stderr = run_single_test(args, ext, server_logs_level, case_file, stdout_file, stderr_file)
sleep(2**counter)
counter += 1
if counter > 6:
break
if proc.returncode != 0:
failure = et.Element("failure", attrib = {"message": "return code {}".format(proc.returncode)})
report_testcase.append(failure)
stdout_element = et.Element("system-out")
stdout_element.text = et.CDATA(stdout)
report_testcase.append(stdout_element)
failures += 1
failures_chain += 1
print("{0} - return code {1}".format(MSG_FAIL, proc.returncode))
if stderr:
stderr_element = et.Element("system-err")
stderr_element.text = et.CDATA(stderr)
report_testcase.append(stderr_element)
print(stderr.encode('utf-8'))
if args.stop and ('Connection refused' in stderr or 'Attempt to read after eof' in stderr) and not 'Received exception from server' in stderr:
SERVER_DIED = True
elif stderr:
failure = et.Element("failure", attrib = {"message": "having stderror"})
report_testcase.append(failure)
stderr_element = et.Element("system-err")
stderr_element.text = et.CDATA(stderr)
report_testcase.append(stderr_element)
failures += 1
failures_chain += 1
print("{0} - having stderror:\n{1}".format(MSG_FAIL, stderr.encode('utf-8')))
elif 'Exception' in stdout:
failure = et.Element("error", attrib = {"message": "having exception"})
report_testcase.append(failure)
stdout_element = et.Element("system-out")
stdout_element.text = et.CDATA(stdout)
report_testcase.append(stdout_element)
failures += 1
failures_chain += 1
print("{0} - having exception:\n{1}".format(MSG_FAIL, stdout.encode('utf-8')))
elif not os.path.isfile(reference_file):
skipped = et.Element("skipped", attrib = {"message": "no reference file"})
report_testcase.append(skipped)
print("{0} - no reference file".format(MSG_UNKNOWN))
else:
result_is_different = subprocess.call(['diff', '-q', reference_file, stdout_file], stdout = PIPE)
if result_is_different:
diff = Popen(['diff', '--unified', reference_file, stdout_file], stdout = PIPE).communicate()[0]
diff = unicode(diff, errors='replace', encoding='utf-8')
cat = Popen(['cat', '-vet'], stdin=PIPE, stdout=PIPE).communicate(input=diff.encode(encoding='utf-8', errors='replace'))[0]
failure = et.Element("failure", attrib = {"message": "result differs with reference"})
report_testcase.append(failure)
stdout_element = et.Element("system-out")
try:
stdout_element.text = et.CDATA(diff)
except:
stdout_element.text = et.CDATA(remove_control_characters(diff))
report_testcase.append(stdout_element)
failures += 1
print("{0} - result differs with reference:\n{1}".format(MSG_FAIL, cat.encode('utf-8')))
else:
passed_total += 1
failures_chain = 0
print(MSG_OK)
if os.path.exists(stdout_file):
os.remove(stdout_file)
if os.path.exists(stderr_file):
os.remove(stderr_file)
except KeyboardInterrupt as e:
print(colored("Break tests execution", "red"))
raise e
except:
import traceback
exc_type, exc_value, tb = sys.exc_info()
error = et.Element("error", attrib = {"type": exc_type.__name__, "message": str(exc_value)})
report_testcase.append(error)
failures += 1
print("{0} - Test internal error: {1}\n{2}\n{3}".format(MSG_FAIL, exc_type.__name__, exc_value, "\n".join(traceback.format_tb(tb, 10))))
finally:
dump_report(args.output, suite, name, report_testcase)
if failures_chain >= 20:
break
failures_total = failures_total + failures
exit_code = 0
if failures_total > 0:
print(colored("\nHaving {failures_total} errors! {passed_total} tests passed. {skipped_total} tests skipped.".format(passed_total = passed_total, skipped_total = skipped_total, failures_total = failures_total), "red", attrs=["bold"]))
exit_code = 1
else:
print(colored("\n{passed_total} tests passed. {skipped_total} tests skipped.".format(passed_total = passed_total, skipped_total = skipped_total), "green", attrs=["bold"]))
if jobs > 1:
with closing(Pool(processes=jobs)) as pool:
pool.map(run_tests_array, all_tests_array)
pool.terminate()
else:
run_tests_array(all_tests_array[int(run_n)-1])
if args.hung_check:
processlist = get_processlist(args.client_with_database)
if processlist:
server_pid = get_server_pid(os.getenv("CLICKHOUSE_PORT_TCP", '9000'))
print(colored("\nFound hung queries in processlist:", "red", attrs=["bold"]))
print(colored("\nFound hung queries in processlist:", args, "red", attrs=["bold"]))
print(processlist)
if server_pid:
print("\nStacktraces of all threads:")
print(get_stacktraces(server_pid))
exit_code = 1
else:
print(colored("\nNo queries hung.", "green", attrs=["bold"]))
print(colored("\nNo queries hung.", args, "green", attrs=["bold"]))
sys.exit(exit_code)
@ -467,7 +497,8 @@ if __name__ == '__main__':
parser.add_argument('--hung-check', action='store_true', default=False)
parser.add_argument('--force-color', action='store_true', default=False)
parser.add_argument('--database', default='test', help='Default database for tests')
parser.add_argument('--parallel', default='1/1', help='Parralel test run number/total')
parser.add_argument('--parallel', default='1/1', help='One parallel test run number/total')
parser.add_argument('-j', '--jobs', default=1, help='Run all tests in parallel', type=int)
parser.add_argument('--no-stateless', action='store_true', help='Disable all stateless tests')
parser.add_argument('--no-stateful', action='store_true', help='Disable all stateful tests')