# -*- coding: utf-8 -*- from ast import literal_eval from dataclasses import dataclass from pathlib import Path from typing import List, Optional, Tuple from html import escape import csv import os import datetime ### BEST FRONTEND PRACTICES BELOW HTML_BASE_TEST_TEMPLATE = """ {title}
🌚🌞

{header}

{test_part} """ HTML_TEST_PART = """ {headers} {rows}
""" BASE_HEADERS = ["Test name", "Test status"] @dataclass class TestResult: name: str status: str # the following fields are optional time: Optional[float] = None log_files: Optional[List[Path]] = None raw_logs: Optional[str] = None # the field for uploaded logs URLs log_urls: Optional[List[str]] = None def set_raw_logs(self, raw_logs: str) -> None: self.raw_logs = raw_logs def set_log_files(self, log_files_literal: str) -> None: self.log_files = [] # type: Optional[List[Path]] log_paths = literal_eval(log_files_literal) if not isinstance(log_paths, list): raise ValueError( f"Malformed input: must be a list literal: {log_files_literal}" ) for log_path in log_paths: file = Path(log_path) assert file.exists(), file self.log_files.append(file) TestResults = List[TestResult] def read_test_results(results_path: Path, with_raw_logs: bool = True) -> TestResults: results = [] # type: TestResults with open(results_path, "r", encoding="utf-8") as descriptor: reader = csv.reader(descriptor, delimiter="\t") for line in reader: name = line[0] status = line[1] time = None if len(line) >= 3 and line[2] and line[2] != "\\N": # The value can be emtpy, but when it's not, # it's the time spent on the test try: time = float(line[2]) except ValueError: pass result = TestResult(name, status, time) if len(line) == 4 and line[3]: # The value can be emtpy, but when it's not, # the 4th value is a pythonic list, e.g. ['file1', 'file2'] if with_raw_logs: # Python does not support TSV, so we unescape manually result.set_raw_logs( line[3].replace("\\t", "\t").replace("\\n", "\n") ) else: result.set_log_files(line[3]) results.append(result) return results @dataclass class BuildResult: compiler: str build_type: str sanitizer: str status: str elapsed_seconds: int comment: str BuildResults = List[BuildResult] class ReportColorTheme: class ReportColor: yellow = "#FFB400" red = "#F00" green = "#0A0" blue = "#00B4FF" default = (ReportColor.green, ReportColor.red, ReportColor.yellow) bugfixcheck = (ReportColor.yellow, ReportColor.blue, ReportColor.blue) ColorTheme = Tuple[str, str, str] def _format_header( header: str, branch_name: str, branch_url: Optional[str] = None ) -> str: # Following line does not lower CI->Ci and SQLancer->Sqlancer. It only # capitalizes the first letter and doesn't touch the rest of the word result = " ".join([w[0].upper() + w[1:] for w in header.split(" ") if w]) result = result.replace("Clickhouse", "ClickHouse") result = result.replace("clickhouse", "ClickHouse") if "ClickHouse" not in result: result = f"ClickHouse {result}" if branch_url: result = f'{result} for {branch_name}' else: result = f"{result} for {branch_name}" return result def _get_status_style(status: str, colortheme: Optional[ColorTheme] = None) -> str: ok_statuses = ("OK", "success", "PASSED") fail_statuses = ("FAIL", "failure", "error", "FAILED", "Timeout", "NOT_FAILED") if colortheme is None: colortheme = ReportColorTheme.default style = "font-weight: bold;" if status in ok_statuses: style += f"color: {colortheme[0]};" elif status in fail_statuses: style += f"color: {colortheme[1]};" else: style += f"color: {colortheme[2]};" return style def _get_html_url_name(url): if isinstance(url, str): return os.path.basename(url).replace("%2B", "+").replace("%20", " ") if isinstance(url, tuple): return url[1].replace("%2B", "+").replace("%20", " ") return None def _get_html_url(url): href = None name = None if isinstance(url, str): href, name = url, _get_html_url_name(url) if isinstance(url, tuple): href, name = url[0], _get_html_url_name(url) if href and name: return f'{_get_html_url_name(url)}' return "" def create_test_html_report( header: str, test_results: TestResults, raw_log_url: str, task_url: str, job_url: str, branch_url: str, branch_name: str, commit_url: str, additional_urls: Optional[List[str]] = None, statuscolors: Optional[ColorTheme] = None, ) -> str: if additional_urls is None: additional_urls = [] if test_results: rows_part = "" num_fails = 0 has_test_time = False has_log_urls = False # Display entires with logs at the top (they correspond to failed tests) test_results.sort( key=lambda result: result.raw_logs is None and result.log_files is None ) for test_result in test_results: colspan = 0 if test_result.log_files is not None: has_log_urls = True row = "" has_error = test_result.status in ("FAIL", "NOT_FAILED") if has_error and test_result.raw_logs is not None: row = '' row += "" + test_result.name + "" colspan += 1 style = _get_status_style(test_result.status, colortheme=statuscolors) # Allow to quickly scroll to the first failure. fail_id = "" if has_error: num_fails = num_fails + 1 fail_id = f'id="fail{num_fails}" ' row += f'{test_result.status}' colspan += 1 if test_result.time is not None: has_test_time = True row += f"{test_result.time}" colspan += 1 if test_result.log_urls is not None: has_log_urls = True test_logs_html = "
".join( [_get_html_url(url) for url in test_result.log_urls] ) row += "" + test_logs_html + "" colspan += 1 row += "" rows_part += row if test_result.raw_logs is not None: raw_logs = escape(test_result.raw_logs) row = ( '' f'
{raw_logs}
' "" ) rows_part += row headers = BASE_HEADERS.copy() if has_test_time: headers.append("Test time, sec.") if has_log_urls: headers.append("Logs") headers_html = "".join(["" + h + "" for h in headers]) test_part = HTML_TEST_PART.format(headers=headers_html, rows=rows_part) else: test_part = "" additional_html_urls = " ".join( [_get_html_url(url) for url in sorted(additional_urls, key=_get_html_url_name)] ) raw_log_name = os.path.basename(raw_log_url) if "?" in raw_log_name: raw_log_name = raw_log_name.split("?")[0] html = HTML_BASE_TEST_TEMPLATE.format( title=_format_header(header, branch_name), header=_format_header(header, branch_name, branch_url), raw_log_name=raw_log_name, raw_log_url=raw_log_url, task_url=task_url, job_url=job_url, test_part=test_part, branch_name=branch_name, commit_url=commit_url, additional_urls=additional_html_urls, ) return html HTML_BASE_BUILD_TEMPLATE = """ {title}

