import argparse import concurrent.futures import json import logging import os import re import subprocess import sys from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional import docker_images_helper import upload_result_helper from build_check import get_release_or_pr from ci_config import CI from ci_metadata import CiMetadata from ci_utils import GHActions, normalize_string from clickhouse_helper import ( CiLogsCredentials, ClickHouseHelper, InsertException, get_instance_id, get_instance_type, prepare_tests_results_for_clickhouse, ) from commit_status_helper import ( CommitStatusData, RerunHelper, format_description, get_commit, post_commit_status, set_status_comment, get_commit_filtered_statuses, ) from digest_helper import DockerDigester from env_helper import ( IS_CI, GITHUB_JOB_API_URL, GITHUB_REPOSITORY, GITHUB_RUN_ID, REPO_COPY, TEMP_PATH, ) from get_robot_token import get_best_robot_token from git_helper import GIT_PREFIX, Git from git_helper import Runner as GitRunner from github_helper import GitHub from pr_info import PRInfo from report import ERROR, FAILURE, PENDING, SUCCESS, BuildResult, JobReport, TestResult from s3_helper import S3Helper from stopwatch import Stopwatch from tee_popen import TeePopen from ci_cache import CiCache from ci_settings import CiSettings from version_helper import get_version_from_repo # pylint: disable=too-many-lines def get_check_name(check_name: str, batch: int, num_batches: int) -> str: res = check_name if num_batches > 1: res = f"{check_name} [{batch+1}/{num_batches}]" return res def parse_args(parser: argparse.ArgumentParser) -> argparse.Namespace: parser.add_argument( "--cancel-previous-run", action="store_true", help="Action that cancels previous running PR workflow if PR added into the Merge Queue", ) parser.add_argument( "--set-pending-status", action="store_true", help="Action to set needed pending statuses in the beginning of CI workflow, e.g. for Sync wf", ) parser.add_argument( "--configure", action="store_true", help="Action that configures ci run. Calculates digests, checks job to be executed, generates json output", ) parser.add_argument( "--update-gh-statuses", action="store_true", help="Action that recreate success GH statuses for jobs that finished successfully in past and will be " "skipped this time", ) parser.add_argument( "--pre", action="store_true", help="Action that executes prerequisites for the job provided in --job-name", ) parser.add_argument( "--run", action="store_true", help="Action that executes run action for specified --job-name. run_command must be configured for a given " "job name.", ) parser.add_argument( "--post", action="store_true", help="Action that executes post actions for the job provided in --job-name", ) parser.add_argument( "--mark-success", action="store_true", help="Action that marks job provided in --job-name (with batch provided in --batch) as successful", ) parser.add_argument( "--job-name", default="", type=str, help="Job name as in config", ) parser.add_argument( "--run-command", default="", type=str, help="A run command to run in --run action. Will override run_command from a job config if any", ) parser.add_argument( "--batch", default=-1, type=int, help="Current batch number (required for --mark-success), -1 or omit for single-batch job", ) parser.add_argument( "--infile", default="", type=str, help="Input json file or json string with ci run config", ) parser.add_argument( "--outfile", default="", type=str, required=False, help="output file to write json result to, if not set - stdout", ) parser.add_argument( "--pretty", action="store_true", default=False, help="makes json output pretty formatted", ) parser.add_argument( "--skip-docker", action="store_true", default=False, help="skip fetching docker data from dockerhub, used in --configure action (for debugging)", ) parser.add_argument( "--docker-digest-or-latest", action="store_true", default=False, help="temporary hack to fallback to latest if image with digest as a tag is not on docker hub", ) parser.add_argument( "--skip-jobs", action="store_true", default=False, help="skip fetching data about job runs, used in --configure action (for debugging and nightly ci)", ) parser.add_argument( "--force", action="store_true", default=False, help="Used with --run, force the job to run, omitting the ci cache", ) # FIXME: remove, not used parser.add_argument( "--rebuild-all-binaries", action="store_true", default=False, help="[DEPRECATED. to be removed, once no wf use it] will create run config without skipping build jobs in " "any case, used in --configure action (for release branches)", ) parser.add_argument( "--commit-message", default="", help="debug option to test commit message processing", ) return parser.parse_args() # FIXME: rewrite the docker job as regular reusable_test job and move interaction with docker hub inside job script # that way run config will be more clean, workflow more generic and less api calls to dockerhub def check_missing_images_on_dockerhub( image_name_tag: Dict[str, str], arch: Optional[str] = None ) -> Dict[str, str]: """ Checks missing images on dockerhub. Works concurrently for all given images. Docker must be logged in. """ def run_docker_command( image: str, image_digest: str, arch: Optional[str] = None ) -> Dict: """ aux command for fetching single docker manifest """ command = [ "docker", "manifest", "inspect", f"{image}:{image_digest}" if not arch else f"{image}:{image_digest}-{arch}", ] process = subprocess.run( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False, ) return { "image": image, "image_digest": image_digest, "arch": arch, "stdout": process.stdout, "stderr": process.stderr, "return_code": process.returncode, } result: Dict[str, str] = {} with concurrent.futures.ThreadPoolExecutor() as executor: futures = [ executor.submit(run_docker_command, image, tag, arch) for image, tag in image_name_tag.items() ] responses = [ future.result() for future in concurrent.futures.as_completed(futures) ] for resp in responses: name, stdout, stderr, digest, arch = ( resp["image"], resp["stdout"], resp["stderr"], resp["image_digest"], resp["arch"], ) if stderr: if stderr.startswith("no such manifest"): result[name] = digest else: print(f"Error: Unknown error: {stderr}, {name}, {arch}") elif stdout: if "mediaType" in stdout: pass else: print(f"Error: Unknown response: {stdout}") assert False, "FIXME" else: print(f"Error: No response for {name}, {digest}, {arch}") assert False, "FIXME" return result def _pre_action(s3, indata, pr_info): CommitStatusData.cleanup() JobReport.cleanup() BuildResult.cleanup() ci_cache = CiCache(s3, indata["jobs_data"]["digests"]) # for release/master branches reports must be from the same branch report_prefix = "" if pr_info.is_master or pr_info.is_release: report_prefix = normalize_string(pr_info.head_ref) print( f"Use report prefix [{report_prefix}], pr_num [{pr_info.number}], head_ref [{pr_info.head_ref}]" ) reports_files = ci_cache.download_build_reports(file_prefix=report_prefix) ci_cache.dump_run_config(indata) print(f"Pre action done. Report files [{reports_files}] have been downloaded") def _mark_success_action( s3: S3Helper, indata: Dict[str, Any], pr_info: PRInfo, job: str, batch: int, ) -> None: ci_cache = CiCache(s3, indata["jobs_data"]["digests"]) job_config = CI.get_job_config(job) num_batches = job_config.num_batches # if batch is not provided - set to 0 batch = 0 if batch == -1 else batch assert ( 0 <= batch < num_batches ), f"--batch must be provided and in range [0, {num_batches}) for {job}" # FIXME: find generic design for propagating and handling job status (e.g. stop using statuses in GH api) # now job ca be build job w/o status data, any other job that exit with 0 with or w/o status data if CI.is_build_job(job): # there is no CommitStatus for build jobs # create dummy status relying on JobReport # FIXME: consider creating commit status for build jobs too, to treat everything the same way job_report = JobReport.load() if JobReport.exist() else None if job_report and job_report.status == SUCCESS: CommitStatusData( SUCCESS, "dummy description", "dummy_url", pr_num=pr_info.number, sha=pr_info.sha, ).dump_status() job_status = None if CommitStatusData.exist(): # normal scenario job_status = CommitStatusData.load_status() else: # apparently exit after rerun-helper check # do nothing, exit without failure print(f"ERROR: no status file for job [{job}]") if job_config.run_always or job_config.run_by_label: print(f"Job [{job}] runs always or by label in CI - do not cache") else: if pr_info.is_master: pass # delete method is disabled for ci_cache. need it? # pending enabled for master branch jobs only # ci_cache.delete_pending(job, batch, num_batches, release_branch=True) if job_status and job_status.is_ok(): ci_cache.push_successful( job, batch, num_batches, job_status, pr_info.is_release ) print(f"Job [{job}] is ok") elif job_status and not job_status.is_ok(): ci_cache.push_failed( job, batch, num_batches, job_status, pr_info.is_release ) print(f"Job [{job}] is failed with status [{job_status.status}]") else: job_status = CommitStatusData( description="dummy description", status=ERROR, report_url="dummy url" ) ci_cache.push_failed( job, batch, num_batches, job_status, pr_info.is_release ) print(f"No CommitStatusData for [{job}], push dummy failure to ci_cache") def _print_results(result: Any, outfile: Optional[str], pretty: bool = False) -> None: if outfile: with open(outfile, "w", encoding="utf-8") as f: if isinstance(result, str): print(result, file=f) elif isinstance(result, dict): print(json.dumps(result, indent=2 if pretty else None), file=f) else: raise AssertionError(f"Unexpected type for 'res': {type(result)}") else: if isinstance(result, str): print(result) elif isinstance(result, dict): print(json.dumps(result, indent=2 if pretty else None)) else: raise AssertionError(f"Unexpected type for 'res': {type(result)}") def _configure_docker_jobs(docker_digest_or_latest: bool) -> Dict: print("::group::Docker images check") # generate docker jobs data docker_digester = DockerDigester() imagename_digest_dict = ( docker_digester.get_all_digests() ) # 'image name - digest' mapping images_info = docker_images_helper.get_images_info() # FIXME: we need login as docker manifest inspect goes directly to one of the *.docker.com hosts instead of "registry-mirrors" : ["http://dockerhub-proxy.dockerhub-proxy-zone:5000"] # find if it's possible to use the setting of /etc/docker/daemon.json (https://github.com/docker/cli/issues/4484#issuecomment-1688095463) docker_images_helper.docker_login() missing_multi_dict = check_missing_images_on_dockerhub(imagename_digest_dict) missing_multi = list(missing_multi_dict) missing_amd64 = [] missing_aarch64 = [] if not docker_digest_or_latest: # look for missing arm and amd images only among missing multi-arch manifests @missing_multi_dict # to avoid extra dockerhub api calls missing_amd64 = list( check_missing_images_on_dockerhub(missing_multi_dict, "amd64") ) # FIXME: WA until full arm support: skip not supported arm images missing_aarch64 = list( check_missing_images_on_dockerhub( { im: digest for im, digest in missing_multi_dict.items() if not images_info[im]["only_amd64"] }, "aarch64", ) ) else: if missing_multi: assert False, f"Missing images [{missing_multi}], cannot proceed" print("::endgroup::") return { "images": imagename_digest_dict, "missing_aarch64": missing_aarch64, "missing_amd64": missing_amd64, "missing_multi": missing_multi, } def _configure_jobs( s3: S3Helper, pr_info: PRInfo, ci_settings: CiSettings, skip_jobs: bool, dry_run: bool = False, ) -> CiCache: """ returns CICache instance with configured job's data :param s3: :param pr_info: :param ci_settings: :return: """ # get all jobs if not skip_jobs: job_configs = CI.get_workflow_jobs_with_configs( is_mq=pr_info.is_merge_queue, is_docs_only=pr_info.has_changes_in_documentation_only(), is_master=pr_info.is_master, is_pr=pr_info.is_pr, ) else: job_configs = {} # filter jobs in accordance with ci settings job_configs = ci_settings.apply( job_configs, pr_info.is_release, is_pr=pr_info.is_pr, is_mq=pr_info.is_merge_queue, labels=pr_info.labels, ) # check jobs in ci cache ci_cache = CiCache.calc_digests_and_create( s3, job_configs, cache_enabled=not ci_settings.no_ci_cache and not skip_jobs and IS_CI, dry_run=dry_run, ) ci_cache.update() ci_cache.apply(job_configs, is_release=pr_info.is_release) return ci_cache def _generate_ci_stage_config( jobs_data: Dict[str, Any], non_blocking_mode: bool = False ) -> Dict[str, Dict[str, Any]]: """ populates GH Actions' workflow with real jobs "Builds_1": [{"job_name": NAME, "runner_type": RUNNER_TYPE}] "Tests_1": [{"job_name": NAME, "runner_type": RUNNER_TYPE}] ... """ result = {} # type: Dict[str, Any] stages_to_do = [] for job in jobs_data: stage_type = CI.get_job_ci_stage(job, non_blocking_ci=non_blocking_mode) if stage_type == CI.WorkflowStages.NA: continue if stage_type not in result: result[stage_type] = [] stages_to_do.append(stage_type) result[stage_type].append( {"job_name": job, "runner_type": CI.JOB_CONFIGS[job].runner_type} ) result["stages_to_do"] = stages_to_do return result def _create_gh_status( commit: Any, job: str, batch: int, num_batches: int, job_status: CommitStatusData ) -> None: print(f"Going to re-create GH status for job [{job}]") assert job_status.status == SUCCESS, "BUG!" commit.create_status( state=job_status.status, target_url=job_status.report_url, description=format_description( f"Reused from [{job_status.pr_num}-{job_status.sha[0:8]}]: " f"{job_status.description}" ), context=get_check_name(job, batch=batch, num_batches=num_batches), ) def _update_gh_statuses_action(indata: Dict, s3: S3Helper) -> None: if CiSettings.create_from_run_config(indata).no_ci_cache: print("CI cache is disabled - skip restoring commit statuses from CI cache") return job_digests = indata["jobs_data"]["digests"] jobs_to_skip = indata["jobs_data"]["jobs_to_skip"] jobs_to_do = indata["jobs_data"]["jobs_to_do"] ci_cache = CiCache(s3, job_digests).update().fetch_records_data().print_status() # create GH status pr_info = PRInfo() commit = get_commit(GitHub(get_best_robot_token(), per_page=100), pr_info.sha) def _concurrent_create_status(job: str, batch: int, num_batches: int) -> None: job_status = ci_cache.get_successful(job, batch, num_batches) if not job_status: return _create_gh_status(commit, job, batch, num_batches, job_status) with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] for job in job_digests: if job not in jobs_to_skip and job not in jobs_to_do: # no need to create status for job that are not supposed to be executed continue if CI.is_build_job(job): # no GH status for build jobs continue job_config = CI.get_job_config(job) if not job_config: # there might be a new job that does not exist on this branch - skip it continue for batch in range(job_config.num_batches): future = executor.submit( _concurrent_create_status, job, batch, job_config.num_batches ) futures.append(future) done, _ = concurrent.futures.wait(futures) for future in done: try: _ = future.result() except Exception as e: raise e print("Going to update overall CI report") set_status_comment(commit, pr_info) print("... CI report update - done") def _fetch_commit_tokens(message: str, pr_info: PRInfo) -> List[str]: pattern = r"(#|- \[x\] +