ClickHouse/tests/ci/commit_status_helper.py
2022-11-10 13:12:22 +01:00

161 lines
4.6 KiB
Python

#!/usr/bin/env python3
import csv
import os
import time
from typing import List
import logging
from ci_config import CI_CONFIG, REQUIRED_CHECKS
from env_helper import GITHUB_REPOSITORY, GITHUB_RUN_URL
from github import Github
from github.Commit import Commit
from github.CommitStatus import CommitStatus
from pr_info import PRInfo, SKIP_MERGEABLE_CHECK_LABEL
RETRY = 5
CommitStatuses = List[CommitStatus]
def override_status(status: str, check_name: str, invert=False) -> str:
if CI_CONFIG["tests_config"].get(check_name, {}).get("force_tests", False):
return "success"
if invert:
if status == "success":
return "error"
return "success"
return status
def get_commit(gh: Github, commit_sha: str, retry_count: int = RETRY) -> Commit:
for i in range(retry_count):
try:
repo = gh.get_repo(GITHUB_REPOSITORY)
commit = repo.get_commit(commit_sha)
break
except Exception as ex:
if i == retry_count - 1:
raise ex
time.sleep(i)
return commit
def post_commit_status(
gh: Github, sha: str, check_name: str, description: str, state: str, report_url: str
):
for i in range(RETRY):
try:
commit = get_commit(gh, sha, 1)
commit.create_status(
context=check_name,
description=description,
state=state,
target_url=report_url,
)
break
except Exception as ex:
if i == RETRY - 1:
raise ex
time.sleep(i)
def post_commit_status_to_file(
file_path: str, description: str, state: str, report_url: str
):
if os.path.exists(file_path):
raise Exception(f'File "{file_path}" already exists!')
with open(file_path, "w", encoding="utf-8") as f:
out = csv.writer(f, delimiter="\t")
out.writerow([state, report_url, description])
def get_commit_filtered_statuses(commit: Commit) -> CommitStatuses:
"""
Squash statuses to latest state
1. context="first", state="success", update_time=1
2. context="second", state="success", update_time=2
3. context="first", stat="failure", update_time=3
=========>
1. context="second", state="success"
2. context="first", stat="failure"
"""
filtered = {}
for status in sorted(commit.get_statuses(), key=lambda x: x.updated_at):
filtered[status.context] = status
return list(filtered.values())
def remove_labels(gh: Github, pr_info: PRInfo, labels_names: List[str]):
repo = gh.get_repo(GITHUB_REPOSITORY)
pull_request = repo.get_pull(pr_info.number)
for label in labels_names:
pull_request.remove_from_labels(label)
def post_labels(gh: Github, pr_info: PRInfo, labels_names: List[str]):
repo = gh.get_repo(GITHUB_REPOSITORY)
pull_request = repo.get_pull(pr_info.number)
for label in labels_names:
pull_request.add_to_labels(label)
def fail_mergeable_check(commit: Commit, description: str):
commit.create_status(
context="Mergeable Check",
description=description,
state="failure",
target_url=GITHUB_RUN_URL,
)
def reset_mergeable_check(commit: Commit, description: str = ""):
commit.create_status(
context="Mergeable Check",
description=description,
state="success",
target_url=GITHUB_RUN_URL,
)
def update_mergeable_check(gh: Github, pr_info: PRInfo, check_name: str):
if SKIP_MERGEABLE_CHECK_LABEL in pr_info.labels:
return
logging.info("Update Mergeable Check by %s", check_name)
commit = get_commit(gh, pr_info.sha)
checks = {
check.context: check.state
for check in filter(
lambda check: (check.context in REQUIRED_CHECKS),
# get_statuses() returns generator, which cannot be reversed - we need comprehension
# pylint: disable=unnecessary-comprehension
reversed([status for status in commit.get_statuses()]),
)
}
success = []
fail = []
for name, state in checks.items():
if state == "success":
success.append(name)
else:
fail.append(name)
if fail:
description = "failed: " + ", ".join(fail)
if success:
description += "; succeeded: " + ", ".join(success)
if len(description) > 140:
description = description[:137] + "..."
fail_mergeable_check(commit, description)
return
description = ", ".join(success)
if len(description) > 140:
description = description[:137] + "..."
reset_mergeable_check(commit, description)