Make env_helper importable from any module

This commit is contained in:
Mikhail f. Shiryaev 2024-09-18 22:04:50 +02:00
parent c0c83236b6
commit cb503ec2ec
No known key found for this signature in database
GPG Key ID: 4B02ED204C7D93F4
7 changed files with 133 additions and 139 deletions

View File

@ -7,28 +7,28 @@ import sys
from pathlib import Path
from typing import List
from ci_config import CI
from env_helper import (
GITHUB_JOB_URL,
CI_CONFIG_PATH,
GITHUB_REPOSITORY,
GITHUB_SERVER_URL,
IS_CI,
REPORT_PATH,
TEMP_PATH,
CI_CONFIG_PATH,
IS_CI,
)
from pr_info import PRInfo
from report import (
ERROR,
FAILURE,
GITHUB_JOB_URL,
PENDING,
SUCCESS,
BuildResult,
JobReport,
create_build_html_report,
get_worst_status,
FAILURE,
)
from stopwatch import Stopwatch
from ci_config import CI
# Old way to read the neads_data
NEEDS_DATA_PATH = os.getenv("NEEDS_DATA_PATH", "")

View File

@ -14,9 +14,12 @@ from typing import Any, Dict, List, Optional
import docker_images_helper
import upload_result_helper
from build_check import get_release_or_pr
from ci_buddy import CIBuddy
from ci_cache import CiCache
from ci_config import CI
from ci_metadata import CiMetadata
from ci_utils import GH, Utils, Envs
from ci_settings import CiSettings
from ci_utils import GH, Envs, Utils
from clickhouse_helper import (
CiLogsCredentials,
ClickHouseHelper,
@ -30,19 +33,12 @@ from commit_status_helper import (
RerunHelper,
format_description,
get_commit,
get_commit_filtered_statuses,
post_commit_status,
set_status_comment,
get_commit_filtered_statuses,
)
from digest_helper import DockerDigester
from env_helper import (
IS_CI,
GITHUB_JOB_API_URL,
GITHUB_REPOSITORY,
GITHUB_RUN_ID,
REPO_COPY,
TEMP_PATH,
)
from env_helper import GITHUB_REPOSITORY, GITHUB_RUN_ID, IS_CI, REPO_COPY, TEMP_PATH
from get_robot_token import get_best_robot_token
from git_helper import GIT_PREFIX, Git
from git_helper import Runner as GitRunner
@ -50,22 +46,20 @@ from github_helper import GitHub
from pr_info import PRInfo
from report import (
ERROR,
FAIL,
GITHUB_JOB_API_URL,
JOB_FINISHED_TEST_NAME,
JOB_STARTED_TEST_NAME,
OK,
PENDING,
SUCCESS,
BuildResult,
JobReport,
TestResult,
OK,
JOB_STARTED_TEST_NAME,
JOB_FINISHED_TEST_NAME,
FAIL,
)
from s3_helper import S3Helper
from tee_popen import TeePopen
from ci_cache import CiCache
from ci_settings import CiSettings
from ci_buddy import CIBuddy
from stopwatch import Stopwatch
from tee_popen import TeePopen
from version_helper import get_version_from_repo
# pylint: disable=too-many-lines

View File

@ -11,6 +11,8 @@ from typing import Any, Dict, Iterator, List, Optional, Sequence, Tuple, Union
import requests
from env_helper import IS_CI
logger = logging.getLogger(__name__)
@ -42,7 +44,7 @@ def cd(path: Union[Path, str]) -> Iterator[None]:
def kill_ci_runner(message: str) -> None:
"""The function to kill the current process with all parents when it's possible.
Works only when run with the set `CI` environment"""
if not os.getenv("CI", ""): # cycle import env_helper
if not IS_CI:
logger.info("Running outside the CI, won't kill the runner")
return
print(f"::error::{message}")

View File

@ -7,7 +7,7 @@ import time
from collections import defaultdict
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Dict, List, Optional, Union, Callable
from typing import Callable, Dict, List, Optional, Union
from github import Github
from github.Commit import Commit

View File

@ -1,11 +1,7 @@
#!/usr/bin/env python
import logging
import os
from os import path as p
from typing import Tuple
from build_download_helper import APIException, get_gh_api
module_dir = p.abspath(p.dirname(__file__))
git_root = p.abspath(p.join(module_dir, "..", ".."))
@ -41,102 +37,3 @@ S3_ARTIFACT_DOWNLOAD_TEMPLATE = (
"{pr_or_release}/{commit}/{build_name}/{artifact}"
)
CI_CONFIG_PATH = f"{TEMP_PATH}/ci_config.json"
# These parameters are set only on demand, and only once
_GITHUB_JOB_ID = ""
_GITHUB_JOB_URL = ""
_GITHUB_JOB_API_URL = ""
def GITHUB_JOB_ID(safe: bool = True) -> str:
global _GITHUB_JOB_ID
global _GITHUB_JOB_URL
global _GITHUB_JOB_API_URL
if _GITHUB_JOB_ID:
return _GITHUB_JOB_ID
try:
_GITHUB_JOB_ID, _GITHUB_JOB_URL, _GITHUB_JOB_API_URL = get_job_id_url(
GITHUB_JOB
)
except APIException as e:
logging.warning("Unable to retrieve the job info from GH API: %s", e)
if not safe:
raise e
return _GITHUB_JOB_ID
def GITHUB_JOB_URL(safe: bool = True) -> str:
try:
GITHUB_JOB_ID()
except APIException:
if safe:
logging.warning("Using run URL as a fallback to not fail the job")
return GITHUB_RUN_URL
raise
return _GITHUB_JOB_URL
def GITHUB_JOB_API_URL(safe: bool = True) -> str:
GITHUB_JOB_ID(safe)
return _GITHUB_JOB_API_URL
def get_job_id_url(job_name: str) -> Tuple[str, str, str]:
job_id = ""
job_url = ""
job_api_url = ""
if GITHUB_RUN_ID == "0":
job_id = "0"
if job_id:
return job_id, job_url, job_api_url
jobs = []
page = 1
while not job_id:
response = get_gh_api(
f"https://api.github.com/repos/{GITHUB_REPOSITORY}/"
f"actions/runs/{GITHUB_RUN_ID}/jobs?per_page=100&page={page}"
)
page += 1
data = response.json()
jobs.extend(data["jobs"])
for job in data["jobs"]:
if job["name"] != job_name:
continue
job_id = job["id"]
job_url = job["html_url"]
job_api_url = job["url"]
return job_id, job_url, job_api_url
if (
len(jobs) >= data["total_count"] # just in case of inconsistency
or len(data["jobs"]) == 0 # if we excided pages
):
job_id = "0"
if not job_url:
# This is a terrible workaround for the case of another broken part of
# GitHub actions. For nested workflows it doesn't provide a proper job_name
# value, but only the final one. So, for `OriginalJob / NestedJob / FinalJob`
# full name, job_name contains only FinalJob
matched_jobs = []
for job in jobs:
nested_parts = job["name"].split(" / ")
if len(nested_parts) <= 1:
continue
if nested_parts[-1] == job_name:
matched_jobs.append(job)
if len(matched_jobs) == 1:
# The best case scenario
job_id = matched_jobs[0]["id"]
job_url = matched_jobs[0]["html_url"]
job_api_url = matched_jobs[0]["url"]
return job_id, job_url, job_api_url
if matched_jobs:
logging.error(
"We could not get the ID and URL for the current job name %s, there "
"are more than one jobs match it for the nested workflows. Please, "
"refer to https://github.com/actions/runner/issues/2577",
job_name,
)
return job_id, job_url, job_api_url

View File

@ -20,9 +20,16 @@ from typing import (
Union,
)
from build_download_helper import get_gh_api
from build_download_helper import APIException, get_gh_api
from ci_config import CI
from env_helper import REPORT_PATH, GITHUB_WORKSPACE
from env_helper import (
GITHUB_JOB,
GITHUB_REPOSITORY,
GITHUB_RUN_ID,
GITHUB_RUN_URL,
GITHUB_WORKSPACE,
REPORT_PATH,
)
logger = logging.getLogger(__name__)
@ -38,6 +45,105 @@ SKIPPED: Final = "SKIPPED"
StatusType = Literal["error", "failure", "pending", "success"]
STATUSES = [ERROR, FAILURE, PENDING, SUCCESS] # type: List[StatusType]
# These parameters are set only on demand, and only once
_GITHUB_JOB_ID = ""
_GITHUB_JOB_URL = ""
_GITHUB_JOB_API_URL = ""
def GITHUB_JOB_ID(safe: bool = True) -> str:
global _GITHUB_JOB_ID
global _GITHUB_JOB_URL
global _GITHUB_JOB_API_URL
if _GITHUB_JOB_ID:
return _GITHUB_JOB_ID
try:
_GITHUB_JOB_ID, _GITHUB_JOB_URL, _GITHUB_JOB_API_URL = get_job_id_url(
GITHUB_JOB
)
except APIException as e:
logging.warning("Unable to retrieve the job info from GH API: %s", e)
if not safe:
raise e
return _GITHUB_JOB_ID
def GITHUB_JOB_URL(safe: bool = True) -> str:
try:
GITHUB_JOB_ID()
except APIException:
if safe:
logging.warning("Using run URL as a fallback to not fail the job")
return GITHUB_RUN_URL
raise
return _GITHUB_JOB_URL
def GITHUB_JOB_API_URL(safe: bool = True) -> str:
GITHUB_JOB_ID(safe)
return _GITHUB_JOB_API_URL
def get_job_id_url(job_name: str) -> Tuple[str, str, str]:
job_id = ""
job_url = ""
job_api_url = ""
if GITHUB_RUN_ID == "0":
job_id = "0"
if job_id:
return job_id, job_url, job_api_url
jobs = []
page = 1
while not job_id:
response = get_gh_api(
f"https://api.github.com/repos/{GITHUB_REPOSITORY}/"
f"actions/runs/{GITHUB_RUN_ID}/jobs?per_page=100&page={page}"
)
page += 1
data = response.json()
jobs.extend(data["jobs"])
for job in data["jobs"]:
if job["name"] != job_name:
continue
job_id = job["id"]
job_url = job["html_url"]
job_api_url = job["url"]
return job_id, job_url, job_api_url
if (
len(jobs) >= data["total_count"] # just in case of inconsistency
or len(data["jobs"]) == 0 # if we excided pages
):
job_id = "0"
if not job_url:
# This is a terrible workaround for the case of another broken part of
# GitHub actions. For nested workflows it doesn't provide a proper job_name
# value, but only the final one. So, for `OriginalJob / NestedJob / FinalJob`
# full name, job_name contains only FinalJob
matched_jobs = []
for job in jobs:
nested_parts = job["name"].split(" / ")
if len(nested_parts) <= 1:
continue
if nested_parts[-1] == job_name:
matched_jobs.append(job)
if len(matched_jobs) == 1:
# The best case scenario
job_id = matched_jobs[0]["id"]
job_url = matched_jobs[0]["html_url"]
job_api_url = matched_jobs[0]["url"]
return job_id, job_url, job_api_url
if matched_jobs:
logging.error(
"We could not get the ID and URL for the current job name %s, there "
"are more than one jobs match it for the nested workflows. Please, "
"refer to https://github.com/actions/runner/issues/2577",
job_name,
)
return job_id, job_url, job_api_url
# The order of statuses from the worst to the best
def _state_rank(status: str) -> int:

View File

@ -1,15 +1,10 @@
import logging
import os
from pathlib import Path
from typing import Dict, List, Optional, Sequence, Union
import os
import logging
from env_helper import (
GITHUB_JOB_URL,
GITHUB_REPOSITORY,
GITHUB_RUN_URL,
GITHUB_SERVER_URL,
)
from report import TestResults, create_test_html_report
from env_helper import GITHUB_REPOSITORY, GITHUB_RUN_URL, GITHUB_SERVER_URL
from report import GITHUB_JOB_URL, TestResults, create_test_html_report
from s3_helper import S3Helper