ClickHouse/tests/ci/bugfix_validate_check.py
2024-05-29 17:22:45 +02:00

158 lines
4.8 KiB
Python

#!/usr/bin/env python3
import csv
import logging
import subprocess
import sys
from pathlib import Path
from typing import List, Sequence, Tuple
from ci_config import JobNames
from ci_utils import normalize_string
from env_helper import TEMP_PATH
from functional_test_check import NO_CHANGES_MSG
from report import (
ERROR,
FAIL,
FAILURE,
OK,
SKIPPED,
SUCCESS,
JobReport,
TestResult,
TestResults,
)
from stopwatch import Stopwatch
def post_commit_status_from_file(file_path: Path) -> List[str]:
with open(file_path, "r", encoding="utf-8") as f:
res = list(csv.reader(f, delimiter="\t"))
if len(res) < 1:
raise IndexError(f'Can\'t read from "{file_path}"')
if len(res[0]) != 3:
raise IndexError(f'Can\'t read from "{file_path}"')
return res[0]
def get_failed_test_cases(file_path: Path) -> List[TestResult]:
job_report = JobReport.load(from_file=file_path)
test_results = [] # type: List[TestResult]
for tr in job_report.test_results:
if tr.status == FAIL:
if tr.name == NO_CHANGES_MSG:
tr.status = SKIPPED
else:
tr.name = "[with NOT_OK] " + tr.name
tr.status = OK
elif tr.status == OK:
tr.name = "[with NOT_OK] " + tr.name
tr.status = FAIL
else:
# do not invert error status
pass
test_results.append(tr)
return test_results
def process_all_results(
file_paths: Sequence[Path],
) -> Tuple[str, str, TestResults]:
all_results = [] # type: TestResults
has_fail = False
has_error = False
has_ok = False
for job_report_path in file_paths:
test_results = get_failed_test_cases(job_report_path)
for tr in test_results:
if tr.status == FAIL:
has_fail = True
elif tr.status == ERROR:
has_error = True
elif tr.status == OK:
has_ok = True
all_results.extend(test_results)
if has_error:
status = ERROR
description = "Some error(s) occured in tests"
elif has_ok:
status = SUCCESS
description = "New test(s) reproduced a bug"
elif has_fail:
status = FAILURE
description = "New test(s) failed to reproduce a bug"
else:
status = ERROR
description = "Invalid job results"
return status, description, all_results
def main():
logging.basicConfig(level=logging.INFO)
# args = parse_args()
stopwatch = Stopwatch()
jobs_to_validate = [JobNames.STATELESS_TEST_RELEASE, JobNames.INTEGRATION_TEST]
functional_job_report_file = Path(TEMP_PATH) / "functional_test_job_report.json"
integration_job_report_file = Path(TEMP_PATH) / "integration_test_job_report.json"
jobs_report_files = {
JobNames.STATELESS_TEST_RELEASE: functional_job_report_file,
JobNames.INTEGRATION_TEST: integration_job_report_file,
}
jobs_scripts = {
JobNames.STATELESS_TEST_RELEASE: "functional_test_check.py",
JobNames.INTEGRATION_TEST: "integration_test_check.py",
}
for test_job in jobs_to_validate:
report_file = jobs_report_files[test_job]
test_script = jobs_scripts[test_job]
if report_file.exists():
report_file.unlink()
# "bugfix" must be present in checkname, as integration test runner checks this
check_name = f"Validate bugfix: {test_job}"
command = (
f"python3 {test_script} '{check_name}' "
f"--validate-bugfix --report-to-file {report_file}"
)
print(f"Going to validate job [{test_job}], command [{command}]")
_ = subprocess.run(
command,
stdout=sys.stdout,
stderr=sys.stderr,
text=True,
check=False,
shell=True,
)
assert (
report_file.is_file()
), f"No job report [{report_file}] found after job execution"
status, description, test_results = process_all_results(
list(jobs_report_files.values())
)
additional_files = []
for job_id, report_file in jobs_report_files.items():
jr = JobReport.load(from_file=report_file)
additional_files.append(report_file)
for file in set(jr.additional_files):
file_ = Path(file)
file_name = file_.name
file_name = file_name.replace(".", "__" + normalize_string(job_id) + ".", 1)
file_ = file_.rename(file_.parent / file_name)
additional_files.append(file_)
JobReport(
description=description,
test_results=test_results,
status=status,
start_time=stopwatch.start_time_str,
duration=stopwatch.duration_seconds,
additional_files=additional_files,
).dump()
if __name__ == "__main__":
main()