import argparse import concurrent.futures import json import logging import os import random import re import subprocess import sys import time from copy import deepcopy from dataclasses import asdict, dataclass from enum import Enum from pathlib import Path from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, Union import docker_images_helper import upload_result_helper from build_check import get_release_or_pr from ci_config import CI_CONFIG, Build, CILabels, CIStages, JobNames, StatusNames from ci_utils import GHActions, is_hex, normalize_string from clickhouse_helper import ( CiLogsCredentials, ClickHouseHelper, InsertException, get_instance_id, get_instance_type, prepare_tests_results_for_clickhouse, ) from commit_status_helper import ( CommitStatusData, RerunHelper, format_description, get_commit, post_commit_status, set_status_comment, update_mergeable_check, update_upstream_sync_status, ) from digest_helper import DockerDigester, JobDigester from env_helper import ( CI, GITHUB_JOB_API_URL, GITHUB_REPOSITORY, GITHUB_RUN_URL, GITHUB_UPSTREAM_REPOSITORY, REPO_COPY, REPORT_PATH, S3_BUILDS_BUCKET, TEMP_PATH, ) from get_robot_token import get_best_robot_token from git_helper import GIT_PREFIX, Git from git_helper import Runner as GitRunner from github_helper import GitHub from pr_info import PRInfo from report import ERROR, SUCCESS, BuildResult, JobReport from s3_helper import S3Helper from synchronizer_utils import SYNC_BRANCH_PREFIX from version_helper import get_version_from_repo # pylint: disable=too-many-lines @dataclass class PendingState: updated_at: float run_url: str class CiCache: """ CI cache is a bunch of records. Record is a file stored under special location on s3. The file name has following format _[]--___.ci RECORD_TYPE: SUCCESSFUL - for successfuly finished jobs PENDING - for pending jobs ATTRIBUTES: release - for jobs being executed on the release branch including master branch (not a PR branch) """ _S3_CACHE_PREFIX = "CI_cache_v1" _CACHE_BUILD_REPORT_PREFIX = "build_report" _RECORD_FILE_EXTENSION = ".ci" _LOCAL_CACHE_PATH = Path(TEMP_PATH) / "ci_cache" _ATTRIBUTE_RELEASE = "release" # divider symbol 1 _DIV1 = "--" # divider symbol 2 _DIV2 = "_" assert _DIV1 != _DIV2 class RecordType(Enum): SUCCESSFUL = "successful" PENDING = "pending" FAILED = "failed" @dataclass class Record: record_type: "CiCache.RecordType" job_name: str job_digest: str batch: int num_batches: int release_branch: bool file: str = "" def to_str_key(self): """other fields must not be included in the hash str""" return "_".join( [self.job_name, self.job_digest, str(self.batch), str(self.num_batches)] ) class JobType(Enum): DOCS = "DOCS" SRCS = "SRCS" @classmethod def is_docs_job(cls, job_name: str) -> bool: return job_name == JobNames.DOCS_CHECK @classmethod def is_srcs_job(cls, job_name: str) -> bool: return not cls.is_docs_job(job_name) @classmethod def get_type_by_name(cls, job_name: str) -> "CiCache.JobType": res = cls.SRCS if cls.is_docs_job(job_name): res = cls.DOCS elif cls.is_srcs_job(job_name): res = cls.SRCS else: assert False return res def __init__( self, s3: S3Helper, job_digests: Dict[str, str], ): self.s3 = s3 self.job_digests = job_digests self.cache_s3_paths = { job_type: f"{self._S3_CACHE_PREFIX}/{job_type.value}-{self._get_digest_for_job_type(self.job_digests, job_type)}/" for job_type in self.JobType } self.s3_record_prefixes = { record_type: record_type.value for record_type in self.RecordType } self.records: Dict["CiCache.RecordType", Dict[str, "CiCache.Record"]] = { record_type: {} for record_type in self.RecordType } self.cache_updated = False self.cache_data_fetched = True if not self._LOCAL_CACHE_PATH.exists(): self._LOCAL_CACHE_PATH.mkdir(parents=True, exist_ok=True) def _get_digest_for_job_type( self, job_digests: Dict[str, str], job_type: JobType ) -> str: if job_type == self.JobType.DOCS: res = job_digests[JobNames.DOCS_CHECK] elif job_type == self.JobType.SRCS: # any build type job has the same digest - pick up Build.PACKAGE_RELEASE or Build.PACKAGE_ASAN as a failover # Build.PACKAGE_RELEASE may not exist in the list if we have reduced CI pipeline if Build.PACKAGE_RELEASE in job_digests: res = job_digests[Build.PACKAGE_RELEASE] elif Build.PACKAGE_ASAN in job_digests: # failover, if failover does not work - fix it! res = job_digests[Build.PACKAGE_ASAN] else: assert False, "BUG, no build job in digest' list" else: assert False, "BUG, New JobType? - please update func" return res def _get_record_file_name( self, record_type: RecordType, job_name: str, batch: int, num_batches: int, release_branch: bool, ) -> str: prefix = self.s3_record_prefixes[record_type] prefix_extended = ( self._DIV2.join([prefix, self._ATTRIBUTE_RELEASE]) if release_branch else prefix ) assert self._DIV1 not in job_name, f"Invalid job name {job_name}" job_name = self._DIV2.join( [job_name, self.job_digests[job_name], str(batch), str(num_batches)] ) file_name = self._DIV1.join([prefix_extended, job_name]) file_name += self._RECORD_FILE_EXTENSION return file_name def _get_record_s3_path(self, job_name: str) -> str: return self.cache_s3_paths[self.JobType.get_type_by_name(job_name)] def _parse_record_file_name( self, record_type: RecordType, file_name: str ) -> Optional["CiCache.Record"]: # validate filename if ( not file_name.endswith(self._RECORD_FILE_EXTENSION) or not len(file_name.split(self._DIV1)) == 2 ): print("ERROR: wrong file name format") return None file_name = file_name.removesuffix(self._RECORD_FILE_EXTENSION) release_branch = False prefix_extended, job_suffix = file_name.split(self._DIV1) record_type_and_attribute = prefix_extended.split(self._DIV2) # validate filename prefix failure = False if not 0 < len(record_type_and_attribute) <= 2: print("ERROR: wrong file name prefix") failure = True if ( len(record_type_and_attribute) > 1 and record_type_and_attribute[1] != self._ATTRIBUTE_RELEASE ): print("ERROR: wrong record attribute") failure = True if record_type_and_attribute[0] != self.s3_record_prefixes[record_type]: print("ERROR: wrong record type") failure = True if failure: return None if ( len(record_type_and_attribute) > 1 and record_type_and_attribute[1] == self._ATTRIBUTE_RELEASE ): release_branch = True job_properties = job_suffix.split(self._DIV2) job_name, job_digest, batch, num_batches = ( self._DIV2.join(job_properties[:-3]), job_properties[-3], int(job_properties[-2]), int(job_properties[-1]), ) if not is_hex(job_digest): print("ERROR: wrong record job digest") return None record = self.Record( record_type, job_name, job_digest, batch, num_batches, release_branch, file="", ) return record def print_status(self): for record_type in self.RecordType: GHActions.print_in_group( f"Cache records: [{record_type}]", list(self.records[record_type]) ) return self def update(self): """ Pulls cache records from s3. Only records name w/o content. """ for record_type in self.RecordType: prefix = self.s3_record_prefixes[record_type] cache_list = self.records[record_type] for job_type in self.JobType: path = self.cache_s3_paths[job_type] records = self.s3.list_prefix(f"{path}{prefix}", S3_BUILDS_BUCKET) records = [record.split("/")[-1] for record in records] for file in records: record = self._parse_record_file_name( record_type=record_type, file_name=file ) if not record: print(f"ERROR: failed to parse cache record [{file}]") continue if ( record.job_name not in self.job_digests or self.job_digests[record.job_name] != record.job_digest ): # skip records we are not interested in continue if record.to_str_key() not in cache_list: cache_list[record.to_str_key()] = record self.cache_data_fetched = False elif ( not cache_list[record.to_str_key()].release_branch and record.release_branch ): # replace a non-release record with a release one cache_list[record.to_str_key()] = record self.cache_data_fetched = False self.cache_updated = True return self def fetch_records_data(self): """ Pulls CommitStatusData for all cached jobs from s3 """ if not self.cache_updated: self.update() if self.cache_data_fetched: # there are no records without fetched data - no need to fetch return self # clean up for file in self._LOCAL_CACHE_PATH.glob("*.ci"): file.unlink() # download all record files for job_type in self.JobType: path = self.cache_s3_paths[job_type] for record_type in self.RecordType: prefix = self.s3_record_prefixes[record_type] _ = self.s3.download_files( bucket=S3_BUILDS_BUCKET, s3_path=f"{path}{prefix}", file_suffix=self._RECORD_FILE_EXTENSION, local_directory=self._LOCAL_CACHE_PATH, ) # validate we have files for all records and save file names meanwhile for record_type in self.RecordType: record_list = self.records[record_type] for _, record in record_list.items(): record_file_name = self._get_record_file_name( record_type, record.job_name, record.batch, record.num_batches, record.release_branch, ) assert ( self._LOCAL_CACHE_PATH / record_file_name ).is_file(), f"BUG. Record file must be present: {self._LOCAL_CACHE_PATH / record_file_name}" record.file = record_file_name self.cache_data_fetched = True return self def exist( self, record_type: "CiCache.RecordType", job: str, batch: int, num_batches: int, release_branch: bool, ) -> bool: if not self.cache_updated: self.update() record_key = self.Record( record_type, job, self.job_digests[job], batch, num_batches, release_branch, ).to_str_key() res = record_key in self.records[record_type] if release_branch: return res and self.records[record_type][record_key].release_branch else: return res def push( self, record_type: "CiCache.RecordType", job: str, batches: Union[int, Sequence[int]], num_batches: int, status: Union[CommitStatusData, PendingState], release_branch: bool = False, ) -> None: """ Pushes a cache record (CommitStatusData) @release_branch adds "release" attribute to a record """ if isinstance(batches, int): batches = [batches] for batch in batches: record_file = self._LOCAL_CACHE_PATH / self._get_record_file_name( record_type, job, batch, num_batches, release_branch ) record_s3_path = self._get_record_s3_path(job) if record_type == self.RecordType.SUCCESSFUL: assert isinstance(status, CommitStatusData) status.dump_to_file(record_file) elif record_type == self.RecordType.FAILED: assert isinstance(status, CommitStatusData) status.dump_to_file(record_file) elif record_type == self.RecordType.PENDING: assert isinstance(status, PendingState) with open(record_file, "w", encoding="utf-8") as json_file: json.dump(asdict(status), json_file) else: assert False _ = self.s3.upload_file( bucket=S3_BUILDS_BUCKET, file_path=record_file, s3_path=record_s3_path + record_file.name, ) record = self.Record( record_type, job, self.job_digests[job], batch, num_batches, release_branch, file=record_file.name, ) if ( record.release_branch or record.to_str_key() not in self.records[record_type] ): self.records[record_type][record.to_str_key()] = record def get( self, record_type: "CiCache.RecordType", job: str, batch: int, num_batches: int ) -> Optional[Union[CommitStatusData, PendingState]]: """ Gets a cache record data for a job, or None if a cache miss """ if not self.cache_data_fetched: self.fetch_records_data() record_key = self.Record( record_type, job, self.job_digests[job], batch, num_batches, release_branch=False, ).to_str_key() if record_key not in self.records[record_type]: return None record_file_name = self.records[record_type][record_key].file res = CommitStatusData.load_from_file( self._LOCAL_CACHE_PATH / record_file_name ) # type: CommitStatusData return res def delete( self, record_type: "CiCache.RecordType", job: str, batch: int, num_batches: int, release_branch: bool, ) -> None: """ deletes record from the cache """ raise NotImplementedError("Let's try make cache push-and-read-only") # assert ( # record_type == self.RecordType.PENDING # ), "FIXME: delete is supported for pending records only" # record_file_name = self._get_record_file_name( # self.RecordType.PENDING, # job, # batch, # num_batches, # release_branch=release_branch, # ) # record_s3_path = self._get_record_s3_path(job) # self.s3.delete_file_from_s3(S3_BUILDS_BUCKET, record_s3_path + record_file_name) # record_key = self.Record( # record_type, # job, # self.job_digests[job], # batch, # num_batches, # release_branch=False, # ).to_str_key() # if record_key in self.records[record_type]: # del self.records[record_type][record_key] def is_successful( self, job: str, batch: int, num_batches: int, release_branch: bool ) -> bool: """ checks if a given job have already been done successfuly """ return self.exist( self.RecordType.SUCCESSFUL, job, batch, num_batches, release_branch ) def is_failed( self, job: str, batch: int, num_batches: int, release_branch: bool ) -> bool: """ checks if a given job have already been done with failure """ return self.exist( self.RecordType.FAILED, job, batch, num_batches, release_branch ) def is_pending( self, job: str, batch: int, num_batches: int, release_branch: bool ) -> bool: """ check pending record in the cache for a given job @release_branch - checks that "release" attribute is set for a record """ if self.is_successful( job, batch, num_batches, release_branch ) or self.is_failed(job, batch, num_batches, release_branch): return False return self.exist( self.RecordType.PENDING, job, batch, num_batches, release_branch ) def push_successful( self, job: str, batch: int, num_batches: int, job_status: CommitStatusData, release_branch: bool = False, ) -> None: """ Pushes a cache record (CommitStatusData) @release_branch adds "release" attribute to a record """ self.push( self.RecordType.SUCCESSFUL, job, [batch], num_batches, job_status, release_branch, ) def push_failed( self, job: str, batch: int, num_batches: int, job_status: CommitStatusData, release_branch: bool = False, ) -> None: """ Pushes a cache record of type Failed (CommitStatusData) @release_branch adds "release" attribute to a record """ self.push( self.RecordType.FAILED, job, [batch], num_batches, job_status, release_branch, ) def push_pending( self, job: str, batches: List[int], num_batches: int, release_branch: bool ) -> None: """ pushes pending record for a job to the cache """ pending_state = PendingState(time.time(), run_url=GITHUB_RUN_URL) self.push( self.RecordType.PENDING, job, batches, num_batches, pending_state, release_branch, ) def get_successful( self, job: str, batch: int, num_batches: int ) -> Optional[CommitStatusData]: """ Gets a cache record (CommitStatusData) for a job, or None if a cache miss """ res = self.get(self.RecordType.SUCCESSFUL, job, batch, num_batches) assert res is None or isinstance(res, CommitStatusData) return res def delete_pending( self, job: str, batch: int, num_batches: int, release_branch: bool ) -> None: """ deletes pending record from the cache """ self.delete(self.RecordType.PENDING, job, batch, num_batches, release_branch) def download_build_reports(self, file_prefix: str = "") -> List[str]: """ not ideal class for this method, but let it be as we store build reports in CI cache directory on s3 and CiCache knows where exactly @file_prefix allows to filter out reports by git head_ref """ report_path = Path(REPORT_PATH) report_path.mkdir(exist_ok=True, parents=True) path = ( self._get_record_s3_path(Build.PACKAGE_RELEASE) + self._CACHE_BUILD_REPORT_PREFIX ) if file_prefix: path += "_" + file_prefix reports_files = self.s3.download_files( bucket=S3_BUILDS_BUCKET, s3_path=path, file_suffix=".json", local_directory=report_path, ) return reports_files def upload_build_report(self, build_result: BuildResult) -> str: result_json_path = build_result.write_json(Path(TEMP_PATH)) s3_path = ( self._get_record_s3_path(Build.PACKAGE_RELEASE) + result_json_path.name ) return self.s3.upload_file( bucket=S3_BUILDS_BUCKET, file_path=result_json_path, s3_path=s3_path ) def await_jobs( self, jobs_with_params: Dict[str, Dict[str, Any]], is_release_branch: bool ) -> Dict[str, List[int]]: """ await pending jobs to be finished @jobs_with_params - jobs to await. {JOB_NAME: {"batches": [BATCHES...], "num_batches": NUM_BATCHES}} returns successfully finished jobs: {JOB_NAME: [BATCHES...]} """ if not jobs_with_params: return {} poll_interval_sec = 300 # TIMEOUT * MAX_ROUNDS_TO_WAIT must be less than 6h (GH job timeout) with a room for rest RunConfig work TIMEOUT = 3000 # 50 min MAX_ROUNDS_TO_WAIT = 6 MAX_JOB_NUM_TO_WAIT = 3 await_finished: Dict[str, List[int]] = {} round_cnt = 0 while ( len(jobs_with_params) > MAX_JOB_NUM_TO_WAIT and round_cnt < MAX_ROUNDS_TO_WAIT ): round_cnt += 1 GHActions.print_in_group( f"Wait pending jobs, round [{round_cnt}/{MAX_ROUNDS_TO_WAIT}]:", list(jobs_with_params), ) # this is initial approach to wait pending jobs: # start waiting for the next TIMEOUT seconds if there are more than X(=4) jobs to wait # wait TIMEOUT seconds in rounds. Y(=5) is the max number of rounds expired_sec = 0 start_at = int(time.time()) while expired_sec < TIMEOUT and jobs_with_params: time.sleep(poll_interval_sec) self.update() jobs_with_params_copy = deepcopy(jobs_with_params) for job_name in jobs_with_params: num_batches = jobs_with_params[job_name]["num_batches"] job_config = CI_CONFIG.get_job_config(job_name) for batch in jobs_with_params[job_name]["batches"]: if self.is_pending( job_name, batch, num_batches, release_branch=is_release_branch and job_config.required_on_release_branch, ): continue print( f"Job [{job_name}_[{batch}/{num_batches}]] is not pending anymore" ) # some_job_ready = True jobs_with_params_copy[job_name]["batches"].remove(batch) if not jobs_with_params_copy[job_name]["batches"]: del jobs_with_params_copy[job_name] if not self.is_successful( job_name, batch, num_batches, release_branch=is_release_branch and job_config.required_on_release_branch, ): print( f"NOTE: Job [{job_name}:{batch}] finished but no success - remove from awaiting list, do not add to ready" ) continue if job_name in await_finished: await_finished[job_name].append(batch) else: await_finished[job_name] = [batch] jobs_with_params = jobs_with_params_copy expired_sec = int(time.time()) - start_at print( f"...awaiting continues... seconds left [{TIMEOUT - expired_sec}]" ) if await_finished: GHActions.print_in_group( f"Finished jobs, round [{round_cnt}]:", [f"{job}:{batches}" for job, batches in await_finished.items()], ) GHActions.print_in_group( "Remaining jobs:", [f"{job}:{params['batches']}" for job, params in jobs_with_params.items()], ) return await_finished @dataclass class CiOptions: # job will be included in the run if any keyword from the list matches job name include_keywords: Optional[List[str]] = None # job will be excluded in the run if any keyword from the list matches job name exclude_keywords: Optional[List[str]] = None # list of specified preconfigured ci sets to run ci_sets: Optional[List[str]] = None # list of specified jobs to run ci_jobs: Optional[List[str]] = None # btaches to run for all multi-batch jobs job_batches: Optional[List[int]] = None do_not_test: bool = False no_ci_cache: bool = False no_merge_commit: bool = False def as_dict(self) -> Dict[str, Any]: return asdict(self) @staticmethod def create_from_run_config(run_config: Dict[str, Any]) -> "CiOptions": return CiOptions(**run_config["ci_options"]) @staticmethod def create_from_pr_message( debug_message: Optional[str], update_from_api: bool ) -> "CiOptions": """ Creates CiOptions instance based on tags found in PR body and/or commit message @commit_message - may be provided directly for debugging purposes, otherwise it will be retrieved from git. """ res = CiOptions() pr_info = PRInfo() if ( not pr_info.is_pr and not debug_message ): # if commit_message is provided it's test/debug scenario - do not return # CI options can be configured in PRs only # if debug_message is provided - it's a test return res message = debug_message or GitRunner(set_cwd_to_git_root=True).run( f"{GIT_PREFIX} log {pr_info.sha} --format=%B -n 1" ) pattern = r"(#|- \[x\] +