# -*- coding: utf-8 -*- import csv import datetime import json import logging import os from ast import literal_eval from dataclasses import asdict, dataclass from html import escape from pathlib import Path from typing import ( Dict, Final, Iterable, List, Literal, Optional, Sequence, Tuple, Union, ) from build_download_helper import get_gh_api from ci_config import CI_CONFIG, BuildConfig from env_helper import REPORT_PATH, TEMP_PATH from ci_utils import normalize_string logger = logging.getLogger(__name__) ERROR: Final = "error" FAILURE: Final = "failure" PENDING: Final = "pending" SUCCESS: Final = "success" OK: Final = "OK" FAIL: Final = "FAIL" SKIPPED: Final = "SKIPPED" StatusType = Literal["error", "failure", "pending", "success"] STATUSES = [ERROR, FAILURE, PENDING, SUCCESS] # type: List[StatusType] # The order of statuses from the worst to the best def _state_rank(status: str) -> int: "return the index of status or index of SUCCESS in case of wrong status" try: return STATUSES.index(status) # type: ignore except ValueError: return 3 def get_worst_status(statuses: Iterable[str]) -> StatusType: worst_status = SUCCESS # type: StatusType for status in statuses: ind = _state_rank(status) if ind < _state_rank(worst_status): worst_status = STATUSES[ind] if worst_status == ERROR: break return worst_status ### BEST FRONTEND PRACTICES BELOW HEAD_HTML_TEMPLATE = """ {title}
🌚🌞

{header}

