# -*- coding: utf-8 -*-
import csv
import datetime
import json
import logging
import os
from ast import literal_eval
from dataclasses import asdict, dataclass
from html import escape
from pathlib import Path
from typing import (
Dict,
Final,
Iterable,
List,
Literal,
Optional,
Sequence,
Tuple,
Union,
)
from build_download_helper import get_gh_api
from ci_config import CI_CONFIG, BuildConfig
from ci_utils import normalize_string
from env_helper import REPORT_PATH, TEMP_PATH
logger = logging.getLogger(__name__)
ERROR: Final = "error"
FAILURE: Final = "failure"
PENDING: Final = "pending"
SUCCESS: Final = "success"
OK: Final = "OK"
FAIL: Final = "FAIL"
SKIPPED: Final = "SKIPPED"
StatusType = Literal["error", "failure", "pending", "success"]
STATUSES = [ERROR, FAILURE, PENDING, SUCCESS] # type: List[StatusType]
# The order of statuses from the worst to the best
def _state_rank(status: str) -> int:
"return the index of status or index of SUCCESS in case of wrong status"
try:
return STATUSES.index(status) # type: ignore
except ValueError:
return 3
def get_worst_status(statuses: Iterable[str]) -> StatusType:
worst_status = SUCCESS # type: StatusType
for status in statuses:
ind = _state_rank(status)
if ind < _state_rank(worst_status):
worst_status = STATUSES[ind]
if worst_status == ERROR:
break
return worst_status
### BEST FRONTEND PRACTICES BELOW
HEAD_HTML_TEMPLATE = """
🌚🌞
{header}
"""
FOOTER_HTML_TEMPLATE = """
"""
HTML_BASE_TEST_TEMPLATE = (
f"{HEAD_HTML_TEMPLATE}"
"""
{raw_log_name}
Commit
{additional_urls}
Task (github actions)
Job (github actions)
{test_part}
"""
f"{FOOTER_HTML_TEMPLATE}"
)
HTML_TEST_PART = """
"""
BASE_HEADERS = ["Test name", "Test status"]
JOB_REPORT_FILE = Path(TEMP_PATH) / "job_report.json"
@dataclass
class TestResult:
name: str
status: str
# the following fields are optional
time: Optional[float] = None
log_files: Optional[Union[Sequence[str], Sequence[Path]]] = None
raw_logs: Optional[str] = None
# the field for uploaded logs URLs
log_urls: Optional[Sequence[str]] = None
def set_raw_logs(self, raw_logs: str) -> None:
self.raw_logs = raw_logs
def set_log_files(self, log_files_literal: str) -> None:
self.log_files = [] # type: Optional[List[Path]]
log_paths = literal_eval(log_files_literal)
if not isinstance(log_paths, list):
raise ValueError(
f"Malformed input: must be a list literal: {log_files_literal}"
)
for log_path in log_paths:
assert Path(log_path).exists(), log_path
self.log_files.append(log_path)
@staticmethod
def create_check_timeout_expired(timeout: float) -> "TestResult":
return TestResult("Check timeout expired", "FAIL", timeout)
TestResults = List[TestResult]
@dataclass
class JobReport:
status: str
description: str
test_results: TestResults
start_time: str
duration: float
additional_files: Union[Sequence[str], Sequence[Path]]
# clcikhouse version, build job only
version: str = ""
# checkname to set in commit status, set if differs from jjob name
check_name: str = ""
# directory with artifacts to upload on s3
build_dir_for_upload: Union[Path, str] = ""
# if False no GH commit status will be created by CI
need_commit_status: bool = True
def __post_init__(self):
assert self.status in (SUCCESS, ERROR, FAILURE, PENDING)
@classmethod
def exist(cls) -> bool:
return JOB_REPORT_FILE.is_file()
@classmethod
def load(cls, from_file=None): # type: ignore
res = {}
from_file = from_file or JOB_REPORT_FILE
with open(from_file, "r", encoding="utf-8") as json_file:
res = json.load(json_file)
# Deserialize the nested lists of TestResult
test_results_data = res.get("test_results", [])
test_results = [TestResult(**result) for result in test_results_data]
del res["test_results"]
return JobReport(test_results=test_results, **res)
@classmethod
def cleanup(cls):
if JOB_REPORT_FILE.exists():
JOB_REPORT_FILE.unlink()
def dump(self, to_file=None):
def path_converter(obj):
if isinstance(obj, Path):
return str(obj)
raise TypeError("Type not serializable")
to_file = to_file or JOB_REPORT_FILE
with open(to_file, "w", encoding="utf-8") as json_file:
json.dump(asdict(self), json_file, default=path_converter, indent=2)
def read_test_results(results_path: Path, with_raw_logs: bool = True) -> TestResults:
results = [] # type: TestResults
with open(results_path, "r", encoding="utf-8") as descriptor:
reader = csv.reader(descriptor, delimiter="\t")
for line in reader:
name = line[0]
status = line[1]
time = None
if len(line) >= 3 and line[2] and line[2] != "\\N":
# The value can be emtpy, but when it's not,
# it's the time spent on the test
try:
time = float(line[2])
except ValueError:
pass
result = TestResult(name, status, time)
if len(line) == 4 and line[3]:
# The value can be emtpy, but when it's not,
# the 4th value is a pythonic list, e.g. ['file1', 'file2']
if with_raw_logs:
# Python does not support TSV, so we unescape manually
result.set_raw_logs(
line[3].replace("\\t", "\t").replace("\\n", "\n")
)
else:
result.set_log_files(line[3])
results.append(result)
return results
@dataclass
class BuildResult:
build_name: str
log_url: str
build_urls: List[str]
version: str
status: str
elapsed_seconds: int
job_api_url: str
pr_number: int = 0
head_ref: str = "dummy_branch_name"
_job_name: Optional[str] = None
_job_html_url: Optional[str] = None
_job_html_link: Optional[str] = None
_grouped_urls: Optional[List[List[str]]] = None
@classmethod
def cleanup(cls):
if Path(REPORT_PATH).exists():
for file in Path(REPORT_PATH).iterdir():
if "build_report" in file.name and file.name.endswith(".json"):
file.unlink()
@classmethod
def load(cls, build_name: str, pr_number: int, head_ref: str): # type: ignore
"""
loads report from a report file matched with given @pr_number and/or a @head_ref
"""
report_path = Path(REPORT_PATH) / BuildResult.get_report_name(
build_name, pr_number or head_ref
)
return cls.load_from_file(report_path)
@classmethod
def load_any(cls, build_name: str, pr_number: int, head_ref: str): # type: ignore
"""
loads report from suitable report file with the following priority:
1. report from PR with the same @pr_number
2. report from branch with the same @head_ref
3. report from the master
4. any other report
"""
reports = []
for file in Path(REPORT_PATH).iterdir():
if f"{build_name}.json" in file.name:
reports.append(file)
if not reports:
return None
file_path = None
for file in reports:
if pr_number and f"_{pr_number}_" in file.name:
file_path = file
break
if f"_{head_ref}_" in file.name:
file_path = file
break
if "_master_" in file.name:
file_path = file
break
return cls.load_from_file(file_path or reports[-1])
@classmethod
def load_from_file(cls, file: Union[Path, str]): # type: ignore
if not Path(file).exists():
return None
with open(file, "r", encoding="utf-8") as json_file:
res = json.load(json_file)
return BuildResult(**res)
def as_json(self) -> str:
return json.dumps(asdict(self), indent=2)
@property
def build_config(self) -> Optional[BuildConfig]:
return CI_CONFIG.build_config.get(self.build_name, None)
@property
def comment(self) -> str:
if self.build_config is None:
return self._wrong_config_message
return self.build_config.comment
@property
def compiler(self) -> str:
if self.build_config is None:
return self._wrong_config_message
return self.build_config.compiler
@property
def debug_build(self) -> bool:
if self.build_config is None:
return False
return self.build_config.debug_build
@property
def sanitizer(self) -> str:
if self.build_config is None:
return self._wrong_config_message
return self.build_config.sanitizer
@property
def coverage(self) -> str:
if self.build_config is None:
return self._wrong_config_message
return str(self.build_config.coverage)
@property
def grouped_urls(self) -> List[List[str]]:
"Combine and preserve build_urls by artifact types"
if self._grouped_urls is not None:
return self._grouped_urls
if not self.build_urls:
self._grouped_urls = [[]]
return self._grouped_urls
artifacts_groups = {
"apk": [],
"deb": [],
"binary": [],
"tgz": [],
"rpm": [],
"performance": [],
} # type: Dict[str, List[str]]
for url in self.build_urls:
if url.endswith("performance.tar.zst"):
artifacts_groups["performance"].append(url)
elif (
url.endswith(".deb")
or url.endswith(".buildinfo")
or url.endswith(".changes")
or url.endswith(".tar.gz")
):
artifacts_groups["deb"].append(url)
elif url.endswith(".apk"):
artifacts_groups["apk"].append(url)
elif url.endswith(".rpm"):
artifacts_groups["rpm"].append(url)
elif url.endswith(".tgz") or url.endswith(".tgz.sha512"):
artifacts_groups["tgz"].append(url)
else:
artifacts_groups["binary"].append(url)
self._grouped_urls = [urls for urls in artifacts_groups.values() if urls]
return self._grouped_urls
@property
def _wrong_config_message(self) -> str:
return "missing"
@property
def is_missing(self) -> bool:
"The report is created for missing json file"
return not (
self.log_url
or self.build_urls
or self.version != "missing"
or self.status != ERROR
)
@property
def job_link(self) -> str:
if self._job_html_link is not None:
return self._job_html_link
self._job_html_link = f'
{self.job_name}'
return self._job_html_link
@property
def job_html_url(self) -> str:
if self._job_html_url is not None:
return self._job_html_url
self._set_properties()
return self._job_html_url or ""
@property
def job_name(self) -> str:
if self._job_name is not None:
return self._job_name
self._set_properties()
return self._job_name or ""
@job_name.setter
def job_name(self, job_name: str) -> None:
self._job_name = job_name
def _set_properties(self) -> None:
if all(p is not None for p in (self._job_name, self._job_html_url)):
return
job_data = {}
# quick check @self.job_api_url is valid url before request. it's set to "missing" for dummy BuildResult
if "http" in self.job_api_url:
try:
job_data = get_gh_api(self.job_api_url).json()
except Exception:
pass
# job_name can be set manually
self._job_name = self._job_name or job_data.get("name", "unknown")
self._job_html_url = job_data.get("html_url", "")
@staticmethod
def get_report_name(name: str, suffix: Union[str, int]) -> Path:
assert "/" not in str(suffix)
return Path(f"build_report_{suffix}_{name}.json")
@staticmethod
def missing_result(build_name: str) -> "BuildResult":
return BuildResult(build_name, "", [], "missing", ERROR, 0, "missing")
def write_json(self, directory: Union[Path, str] = REPORT_PATH) -> Path:
path = Path(directory) / self.get_report_name(
self.build_name, self.pr_number or normalize_string(self.head_ref)
)
path.write_text(
json.dumps(
{
"build_name": self.build_name,
"log_url": self.log_url,
"build_urls": self.build_urls,
"version": self.version,
"status": self.status,
"elapsed_seconds": self.elapsed_seconds,
"job_api_url": self.job_api_url,
"pr_number": self.pr_number,
"head_ref": self.head_ref,
}
),
encoding="utf-8",
)
# TODO: remove after the artifacts are in S3 completely
env_path = Path(os.getenv("GITHUB_ENV", "/dev/null"))
with env_path.open("a", encoding="utf-8") as ef:
ef.write(f"BUILD_URLS={path.stem}")
return path
BuildResults = List[BuildResult]
class ReportColorTheme:
class ReportColor:
yellow = "#FFB400"
red = "#F00"
green = "#0A0"
blue = "#00B4FF"
default = (ReportColor.green, ReportColor.red, ReportColor.yellow)
ColorTheme = Tuple[str, str, str]
def _format_header(
header: str, branch_name: str, branch_url: Optional[str] = None
) -> str:
# Following line does not lower CI->Ci and SQLancer->Sqlancer. It only
# capitalizes the first letter and doesn't touch the rest of the word
result = " ".join([w[0].upper() + w[1:] for w in header.split(" ") if w])
result = result.replace("Clickhouse", "ClickHouse")
result = result.replace("clickhouse", "ClickHouse")
if "ClickHouse" not in result:
result = f"ClickHouse {result}"
if branch_url:
result = f'{result} for
{branch_name}'
else:
result = f"{result} for {branch_name}"
return result
def _get_status_style(status: str, colortheme: Optional[ColorTheme] = None) -> str:
ok_statuses = (OK, SUCCESS, "PASSED")
fail_statuses = (FAIL, FAILURE, ERROR, "FAILED", "Timeout", "NOT_FAILED")
if colortheme is None:
colortheme = ReportColorTheme.default
style = "font-weight: bold;"
if status in ok_statuses:
style += f"color: {colortheme[0]};"
elif status in fail_statuses:
style += f"color: {colortheme[1]};"
else:
style += f"color: {colortheme[2]};"
return style
def _get_html_url_name(url):
base_name = ""
if isinstance(url, str):
base_name = os.path.basename(url)
if isinstance(url, tuple):
base_name = url[1]
if "?" in base_name:
base_name = base_name.split("?")[0]
if base_name is not None:
return base_name.replace("%2B", "+").replace("%20", " ")
return None
def _get_html_url(url):
href = None
name = None
if isinstance(url, str):
href, name = url, _get_html_url_name(url)
if isinstance(url, tuple):
href, name = url[0], _get_html_url_name(url)
if href and name:
return f'
{_get_html_url_name(url)}'
return ""
def create_test_html_report(
header: str,
test_results: TestResults,
raw_log_url: str,
task_url: str,
job_url: str,
branch_url: str,
branch_name: str,
commit_url: str,
additional_urls: Optional[List[str]] = None,
statuscolors: Optional[ColorTheme] = None,
) -> str:
if additional_urls is None:
additional_urls = []
if test_results:
rows_part = []
num_fails = 0
has_test_time = False
has_log_urls = False
# Display entires with logs at the top (they correspond to failed tests)
test_results.sort(
key=lambda result: result.raw_logs is None and result.log_files is None
)
for test_result in test_results:
colspan = 0
if test_result.log_files is not None:
has_log_urls = True
row = []
if test_result.raw_logs is not None:
row.append('
')
else:
row.append("
")
row.append(f"{test_result.name} | ")
colspan += 1
style = _get_status_style(test_result.status, colortheme=statuscolors)
# Allow to quickly scroll to the first failure.
fail_id = ""
has_error = test_result.status in ("FAIL", "NOT_FAILED")
if has_error:
num_fails = num_fails + 1
fail_id = f'id="fail{num_fails}" '
row.append(f'{test_result.status} | ')
colspan += 1
if test_result.time is not None:
has_test_time = True
row.append(f"{test_result.time} | ")
colspan += 1
if test_result.log_urls is not None:
has_log_urls = True
test_logs_html = "
".join(
[_get_html_url(url) for url in test_result.log_urls]
)
row.append(f"{test_logs_html} | ")
colspan += 1
row.append("
")
rows_part.append("\n".join(row))
if test_result.raw_logs is not None:
raw_logs = escape(test_result.raw_logs)
row_raw_logs = (
'
'
f'{raw_logs} | '
"
"
)
rows_part.append(row_raw_logs)
headers = BASE_HEADERS.copy()
if has_test_time:
headers.append("Test time, sec.")
if has_log_urls:
headers.append("Logs")
headers_html = "".join(["
" + h + " | " for h in headers])
test_part = HTML_TEST_PART.format(headers=headers_html, rows="".join(rows_part))
else:
test_part = ""
additional_html_urls = " ".join(
[_get_html_url(url) for url in sorted(additional_urls, key=_get_html_url_name)]
)
raw_log_name = os.path.basename(raw_log_url)
if "?" in raw_log_name:
raw_log_name = raw_log_name.split("?")[0]
html = HTML_BASE_TEST_TEMPLATE.format(
title=_format_header(header, branch_name),
header=_format_header(header, branch_name, branch_url),
raw_log_name=raw_log_name,
raw_log_url=raw_log_url,
task_url=task_url,
job_url=job_url,
test_part=test_part,
branch_name=branch_name,
commit_url=commit_url,
additional_urls=additional_html_urls,
)
return html
HTML_BASE_BUILD_TEMPLATE = (
f"{HEAD_HTML_TEMPLATE}"
"""
Commit
Task (github actions)
Config/job name |
Compiler |
Build type |
Version |
Sanitizer |
Coverage |
Status |
Build log |
Build time |
Artifacts |
Comment |
{rows}
"""
f"{FOOTER_HTML_TEMPLATE}"
)
LINK_TEMPLATE = '
{text}'
def create_build_html_report(
header: str,
build_results: BuildResults,
task_url: str,
branch_url: str,
branch_name: str,
commit_url: str,
) -> str:
rows = []
for build_result in build_results:
for artifact_urls in build_result.grouped_urls:
row = ["
"]
row.append(
f"{build_result.build_name} {build_result.job_link} | "
)
row.append(f"{build_result.compiler} | ")
if build_result.debug_build:
row.append("debug | ")
else:
row.append("relwithdebuginfo | ")
row.append(f"{build_result.version} | ")
if build_result.sanitizer:
row.append(f"{build_result.sanitizer} | ")
else:
row.append("none | ")
row.append(f"{build_result.coverage} | ")
if build_result.status:
style = _get_status_style(build_result.status)
row.append(f'{build_result.status} | ')
else:
style = _get_status_style(ERROR)
row.append(f'error | ')
row.append(f'link | ')
delta = "unknown"
if build_result.elapsed_seconds:
delta = str(datetime.timedelta(seconds=build_result.elapsed_seconds))
row.append(f"{delta} | ")
links = []
link_separator = "
"
if artifact_urls:
for artifact_url in artifact_urls:
links.append(
LINK_TEMPLATE.format(
text=_get_html_url_name(artifact_url), url=artifact_url
)
)
row.append(f"{link_separator.join(links)} | ")
comment = build_result.comment
if (
build_result.build_config is not None
and build_result.build_config.sparse_checkout
):
comment += " (note: sparse checkout is used, see update-submodules.sh)"
row.append(f"{comment} | ")
row.append("
")
rows.append("".join(row))
return HTML_BASE_BUILD_TEMPLATE.format(
title=_format_header(header, branch_name),
header=_format_header(header, branch_name, branch_url),
rows="".join(rows),
task_url=task_url,
branch_name=branch_name,
commit_url=commit_url,
)