#!/usr/bin/env python3 import csv import json import logging import time from collections import defaultdict from dataclasses import asdict, dataclass from pathlib import Path from typing import Dict, List, Optional, Union from github import Github from github.Commit import Commit from github.CommitStatus import CommitStatus from github.GithubException import GithubException from github.GithubObject import NotSet from github.IssueComment import IssueComment from github.Repository import Repository from ci_config import CI from env_helper import GITHUB_REPOSITORY, GITHUB_UPSTREAM_REPOSITORY, TEMP_PATH from lambda_shared_package.lambda_shared.pr import Labels from pr_info import PRInfo from report import ( ERROR, FAILURE, PENDING, SUCCESS, StatusType, TestResult, TestResults, get_status, get_worst_status, ) from s3_helper import S3Helper from upload_result_helper import upload_results RETRY = 5 CommitStatuses = List[CommitStatus] GH_REPO = None # type: Optional[Repository] STATUS_FILE_PATH = Path(TEMP_PATH) / "status.json" class RerunHelper: def __init__(self, commit: Commit, check_name: str): self.check_name = check_name self.commit = commit self.statuses = get_commit_filtered_statuses(commit) def is_already_finished_by_status(self) -> bool: # currently we agree even for failed statuses for status in self.statuses: if self.check_name in status.context and status.state in ( SUCCESS, FAILURE, ): return True return False def get_finished_status(self) -> Optional[CommitStatus]: for status in self.statuses: if self.check_name in status.context: return status return None def get_commit(gh: Github, commit_sha: str, retry_count: int = RETRY) -> Commit: for i in range(retry_count): try: repo = get_repo(gh) commit = repo.get_commit(commit_sha) break except Exception as ex: if i == retry_count - 1: raise ex time.sleep(i) return commit def post_commit_status( commit: Commit, state: StatusType, # do not change it, it MUST be StatusType and nothing else report_url: Optional[str] = None, description: Optional[str] = None, check_name: Optional[str] = None, pr_info: Optional[PRInfo] = None, dump_to_file: bool = False, ) -> CommitStatus: """The parameters are given in the same order as for commit.create_status, if an optional parameter `pr_info` is given, the `set_status_comment` functions is invoked to add or update the comment with statuses overview""" for i in range(RETRY): try: commit_status = commit.create_status( state=state, target_url=report_url if report_url is not None else NotSet, description=description if description is not None else NotSet, context=check_name if check_name is not None else NotSet, ) break except Exception as ex: if i == RETRY - 1: raise ex time.sleep(i) if pr_info: status_updated = False for i in range(RETRY): try: set_status_comment(commit, pr_info) status_updated = True break except Exception as ex: logging.warning( "Failed to update the status commit, will retry %s times: %s", RETRY - i - 1, ex, ) if not status_updated: logging.error("Failed to update the status comment, continue anyway") if dump_to_file: assert pr_info CommitStatusData( status=state, description=description or "", report_url=report_url or "", sha=pr_info.sha, pr_num=pr_info.number, ).dump_status() return commit_status STATUS_ICON_MAP = defaultdict( str, { ERROR: "❌", FAILURE: "❌", PENDING: "⏳", SUCCESS: "✅", }, ) def set_status_comment(commit: Commit, pr_info: PRInfo) -> None: """It adds or updates the comment status to all Pull Requests but for release one, so the method does nothing for simple pushes and pull requests with `release`/`release-lts` labels""" if pr_info.is_merge_queue: # skip report creation for the MQ return # to reduce number of parameters, the Github is constructed on the fly gh = Github() gh.__requester = commit._requester # type:ignore #pylint:disable=protected-access repo = get_repo(gh) statuses = sorted(get_commit_filtered_statuses(commit), key=lambda x: x.context) if not statuses: return if not [status for status in statuses if status.context == CI.StatusNames.CI]: # This is the case, when some statuses already exist for the check, # but not the StatusNames.CI. We should create it as pending. # W/o pr_info to avoid recursion, and yes, one extra create_ci_report post_commit_status( commit, PENDING, create_ci_report(pr_info, statuses), "The report for running CI", CI.StatusNames.CI, ) # We update the report in generate_status_comment function, so do it each # run, even in the release PRs and normal pushes comment_body = generate_status_comment(pr_info, statuses) # We post the comment only to normal and backport PRs if pr_info.number == 0 or pr_info.labels.intersection({"release", "release-lts"}): return comment_service_header = comment_body.split("\n", 1)[0] comment = None # type: Optional[IssueComment] pr = repo.get_pull(pr_info.number) for ic in pr.get_issue_comments(): if ic.body.startswith(comment_service_header): comment = ic break if comment is None: pr.create_issue_comment(comment_body) return if comment.body == comment_body: logging.info("The status comment is already updated, no needs to change it") return comment.edit(comment_body) def generate_status_comment(pr_info: PRInfo, statuses: CommitStatuses) -> str: """The method generates the comment body, as well it updates the CI report""" report_url = create_ci_report(pr_info, statuses) worst_state = get_worst_state(statuses) comment_body = ( f"\n" f"*This is an automated comment for commit {pr_info.sha} with " f"description of existing statuses. It's updated for the latest CI running*\n\n" f"[{STATUS_ICON_MAP[worst_state]} Click here]({report_url}) to open a full report in a separate page\n" f"\n" ) # group checks by the name to get the worst one per each grouped_statuses = {} # type: Dict[CI.CheckDescription, CommitStatuses] for status in statuses: cd = None for c in CI.CHECK_DESCRIPTIONS: if c.match_func(status.context): cd = c break if cd is None or cd == CI.CHECK_DESCRIPTIONS[-1]: # This is the case for either non-found description or a fallback cd = CI.CheckDescription( status.context, CI.CHECK_DESCRIPTIONS[-1].description, CI.CHECK_DESCRIPTIONS[-1].match_func, ) if cd in grouped_statuses: grouped_statuses[cd].append(status) else: grouped_statuses[cd] = [status] table_header = ( "\n" "\n" "\n" ) table_footer = "\n
Check nameDescriptionStatus
\n" details_header = "
Successful checks\n" details_footer = "
\n" visible_table_rows = [] # type: List[str] hidden_table_rows = [] # type: List[str] for desc, gs in grouped_statuses.items(): state = get_worst_state(gs) state_text = f"{STATUS_ICON_MAP[state]} {state}" # take the first target_url with the worst state for status in gs: if status.target_url and status.state == state: state_text = f'{state_text}' break table_row = ( f"{desc.name}{desc.description}" f"{state_text}\n" ) if state == SUCCESS: hidden_table_rows.append(table_row) else: visible_table_rows.append(table_row) result = [comment_body] if visible_table_rows: visible_table_rows.sort() result.append(table_header) result.extend(visible_table_rows) result.append(table_footer) if hidden_table_rows: hidden_table_rows.sort() result.append(details_header) result.append(table_header) result.extend(hidden_table_rows) result.append(table_footer) result.append(details_footer) return "".join(result) def get_worst_state(statuses: CommitStatuses) -> StatusType: return get_worst_status(status.state for status in statuses) def create_ci_report(pr_info: PRInfo, statuses: CommitStatuses) -> str: """The function converst the statuses to TestResults and uploads the report to S3 tests bucket. Then it returns the URL""" test_results = [] # type: TestResults for status in statuses: log_urls = [] if status.target_url is not None: log_urls.append(status.target_url) raw_logs = status.description or None test_results.append( TestResult( status.context, status.state, log_urls=log_urls, raw_logs=raw_logs ) ) return upload_results( S3Helper(), pr_info.number, pr_info.sha, test_results, [], CI.StatusNames.CI ) def post_commit_status_to_file( file_path: Path, description: str, state: str, report_url: str ) -> None: if file_path.exists(): raise FileExistsError(f'File "{file_path}" already exists!') with open(file_path, "w", encoding="utf-8") as f: out = csv.writer(f, delimiter="\t") out.writerow([state, report_url, description]) @dataclass class CommitStatusData: """ if u about to add/remove fields in this class be causious that it dumps/loads to/from files (see it's method) - you might want to add default values for new fields so that it won't break with old files """ status: str report_url: str description: str sha: str = "deadbeaf" pr_num: int = -1 @classmethod def _filter_dict(cls, data: dict) -> Dict: return {k: v for k, v in data.items() if k in cls.__annotations__.keys()} @classmethod def load_from_file(cls, file_path: Union[Path, str]): # type: ignore res = {} with open(file_path, "r", encoding="utf-8") as json_file: res = json.load(json_file) return CommitStatusData(**cls._filter_dict(res)) @classmethod def load_status(cls): # type: ignore return cls.load_from_file(STATUS_FILE_PATH) @classmethod def exist(cls) -> bool: return STATUS_FILE_PATH.is_file() def dump_status(self) -> None: STATUS_FILE_PATH.parent.mkdir(parents=True, exist_ok=True) self.dump_to_file(STATUS_FILE_PATH) def dump_to_file(self, file_path: Union[Path, str]) -> None: file_path = Path(file_path) or STATUS_FILE_PATH with open(file_path, "w", encoding="utf-8") as json_file: json.dump(asdict(self), json_file) def is_ok(self): return self.status == SUCCESS def is_failure(self): return self.status == FAILURE @staticmethod def cleanup(): STATUS_FILE_PATH.unlink(missing_ok=True) def get_commit_filtered_statuses(commit: Commit) -> CommitStatuses: """ Squash statuses to latest state 1. context="first", state=SUCCESS, update_time=1 2. context="second", state=SUCCESS, update_time=2 3. context="first", stat=FAILURE, update_time=3 =========> 1. context="second", state=SUCCESS 2. context="first", stat=FAILURE """ filtered = {} for status in sorted(commit.get_statuses(), key=lambda x: x.updated_at): filtered[status.context] = status return list(filtered.values()) def get_repo(gh: Github) -> Repository: global GH_REPO if GH_REPO is not None: return GH_REPO GH_REPO = gh.get_repo(GITHUB_REPOSITORY) return GH_REPO def remove_labels(gh: Github, pr_info: PRInfo, labels_names: List[str]) -> None: repo = get_repo(gh) pull_request = repo.get_pull(pr_info.number) for label in labels_names: try: pull_request.remove_from_labels(label) except GithubException as exc: if not ( exc.status == 404 and isinstance(exc.data, dict) and exc.data.get("message", "") == "Label does not exist" ): raise logging.warning( "The label '%s' does not exist in PR #%s", pr_info.number, label ) pr_info.labels.remove(label) def post_labels(gh: Github, pr_info: PRInfo, labels_names: List[str]) -> None: repo = get_repo(gh) pull_request = repo.get_pull(pr_info.number) for label in labels_names: pull_request.add_to_labels(label) pr_info.labels.add(label) def format_description(description: str) -> str: if len(description) > 140: description = description[:137] + "..." return description def set_mergeable_check( commit: Commit, description: str = "", state: StatusType = SUCCESS, ) -> CommitStatus: report_url = "" return post_commit_status( commit, state, report_url, format_description(description), CI.StatusNames.MERGEABLE, ) def update_mergeable_check(commit: Commit, pr_info: PRInfo, check_name: str) -> None: "check if the check_name in REQUIRED_CHECKS and then trigger update" not_run = ( pr_info.labels.intersection({Labels.SKIP_MERGEABLE_CHECK, Labels.RELEASE}) or not CI.is_required(check_name) or pr_info.release_pr or pr_info.number == 0 ) if not_run: # Let's avoid unnecessary work return logging.info("Update Mergeable Check by %s", check_name) statuses = get_commit_filtered_statuses(commit) trigger_mergeable_check(commit, statuses) def trigger_mergeable_check( commit: Commit, statuses: CommitStatuses, set_if_green: bool = False, workflow_failed: bool = False, ) -> StatusType: """calculate and update StatusNames.MERGEABLE""" required_checks = [status for status in statuses if CI.is_required(status.context)] mergeable_status = None for status in statuses: if status.context == CI.StatusNames.MERGEABLE: mergeable_status = status break success = [] fail = [] pending = [] for status in required_checks: if status.state == SUCCESS: success.append(status.context) elif status.state == PENDING: pending.append(status.context) else: fail.append(status.context) state: StatusType = SUCCESS if fail: description = "failed: " + ", ".join(fail) state = FAILURE elif workflow_failed: description = "check workflow failures" state = FAILURE elif pending: description = "pending: " + ", ".join(pending) state = PENDING else: # all good description = ", ".join(success) description = format_description(description) if not set_if_green and state == SUCCESS: # do not set green Mergeable Check status pass else: if mergeable_status is None or mergeable_status.description != description: set_mergeable_check(commit, description, state) return state def update_upstream_sync_status( upstream_pr_number: int, sync_pr_number: int, gh: Github, state: StatusType, can_set_green_mergeable_status: bool = False, ) -> None: upstream_repo = gh.get_repo(GITHUB_UPSTREAM_REPOSITORY) upstream_pr = upstream_repo.get_pull(upstream_pr_number) sync_repo = gh.get_repo(GITHUB_REPOSITORY) sync_pr = sync_repo.get_pull(sync_pr_number) # Find the commit that is in both repos, upstream and cloud sync_commits = sync_pr.get_commits().reversed upstream_commits = upstream_pr.get_commits().reversed # Github objects are compared by _url attribute. We can't compare them directly and # should compare commits by SHA1 upstream_shas = [c.sha for c in upstream_commits] logging.info("Commits in upstream PR:\n %s", ", ".join(upstream_shas)) sync_shas = [c.sha for c in sync_commits] logging.info("Commits in sync PR:\n %s", ", ".join(reversed(sync_shas))) # find latest synced commit last_synced_upstream_commit = None for commit in upstream_commits: if commit.sha in sync_shas: last_synced_upstream_commit = commit break assert last_synced_upstream_commit sync_status = get_status(state) logging.info( "Using commit %s to post the %s status `%s`: [%s]", last_synced_upstream_commit.sha, sync_status, CI.StatusNames.SYNC, "", ) post_commit_status( last_synced_upstream_commit, sync_status, "", "", CI.StatusNames.SYNC, ) trigger_mergeable_check( last_synced_upstream_commit, get_commit_filtered_statuses(last_synced_upstream_commit), set_if_green=can_set_green_mergeable_status, )