# -*- coding: utf-8 -*- import csv import datetime import json import logging import os from ast import literal_eval from dataclasses import asdict, dataclass from html import escape from pathlib import Path from typing import ( Dict, Final, Iterable, List, Literal, Optional, Sequence, Tuple, Union, ) from build_download_helper import APIException, get_gh_api from ci_config import CI from env_helper import ( GITHUB_JOB, GITHUB_REPOSITORY, GITHUB_RUN_ID, GITHUB_RUN_URL, GITHUB_WORKSPACE, REPORT_PATH, ) logger = logging.getLogger(__name__) ERROR: Final = "error" FAILURE: Final = "failure" PENDING: Final = "pending" SUCCESS: Final = "success" OK: Final = "OK" FAIL: Final = "FAIL" SKIPPED: Final = "SKIPPED" StatusType = Literal["error", "failure", "pending", "success"] STATUSES = [ERROR, FAILURE, PENDING, SUCCESS] # type: List[StatusType] # These parameters are set only on demand, and only once _GITHUB_JOB_ID = "" _GITHUB_JOB_URL = "" _GITHUB_JOB_API_URL = "" def GITHUB_JOB_ID(safe: bool = True) -> str: global _GITHUB_JOB_ID global _GITHUB_JOB_URL global _GITHUB_JOB_API_URL if _GITHUB_JOB_ID: return _GITHUB_JOB_ID try: _GITHUB_JOB_ID, _GITHUB_JOB_URL, _GITHUB_JOB_API_URL = get_job_id_url( GITHUB_JOB ) except APIException as e: logging.warning("Unable to retrieve the job info from GH API: %s", e) if not safe: raise e return _GITHUB_JOB_ID def GITHUB_JOB_URL(safe: bool = True) -> str: try: GITHUB_JOB_ID() except APIException: if safe: logging.warning("Using run URL as a fallback to not fail the job") return GITHUB_RUN_URL raise return _GITHUB_JOB_URL def GITHUB_JOB_API_URL(safe: bool = True) -> str: GITHUB_JOB_ID(safe) return _GITHUB_JOB_API_URL def get_job_id_url(job_name: str) -> Tuple[str, str, str]: job_id = "" job_url = "" job_api_url = "" if GITHUB_RUN_ID == "0": job_id = "0" if job_id: return job_id, job_url, job_api_url jobs = [] page = 1 while not job_id: response = get_gh_api( f"https://api.github.com/repos/{GITHUB_REPOSITORY}/" f"actions/runs/{GITHUB_RUN_ID}/jobs?per_page=100&page={page}" ) page += 1 data = response.json() jobs.extend(data["jobs"]) for job in data["jobs"]: if job["name"] != job_name: continue job_id = job["id"] job_url = job["html_url"] job_api_url = job["url"] return job_id, job_url, job_api_url if ( len(jobs) >= data["total_count"] # just in case of inconsistency or len(data["jobs"]) == 0 # if we excided pages ): job_id = "0" if not job_url: # This is a terrible workaround for the case of another broken part of # GitHub actions. For nested workflows it doesn't provide a proper job_name # value, but only the final one. So, for `OriginalJob / NestedJob / FinalJob` # full name, job_name contains only FinalJob matched_jobs = [] for job in jobs: nested_parts = job["name"].split(" / ") if len(nested_parts) <= 1: continue if nested_parts[-1] == job_name: matched_jobs.append(job) if len(matched_jobs) == 1: # The best case scenario job_id = matched_jobs[0]["id"] job_url = matched_jobs[0]["html_url"] job_api_url = matched_jobs[0]["url"] return job_id, job_url, job_api_url if matched_jobs: logging.error( "We could not get the ID and URL for the current job name %s, there " "are more than one jobs match it for the nested workflows. Please, " "refer to https://github.com/actions/runner/issues/2577", job_name, ) return job_id, job_url, job_api_url # The order of statuses from the worst to the best def _state_rank(status: str) -> int: "return the index of status or index of SUCCESS in case of wrong status" try: return STATUSES.index(status) # type: ignore except ValueError: return 3 def get_status(status: str) -> StatusType: "function to get the StatusType for a status or ERROR" try: ind = STATUSES.index(status) # type: ignore return STATUSES[ind] except ValueError: return ERROR def get_worst_status(statuses: Iterable[str]) -> StatusType: worst_status = SUCCESS # type: StatusType for status in statuses: ind = _state_rank(status) if ind < _state_rank(worst_status): worst_status = STATUSES[ind] if worst_status == ERROR: break return worst_status ### BEST FRONTEND PRACTICES BELOW HEAD_HTML_TEMPLATE = """ {title}
🌚🌞

