ClickHouse/tests/ci/ci_utils.py

326 lines
9.9 KiB
Python
Raw Normal View History

import json
import logging
import os
import re
2024-07-11 11:37:26 +00:00
import subprocess
2024-08-01 09:57:54 +00:00
import sys
import time
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional, Sequence, Tuple, Union
import requests
from env_helper import IS_CI
logger = logging.getLogger(__name__)
class Envs:
GITHUB_REPOSITORY = os.getenv("GITHUB_REPOSITORY", "ClickHouse/ClickHouse")
WORKFLOW_RESULT_FILE = os.getenv(
"WORKFLOW_RESULT_FILE", "/tmp/workflow_results.json"
)
2024-08-02 07:23:40 +00:00
S3_BUILDS_BUCKET = os.getenv("S3_BUILDS_BUCKET", "clickhouse-builds")
GITHUB_WORKFLOW = os.getenv("GITHUB_WORKFLOW", "")
GITHUB_ACTOR = os.getenv("GITHUB_ACTOR", "")
class WithIter(type):
def __iter__(cls):
return (v for k, v in cls.__dict__.items() if not k.startswith("_"))
@contextmanager
def cd(path: Union[Path, str]) -> Iterator[None]:
oldpwd = os.getcwd()
os.chdir(path)
try:
yield
finally:
os.chdir(oldpwd)
def kill_ci_runner(message: str) -> None:
"""The function to kill the current process with all parents when it's possible.
Works only when run with the set `CI` environment"""
if not IS_CI:
logger.info("Running outside the CI, won't kill the runner")
return
print(f"::error::{message}")
def get_ppid_name(pid: int) -> Tuple[int, str]:
# Avoid using psutil, it's not in stdlib
stats = Path(f"/proc/{pid}/stat").read_text(encoding="utf-8").split()
return int(stats[3]), stats[1]
pid = os.getpid()
pids = {} # type: Dict[str, str]
while pid:
ppid, name = get_ppid_name(pid)
pids[str(pid)] = name
pid = ppid
logger.error(
"Sleeping 5 seconds and killing all possible processes from following:\n %s",
"\n ".join(f"{p}: {n}" for p, n in pids.items()),
)
time.sleep(5)
# The current process will be killed too
subprocess.run(f"kill -9 {' '.join(pids.keys())}", check=False, shell=True)
2024-08-02 07:23:40 +00:00
class GH:
class ActionsNames:
RunConfig = "RunConfig"
class ActionStatuses:
ERROR = "error"
FAILURE = "failure"
PENDING = "pending"
SUCCESS = "success"
2024-07-25 09:32:59 +00:00
SKIPPED = "skipped"
2024-07-23 09:25:19 +00:00
@classmethod
2024-07-25 09:32:59 +00:00
def get_workflow_results(cls):
if not Path(Envs.WORKFLOW_RESULT_FILE).exists():
print(
f"ERROR: Failed to get workflow results from file [{Envs.WORKFLOW_RESULT_FILE}]"
)
2024-07-23 09:25:19 +00:00
return {}
with open(Envs.WORKFLOW_RESULT_FILE, "r", encoding="utf-8") as json_file:
2024-07-23 09:25:19 +00:00
try:
res = json.load(json_file)
except json.JSONDecodeError as e:
print(f"ERROR: json decoder exception {e}")
2024-07-23 18:42:21 +00:00
json_file.seek(0)
print(" File content:")
print(json_file.read())
2024-07-23 09:25:19 +00:00
return {}
return res
@classmethod
def print_workflow_results(cls):
2024-07-25 09:32:59 +00:00
res = cls.get_workflow_results()
2024-07-23 09:25:19 +00:00
results = [f"{job}: {data['result']}" for job, data in res.items()]
cls.print_in_group("Workflow results", results)
2024-08-02 07:23:40 +00:00
@classmethod
def is_workflow_ok(cls) -> bool:
2024-07-25 09:32:59 +00:00
res = cls.get_workflow_results()
2024-08-02 07:23:40 +00:00
for _job, data in res.items():
if data["result"] == "failure":
return False
return bool(res)
2024-07-23 09:25:19 +00:00
@classmethod
def get_workflow_job_result(cls, wf_job_name: str) -> Optional[str]:
2024-07-25 09:32:59 +00:00
res = cls.get_workflow_results()
if wf_job_name in res:
return res[wf_job_name]["result"] # type: ignore
return None
@staticmethod
2024-02-04 19:12:37 +00:00
def print_in_group(group_name: str, lines: Union[Any, List[Any]]) -> None:
lines = list(lines)
print(f"::group::{group_name}")
for line in lines:
print(line)
print("::endgroup::")
2024-07-11 11:37:26 +00:00
@staticmethod
def get_commit_status_by_name(
token: str, commit_sha: str, status_name: Union[str, Sequence]
2024-07-19 09:35:43 +00:00
) -> str:
assert len(token) == 40
assert len(commit_sha) == 40
2024-08-03 08:40:12 +00:00
assert Utils.is_hex(commit_sha)
assert not Utils.is_hex(token)
2024-08-16 09:11:11 +00:00
url = f"https://api.github.com/repos/{Envs.GITHUB_REPOSITORY}/commits/{commit_sha}/statuses"
headers = {
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json",
}
if isinstance(status_name, str):
status_name = (status_name,)
2024-08-16 09:11:11 +00:00
while url:
response = requests.get(url, headers=headers, timeout=5)
if response.status_code == 200:
statuses = response.json()
for status in statuses:
if status["context"] in status_name:
return status["state"] # type: ignore
2024-08-16 09:11:11 +00:00
# Check if there is a next page
url = response.links.get("next", {}).get("url")
else:
break
2024-07-19 09:35:43 +00:00
return ""
@staticmethod
def check_wf_completed(token: str, commit_sha: str) -> bool:
headers = {
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json",
}
url = f"https://api.github.com/repos/{Envs.GITHUB_REPOSITORY}/commits/{commit_sha}/check-runs?per_page={100}"
for i in range(3):
try:
response = requests.get(url, headers=headers, timeout=5)
response.raise_for_status()
# assert "next" not in response.links, "Response truncated"
data = response.json()
assert data["check_runs"], "?"
for check in data["check_runs"]:
if check["status"] != "completed":
print(
f" Check workflow status: Check not completed [{check['name']}]"
)
return False
2024-07-19 09:35:43 +00:00
return True
except Exception as e:
2024-07-19 09:35:43 +00:00
print(f"ERROR: exception after attempt [{i}]: {e}")
time.sleep(1)
return False
2024-07-19 18:43:14 +00:00
@staticmethod
2024-08-02 07:23:40 +00:00
def get_pr_url_by_branch(branch, repo=None):
repo = repo or Envs.GITHUB_REPOSITORY
get_url_cmd = f"gh pr list --repo {repo} --head {branch} --json url --jq '.[0].url' --state open"
2024-08-01 09:57:54 +00:00
url = Shell.get_output(get_url_cmd)
2024-08-02 07:23:40 +00:00
if not url:
print(f"WARNING: No open PR found, branch [{branch}] - search for merged")
get_url_cmd = f"gh pr list --repo {repo} --head {branch} --json url --jq '.[0].url' --state merged"
url = Shell.get_output(get_url_cmd)
2024-07-19 18:43:14 +00:00
if not url:
print(f"ERROR: PR nor found, branch [{branch}]")
return url
2024-08-02 07:23:40 +00:00
@staticmethod
def is_latest_release_branch(branch):
latest_branch = Shell.get_output(
'gh pr list --label release --repo ClickHouse/ClickHouse --search "sort:created" -L1 --json headRefName'
)
if latest_branch:
latest_branch = json.loads(latest_branch)[0]["headRefName"]
print(
f"Latest branch [{latest_branch}], release branch [{branch}], release latest [{latest_branch == branch}]"
)
2024-08-02 07:23:40 +00:00
return latest_branch == branch
2024-07-11 11:37:26 +00:00
class Shell:
@classmethod
2024-08-01 09:57:54 +00:00
def get_output_or_raise(cls, command):
return cls.get_output(command, strict=True)
@classmethod
def get_output(cls, command, strict=False):
2024-07-15 16:18:15 +00:00
res = subprocess.run(
command,
2024-07-11 11:37:26 +00:00
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
2024-08-01 09:57:54 +00:00
check=strict,
2024-07-11 11:37:26 +00:00
)
2024-07-15 16:18:15 +00:00
return res.stdout.strip()
2024-07-11 11:37:26 +00:00
@classmethod
2024-08-01 09:57:54 +00:00
def check(
cls,
command,
strict=False,
verbose=False,
dry_run=False,
stdin_str=None,
**kwargs,
):
2024-07-19 18:43:14 +00:00
if dry_run:
print(f"Dry-ryn. Would run command [{command}]")
2024-08-01 18:51:36 +00:00
return True
2024-08-01 09:57:54 +00:00
if verbose:
print(f"Run command [{command}]")
2024-09-26 14:09:46 +00:00
with subprocess.Popen(
command,
2024-07-11 11:37:26 +00:00
shell=True,
2024-07-31 18:14:22 +00:00
stderr=subprocess.STDOUT,
2024-08-01 09:57:54 +00:00
stdout=subprocess.PIPE,
stdin=subprocess.PIPE if stdin_str else None,
universal_newlines=True,
start_new_session=True,
bufsize=1,
errors="backslashreplace",
**kwargs,
2024-09-26 14:09:46 +00:00
) as proc:
if stdin_str:
proc.communicate(input=stdin_str)
elif proc.stdout:
for line in proc.stdout:
sys.stdout.write(line)
proc.wait()
retcode = proc.returncode
if strict:
assert retcode == 0
return retcode == 0
class Utils:
@staticmethod
def get_failed_tests_number(description: str) -> Optional[int]:
description = description.lower()
pattern = r"fail:\s*(\d+)\s*(?=,|$)"
match = re.search(pattern, description)
if match:
return int(match.group(1))
return None
@staticmethod
def is_killed_with_oom():
if Shell.check(
"sudo dmesg -T | grep -q -e 'Out of memory: Killed process' -e 'oom_reaper: reaped process' -e 'oom-kill:constraint=CONSTRAINT_NONE'"
):
return True
return False
@staticmethod
def clear_dmesg():
2024-08-01 09:57:54 +00:00
Shell.check("sudo dmesg --clear", verbose=True)
@staticmethod
2024-08-03 08:40:12 +00:00
def is_hex(s):
try:
int(s, 16)
return True
except ValueError:
return False
@staticmethod
def normalize_string(string: str) -> str:
res = string.lower()
for r in (
(" ", "_"),
("(", "_"),
(")", "_"),
(",", "_"),
("/", "_"),
("-", "_"),
):
res = res.replace(*r)
return res
@staticmethod
def is_job_triggered_manually():
2024-08-27 10:12:34 +00:00
return (
2024-08-27 10:19:19 +00:00
"robot" not in Envs.GITHUB_ACTOR
and "clickhouse-ci" not in Envs.GITHUB_ACTOR
2024-08-27 10:12:34 +00:00
)