import argparse import concurrent.futures from dataclasses import asdict, dataclass from enum import Enum import json import logging import os import re import subprocess import sys from pathlib import Path import time from typing import Any, Dict, List, Optional, Sequence, Union import docker_images_helper import upload_result_helper from build_check import get_release_or_pr from ci_config import CI_CONFIG, Build, Labels, JobNames from ci_utils import GHActions, is_hex from clickhouse_helper import ( CiLogsCredentials, ClickHouseHelper, get_instance_id, get_instance_type, prepare_tests_results_for_clickhouse, ) from commit_status_helper import ( CommitStatusData, RerunHelper, format_description, get_commit, post_commit_status, set_status_comment, update_mergeable_check, ) from digest_helper import DockerDigester, JobDigester from env_helper import ( CI, GITHUB_JOB_API_URL, GITHUB_RUN_URL, REPO_COPY, REPORT_PATH, S3_BUILDS_BUCKET, TEMP_PATH, ) from get_robot_token import get_best_robot_token from git_helper import GIT_PREFIX, Git from git_helper import Runner as GitRunner from github import Github from pr_info import PRInfo from report import SUCCESS, BuildResult, JobReport from s3_helper import S3Helper from version_helper import get_version_from_repo @dataclass class PendingState: updated_at: float run_url: str class CiCache: """ CI cache is a bunch of records. Record is a file stored under special location on s3. The file name has following format _[]--___.ci RECORD_TYPE: SUCCESSFUL - for successfuly finished jobs PENDING - for pending jobs ATTRIBUTES: release - for jobs being executed on the release branch including master branch (not a PR branch) """ _S3_CACHE_PREFIX = "CI_cache_v1" _CACHE_BUILD_REPORT_PREFIX = "build_report" _RECORD_FILE_EXTENSION = ".ci" _LOCAL_CACHE_PATH = Path(TEMP_PATH) / "ci_cache" _ATTRIBUTE_RELEASE = "release" # divider symbol 1 _DIV1 = "--" # divider symbol 2 _DIV2 = "_" assert _DIV1 != _DIV2 class RecordType(Enum): SUCCESSFUL = "successful" PENDING = "pending" @dataclass class Record: record_type: "CiCache.RecordType" job_name: str job_digest: str batch: int num_batches: int release_branch: bool file: str = "" def to_str_key(self): """other fields must not be included in the hash str""" return "_".join( [self.job_name, self.job_digest, str(self.batch), str(self.num_batches)] ) class JobType(Enum): DOCS = "DOCS" SRCS = "SRCS" @classmethod def is_docs_job(cls, job_name: str) -> bool: return job_name == JobNames.DOCS_CHECK @classmethod def is_srcs_job(cls, job_name: str) -> bool: return not cls.is_docs_job(job_name) @classmethod def get_type_by_name(cls, job_name: str) -> "CiCache.JobType": res = cls.SRCS if cls.is_docs_job(job_name): res = cls.DOCS elif cls.is_srcs_job(job_name): res = cls.SRCS else: assert False return res def __init__( self, s3: S3Helper, job_digests: Dict[str, str], ): self.s3 = s3 self.job_digests = job_digests self.cache_s3_paths = { job_type: f"{self._S3_CACHE_PREFIX}/{job_type.value}-{self.job_digests[self._get_reference_job_name(job_type)]}/" for job_type in self.JobType } self.s3_record_prefixes = { record_type: record_type.value for record_type in self.RecordType } self.records: Dict["CiCache.RecordType", Dict[str, "CiCache.Record"]] = { record_type: {} for record_type in self.RecordType } self.cache_updated = False self.cache_data_fetched = True if not self._LOCAL_CACHE_PATH.exists(): self._LOCAL_CACHE_PATH.mkdir(parents=True, exist_ok=True) def _get_reference_job_name(self, job_type: JobType) -> str: res = Build.PACKAGE_RELEASE if job_type == self.JobType.DOCS: res = JobNames.DOCS_CHECK elif job_type == self.JobType.SRCS: res = Build.PACKAGE_RELEASE else: assert False return res def _get_record_file_name( self, record_type: RecordType, job_name: str, batch: int, num_batches: int, release_branch: bool, ) -> str: prefix = self.s3_record_prefixes[record_type] prefix_extended = ( self._DIV2.join([prefix, self._ATTRIBUTE_RELEASE]) if release_branch else prefix ) assert self._DIV1 not in job_name, f"Invalid job name {job_name}" job_name = self._DIV2.join( [job_name, self.job_digests[job_name], str(batch), str(num_batches)] ) file_name = self._DIV1.join([prefix_extended, job_name]) file_name += self._RECORD_FILE_EXTENSION return file_name def _get_record_s3_path(self, job_name: str) -> str: return self.cache_s3_paths[self.JobType.get_type_by_name(job_name)] def _parse_record_file_name( self, record_type: RecordType, file_name: str ) -> Optional["CiCache.Record"]: # validate filename if ( not file_name.endswith(self._RECORD_FILE_EXTENSION) or not len(file_name.split(self._DIV1)) == 2 ): print("ERROR: wrong file name format") return None file_name = file_name.removesuffix(self._RECORD_FILE_EXTENSION) release_branch = False prefix_extended, job_suffix = file_name.split(self._DIV1) record_type_and_attribute = prefix_extended.split(self._DIV2) # validate filename prefix failure = False if not 0 < len(record_type_and_attribute) <= 2: print("ERROR: wrong file name prefix") failure = True if ( len(record_type_and_attribute) > 1 and record_type_and_attribute[1] != self._ATTRIBUTE_RELEASE ): print("ERROR: wrong record attribute") failure = True if record_type_and_attribute[0] != self.s3_record_prefixes[record_type]: print("ERROR: wrong record type") failure = True if failure: return None if ( len(record_type_and_attribute) > 1 and record_type_and_attribute[1] == self._ATTRIBUTE_RELEASE ): release_branch = True job_properties = job_suffix.split(self._DIV2) job_name, job_digest, batch, num_batches = ( self._DIV2.join(job_properties[:-3]), job_properties[-3], int(job_properties[-2]), int(job_properties[-1]), ) if not is_hex(job_digest): print("ERROR: wrong record job digest") return None record = self.Record( record_type, job_name, job_digest, batch, num_batches, release_branch, file="", ) return record def update(self): """ Pulls cache records from s3. Only records name w/o content. """ for record_type in self.RecordType: prefix = self.s3_record_prefixes[record_type] cache_list = self.records[record_type] for job_type in self.JobType: path = self.cache_s3_paths[job_type] records = self.s3.list_prefix(f"{path}{prefix}", S3_BUILDS_BUCKET) records = [record.split("/")[-1] for record in records] GHActions.print_in_group( f"Cache records: [{record_type}] in [{job_type.value}]", records ) for file in records: record = self._parse_record_file_name( record_type=record_type, file_name=file ) if not record: print(f"ERROR: failed to parse cache record [{file}]") continue if ( record.job_name not in self.job_digests or self.job_digests[record.job_name] != record.job_digest ): # skip records we are not interested in continue if record.to_str_key() not in cache_list: cache_list[record.to_str_key()] = record self.cache_data_fetched = False elif ( not cache_list[record.to_str_key()].release_branch and record.release_branch ): # replace a non-release record with a release one cache_list[record.to_str_key()] = record self.cache_data_fetched = False self.cache_updated = True return self def fetch_records_data(self): """ Pulls CommitStatusData for all cached jobs from s3 """ if not self.cache_updated: self.update() if self.cache_data_fetched: # there are no record w/o underling data - no need to fetch return self # clean up for file in self._LOCAL_CACHE_PATH.glob("*.ci"): file.unlink() # download all record files for job_type in self.JobType: path = self.cache_s3_paths[job_type] for record_type in self.RecordType: prefix = self.s3_record_prefixes[record_type] _ = self.s3.download_files( bucket=S3_BUILDS_BUCKET, s3_path=f"{path}{prefix}", file_suffix=self._RECORD_FILE_EXTENSION, local_directory=self._LOCAL_CACHE_PATH, ) # validate we have files for all records and save file names meanwhile for record_type in self.RecordType: record_list = self.records[record_type] for _, record in record_list.items(): record_file_name = self._get_record_file_name( record_type, record.job_name, record.batch, record.num_batches, record.release_branch, ) assert ( self._LOCAL_CACHE_PATH / record_file_name ).is_file(), f"BUG. Record file must be present: {self._LOCAL_CACHE_PATH / record_file_name}" record.file = record_file_name self.cache_data_fetched = True return self def exist( self, record_type: "CiCache.RecordType", job: str, batch: int, num_batches: int, release_branch: bool, ) -> bool: if not self.cache_updated: self.update() record_key = self.Record( record_type, job, self.job_digests[job], batch, num_batches, release_branch, ).to_str_key() res = record_key in self.records[record_type] if release_branch: return res and self.records[record_type][record_key].release_branch else: return res def push( self, record_type: "CiCache.RecordType", job: str, batches: Union[int, Sequence[int]], num_batches: int, status: Union[CommitStatusData, PendingState], release_branch: bool = False, ) -> None: """ Pushes a cache record (CommitStatusData) @release_branch adds "release" attribute to a record """ if isinstance(batches, int): batches = [batches] for batch in batches: record_file = self._LOCAL_CACHE_PATH / self._get_record_file_name( record_type, job, batch, num_batches, release_branch ) record_s3_path = self._get_record_s3_path(job) if record_type == self.RecordType.SUCCESSFUL: assert isinstance(status, CommitStatusData) status.dump_to_file(record_file) elif record_type == self.RecordType.PENDING: assert isinstance(status, PendingState) with open(record_file, "w") as json_file: json.dump(asdict(status), json_file) else: assert False _ = self.s3.upload_file( bucket=S3_BUILDS_BUCKET, file_path=record_file, s3_path=record_s3_path + record_file.name, ) record = self.Record( record_type, job, self.job_digests[job], batch, num_batches, release_branch, file=record_file.name, ) if ( record.release_branch or record.to_str_key() not in self.records[record_type] ): self.records[record_type][record.to_str_key()] = record def get( self, record_type: "CiCache.RecordType", job: str, batch: int, num_batches: int ) -> Optional[Union[CommitStatusData, PendingState]]: """ Gets a cache record data for a job, or None if a cache miss """ if not self.cache_data_fetched: self.fetch_records_data() record_key = self.Record( record_type, job, self.job_digests[job], batch, num_batches, release_branch=False, ).to_str_key() if record_key not in self.records[record_type]: return None record_file_name = self.records[record_type][record_key].file res = CommitStatusData.load_from_file( self._LOCAL_CACHE_PATH / record_file_name ) # type: CommitStatusData return res def delete( self, record_type: "CiCache.RecordType", job: str, batch: int, num_batches: int, release_branch: bool, ) -> None: """ deletes record from the cache """ raise NotImplementedError("Let's try make cache push-and-read-only") # assert ( # record_type == self.RecordType.PENDING # ), "FIXME: delete is supported for pending records only" # record_file_name = self._get_record_file_name( # self.RecordType.PENDING, # job, # batch, # num_batches, # release_branch=release_branch, # ) # record_s3_path = self._get_record_s3_path(job) # self.s3.delete_file_from_s3(S3_BUILDS_BUCKET, record_s3_path + record_file_name) # record_key = self.Record( # record_type, # job, # self.job_digests[job], # batch, # num_batches, # release_branch=False, # ).to_str_key() # if record_key in self.records[record_type]: # del self.records[record_type][record_key] def is_successful( self, job: str, batch: int, num_batches: int, release_branch: bool ) -> bool: """ checks if a given job have already been done successfuly """ return self.exist( self.RecordType.SUCCESSFUL, job, batch, num_batches, release_branch ) def is_pending( self, job: str, batch: int, num_batches: int, release_branch: bool ) -> bool: """ check pending record in the cache for a given job @release_branch - checks that "release" attribute is set for a record """ if self.is_successful(job, batch, num_batches, release_branch): # successful record is present - not pending return False return self.exist( self.RecordType.PENDING, job, batch, num_batches, release_branch ) def push_successful( self, job: str, batch: int, num_batches: int, job_status: CommitStatusData, release_branch: bool = False, ) -> None: """ Pushes a cache record (CommitStatusData) @release_branch adds "release" attribute to a record """ self.push( self.RecordType.SUCCESSFUL, job, [batch], num_batches, job_status, release_branch, ) def push_pending( self, job: str, batches: List[int], num_batches: int, release_branch: bool ) -> None: """ pushes pending record for a job to the cache """ pending_state = PendingState(time.time(), run_url=GITHUB_RUN_URL) self.push( self.RecordType.PENDING, job, batches, num_batches, pending_state, release_branch, ) def get_successful( self, job: str, batch: int, num_batches: int ) -> Optional[CommitStatusData]: """ Gets a cache record (CommitStatusData) for a job, or None if a cache miss """ res = self.get(self.RecordType.SUCCESSFUL, job, batch, num_batches) assert res is None or isinstance(res, CommitStatusData) return res def delete_pending( self, job: str, batch: int, num_batches: int, release_branch: bool ) -> None: """ deletes pending record from the cache """ self.delete(self.RecordType.PENDING, job, batch, num_batches, release_branch) def download_build_reports(self, file_prefix: str = "") -> List[str]: """ not ideal class for this method, but let it be as we store build reports in CI cache directory on s3 and CiCache knows where exactly @file_prefix allows to filter out reports by git head_ref """ report_path = Path(REPORT_PATH) report_path.mkdir(exist_ok=True, parents=True) path = ( self._get_record_s3_path(Build.PACKAGE_RELEASE) + self._CACHE_BUILD_REPORT_PREFIX ) if file_prefix: path += "_" + file_prefix reports_files = self.s3.download_files( bucket=S3_BUILDS_BUCKET, s3_path=path, file_suffix=".json", local_directory=report_path, ) return reports_files def upload_build_report(self, build_result: BuildResult) -> str: result_json_path = build_result.write_json(Path(TEMP_PATH)) s3_path = ( self._get_record_s3_path(Build.PACKAGE_RELEASE) + result_json_path.name ) return self.s3.upload_file( bucket=S3_BUILDS_BUCKET, file_path=result_json_path, s3_path=s3_path ) # def await_jobs(self, jobs_with_params: Dict[str, Dict[str, Any]]) -> List[str]: # if not jobs_with_params: # return [] # print(f"Start awaiting jobs [{list(jobs_with_params)}]") # poll_interval_sec = 180 # start_at = int(time.time()) # TIMEOUT = 3000 # expired_sec = 0 # done_jobs = [] # type: List[str] # while expired_sec < TIMEOUT and jobs_with_params: # time.sleep(poll_interval_sec) # self.update() # pending_finished: List[str] = [] # for job_name in jobs_with_params: # num_batches = jobs_with_params[job_name]["num_batches"] # for batch in jobs_with_params[job_name]["batches"]: # if self.is_pending(job_name, batch, num_batches): # continue # print( # f"Job [{job_name}_[{batch}/{num_batches}]] is not pending anymore" # ) # pending_finished.append(job_name) # if pending_finished: # # restart timer # start_at = int(time.time()) # expired_sec = 0 # # remove finished jobs from awaiting list # for job in pending_finished: # del jobs_with_params[job] # done_jobs.append(job) # else: # expired_sec = int(time.time()) - start_at # print(f" ...awaiting continues... time left [{TIMEOUT - expired_sec}]") # if done_jobs: # print( # f"Awaiting OK. Left jobs: [{list(jobs_with_params)}], finished jobs: [{done_jobs}]" # ) # else: # print("Awaiting FAILED. No job has finished.") # return done_jobs def get_check_name(check_name: str, batch: int, num_batches: int) -> str: res = check_name if num_batches > 1: res = f"{check_name} [{batch+1}/{num_batches}]" return res def normalize_check_name(check_name: str) -> str: res = check_name.lower() for r in ((" ", "_"), ("(", "_"), (")", "_"), (",", "_"), ("/", "_")): res = res.replace(*r) return res def parse_args(parser: argparse.ArgumentParser) -> argparse.Namespace: # FIXME: consider switching to sub_parser for configure, pre, run, post actions parser.add_argument( "--configure", action="store_true", help="Action that configures ci run. Calculates digests, checks job to be executed, generates json output", ) parser.add_argument( "--update-gh-statuses", action="store_true", help="Action that recreate success GH statuses for jobs that finished successfully in past and will be skipped this time", ) parser.add_argument( "--pre", action="store_true", help="Action that executes prerequesetes for the job provided in --job-name", ) parser.add_argument( "--run", action="store_true", help="Action that executes run action for specified --job-name. run_command must be configured for a given job name.", ) parser.add_argument( "--post", action="store_true", help="Action that executes post actions for the job provided in --job-name", ) parser.add_argument( "--mark-success", action="store_true", help="Action that marks job provided in --job-name (with batch provided in --batch) as successful", ) parser.add_argument( "--job-name", default="", type=str, help="Job name as in config", ) parser.add_argument( "--run-command", default="", type=str, help="A run command to run in --run action. Will override run_command from a job config if any", ) parser.add_argument( "--batch", default=-1, type=int, help="Current batch number (required for --mark-success), -1 or omit for single-batch job", ) parser.add_argument( "--infile", default="", type=str, help="Input json file or json string with ci run config", ) parser.add_argument( "--outfile", default="", type=str, required=False, help="output file to write json result to, if not set - stdout", ) parser.add_argument( "--pretty", action="store_true", default=False, help="makes json output pretty formatted", ) parser.add_argument( "--skip-docker", action="store_true", default=False, help="skip fetching docker data from dockerhub, used in --configure action (for debugging)", ) parser.add_argument( "--docker-digest-or-latest", action="store_true", default=False, help="temporary hack to fallback to latest if image with digest as a tag is not on docker hub", ) parser.add_argument( "--skip-jobs", action="store_true", default=False, help="skip fetching data about job runs, used in --configure action (for debugging and nigthly ci)", ) parser.add_argument( "--rebuild-all-docker", action="store_true", default=False, help="will create run config for rebuilding all dockers, used in --configure action (for nightly docker job)", ) # FIXME: remove, not used parser.add_argument( "--rebuild-all-binaries", action="store_true", default=False, help="[DEPRECATED. to be removed, once no wf use it] will create run config without skipping build jobs in any case, used in --configure action (for release branches)", ) parser.add_argument( "--commit-message", default="", help="debug option to test commit message processing", ) return parser.parse_args() # FIXME: rewrite the docker job as regular reusable_test job and move interaction with docker hub inside job script # that way run config will be more clean, workflow more generic and less api calls to dockerhub def check_missing_images_on_dockerhub( image_name_tag: Dict[str, str], arch: Optional[str] = None ) -> Dict[str, str]: """ Checks missing images on dockerhub. Works concurrently for all given images. Docker must be logged in. """ def run_docker_command( image: str, image_digest: str, arch: Optional[str] = None ) -> Dict: """ aux command for fetching single docker manifest """ command = [ "docker", "manifest", "inspect", f"{image}:{image_digest}" if not arch else f"{image}:{image_digest}-{arch}", ] process = subprocess.run( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False, ) return { "image": image, "image_digest": image_digest, "arch": arch, "stdout": process.stdout, "stderr": process.stderr, "return_code": process.returncode, } result: Dict[str, str] = {} with concurrent.futures.ThreadPoolExecutor() as executor: futures = [ executor.submit(run_docker_command, image, tag, arch) for image, tag in image_name_tag.items() ] responses = [ future.result() for future in concurrent.futures.as_completed(futures) ] for resp in responses: name, stdout, stderr, digest, arch = ( resp["image"], resp["stdout"], resp["stderr"], resp["image_digest"], resp["arch"], ) if stderr: if stderr.startswith("no such manifest"): result[name] = digest else: print(f"Error: Unknown error: {stderr}, {name}, {arch}") elif stdout: if "mediaType" in stdout: pass else: print(f"Error: Unknown response: {stdout}") assert False, "FIXME" else: print(f"Error: No response for {name}, {digest}, {arch}") assert False, "FIXME" return result def _pre_action(s3, indata, pr_info): CommitStatusData.cleanup() JobReport.cleanup() BuildResult.cleanup() ci_cache = CiCache(s3, indata["jobs_data"]["digests"]) # for release/master branches reports must be from the same branches report_prefix = pr_info.head_ref if pr_info.number == 0 else "" reports_files = ci_cache.download_build_reports(file_prefix=report_prefix) print(f"Pre action done. Report files [{reports_files}] have been downloaded") def _mark_success_action( s3: S3Helper, indata: Dict[str, Any], pr_info: PRInfo, job: str, batch: int, ) -> None: ci_cache = CiCache(s3, indata["jobs_data"]["digests"]) job_config = CI_CONFIG.get_job_config(job) num_batches = job_config.num_batches # if batch is not provided - set to 0 batch = 0 if batch == -1 else batch assert ( 0 <= batch < num_batches ), f"--batch must be provided and in range [0, {num_batches}) for {job}" # FIXME: find generic design for propagating and handling job status (e.g. stop using statuses in GH api) # now job ca be build job w/o status data, any other job that exit with 0 with or w/o status data if CI_CONFIG.is_build_job(job): # there is no status for build jobs # create dummy success to mark it as done # FIXME: consider creating commit status for build jobs too, to treat everything the same way CommitStatusData("success", "dummy description", "dummy_url").dump_status() job_status = None if CommitStatusData.exist(): # normal scenario job_status = CommitStatusData.load_status() else: # apparently exit after rerun-helper check # do nothing, exit without failure print(f"ERROR: no status file for job [{job}]") if job_config.run_always or job_config.run_by_label: print(f"Job [{job}] runs always or by label in CI - do not cache") else: if pr_info.is_master(): pass # delete method is disabled for ci_cache. need it? # pending enabled for master branch jobs only # ci_cache.delete_pending(job, batch, num_batches, release_branch=True) if job_status and job_status.is_ok(): ci_cache.push_successful( job, batch, num_batches, job_status, pr_info.is_release_branch() ) print(f"Job [{job}] is ok") elif job_status: print(f"Job [{job}] is not ok, status [{job_status.status}]") def _print_results(result: Any, outfile: Optional[str], pretty: bool = False) -> None: if outfile: with open(outfile, "w") as f: if isinstance(result, str): print(result, file=f) elif isinstance(result, dict): print(json.dumps(result, indent=2 if pretty else None), file=f) else: raise AssertionError(f"Unexpected type for 'res': {type(result)}") else: if isinstance(result, str): print(result) elif isinstance(result, dict): print(json.dumps(result, indent=2 if pretty else None)) else: raise AssertionError(f"Unexpected type for 'res': {type(result)}") def _check_and_update_for_early_style_check(jobs_data: dict, docker_data: dict) -> None: """ This is temporary hack to start style check before docker build if possible FIXME: need better solution to do style check as soon as possible and as fast as possible w/o dependency on docker job """ jobs_to_do = jobs_data.get("jobs_to_do", []) docker_to_build = docker_data.get("missing_multi", []) if ( JobNames.STYLE_CHECK in jobs_to_do and docker_to_build and "clickhouse/style-test" not in docker_to_build ): index = jobs_to_do.index(JobNames.STYLE_CHECK) jobs_to_do[index] = "Style check early" def _update_config_for_docs_only(jobs_data: dict) -> None: DOCS_CHECK_JOBS = [JobNames.DOCS_CHECK, JobNames.STYLE_CHECK] print(f"NOTE: Will keep only docs related jobs: [{DOCS_CHECK_JOBS}]") jobs_to_do = jobs_data.get("jobs_to_do", []) jobs_data["jobs_to_do"] = [job for job in jobs_to_do if job in DOCS_CHECK_JOBS] jobs_data["jobs_to_wait"] = { job: params for job, params in jobs_data["jobs_to_wait"].items() if job in DOCS_CHECK_JOBS } def _configure_docker_jobs( rebuild_all_dockers: bool, docker_digest_or_latest: bool = False ) -> Dict: print("::group::Docker images check") # generate docker jobs data docker_digester = DockerDigester() imagename_digest_dict = ( docker_digester.get_all_digests() ) # 'image name - digest' mapping images_info = docker_images_helper.get_images_info() # a. check missing images if not rebuild_all_dockers: # FIXME: we need login as docker manifest inspect goes directly to one of the *.docker.com hosts instead of "registry-mirrors" : ["http://dockerhub-proxy.dockerhub-proxy-zone:5000"] # find if it's possible to use the setting of /etc/docker/daemon.json docker_images_helper.docker_login() missing_multi_dict = check_missing_images_on_dockerhub(imagename_digest_dict) missing_multi = list(missing_multi_dict) missing_amd64 = [] missing_aarch64 = [] if not docker_digest_or_latest: # look for missing arm and amd images only among missing multiarch manifests @missing_multi_dict # to avoid extra dockerhub api calls missing_amd64 = list( check_missing_images_on_dockerhub(missing_multi_dict, "amd64") ) # FIXME: WA until full arm support: skip not supported arm images missing_aarch64 = list( check_missing_images_on_dockerhub( { im: digest for im, digest in missing_multi_dict.items() if not images_info[im]["only_amd64"] }, "aarch64", ) ) # FIXME: temporary hack, remove after transition to docker digest as tag else: if missing_multi: print( f"WARNING: Missing images {list(missing_multi)} - fallback to latest tag" ) for image in missing_multi: imagename_digest_dict[image] = "latest" else: # add all images to missing missing_multi = list(imagename_digest_dict) missing_amd64 = missing_multi # FIXME: WA until full arm support: skip not supported arm images missing_aarch64 = [ name for name in imagename_digest_dict if not images_info[name]["only_amd64"] ] print("::endgroup::") return { "images": imagename_digest_dict, "missing_aarch64": missing_aarch64, "missing_amd64": missing_amd64, "missing_multi": missing_multi, } def _configure_jobs( job_digester: JobDigester, s3: S3Helper, pr_info: PRInfo, commit_tokens: List[str], ci_cache_disabled: bool, ) -> Dict: ## a. digest each item from the config job_digester = JobDigester() jobs_params: Dict[str, Dict] = {} jobs_to_do: List[str] = [] jobs_to_skip: List[str] = [] digests: Dict[str, str] = {} print("::group::Job Digests") for job in CI_CONFIG.job_generator(): digest = job_digester.get_job_digest(CI_CONFIG.get_digest_config(job)) digests[job] = digest print(f" job [{job.rjust(50)}] has digest [{digest}]") print("::endgroup::") ## b. check what we need to run ci_cache = None if not ci_cache_disabled: ci_cache = CiCache(s3, digests) jobs_to_wait: Dict[str, Dict[str, Any]] = {} for job in digests: digest = digests[job] job_config = CI_CONFIG.get_job_config(job) num_batches: int = job_config.num_batches batches_to_do: List[int] = [] for batch in range(num_batches): # type: ignore if job_config.pr_only and pr_info.is_release_branch(): continue if job_config.run_by_label: # this job controlled by label, add to todo if its label is set in pr if job_config.run_by_label in pr_info.labels: batches_to_do.append(batch) elif job_config.run_always: # always add to todo batches_to_do.append(batch) elif not ci_cache: batches_to_do.append(batch) elif not ci_cache.is_successful( job, batch, num_batches, release_branch=pr_info.is_release_branch() and job_config.required_on_release_branch, ): # ci cache is enabled and job is not in the cache - add batches_to_do.append(batch) # check if it's pending in the cache if ci_cache.is_pending(job, batch, num_batches, release_branch=False): if job in jobs_to_wait: jobs_to_wait[job]["batches"].append(batch) else: jobs_to_wait[job] = { "batches": [batch], "num_batches": num_batches, } if batches_to_do: jobs_to_do.append(job) jobs_params[job] = { "batches": batches_to_do, "num_batches": num_batches, } else: jobs_to_skip.append(job) ## c. check CI controlling labels and commit messages if pr_info.labels: jobs_requested_by_label = [] # type: List[str] ci_controlling_labels = [] # type: List[str] for label in pr_info.labels: label_config = CI_CONFIG.get_label_config(label) if label_config: jobs_requested_by_label += label_config.run_jobs ci_controlling_labels += [label] if ci_controlling_labels: print(f"NOTE: CI controlling labels are set: [{ci_controlling_labels}]") print( f" : following jobs will be executed: [{jobs_requested_by_label}]" ) # so far there is only "do not test" label in the config that runs only Style check. # check later if we need to filter out requested jobs using ci cache. right now we do it: jobs_to_do = [job for job in jobs_requested_by_label if job in jobs_to_do] if commit_tokens: jobs_to_do_requested = [] # type: List[str] # handle ci set tokens ci_controlling_tokens = [ token for token in commit_tokens if token in CI_CONFIG.label_configs ] for token_ in ci_controlling_tokens: label_config = CI_CONFIG.get_label_config(token_) assert label_config, f"Unknonwn token [{token_}]" print( f"NOTE: CI controlling token: [{ci_controlling_tokens}], add jobs: [{label_config.run_jobs}]" ) jobs_to_do_requested += label_config.run_jobs # handle specific job requests requested_jobs = [ token[len("job_") :] for token in commit_tokens if token.startswith("job_") ] if requested_jobs: assert any( len(x) > 1 for x in requested_jobs ), f"Invalid job names requested [{requested_jobs}]" for job in requested_jobs: job_with_parents = CI_CONFIG.get_job_with_parents(job) print( f"NOTE: CI controlling token: [#job_{job}], add jobs: [{job_with_parents}]" ) # always add requested job itself, even if it could be skipped jobs_to_do_requested.append(job_with_parents[0]) for parent in job_with_parents[1:]: if parent in jobs_to_do and parent not in jobs_to_do_requested: jobs_to_do_requested.append(parent) if jobs_to_do_requested: print( f"NOTE: Only specific job(s) were requested by commit message tokens: [{jobs_to_do_requested}]" ) jobs_to_do = list( set(job for job in jobs_to_do_requested if job in jobs_to_do) ) return { "digests": digests, "jobs_to_do": jobs_to_do, "jobs_to_skip": jobs_to_skip, "jobs_to_wait": jobs_to_wait, "jobs_params": { job: params for job, params in jobs_params.items() if job in jobs_to_do }, } def _update_gh_statuses_action(indata: Dict, s3: S3Helper) -> None: if indata["ci_flags"][Labels.NO_CI_CACHE]: print("CI cache is disabled - skip restoring commit statuses from CI cache") return job_digests = indata["jobs_data"]["digests"] ci_cache = CiCache(s3, job_digests).update().fetch_records_data() # create GH status pr_info = PRInfo() commit = get_commit(Github(get_best_robot_token(), per_page=100), pr_info.sha) def _run_create_status(job: str, batch: int, num_batches: int) -> None: job_status = ci_cache.get_successful(job, batch, num_batches) if not job_status: return print(f"Going to re-create GH status for job [{job}] sha [{pr_info.sha}]") assert job_status.status == "success", "BUG!" commit.create_status( state=job_status.status, target_url=job_status.report_url, description=format_description( f"Reused from [{job_status.pr_num}-{job_status.sha[0:8]}]: " f"{job_status.description}" ), context=get_check_name(job, batch=batch, num_batches=num_batches), ) with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] for job in job_digests: if CI_CONFIG.is_build_job(job): # no GH status for build jobs continue num_batches = CI_CONFIG.get_job_config(job).num_batches for batch in range(num_batches): future = executor.submit(_run_create_status, job, batch, num_batches) futures.append(future) done, _ = concurrent.futures.wait(futures) for future in done: try: _ = future.result() except Exception as e: raise e print("Going to update overall CI report") set_status_comment(commit, pr_info) print("... CI report update - done") def _fetch_commit_tokens(message: str) -> List[str]: pattern = r"#[\w-]+" matches = [match[1:] for match in re.findall(pattern, message)] res = [match for match in matches if match in Labels or match.startswith("job_")] return res def _upload_build_artifacts( pr_info: PRInfo, build_name: str, ci_cache: CiCache, job_report: JobReport, s3: S3Helper, s3_destination: str, ) -> str: # There are ugly artifacts for the performance test. FIXME: s3_performance_path = "/".join( ( get_release_or_pr(pr_info, get_version_from_repo())[1], pr_info.sha, CI_CONFIG.normalize_string(build_name), "performance.tar.zst", ) ) performance_urls = [] assert job_report.build_dir_for_upload, "Must be set for build job" performance_path = Path(job_report.build_dir_for_upload) / "performance.tar.zst" if performance_path.exists(): performance_urls.append( s3.upload_build_file_to_s3(performance_path, s3_performance_path) ) print( "Uploaded performance.tar.zst to %s, now delete to avoid duplication", performance_urls[0], ) performance_path.unlink() build_urls = ( s3.upload_build_directory_to_s3( Path(job_report.build_dir_for_upload), s3_destination, keep_dirs_in_s3_path=False, upload_symlinks=False, ) + performance_urls ) print("::notice ::Build URLs: {}".format("\n".join(build_urls))) log_path = Path(job_report.additional_files[0]) log_url = "" if log_path.exists(): log_url = s3.upload_build_file_to_s3( log_path, s3_destination + "/" + log_path.name ) print(f"::notice ::Log URL: {log_url}") # generate and upload build report build_result = BuildResult( build_name, log_url, build_urls, job_report.version, job_report.status, int(job_report.duration), GITHUB_JOB_API_URL(), head_ref=pr_info.head_ref, pr_number=pr_info.number, ) report_url = ci_cache.upload_build_report(build_result) print(f"Report file has been uploaded to [{report_url}]") # Upload head master binaries static_bin_name = CI_CONFIG.build_config[build_name].static_binary_name if pr_info.is_master() and static_bin_name: # Full binary with debug info: s3_path_full = "/".join((pr_info.base_ref, static_bin_name, "clickhouse-full")) binary_full = Path(job_report.build_dir_for_upload) / "clickhouse" url_full = s3.upload_build_file_to_s3(binary_full, s3_path_full) print(f"::notice ::Binary static URL (with debug info): {url_full}") # Stripped binary without debug info: s3_path_compact = "/".join((pr_info.base_ref, static_bin_name, "clickhouse")) binary_compact = Path(job_report.build_dir_for_upload) / "clickhouse-stripped" url_compact = s3.upload_build_file_to_s3(binary_compact, s3_path_compact) print(f"::notice ::Binary static URL (compact): {url_compact}") return log_url def _upload_build_profile_data( pr_info: PRInfo, build_name: str, job_report: JobReport, git_runner: GitRunner, ch_helper: ClickHouseHelper, ) -> None: ci_logs_credentials = CiLogsCredentials(Path("/dev/null")) if ci_logs_credentials.host: instance_type = get_instance_type() instance_id = get_instance_id() query = f"""INSERT INTO build_time_trace ( pull_request_number, commit_sha, check_start_time, check_name, instance_type, instance_id, file, library, time, pid, tid, ph, ts, dur, cat, name, detail, count, avgMs, args_name ) SELECT {pr_info.number}, '{pr_info.sha}', '{job_report.start_time}', '{build_name}', '{instance_type}', '{instance_id}', * FROM input(' file String, library String, time DateTime64(6), pid UInt32, tid UInt32, ph String, ts UInt64, dur UInt64, cat String, name String, detail String, count UInt64, avgMs UInt64, args_name String') FORMAT JSONCompactEachRow""" auth = { "X-ClickHouse-User": "ci", "X-ClickHouse-Key": ci_logs_credentials.password, } url = f"https://{ci_logs_credentials.host}/" profiles_dir = Path(TEMP_PATH) / "profiles_source" profiles_dir.mkdir(parents=True, exist_ok=True) print( "Processing profile JSON files from %s", Path(REPO_COPY) / "build_docker", ) git_runner( "./utils/prepare-time-trace/prepare-time-trace.sh " f"build_docker {profiles_dir.absolute()}" ) profile_data_file = Path(TEMP_PATH) / "profile.json" with open(profile_data_file, "wb") as profile_fd: for profile_source in profiles_dir.iterdir(): if profile_source.name != "binary_sizes.txt": with open(profiles_dir / profile_source, "rb") as ps_fd: profile_fd.write(ps_fd.read()) print( "::notice ::Log Uploading profile data, path: %s, size: %s, query: %s", profile_data_file, profile_data_file.stat().st_size, query, ) ch_helper.insert_file(url, auth, query, profile_data_file) query = f"""INSERT INTO binary_sizes ( pull_request_number, commit_sha, check_start_time, check_name, instance_type, instance_id, file, size ) SELECT {pr_info.number}, '{pr_info.sha}', '{job_report.start_time}', '{build_name}', '{instance_type}', '{instance_id}', file, size FROM input('size UInt64, file String') SETTINGS format_regexp = '^\\s*(\\d+) (.+)$' FORMAT Regexp""" binary_sizes_file = profiles_dir / "binary_sizes.txt" print( "::notice ::Log Uploading binary sizes data, path: %s, size: %s, query: %s", binary_sizes_file, binary_sizes_file.stat().st_size, query, ) ch_helper.insert_file(url, auth, query, binary_sizes_file) def _run_test(job_name: str, run_command: str) -> int: assert ( run_command or CI_CONFIG.get_job_config(job_name).run_command ), "Run command must be provided as input argument or be configured in job config" if not run_command: if CI_CONFIG.get_job_config(job_name).timeout: os.environ["KILL_TIMEOUT"] = str(CI_CONFIG.get_job_config(job_name).timeout) run_command = "/".join( (os.path.dirname(__file__), CI_CONFIG.get_job_config(job_name).run_command) ) if ".py" in run_command and not run_command.startswith("python"): run_command = "python3 " + run_command print("Use run command from a job config") else: print("Use run command from the workflow") os.environ["CHECK_NAME"] = job_name print(f"Going to start run command [{run_command}]") process = subprocess.run( run_command, stdout=sys.stdout, stderr=sys.stderr, text=True, check=False, shell=True, ) if process.returncode == 0: print(f"Run action done for: [{job_name}]") exit_code = 0 else: print( f"Run action failed for: [{job_name}] with exit code [{process.returncode}]" ) exit_code = process.returncode return exit_code def _get_ext_check_name(check_name: str) -> str: run_by_hash_num = int(os.getenv("RUN_BY_HASH_NUM", "0")) run_by_hash_total = int(os.getenv("RUN_BY_HASH_TOTAL", "0")) if run_by_hash_total > 1: check_name_with_group = ( check_name + f" [{run_by_hash_num + 1}/{run_by_hash_total}]" ) else: check_name_with_group = check_name return check_name_with_group def main() -> int: logging.basicConfig(level=logging.INFO) exit_code = 0 parser = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) args = parse_args(parser) if args.mark_success or args.pre or args.run: assert args.infile, "Run config must be provided via --infile" assert args.job_name, "Job name must be provided via --job-name" indata: Optional[Dict[str, Any]] = None if args.infile: indata = ( json.loads(args.infile) if not os.path.isfile(args.infile) else json.load(open(args.infile)) ) assert indata and isinstance(indata, dict), "Invalid --infile json" result: Dict[str, Any] = {} s3 = S3Helper() pr_info = PRInfo() git_runner = GitRunner(set_cwd_to_git_root=True) ### CONFIGURE action: start if args.configure: # if '#no_merge_commit' is set in commit message - set git ref to PR branch head to avoid merge-commit tokens = [] ci_flags = { Labels.NO_MERGE_COMMIT: False, Labels.NO_CI_CACHE: False, } if (pr_info.number != 0 and not args.skip_jobs) or args.commit_message: message = args.commit_message or git_runner.run( f"{GIT_PREFIX} log {pr_info.sha} --format=%B -n 1" ) tokens = _fetch_commit_tokens(message) print(f"Commit message tokens: [{tokens}]") if Labels.NO_MERGE_COMMIT in tokens and CI: git_runner.run(f"{GIT_PREFIX} checkout {pr_info.sha}") git_ref = git_runner.run(f"{GIT_PREFIX} rev-parse HEAD") ci_flags[Labels.NO_MERGE_COMMIT] = True print("NOTE: Disable Merge Commit") if Labels.NO_CI_CACHE in tokens: ci_flags[Labels.NO_CI_CACHE] = True print("NOTE: Disable CI Cache") docker_data = {} git_ref = git_runner.run(f"{GIT_PREFIX} rev-parse HEAD") # let's get CH version version = get_version_from_repo(git=Git(True)).string print(f"Got CH version for this commit: [{version}]") docker_data = ( _configure_docker_jobs( args.rebuild_all_docker, args.docker_digest_or_latest ) if not args.skip_docker else {} ) job_digester = JobDigester() build_digest = job_digester.get_job_digest( CI_CONFIG.get_digest_config("package_release") ) docs_digest = job_digester.get_job_digest( CI_CONFIG.get_digest_config(JobNames.DOCS_CHECK) ) jobs_data = ( _configure_jobs( job_digester, s3, pr_info, tokens, ci_flags[Labels.NO_CI_CACHE], ) if not args.skip_jobs else {} ) # FIXME: Early style check manipulates with job names might be not robust with await feature if pr_info.number != 0 and not args.docker_digest_or_latest: # FIXME: it runs style check before docker build if possible (style-check images is not changed) # find a way to do style check always before docker build and others _check_and_update_for_early_style_check(jobs_data, docker_data) if args.skip_jobs and pr_info.has_changes_in_documentation_only(): _update_config_for_docs_only(jobs_data) # TODO: await pending jobs # wait for pending jobs to be finished, await_jobs is a long blocking call if any job has to be awaited ci_cache = CiCache(s3, jobs_data["digests"]) # awaited_jobs = ci_cache.await_jobs(jobs_data.get("jobs_to_wait", {})) # for job in awaited_jobs: # jobs_to_do = jobs_data["jobs_to_do"] # if job in jobs_to_do: # jobs_to_do.remove(job) # else: # assert False, "BUG" # set planned jobs as pending in the CI cache if on the master if pr_info.is_master(): for job in jobs_data["jobs_to_do"]: config = CI_CONFIG.get_job_config(job) if config.run_always or config.run_by_label: continue job_params = jobs_data["jobs_params"][job] ci_cache.push_pending( job, job_params["batches"], config.num_batches, release_branch=pr_info.is_release_branch(), ) # conclude results result["git_ref"] = git_ref result["version"] = version result["build"] = build_digest result["docs"] = docs_digest result["ci_flags"] = ci_flags result["jobs_data"] = jobs_data result["docker_data"] = docker_data ### CONFIGURE action: end ### PRE action: start elif args.pre: assert indata, "Run config must be provided via --infile" _pre_action(s3, indata, pr_info) ### RUN action: start elif args.run: assert indata check_name = args.job_name check_name_with_group = _get_ext_check_name(check_name) print( f"Check if rerun for name: [{check_name}], extended name [{check_name_with_group}]" ) previous_status = None if CI_CONFIG.is_build_job(check_name): # this is a build job - check if build report is present build_result = ( BuildResult.load_any(check_name, pr_info.number, pr_info.head_ref) if not indata["ci_flags"][Labels.NO_CI_CACHE] else None ) if build_result: if build_result.status == SUCCESS: previous_status = build_result.status else: # FIXME: Consider reusing failures for build jobs. # Just remove this if/else - that makes build job starting and failing immediately print( "Build report found but status is unsuccessful - will try to rerun" ) print("::group::Build Report") print(build_result.as_json()) print("::endgroup::") else: # this is a test job - check if GH commit status is present # rerun helper check # FIXME: remove rerun_helper check and rely on ci cache only commit = get_commit( Github(get_best_robot_token(), per_page=100), pr_info.sha ) rerun_helper = RerunHelper(commit, check_name_with_group) if rerun_helper.is_already_finished_by_status(): status = rerun_helper.get_finished_status() assert status previous_status = status.state print("::group::Commit Status") print(status) print("::endgroup::") # ci cache check elif not indata["ci_flags"][Labels.NO_CI_CACHE]: ci_cache = CiCache(s3, indata["jobs_data"]["digests"]).update() job_config = CI_CONFIG.get_job_config(check_name) if ci_cache.is_successful( check_name, args.batch, job_config.num_batches, job_config.required_on_release_branch, ): job_status = ci_cache.get_successful( check_name, args.batch, job_config.num_batches ) assert job_status, "BUG" commit.create_status( state=job_status.status, target_url=job_status.report_url, description=format_description( f"Reused from [{job_status.pr_num}-{job_status.sha[0:8]}]: " f"{job_status.description}" ), context=get_check_name( check_name, batch=args.batch, num_batches=job_config.num_batches, ), ) previous_status = job_status.status print("::group::Commit Status Data") print(job_status) print("::endgroup::") if previous_status: print( f"Commit status or Build Report is already present - job will be skipped with status: [{previous_status}]" ) if previous_status == SUCCESS: exit_code = 0 else: exit_code = 1 else: exit_code = _run_test(check_name, args.run_command) ### RUN action: end ### POST action: start elif args.post: job_report = JobReport.load() if JobReport.exist() else None if job_report: ch_helper = ClickHouseHelper() check_url = "" if CI_CONFIG.is_build_job(args.job_name): assert ( indata ), "--infile with config must be provided for POST action of a build type job [{args.job_name}]" build_name = args.job_name s3_path_prefix = "/".join( ( get_release_or_pr(pr_info, get_version_from_repo())[0], pr_info.sha, build_name, ) ) log_url = _upload_build_artifacts( pr_info, build_name, ci_cache=CiCache(s3, indata["jobs_data"]["digests"]), job_report=job_report, s3=s3, s3_destination=s3_path_prefix, ) _upload_build_profile_data( pr_info, build_name, job_report, git_runner, ch_helper ) check_url = log_url else: # test job additional_urls = [] s3_path_prefix = "/".join( ( get_release_or_pr(pr_info, get_version_from_repo())[0], pr_info.sha, CI_CONFIG.normalize_string( job_report.check_name or _get_ext_check_name(args.job_name) ), ) ) if job_report.build_dir_for_upload: additional_urls = s3.upload_build_directory_to_s3( Path(job_report.build_dir_for_upload), s3_path_prefix, keep_dirs_in_s3_path=False, upload_symlinks=False, ) if job_report.test_results or job_report.additional_files: check_url = upload_result_helper.upload_results( s3, pr_info.number, pr_info.sha, job_report.test_results, job_report.additional_files, job_report.check_name or args.job_name, additional_urls=additional_urls or None, ) commit = get_commit( Github(get_best_robot_token(), per_page=100), pr_info.sha ) post_commit_status( commit, job_report.status, check_url, format_description(job_report.description), job_report.check_name or args.job_name, pr_info, dump_to_file=True, ) update_mergeable_check( commit, pr_info, job_report.check_name or _get_ext_check_name(args.job_name), ) print(f"Job report url: [{check_url}]") prepared_events = prepare_tests_results_for_clickhouse( pr_info, job_report.test_results, job_report.status, job_report.duration, job_report.start_time, check_url or "", job_report.check_name or args.job_name, ) ch_helper.insert_events_into( db="default", table="checks", events=prepared_events ) else: # no job report print(f"No job report for {[args.job_name]} - do nothing") ### POST action: end ### MARK SUCCESS action: start elif args.mark_success: assert indata, "Run config must be provided via --infile" _mark_success_action(s3, indata, pr_info, args.job_name, args.batch) ### UPDATE GH STATUSES action: start elif args.update_gh_statuses: assert indata, "Run config must be provided via --infile" _update_gh_statuses_action(indata=indata, s3=s3) ### print results _print_results(result, args.outfile, args.pretty) return exit_code if __name__ == "__main__": sys.exit(main())