""" FOOTER_HTML_TEMPLATE = """ """ HTML_BASE_TEST_TEMPLATE = ( f"{HEAD_HTML_TEMPLATE}" """ {test_part} """ f"{FOOTER_HTML_TEMPLATE}" ) HTML_TEST_PART = """ {headers} {rows}
""" BASE_HEADERS = ["Test name", "Test status"] JOB_REPORT_FILE = Path(TEMP_PATH) / "job_report.json" @dataclass class TestResult: name: str status: str # the following fields are optional time: Optional[float] = None log_files: Optional[Union[Sequence[str], Sequence[Path]]] = None raw_logs: Optional[str] = None # the field for uploaded logs URLs log_urls: Optional[Sequence[str]] = None def set_raw_logs(self, raw_logs: str) -> None: self.raw_logs = raw_logs def set_log_files(self, log_files_literal: str) -> None: self.log_files = [] # type: Optional[List[Path]] log_paths = literal_eval(log_files_literal) if not isinstance(log_paths, list): raise ValueError( f"Malformed input: must be a list literal: {log_files_literal}" ) for log_path in log_paths: assert Path(log_path).exists(), log_path self.log_files.append(log_path) @staticmethod def create_check_timeout_expired(timeout: float) -> "TestResult": return TestResult("Check timeout expired", "FAIL", timeout) TestResults = List[TestResult] @dataclass class JobReport: status: str description: str test_results: TestResults start_time: str duration: float additional_files: Union[Sequence[str], Sequence[Path]] # clcikhouse version, build job only version: str = "" # checkname to set in commit status, set if differs from jjob name check_name: str = "" # directory with artifacts to upload on s3 build_dir_for_upload: Union[Path, str] = "" # if False no GH commit status will be created by CI need_commit_status: bool = True @classmethod def exist(cls) -> bool: return JOB_REPORT_FILE.is_file() @classmethod def load(cls, from_file=None): # type: ignore res = {} from_file = from_file or JOB_REPORT_FILE with open(from_file, "r") as json_file: res = json.load(json_file) # Deserialize the nested lists of TestResult test_results_data = res.get("test_results", []) test_results = [TestResult(**result) for result in test_results_data] del res["test_results"] return JobReport(test_results=test_results, **res) @classmethod def cleanup(cls): if JOB_REPORT_FILE.exists(): JOB_REPORT_FILE.unlink() def dump(self, to_file=None): def path_converter(obj): if isinstance(obj, Path): return str(obj) raise TypeError("Type not serializable") to_file = to_file or JOB_REPORT_FILE with open(to_file, "w") as json_file: json.dump(asdict(self), json_file, default=path_converter, indent=2) def read_test_results(results_path: Path, with_raw_logs: bool = True) -> TestResults: results = [] # type: TestResults with open(results_path, "r", encoding="utf-8") as descriptor: reader = csv.reader(descriptor, delimiter="\t") for line in reader: name = line[0] status = line[1] time = None if len(line) >= 3 and line[2] and line[2] != "\\N": # The value can be emtpy, but when it's not, # it's the time spent on the test try: time = float(line[2]) except ValueError: pass result = TestResult(name, status, time) if len(line) == 4 and line[3]: # The value can be emtpy, but when it's not, # the 4th value is a pythonic list, e.g. ['file1', 'file2'] if with_raw_logs: # Python does not support TSV, so we unescape manually result.set_raw_logs( line[3].replace("\\t", "\t").replace("\\n", "\n") ) else: result.set_log_files(line[3]) results.append(result) return results @dataclass class BuildResult: build_name: str log_url: str build_urls: List[str] version: str status: str elapsed_seconds: int job_api_url: str pr_number: int = 0 head_ref: str = "dummy_branch_name" _job_name: Optional[str] = None _job_html_url: Optional[str] = None _job_html_link: Optional[str] = None _grouped_urls: Optional[List[List[str]]] = None @classmethod def cleanup(cls): if Path(REPORT_PATH).exists(): for file in Path(REPORT_PATH).iterdir(): if "build_report" in file.name and file.name.endswith(".json"): file.unlink() @classmethod def load(cls, build_name: str, pr_number: int, head_ref: str): # type: ignore """ loads report from a report file matched with given @pr_number and/or a @head_ref """ report_path = Path(REPORT_PATH) / BuildResult.get_report_name( build_name, pr_number or head_ref ) return cls.load_from_file(report_path) @classmethod def load_any(cls, build_name: str, pr_number: int, head_ref: str): # type: ignore """ loads report from suitable report file with the following priority: 1. report from PR with the same @pr_number 2. report from branch with the same @head_ref 3. report from the master 4. any other report """ reports = [] for file in Path(REPORT_PATH).iterdir(): if f"{build_name}.json" in file.name: reports.append(file) if not reports: return None file_path = None for file in reports: if pr_number and f"_{pr_number}_" in file.name: file_path = file break if f"_{head_ref}_" in file.name: file_path = file break if "_master_" in file.name: file_path = file break return cls.load_from_file(file_path or reports[-1]) @classmethod def load_from_file(cls, file: Union[Path, str]): # type: ignore if not Path(file).exists(): return None with open(file, "r") as json_file: res = json.load(json_file) return BuildResult(**res) def as_json(self) -> str: return json.dumps(asdict(self), indent=2) @property def build_config(self) -> Optional[BuildConfig]: return CI_CONFIG.build_config.get(self.build_name, None) @property def comment(self) -> str: if self.build_config is None: return self._wrong_config_message return self.build_config.comment @property def compiler(self) -> str: if self.build_config is None: return self._wrong_config_message return self.build_config.compiler @property def debug_build(self) -> bool: if self.build_config is None: return False return self.build_config.debug_build @property def sanitizer(self) -> str: if self.build_config is None: return self._wrong_config_message return self.build_config.sanitizer @property def coverage(self) -> str: if self.build_config is None: return self._wrong_config_message return str(self.build_config.coverage) @property def grouped_urls(self) -> List[List[str]]: "Combine and preserve build_urls by artifact types" if self._grouped_urls is not None: return self._grouped_urls if not self.build_urls: self._grouped_urls = [[]] return self._grouped_urls artifacts_groups = { "apk": [], "deb": [], "binary": [], "tgz": [], "rpm": [], "performance": [], } # type: Dict[str, List[str]] for url in self.build_urls: if url.endswith("performance.tar.zst"): artifacts_groups["performance"].append(url) elif ( url.endswith(".deb") or url.endswith(".buildinfo") or url.endswith(".changes") or url.endswith(".tar.gz") ): artifacts_groups["deb"].append(url) elif url.endswith(".apk"): artifacts_groups["apk"].append(url) elif url.endswith(".rpm"): artifacts_groups["rpm"].append(url) elif url.endswith(".tgz") or url.endswith(".tgz.sha512"): artifacts_groups["tgz"].append(url) else: artifacts_groups["binary"].append(url) self._grouped_urls = [urls for urls in artifacts_groups.values() if urls] return self._grouped_urls @property def _wrong_config_message(self) -> str: return "missing" @property def is_missing(self) -> bool: "The report is created for missing json file" return not ( self.log_url or self.build_urls or self.version != "missing" or self.status != ERROR ) @property def job_link(self) -> str: if self._job_html_link is not None: return self._job_html_link self._job_html_link = f'{self.job_name}' return self._job_html_link @property def job_html_url(self) -> str: if self._job_html_url is not None: return self._job_html_url self._set_properties() return self._job_html_url or "" @property def job_name(self) -> str: if self._job_name is not None: return self._job_name self._set_properties() return self._job_name or "" @job_name.setter def job_name(self, job_name: str) -> None: self._job_name = job_name def _set_properties(self) -> None: if all(p is not None for p in (self._job_name, self._job_html_url)): return job_data = {} # quick check @self.job_api_url is valid url before request. it's set to "missing" for dummy BuildResult if "http" in self.job_api_url: try: job_data = get_gh_api(self.job_api_url).json() except Exception: pass # job_name can be set manually self._job_name = self._job_name or job_data.get("name", "unknown") self._job_html_url = job_data.get("html_url", "") @staticmethod def get_report_name(name: str, suffix: Union[str, int]) -> Path: assert "/" not in str(suffix) return Path(f"build_report_{suffix}_{name}.json") @staticmethod def missing_result(build_name: str) -> "BuildResult": return BuildResult(build_name, "", [], "missing", ERROR, 0, "missing") def write_json(self, directory: Union[Path, str] = REPORT_PATH) -> Path: path = Path(directory) / self.get_report_name( self.build_name, self.pr_number or normalize_string(self.head_ref) ) path.write_text( json.dumps( { "build_name": self.build_name, "log_url": self.log_url, "build_urls": self.build_urls, "version": self.version, "status": self.status, "elapsed_seconds": self.elapsed_seconds, "job_api_url": self.job_api_url, "pr_number": self.pr_number, "head_ref": self.head_ref, } ), encoding="utf-8", ) # TODO: remove after the artifacts are in S3 completely env_path = Path(os.getenv("GITHUB_ENV", "/dev/null")) with env_path.open("a", encoding="utf-8") as ef: ef.write(f"BUILD_URLS={path.stem}") return path BuildResults = List[BuildResult] class ReportColorTheme: class ReportColor: yellow = "#FFB400" red = "#F00" green = "#0A0" blue = "#00B4FF" default = (ReportColor.green, ReportColor.red, ReportColor.yellow) ColorTheme = Tuple[str, str, str] def _format_header( header: str, branch_name: str, branch_url: Optional[str] = None ) -> str: # Following line does not lower CI->Ci and SQLancer->Sqlancer. It only # capitalizes the first letter and doesn't touch the rest of the word result = " ".join([w[0].upper() + w[1:] for w in header.split(" ") if w]) result = result.replace("Clickhouse", "ClickHouse") result = result.replace("clickhouse", "ClickHouse") if "ClickHouse" not in result: result = f"ClickHouse {result}" if branch_url: result = f'{result} for {branch_name}' else: result = f"{result} for {branch_name}" return result def _get_status_style(status: str, colortheme: Optional[ColorTheme] = None) -> str: ok_statuses = (OK, SUCCESS, "PASSED") fail_statuses = (FAIL, FAILURE, ERROR, "FAILED", "Timeout", "NOT_FAILED") if colortheme is None: colortheme = ReportColorTheme.default style = "font-weight: bold;" if status in ok_statuses: style += f"color: {colortheme[0]};" elif status in fail_statuses: style += f"color: {colortheme[1]};" else: style += f"color: {colortheme[2]};" return style def _get_html_url_name(url): base_name = "" if isinstance(url, str): base_name = os.path.basename(url) if isinstance(url, tuple): base_name = url[1] if "?" in base_name: base_name = base_name.split("?")[0] if base_name is not None: return base_name.replace("%2B", "+").replace("%20", " ") return None def _get_html_url(url): href = None name = None if isinstance(url, str): href, name = url, _get_html_url_name(url) if isinstance(url, tuple): href, name = url[0], _get_html_url_name(url) if href and name: return f'{_get_html_url_name(url)}' return "" def create_test_html_report( header: str, test_results: TestResults, raw_log_url: str, task_url: str, job_url: str, branch_url: str, branch_name: str, commit_url: str, additional_urls: Optional[List[str]] = None, statuscolors: Optional[ColorTheme] = None, ) -> str: if additional_urls is None: additional_urls = [] if test_results: rows_part = [] num_fails = 0 has_test_time = False has_log_urls = False # Display entires with logs at the top (they correspond to failed tests) test_results.sort( key=lambda result: result.raw_logs is None and result.log_files is None ) for test_result in test_results: colspan = 0 if test_result.log_files is not None: has_log_urls = True row = [] if test_result.raw_logs is not None: row.append('') else: row.append("") row.append(f"{test_result.name}") colspan += 1 style = _get_status_style(test_result.status, colortheme=statuscolors) # Allow to quickly scroll to the first failure. fail_id = "" has_error = test_result.status in ("FAIL", "NOT_FAILED") if has_error: num_fails = num_fails + 1 fail_id = f'id="fail{num_fails}" ' row.append(f'{test_result.status}') colspan += 1 if test_result.time is not None: has_test_time = True row.append(f"{test_result.time}") colspan += 1 if test_result.log_urls is not None: has_log_urls = True test_logs_html = "
".join( [_get_html_url(url) for url in test_result.log_urls] ) row.append(f"{test_logs_html}") colspan += 1 row.append("") rows_part.append("\n".join(row)) if test_result.raw_logs is not None: raw_logs = escape(test_result.raw_logs) row_raw_logs = ( '' f'
{raw_logs}
' "" ) rows_part.append(row_raw_logs) headers = BASE_HEADERS.copy() if has_test_time: headers.append("Test time, sec.") if has_log_urls: headers.append("Logs") headers_html = "".join(["" + h + "" for h in headers]) test_part = HTML_TEST_PART.format(headers=headers_html, rows="".join(rows_part)) else: test_part = "" additional_html_urls = " ".join( [_get_html_url(url) for url in sorted(additional_urls, key=_get_html_url_name)] ) raw_log_name = os.path.basename(raw_log_url) if "?" in raw_log_name: raw_log_name = raw_log_name.split("?")[0] html = HTML_BASE_TEST_TEMPLATE.format( title=_format_header(header, branch_name), header=_format_header(header, branch_name, branch_url), raw_log_name=raw_log_name, raw_log_url=raw_log_url, task_url=task_url, job_url=job_url, test_part=test_part, branch_name=branch_name, commit_url=commit_url, additional_urls=additional_html_urls, ) return html HTML_BASE_BUILD_TEMPLATE = ( f"{HEAD_HTML_TEMPLATE}" """ {rows}
Config/job name Compiler Build type Version Sanitizer Coverage Status Build log Build time Artifacts Comment
""" f"{FOOTER_HTML_TEMPLATE}" ) LINK_TEMPLATE = '{text}' def create_build_html_report( header: str, build_results: BuildResults, task_url: str, branch_url: str, branch_name: str, commit_url: str, ) -> str: rows = [] for build_result in build_results: for artifact_urls in build_result.grouped_urls: row = [""] row.append( f"{build_result.build_name}
{build_result.job_link}" ) row.append(f"{build_result.compiler}") if build_result.debug_build: row.append("debug") else: row.append("relwithdebuginfo") row.append(f"{build_result.version}") if build_result.sanitizer: row.append(f"{build_result.sanitizer}") else: row.append("none") row.append(f"{build_result.coverage}") if build_result.status: style = _get_status_style(build_result.status) row.append(f'{build_result.status}') else: style = _get_status_style(ERROR) row.append(f'error') row.append(f'link') delta = "unknown" if build_result.elapsed_seconds: delta = str(datetime.timedelta(seconds=build_result.elapsed_seconds)) row.append(f"{delta}") links = [] link_separator = "
" if artifact_urls: for artifact_url in artifact_urls: links.append( LINK_TEMPLATE.format( text=_get_html_url_name(artifact_url), url=artifact_url ) ) row.append(f"{link_separator.join(links)}") comment = build_result.comment if ( build_result.build_config is not None and build_result.build_config.sparse_checkout ): comment += " (note: sparse checkout is used, see update-submodules.sh)" row.append(f"{comment}") row.append("") rows.append("".join(row)) return HTML_BASE_BUILD_TEMPLATE.format( title=_format_header(header, branch_name), header=_format_header(header, branch_name, branch_url), rows="".join(rows), task_url=task_url, branch_name=branch_name, commit_url=commit_url, )