{header}

{rows}
Compiler Build type Sanitizer Status Build log Build time Artifacts Comment
""" LINK_TEMPLATE = '{text}' def create_build_html_report( header: str, build_results: BuildResults, build_logs_urls: List[str], artifact_urls_list: List[List[str]], task_url: str, branch_url: str, branch_name: str, commit_url: str, ) -> str: rows = "" for build_result, build_log_url, artifact_urls in zip( build_results, build_logs_urls, artifact_urls_list ): row = "" row += f"{build_result.compiler}" if build_result.build_type: row += f"{build_result.build_type}" else: row += "relwithdebuginfo" if build_result.sanitizer: row += f"{build_result.sanitizer}" else: row += "none" if build_result.status: style = _get_status_style(build_result.status) row += f'{build_result.status}' else: style = _get_status_style("error") row += f'error' row += f'link' if build_result.elapsed_seconds: delta = datetime.timedelta(seconds=build_result.elapsed_seconds) else: delta = "unknown" # type: ignore row += f"{delta}" links = "" link_separator = "
" if artifact_urls: for artifact_url in artifact_urls: links += LINK_TEMPLATE.format( text=_get_html_url_name(artifact_url), url=artifact_url ) links += link_separator if links: links = links[: -len(link_separator)] row += f"{links}" row += f"{build_result.comment}" row += "" rows += row return HTML_BASE_BUILD_TEMPLATE.format( title=_format_header(header, branch_name), header=_format_header(header, branch_name, branch_url), rows=rows, task_url=task_url, branch_name=branch_name, commit_url=commit_url, )