{header}

""" FOOTER_HTML_TEMPLATE = """ """ HTML_BASE_TEST_TEMPLATE = ( f"{HEAD_HTML_TEMPLATE}" """ {test_part} """ f"{FOOTER_HTML_TEMPLATE}" ) HTML_TEST_PART = """ {headers} {rows}
""" BASE_HEADERS = ["Test name", "Test status"] # should not be in TEMP directory or any directory that may be cleaned during the job execution JOB_REPORT_FILE = Path(GITHUB_WORKSPACE) / "job_report.json" JOB_STARTED_TEST_NAME = "STARTED" JOB_FINISHED_TEST_NAME = "COMPLETED" JOB_TIMEOUT_TEST_NAME = "Job Timeout Expired" @dataclass class TestResult: name: str status: str # the following fields are optional time: Optional[float] = None log_files: Optional[Union[Sequence[str], Sequence[Path]]] = None raw_logs: Optional[str] = None # the field for uploaded logs URLs log_urls: Optional[Sequence[str]] = None def set_raw_logs(self, raw_logs: str) -> None: self.raw_logs = raw_logs def set_log_files(self, log_files_literal: str) -> None: self.log_files = [] # type: Optional[List[Path]] log_paths = literal_eval(log_files_literal) if not isinstance(log_paths, list): raise ValueError( f"Malformed input: must be a list literal: {log_files_literal}" ) for log_path in log_paths: assert Path(log_path).exists(), log_path self.log_files.append(log_path) @staticmethod def create_check_timeout_expired(duration: Optional[float] = None) -> "TestResult": return TestResult(JOB_TIMEOUT_TEST_NAME, "FAIL", time=duration) TestResults = List[TestResult] @dataclass class JobReport: status: str description: str test_results: TestResults start_time: str duration: float additional_files: Union[Sequence[str], Sequence[Path]] # ClickHouse version, build job only version: str = "" # check_name to be set in commit status, set it if it differs from the job name check_name: str = "" # directory with artifacts to upload on s3 build_dir_for_upload: Union[Path, str] = "" # if False no GH commit status will be created by CI need_commit_status: bool = True # indicates that this is not real job report but report for the job that was skipped by rerun check job_skipped: bool = False # indicates that report generated by CI script in order to check later if job was killed before real report is generated dummy: bool = False exit_code: int = -1 @staticmethod def get_start_time_from_current(): return datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") @classmethod def create_dummy(cls, status: str, job_skipped: bool) -> "JobReport": return JobReport( status=status, description="", test_results=[], start_time=cls.get_start_time_from_current(), duration=0.0, additional_files=[], job_skipped=job_skipped, dummy=True, ) def update_duration(self): if not self.start_time: self.duration = 0.0 else: start_time = datetime.datetime.strptime( self.start_time, "%Y-%m-%d %H:%M:%S" ) current_time = datetime.datetime.utcnow() self.duration = (current_time - start_time).total_seconds() def __post_init__(self): assert self.status in (SUCCESS, ERROR, FAILURE, PENDING) @classmethod def exist(cls) -> bool: return JOB_REPORT_FILE.is_file() @classmethod def load(cls, from_file=None): # type: ignore res = {} from_file = from_file or JOB_REPORT_FILE with open(from_file, "r", encoding="utf-8") as json_file: res = json.load(json_file) # Deserialize the nested lists of TestResult test_results_data = res.get("test_results", []) test_results = [TestResult(**result) for result in test_results_data] del res["test_results"] return JobReport(test_results=test_results, **res) @classmethod def cleanup(cls): if JOB_REPORT_FILE.exists(): JOB_REPORT_FILE.unlink() def dump(self, to_file=None): def path_converter(obj): if isinstance(obj, Path): return str(obj) raise TypeError("Type not serializable") to_file = to_file or JOB_REPORT_FILE with open(to_file, "w", encoding="utf-8") as json_file: json.dump(asdict(self), json_file, default=path_converter, indent=2) def read_test_results(results_path: Path, with_raw_logs: bool = True) -> TestResults: results = [] # type: TestResults with open(results_path, "r", encoding="utf-8") as descriptor: reader = csv.reader(descriptor, delimiter="\t") for line in reader: name = line[0] status = line[1] time = None if len(line) >= 3 and line[2] and line[2] != "\\N": # The value can be emtpy, but when it's not, # it's the time spent on the test try: time = float(line[2]) except ValueError: pass result = TestResult(name, status, time) if len(line) == 4 and line[3]: # The value can be emtpy, but when it's not, # the 4th value is a pythonic list, e.g. ['file1', 'file2'] if with_raw_logs: # Python does not support TSV, so we unescape manually result.set_raw_logs( line[3].replace("\\t", "\t").replace("\\n", "\n") ) else: result.set_log_files(line[3]) results.append(result) return results @dataclass class BuildResult: build_name: str log_url: str build_urls: List[str] version: str status: str elapsed_seconds: int job_api_url: str pr_number: int = 0 head_ref: str = "dummy_branch_name" _job_name: Optional[str] = None _job_html_url: Optional[str] = None _job_html_link: Optional[str] = None _grouped_urls: Optional[List[List[str]]] = None @classmethod def cleanup(cls): if Path(REPORT_PATH).exists(): for file in Path(REPORT_PATH).iterdir(): if "build_report" in file.name and file.name.endswith(".json"): file.unlink() @classmethod def load(cls, build_name: str, pr_number: int, head_ref: str): # type: ignore """ loads report from a report file matched with given @pr_number and/or a @head_ref """ report_path = Path(REPORT_PATH) / BuildResult.get_report_name( build_name, pr_number or head_ref ) return cls.load_from_file(report_path) @classmethod def load_any(cls, build_name: str, pr_number: int, head_ref: str): # type: ignore """ loads build report from one of all available report files (matching the job digest) with the following priority: 1. report for the current PR @pr_number (might happen in PR' wf with or without job reuse) 2. report for the current branch @head_ref (might happen in release/master' wf with or without job reuse) 3. report for master branch (might happen in any workflow in case of job reuse) 4. any other report (job reuse from another PR, if master report is not available yet) """ pr_report = None ref_report = None master_report = None any_report = None Path(REPORT_PATH).mkdir(parents=True, exist_ok=True) for file in Path(REPORT_PATH).iterdir(): if f"{build_name}.json" in file.name: any_report = file if "_master_" in file.name: master_report = file elif f"_{head_ref}_" in file.name: ref_report = file elif pr_number and f"_{pr_number}_" in file.name: pr_report = file if not any_report: return None if pr_report: file_path = pr_report elif ref_report: file_path = ref_report elif master_report: file_path = master_report else: file_path = any_report return cls.load_from_file(file_path) @classmethod def load_from_file(cls, file: Union[Path, str]): # type: ignore if not Path(file).exists(): return None with open(file, "r", encoding="utf-8") as json_file: res = json.load(json_file) return BuildResult(**res) def as_json(self) -> str: return json.dumps(asdict(self), indent=2) @property def build_config(self) -> Optional[CI.BuildConfig]: if self.build_name not in CI.JOB_CONFIGS: return None return CI.JOB_CONFIGS[self.build_name].build_config @property def comment(self) -> str: if self.build_config is None: return self._wrong_config_message return self.build_config.comment @property def compiler(self) -> str: if self.build_config is None: return self._wrong_config_message return self.build_config.compiler @property def debug_build(self) -> bool: if self.build_config is None: return False return self.build_config.debug_build @property def sanitizer(self) -> str: if self.build_config is None: return self._wrong_config_message return self.build_config.sanitizer @property def coverage(self) -> str: if self.build_config is None: return self._wrong_config_message return str(self.build_config.coverage) @property def grouped_urls(self) -> List[List[str]]: "Combine and preserve build_urls by artifact types" if self._grouped_urls is not None: return self._grouped_urls if not self.build_urls: self._grouped_urls = [[]] return self._grouped_urls artifacts_groups = { "apk": [], "deb": [], "binary": [], "tgz": [], "rpm": [], "performance": [], } # type: Dict[str, List[str]] for url in self.build_urls: if url.endswith("performance.tar.zst"): artifacts_groups["performance"].append(url) elif ( url.endswith(".deb") or url.endswith(".buildinfo") or url.endswith(".changes") or url.endswith(".tar.gz") ): artifacts_groups["deb"].append(url) elif url.endswith(".apk"): artifacts_groups["apk"].append(url) elif url.endswith(".rpm"): artifacts_groups["rpm"].append(url) elif url.endswith(".tgz") or url.endswith(".tgz.sha512"): artifacts_groups["tgz"].append(url) else: artifacts_groups["binary"].append(url) self._grouped_urls = [urls for urls in artifacts_groups.values() if urls] return self._grouped_urls @property def _wrong_config_message(self) -> str: return "missing" @property def is_missing(self) -> bool: "The report is created for missing json file" return not ( self.log_url or self.build_urls or self.version != "missing" or self.status != ERROR ) @property def job_link(self) -> str: if self._job_html_link is not None: return self._job_html_link self._job_html_link = f'{self.job_name}' return self._job_html_link @property def job_html_url(self) -> str: if self._job_html_url is not None: return self._job_html_url self._set_properties() return self._job_html_url or "" @property def job_name(self) -> str: if self._job_name is not None: return self._job_name self._set_properties() return self._job_name or "" @job_name.setter def job_name(self, job_name: str) -> None: self._job_name = job_name def _set_properties(self) -> None: if all(p is not None for p in (self._job_name, self._job_html_url)): return job_data = {} # quick check @self.job_api_url is valid url before request. it's set to "missing" for dummy BuildResult if "http" in self.job_api_url: try: job_data = get_gh_api(self.job_api_url).json() except Exception: pass # job_name can be set manually self._job_name = self._job_name or job_data.get("name", "unknown") self._job_html_url = job_data.get("html_url", "") @staticmethod def get_report_name(name: str, suffix: Union[str, int]) -> Path: assert "/" not in str(suffix) return Path(f"build_report_{suffix}_{name}.json") @staticmethod def missing_result(build_name: str) -> "BuildResult": return BuildResult(build_name, "", [], "missing", ERROR, 0, "missing") def write_json(self, directory: Union[Path, str] = REPORT_PATH) -> Path: path = Path(directory) / self.get_report_name( self.build_name, self.pr_number or CI.Utils.normalize_string(self.head_ref) ) path.write_text( json.dumps( { "build_name": self.build_name, "log_url": self.log_url, "build_urls": self.build_urls, "version": self.version, "status": self.status, "elapsed_seconds": self.elapsed_seconds, "job_api_url": self.job_api_url, "pr_number": self.pr_number, "head_ref": self.head_ref, } ), encoding="utf-8", ) # TODO: remove after the artifacts are in S3 completely env_path = Path(os.getenv("GITHUB_ENV", "/dev/null")) with env_path.open("a", encoding="utf-8") as ef: ef.write(f"BUILD_URLS={path.stem}") return path BuildResults = List[BuildResult] class ReportColorTheme: class ReportColor: yellow = "#FFB400" red = "#F00" green = "#0A0" blue = "#00B4FF" default = (ReportColor.green, ReportColor.red, ReportColor.yellow) ColorTheme = Tuple[str, str, str] def _format_header( header: str, branch_name: str, branch_url: Optional[str] = None ) -> str: result = header if "ClickHouse" not in result: result = f"ClickHouse {result}" if branch_url: result = f'{result} for {branch_name}' else: result = f"{result} for {branch_name}" return result def _get_status_style(status: str, colortheme: Optional[ColorTheme] = None) -> str: ok_statuses = (OK, SUCCESS, "PASSED") fail_statuses = (FAIL, FAILURE, ERROR, "FAILED", "Timeout", "NOT_FAILED") if colortheme is None: colortheme = ReportColorTheme.default style = "font-weight: bold;" if status in ok_statuses: style += f"color: {colortheme[0]};" elif status in fail_statuses: style += f"color: {colortheme[1]};" else: style += f"color: {colortheme[2]};" return style def _get_html_url_name(url): base_name = "" if isinstance(url, str): base_name = os.path.basename(url) if isinstance(url, tuple): base_name = url[1] if "?" in base_name: base_name = base_name.split("?")[0] if base_name is not None: return base_name.replace("%2B", "+").replace("%20", " ") return None def _get_html_url(url): href = None name = None if isinstance(url, str): href, name = url, _get_html_url_name(url) if isinstance(url, tuple): href, name = url[0], _get_html_url_name(url) if href and name: return f'{_get_html_url_name(url)}' return "" def create_test_html_report( header: str, test_results: TestResults, raw_log_url: str, task_url: str, job_url: str, branch_url: str, branch_name: str, commit_url: str, additional_urls: Optional[List[str]] = None, statuscolors: Optional[ColorTheme] = None, ) -> str: if additional_urls is None: additional_urls = [] if test_results: rows_part = [] num_fails = 0 has_test_time = any(tr.time is not None for tr in test_results) has_log_urls = False def sort_key(status): if "fail" in status.lower(): return 0 elif "error" in status.lower(): return 1 elif "not" in status.lower(): return 2 elif "ok" in status.lower(): return 10 elif "success" in status.lower(): return 9 else: return 5 test_results.sort(key=lambda result: sort_key(result.status)) for test_result in test_results: colspan = 0 if test_result.log_files is not None: has_log_urls = True row = [] if test_result.raw_logs is not None: row.append('') else: row.append("") row.append(f"{test_result.name}") colspan += 1 style = _get_status_style(test_result.status, colortheme=statuscolors) # Allow to quickly scroll to the first failure. fail_id = "" has_error = test_result.status in ("FAIL", "NOT_FAILED") if has_error: num_fails = num_fails + 1 fail_id = f'id="fail{num_fails}" ' row.append(f'{test_result.status}') colspan += 1 if has_test_time: if test_result.time is not None: row.append(f"{test_result.time}") else: row.append("") colspan += 1 if test_result.log_urls is not None: has_log_urls = True test_logs_html = "
".join( [_get_html_url(url) for url in test_result.log_urls] ) row.append(f"{test_logs_html}") colspan += 1 row.append("") rows_part.append("\n".join(row)) if test_result.raw_logs is not None: raw_logs = escape(test_result.raw_logs) row_raw_logs = ( '' f'
{raw_logs}
' "" ) rows_part.append(row_raw_logs) headers = BASE_HEADERS.copy() if has_test_time: headers.append("Test time, sec.") if has_log_urls: headers.append("Logs") headers_html = "".join(["" + h + "" for h in headers]) test_part = HTML_TEST_PART.format(headers=headers_html, rows="".join(rows_part)) else: test_part = "" additional_html_urls = " ".join( [_get_html_url(url) for url in sorted(additional_urls, key=_get_html_url_name)] ) raw_log_name = os.path.basename(raw_log_url) if "?" in raw_log_name: raw_log_name = raw_log_name.split("?")[0] html = HTML_BASE_TEST_TEMPLATE.format( title=_format_header(header, branch_name), header=_format_header(header, branch_name, branch_url), raw_log_name=raw_log_name, raw_log_url=raw_log_url, task_url=task_url, job_url=job_url, test_part=test_part, branch_name=branch_name, commit_url=commit_url, additional_urls=additional_html_urls, ) return html HTML_BASE_BUILD_TEMPLATE = ( f"{HEAD_HTML_TEMPLATE}" """ {rows}
Config/job name Compiler Build type Version Sanitizer Coverage Status Build log Build time Artifacts Comment
""" f"{FOOTER_HTML_TEMPLATE}" ) LINK_TEMPLATE = '{text}' def create_build_html_report( header: str, build_results: BuildResults, task_url: str, branch_url: str, branch_name: str, commit_url: str, ) -> str: rows = [] for build_result in build_results: for artifact_urls in build_result.grouped_urls: row = [""] row.append( f"{build_result.build_name}
{build_result.job_link}" ) row.append(f"{build_result.compiler}") if build_result.debug_build: row.append("debug") else: row.append("relwithdebuginfo") row.append(f"{build_result.version}") if build_result.sanitizer: row.append(f"{build_result.sanitizer}") else: row.append("none") row.append(f"{build_result.coverage}") if build_result.status: style = _get_status_style(build_result.status) row.append(f'{build_result.status}') else: style = _get_status_style(ERROR) row.append(f'error') row.append(f'link') delta = "unknown" if build_result.elapsed_seconds: delta = str(datetime.timedelta(seconds=build_result.elapsed_seconds)) row.append(f"{delta}") links = [] link_separator = "
" if artifact_urls: for artifact_url in artifact_urls: links.append( LINK_TEMPLATE.format( text=_get_html_url_name(artifact_url), url=artifact_url ) ) row.append(f"{link_separator.join(links)}") comment = build_result.comment if ( build_result.build_config is not None and build_result.build_config.sparse_checkout ): comment += " (note: sparse checkout is used, see update-submodules.sh)" row.append(f"{comment}") row.append("") rows.append("".join(row)) return HTML_BASE_BUILD_TEMPLATE.format( title=_format_header(header, branch_name), header=_format_header(header, branch_name, branch_url), rows="".join(rows), task_url=task_url, branch_name=branch_name, commit_url=commit_url, )