ClickHouse/tests/ci/ci_utils.py
2024-08-02 18:23:52 +02:00

377 lines
12 KiB
Python

import json
import os
import re
import subprocess
import sys
import time
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Iterator, List, Union, Optional, Sequence, Tuple
import requests
class Envs:
GITHUB_REPOSITORY = os.getenv("GITHUB_REPOSITORY", "ClickHouse/ClickHouse")
WORKFLOW_RESULT_FILE = os.getenv(
"WORKFLOW_RESULT_FILE", "/tmp/workflow_results.json"
)
S3_BUILDS_BUCKET = os.getenv("S3_BUILDS_BUCKET", "clickhouse-builds")
GITHUB_WORKFLOW = os.getenv("GITHUB_WORKFLOW", "")
LABEL_CATEGORIES = {
"pr-backward-incompatible": ["Backward Incompatible Change"],
"pr-bugfix": [
"Bug Fix",
"Bug Fix (user-visible misbehavior in an official stable release)",
"Bug Fix (user-visible misbehaviour in official stable or prestable release)",
"Bug Fix (user-visible misbehavior in official stable or prestable release)",
],
"pr-critical-bugfix": ["Critical Bug Fix (crash, LOGICAL_ERROR, data loss, RBAC)"],
"pr-build": [
"Build/Testing/Packaging Improvement",
"Build Improvement",
"Build/Testing Improvement",
"Build",
"Packaging Improvement",
],
"pr-documentation": [
"Documentation (changelog entry is not required)",
"Documentation",
],
"pr-feature": ["New Feature"],
"pr-improvement": ["Improvement"],
"pr-not-for-changelog": [
"Not for changelog (changelog entry is not required)",
"Not for changelog",
],
"pr-performance": ["Performance Improvement"],
"pr-ci": ["CI Fix or Improvement (changelog entry is not required)"],
}
CATEGORY_TO_LABEL = {
c: lb for lb, categories in LABEL_CATEGORIES.items() for c in categories
}
class WithIter(type):
def __iter__(cls):
return (v for k, v in cls.__dict__.items() if not k.startswith("_"))
@contextmanager
def cd(path: Union[Path, str]) -> Iterator[None]:
oldpwd = os.getcwd()
os.chdir(path)
try:
yield
finally:
os.chdir(oldpwd)
def is_hex(s):
try:
int(s, 16)
return True
except ValueError:
return False
def normalize_string(string: str) -> str:
res = string.lower()
for r in ((" ", "_"), ("(", "_"), (")", "_"), (",", "_"), ("/", "_"), ("-", "_")):
res = res.replace(*r)
return res
class GH:
class ActionsNames:
RunConfig = "RunConfig"
class ActionStatuses:
ERROR = "error"
FAILURE = "failure"
PENDING = "pending"
SUCCESS = "success"
@classmethod
def _get_workflow_results(cls):
if not Path(Envs.WORKFLOW_RESULT_FILE).exists():
print(
f"ERROR: Failed to get workflow results from file [{Envs.WORKFLOW_RESULT_FILE}]"
)
return {}
with open(Envs.WORKFLOW_RESULT_FILE, "r", encoding="utf-8") as json_file:
try:
res = json.load(json_file)
except json.JSONDecodeError as e:
print(f"ERROR: json decoder exception {e}")
json_file.seek(0)
print(" File content:")
print(json_file.read())
return {}
return res
@classmethod
def print_workflow_results(cls):
res = cls._get_workflow_results()
results = [f"{job}: {data['result']}" for job, data in res.items()]
cls.print_in_group("Workflow results", results)
@classmethod
def is_workflow_ok(cls) -> bool:
res = cls._get_workflow_results()
for _job, data in res.items():
if data["result"] == "failure":
return False
return bool(res)
@classmethod
def get_workflow_job_result(cls, wf_job_name: str) -> Optional[str]:
res = cls._get_workflow_results()
if wf_job_name in res:
return res[wf_job_name]["result"] # type: ignore
else:
return None
@staticmethod
def print_in_group(group_name: str, lines: Union[Any, List[Any]]) -> None:
lines = list(lines)
print(f"::group::{group_name}")
for line in lines:
print(line)
print("::endgroup::")
@staticmethod
def get_commit_status_by_name(
token: str, commit_sha: str, status_name: Union[str, Sequence]
) -> str:
assert len(token) == 40
assert len(commit_sha) == 40
assert is_hex(commit_sha)
assert not is_hex(token)
url = f"https://api.github.com/repos/{Envs.GITHUB_REPOSITORY}/commits/{commit_sha}/statuses?per_page={200}"
headers = {
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json",
}
response = requests.get(url, headers=headers, timeout=5)
if isinstance(status_name, str):
status_name = (status_name,)
if response.status_code == 200:
assert "next" not in response.links, "Response truncated"
statuses = response.json()
for status in statuses:
if status["context"] in status_name:
return status["state"] # type: ignore
return ""
@staticmethod
def check_wf_completed(token: str, commit_sha: str) -> bool:
headers = {
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json",
}
url = f"https://api.github.com/repos/{Envs.GITHUB_REPOSITORY}/commits/{commit_sha}/check-runs?per_page={100}"
for i in range(3):
try:
response = requests.get(url, headers=headers, timeout=5)
response.raise_for_status()
# assert "next" not in response.links, "Response truncated"
data = response.json()
assert data["check_runs"], "?"
for check in data["check_runs"]:
if check["status"] != "completed":
print(
f" Check workflow status: Check not completed [{check['name']}]"
)
return False
return True
except Exception as e:
print(f"ERROR: exception after attempt [{i}]: {e}")
time.sleep(1)
return False
@staticmethod
def get_pr_url_by_branch(branch, repo=None):
repo = repo or Envs.GITHUB_REPOSITORY
get_url_cmd = f"gh pr list --repo {repo} --head {branch} --json url --jq '.[0].url' --state open"
url = Shell.get_output(get_url_cmd)
if not url:
print(f"WARNING: No open PR found, branch [{branch}] - search for merged")
get_url_cmd = f"gh pr list --repo {repo} --head {branch} --json url --jq '.[0].url' --state merged"
url = Shell.get_output(get_url_cmd)
if not url:
print(f"ERROR: PR nor found, branch [{branch}]")
return url
@staticmethod
def is_latest_release_branch(branch):
latest_branch = Shell.get_output(
'gh pr list --label release --repo ClickHouse/ClickHouse --search "sort:created" -L1 --json headRefName'
)
return latest_branch == branch
class Shell:
@classmethod
def get_output_or_raise(cls, command):
return cls.get_output(command, strict=True)
@classmethod
def get_output(cls, command, strict=False):
res = subprocess.run(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=strict,
)
return res.stdout.strip()
@classmethod
def check(
cls,
command,
strict=False,
verbose=False,
dry_run=False,
stdin_str=None,
**kwargs,
):
if dry_run:
print(f"Dry-ryn. Would run command [{command}]")
return True
if verbose:
print(f"Run command [{command}]")
proc = subprocess.Popen(
command,
shell=True,
stderr=subprocess.STDOUT,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE if stdin_str else None,
universal_newlines=True,
start_new_session=True,
bufsize=1,
errors="backslashreplace",
**kwargs,
)
if stdin_str:
proc.communicate(input=stdin_str)
elif proc.stdout:
for line in proc.stdout:
sys.stdout.write(line)
proc.wait()
if strict:
assert proc.returncode == 0
return proc.returncode == 0
class Utils:
@staticmethod
def get_failed_tests_number(description: str) -> Optional[int]:
description = description.lower()
pattern = r"fail:\s*(\d+)\s*(?=,|$)"
match = re.search(pattern, description)
if match:
return int(match.group(1))
return None
@staticmethod
def is_killed_with_oom():
if Shell.check(
"sudo dmesg -T | grep -q -e 'Out of memory: Killed process' -e 'oom_reaper: reaped process' -e 'oom-kill:constraint=CONSTRAINT_NONE'"
):
return True
return False
@staticmethod
def clear_dmesg():
Shell.check("sudo dmesg --clear", verbose=True)
@staticmethod
def check_pr_description(pr_body: str, repo_name: str) -> Tuple[str, str]:
"""The function checks the body to being properly formatted according to
.github/PULL_REQUEST_TEMPLATE.md, if the first returned string is not empty,
then there is an error."""
lines = list(map(lambda x: x.strip(), pr_body.split("\n") if pr_body else []))
lines = [re.sub(r"\s+", " ", line) for line in lines]
# Check if body contains "Reverts ClickHouse/ClickHouse#36337"
if [
True for line in lines if re.match(rf"\AReverts {repo_name}#[\d]+\Z", line)
]:
return "", LABEL_CATEGORIES["pr-not-for-changelog"][0]
category = ""
entry = ""
description_error = ""
i = 0
while i < len(lines):
if re.match(r"(?i)^[#>*_ ]*change\s*log\s*category", lines[i]):
i += 1
if i >= len(lines):
break
# Can have one empty line between header and the category
# itself. Filter it out.
if not lines[i]:
i += 1
if i >= len(lines):
break
category = re.sub(r"^[-*\s]*", "", lines[i])
i += 1
# Should not have more than one category. Require empty line
# after the first found category.
if i >= len(lines):
break
if lines[i]:
second_category = re.sub(r"^[-*\s]*", "", lines[i])
description_error = (
"More than one changelog category specified: "
f"'{category}', '{second_category}'"
)
return description_error, category
elif re.match(
r"(?i)^[#>*_ ]*(short\s*description|change\s*log\s*entry)", lines[i]
):
i += 1
# Can have one empty line between header and the entry itself.
# Filter it out.
if i < len(lines) and not lines[i]:
i += 1
# All following lines until empty one are the changelog entry.
entry_lines = []
while i < len(lines) and lines[i]:
entry_lines.append(lines[i])
i += 1
entry = " ".join(entry_lines)
# Don't accept changelog entries like '...'.
entry = re.sub(r"[#>*_.\- ]", "", entry)
# Don't accept changelog entries like 'Close #12345'.
entry = re.sub(r"^[\w\-\s]{0,10}#?\d{5,6}\.?$", "", entry)
else:
i += 1
if not category:
description_error = "Changelog category is empty"
# Filter out the PR categories that are not for changelog.
elif "(changelog entry is not required)" in category:
pass # to not check the rest of the conditions
elif category not in CATEGORY_TO_LABEL:
description_error, category = f"Category '{category}' is not valid", ""
elif not entry:
description_error = f"Changelog entry required for category '{category}'"
return description_error, category