2023-12-18 08:07:22 +00:00
import argparse
2023-12-18 12:44:11 +00:00
import concurrent . futures
2024-02-04 19:12:37 +00:00
from copy import deepcopy
from dataclasses import asdict , dataclass
from enum import Enum
2023-12-18 08:07:22 +00:00
import json
2024-01-04 15:35:09 +00:00
import logging
2023-12-18 08:07:22 +00:00
import os
2024-02-21 14:55:48 +00:00
import random
2023-12-18 08:07:22 +00:00
import re
import subprocess
import sys
2024-02-02 17:10:47 +00:00
import time
2024-02-06 12:39:34 +00:00
from pathlib import Path
2024-02-21 14:55:48 +00:00
from typing import Any , Dict , List , Optional , Sequence , Set , Union
2023-12-18 08:07:22 +00:00
import docker_images_helper
2024-01-30 17:44:52 +00:00
import upload_result_helper
from build_check import get_release_or_pr
2024-02-04 19:12:37 +00:00
from ci_config import CI_CONFIG , Build , Labels , JobNames
from ci_utils import GHActions , is_hex , normalize_string
2024-01-30 17:44:52 +00:00
from clickhouse_helper import (
CiLogsCredentials ,
ClickHouseHelper ,
get_instance_id ,
get_instance_type ,
prepare_tests_results_for_clickhouse ,
)
2023-12-18 12:45:15 +00:00
from commit_status_helper import (
CommitStatusData ,
2024-01-04 15:35:09 +00:00
RerunHelper ,
2023-12-18 12:45:15 +00:00
format_description ,
get_commit ,
2024-01-04 15:35:09 +00:00
post_commit_status ,
2023-12-18 12:45:15 +00:00
set_status_comment ,
2024-01-04 15:35:09 +00:00
update_mergeable_check ,
2023-12-18 12:45:15 +00:00
)
2023-12-18 12:44:11 +00:00
from digest_helper import DockerDigester , JobDigester
2024-01-04 15:35:09 +00:00
from env_helper import (
CI ,
GITHUB_JOB_API_URL ,
2024-02-02 17:10:47 +00:00
GITHUB_RUN_URL ,
2024-01-04 15:35:09 +00:00
REPO_COPY ,
REPORT_PATH ,
S3_BUILDS_BUCKET ,
TEMP_PATH ,
)
2023-12-18 08:07:22 +00:00
from get_robot_token import get_best_robot_token
2023-12-18 12:44:11 +00:00
from git_helper import GIT_PREFIX , Git
from git_helper import Runner as GitRunner
from github import Github
2023-12-18 08:07:22 +00:00
from pr_info import PRInfo
2024-02-04 19:12:37 +00:00
from report import ERROR , SUCCESS , BuildResult , JobReport
2023-12-18 12:44:11 +00:00
from s3_helper import S3Helper
2023-12-18 08:07:22 +00:00
from version_helper import get_version_from_repo
2024-02-02 17:10:47 +00:00
@dataclass
class PendingState :
updated_at : float
run_url : str
class CiCache :
"""
CI cache is a bunch of records . Record is a file stored under special location on s3 .
The file name has following format
< RECORD_TYPE > _ [ < ATTRIBUTES > ] - - < JOB_NAME > _ < JOB_DIGEST > _ < BATCH > _ < NUM_BATCHES > . ci
RECORD_TYPE :
SUCCESSFUL - for successfuly finished jobs
PENDING - for pending jobs
ATTRIBUTES :
release - for jobs being executed on the release branch including master branch ( not a PR branch )
"""
_S3_CACHE_PREFIX = " CI_cache_v1 "
_CACHE_BUILD_REPORT_PREFIX = " build_report "
_RECORD_FILE_EXTENSION = " .ci "
_LOCAL_CACHE_PATH = Path ( TEMP_PATH ) / " ci_cache "
_ATTRIBUTE_RELEASE = " release "
# divider symbol 1
_DIV1 = " -- "
# divider symbol 2
_DIV2 = " _ "
assert _DIV1 != _DIV2
class RecordType ( Enum ) :
SUCCESSFUL = " successful "
PENDING = " pending "
2024-02-04 19:12:37 +00:00
FAILED = " failed "
2024-02-02 17:10:47 +00:00
@dataclass
class Record :
record_type : " CiCache.RecordType "
job_name : str
job_digest : str
batch : int
num_batches : int
release_branch : bool
file : str = " "
def to_str_key ( self ) :
""" other fields must not be included in the hash str """
return " _ " . join (
[ self . job_name , self . job_digest , str ( self . batch ) , str ( self . num_batches ) ]
)
class JobType ( Enum ) :
DOCS = " DOCS "
SRCS = " SRCS "
@classmethod
def is_docs_job ( cls , job_name : str ) - > bool :
return job_name == JobNames . DOCS_CHECK
@classmethod
def is_srcs_job ( cls , job_name : str ) - > bool :
return not cls . is_docs_job ( job_name )
@classmethod
def get_type_by_name ( cls , job_name : str ) - > " CiCache.JobType " :
res = cls . SRCS
if cls . is_docs_job ( job_name ) :
res = cls . DOCS
elif cls . is_srcs_job ( job_name ) :
res = cls . SRCS
else :
assert False
return res
def __init__ (
self ,
s3 : S3Helper ,
job_digests : Dict [ str , str ] ,
) :
self . s3 = s3
self . job_digests = job_digests
self . cache_s3_paths = {
job_type : f " { self . _S3_CACHE_PREFIX } / { job_type . value } - { self . job_digests [ self . _get_reference_job_name ( job_type ) ] } / "
for job_type in self . JobType
}
self . s3_record_prefixes = {
record_type : record_type . value for record_type in self . RecordType
}
self . records : Dict [ " CiCache.RecordType " , Dict [ str , " CiCache.Record " ] ] = {
record_type : { } for record_type in self . RecordType
}
self . cache_updated = False
self . cache_data_fetched = True
if not self . _LOCAL_CACHE_PATH . exists ( ) :
self . _LOCAL_CACHE_PATH . mkdir ( parents = True , exist_ok = True )
def _get_reference_job_name ( self , job_type : JobType ) - > str :
res = Build . PACKAGE_RELEASE
if job_type == self . JobType . DOCS :
res = JobNames . DOCS_CHECK
elif job_type == self . JobType . SRCS :
res = Build . PACKAGE_RELEASE
else :
assert False
return res
def _get_record_file_name (
self ,
record_type : RecordType ,
job_name : str ,
batch : int ,
num_batches : int ,
release_branch : bool ,
) - > str :
prefix = self . s3_record_prefixes [ record_type ]
prefix_extended = (
self . _DIV2 . join ( [ prefix , self . _ATTRIBUTE_RELEASE ] )
if release_branch
else prefix
)
assert self . _DIV1 not in job_name , f " Invalid job name { job_name } "
job_name = self . _DIV2 . join (
[ job_name , self . job_digests [ job_name ] , str ( batch ) , str ( num_batches ) ]
)
file_name = self . _DIV1 . join ( [ prefix_extended , job_name ] )
file_name + = self . _RECORD_FILE_EXTENSION
return file_name
def _get_record_s3_path ( self , job_name : str ) - > str :
return self . cache_s3_paths [ self . JobType . get_type_by_name ( job_name ) ]
def _parse_record_file_name (
self , record_type : RecordType , file_name : str
) - > Optional [ " CiCache.Record " ] :
# validate filename
if (
not file_name . endswith ( self . _RECORD_FILE_EXTENSION )
or not len ( file_name . split ( self . _DIV1 ) ) == 2
) :
print ( " ERROR: wrong file name format " )
return None
file_name = file_name . removesuffix ( self . _RECORD_FILE_EXTENSION )
release_branch = False
prefix_extended , job_suffix = file_name . split ( self . _DIV1 )
record_type_and_attribute = prefix_extended . split ( self . _DIV2 )
# validate filename prefix
failure = False
if not 0 < len ( record_type_and_attribute ) < = 2 :
print ( " ERROR: wrong file name prefix " )
failure = True
if (
len ( record_type_and_attribute ) > 1
and record_type_and_attribute [ 1 ] != self . _ATTRIBUTE_RELEASE
) :
print ( " ERROR: wrong record attribute " )
failure = True
if record_type_and_attribute [ 0 ] != self . s3_record_prefixes [ record_type ] :
print ( " ERROR: wrong record type " )
failure = True
if failure :
return None
if (
len ( record_type_and_attribute ) > 1
and record_type_and_attribute [ 1 ] == self . _ATTRIBUTE_RELEASE
) :
release_branch = True
job_properties = job_suffix . split ( self . _DIV2 )
job_name , job_digest , batch , num_batches = (
self . _DIV2 . join ( job_properties [ : - 3 ] ) ,
job_properties [ - 3 ] ,
int ( job_properties [ - 2 ] ) ,
int ( job_properties [ - 1 ] ) ,
)
if not is_hex ( job_digest ) :
print ( " ERROR: wrong record job digest " )
return None
record = self . Record (
record_type ,
job_name ,
job_digest ,
batch ,
num_batches ,
release_branch ,
file = " " ,
)
return record
2024-02-04 19:12:37 +00:00
def print_status ( self ) :
for record_type in self . RecordType :
GHActions . print_in_group (
f " Cache records: [ { record_type } ] " , list ( self . records [ record_type ] )
)
return self
2024-02-02 17:10:47 +00:00
def update ( self ) :
"""
Pulls cache records from s3 . Only records name w / o content .
"""
for record_type in self . RecordType :
prefix = self . s3_record_prefixes [ record_type ]
cache_list = self . records [ record_type ]
for job_type in self . JobType :
path = self . cache_s3_paths [ job_type ]
records = self . s3 . list_prefix ( f " { path } { prefix } " , S3_BUILDS_BUCKET )
records = [ record . split ( " / " ) [ - 1 ] for record in records ]
for file in records :
record = self . _parse_record_file_name (
record_type = record_type , file_name = file
)
if not record :
print ( f " ERROR: failed to parse cache record [ { file } ] " )
continue
if (
record . job_name not in self . job_digests
or self . job_digests [ record . job_name ] != record . job_digest
) :
# skip records we are not interested in
continue
if record . to_str_key ( ) not in cache_list :
cache_list [ record . to_str_key ( ) ] = record
self . cache_data_fetched = False
elif (
not cache_list [ record . to_str_key ( ) ] . release_branch
and record . release_branch
) :
# replace a non-release record with a release one
cache_list [ record . to_str_key ( ) ] = record
self . cache_data_fetched = False
self . cache_updated = True
return self
def fetch_records_data ( self ) :
"""
Pulls CommitStatusData for all cached jobs from s3
"""
if not self . cache_updated :
self . update ( )
if self . cache_data_fetched :
# there are no record w/o underling data - no need to fetch
return self
# clean up
for file in self . _LOCAL_CACHE_PATH . glob ( " *.ci " ) :
file . unlink ( )
# download all record files
for job_type in self . JobType :
path = self . cache_s3_paths [ job_type ]
for record_type in self . RecordType :
prefix = self . s3_record_prefixes [ record_type ]
_ = self . s3 . download_files (
bucket = S3_BUILDS_BUCKET ,
s3_path = f " { path } { prefix } " ,
file_suffix = self . _RECORD_FILE_EXTENSION ,
local_directory = self . _LOCAL_CACHE_PATH ,
)
# validate we have files for all records and save file names meanwhile
for record_type in self . RecordType :
record_list = self . records [ record_type ]
for _ , record in record_list . items ( ) :
record_file_name = self . _get_record_file_name (
record_type ,
record . job_name ,
record . batch ,
record . num_batches ,
record . release_branch ,
)
assert (
self . _LOCAL_CACHE_PATH / record_file_name
) . is_file ( ) , f " BUG. Record file must be present: { self . _LOCAL_CACHE_PATH / record_file_name } "
record . file = record_file_name
self . cache_data_fetched = True
return self
def exist (
self ,
record_type : " CiCache.RecordType " ,
job : str ,
batch : int ,
num_batches : int ,
release_branch : bool ,
) - > bool :
if not self . cache_updated :
self . update ( )
record_key = self . Record (
record_type ,
job ,
self . job_digests [ job ] ,
batch ,
num_batches ,
release_branch ,
) . to_str_key ( )
res = record_key in self . records [ record_type ]
if release_branch :
return res and self . records [ record_type ] [ record_key ] . release_branch
else :
return res
def push (
self ,
record_type : " CiCache.RecordType " ,
job : str ,
batches : Union [ int , Sequence [ int ] ] ,
num_batches : int ,
status : Union [ CommitStatusData , PendingState ] ,
release_branch : bool = False ,
) - > None :
"""
Pushes a cache record ( CommitStatusData )
@release_branch adds " release " attribute to a record
"""
if isinstance ( batches , int ) :
batches = [ batches ]
for batch in batches :
record_file = self . _LOCAL_CACHE_PATH / self . _get_record_file_name (
record_type , job , batch , num_batches , release_branch
)
record_s3_path = self . _get_record_s3_path ( job )
if record_type == self . RecordType . SUCCESSFUL :
assert isinstance ( status , CommitStatusData )
status . dump_to_file ( record_file )
2024-02-04 19:12:37 +00:00
elif record_type == self . RecordType . FAILED :
assert isinstance ( status , CommitStatusData )
status . dump_to_file ( record_file )
2024-02-02 17:10:47 +00:00
elif record_type == self . RecordType . PENDING :
assert isinstance ( status , PendingState )
with open ( record_file , " w " ) as json_file :
json . dump ( asdict ( status ) , json_file )
else :
assert False
_ = self . s3 . upload_file (
bucket = S3_BUILDS_BUCKET ,
file_path = record_file ,
s3_path = record_s3_path + record_file . name ,
)
record = self . Record (
record_type ,
job ,
self . job_digests [ job ] ,
batch ,
num_batches ,
release_branch ,
file = record_file . name ,
)
if (
record . release_branch
or record . to_str_key ( ) not in self . records [ record_type ]
) :
self . records [ record_type ] [ record . to_str_key ( ) ] = record
def get (
self , record_type : " CiCache.RecordType " , job : str , batch : int , num_batches : int
) - > Optional [ Union [ CommitStatusData , PendingState ] ] :
"""
Gets a cache record data for a job , or None if a cache miss
"""
if not self . cache_data_fetched :
self . fetch_records_data ( )
record_key = self . Record (
record_type ,
job ,
self . job_digests [ job ] ,
batch ,
num_batches ,
release_branch = False ,
) . to_str_key ( )
if record_key not in self . records [ record_type ] :
return None
record_file_name = self . records [ record_type ] [ record_key ] . file
res = CommitStatusData . load_from_file (
self . _LOCAL_CACHE_PATH / record_file_name
) # type: CommitStatusData
return res
def delete (
self ,
record_type : " CiCache.RecordType " ,
job : str ,
batch : int ,
num_batches : int ,
release_branch : bool ,
) - > None :
"""
deletes record from the cache
"""
raise NotImplementedError ( " Let ' s try make cache push-and-read-only " )
# assert (
# record_type == self.RecordType.PENDING
# ), "FIXME: delete is supported for pending records only"
# record_file_name = self._get_record_file_name(
# self.RecordType.PENDING,
# job,
# batch,
# num_batches,
# release_branch=release_branch,
# )
# record_s3_path = self._get_record_s3_path(job)
# self.s3.delete_file_from_s3(S3_BUILDS_BUCKET, record_s3_path + record_file_name)
# record_key = self.Record(
# record_type,
# job,
# self.job_digests[job],
# batch,
# num_batches,
# release_branch=False,
# ).to_str_key()
# if record_key in self.records[record_type]:
# del self.records[record_type][record_key]
def is_successful (
self , job : str , batch : int , num_batches : int , release_branch : bool
) - > bool :
"""
checks if a given job have already been done successfuly
"""
return self . exist (
self . RecordType . SUCCESSFUL , job , batch , num_batches , release_branch
)
2024-02-04 19:12:37 +00:00
def is_failed (
self , job : str , batch : int , num_batches : int , release_branch : bool
) - > bool :
"""
checks if a given job have already been done with failure
"""
return self . exist (
self . RecordType . FAILED , job , batch , num_batches , release_branch
)
2024-02-02 17:10:47 +00:00
def is_pending (
self , job : str , batch : int , num_batches : int , release_branch : bool
) - > bool :
"""
check pending record in the cache for a given job
@release_branch - checks that " release " attribute is set for a record
"""
2024-02-04 19:12:37 +00:00
if self . is_successful (
job , batch , num_batches , release_branch
) or self . is_failed ( job , batch , num_batches , release_branch ) :
2024-02-02 17:10:47 +00:00
return False
return self . exist (
self . RecordType . PENDING , job , batch , num_batches , release_branch
)
def push_successful (
self ,
job : str ,
batch : int ,
num_batches : int ,
job_status : CommitStatusData ,
release_branch : bool = False ,
) - > None :
"""
Pushes a cache record ( CommitStatusData )
@release_branch adds " release " attribute to a record
"""
self . push (
self . RecordType . SUCCESSFUL ,
job ,
[ batch ] ,
num_batches ,
job_status ,
release_branch ,
)
2024-02-04 19:12:37 +00:00
def push_failed (
self ,
job : str ,
batch : int ,
num_batches : int ,
job_status : CommitStatusData ,
release_branch : bool = False ,
) - > None :
"""
Pushes a cache record of type Failed ( CommitStatusData )
@release_branch adds " release " attribute to a record
"""
self . push (
self . RecordType . FAILED ,
job ,
[ batch ] ,
num_batches ,
job_status ,
release_branch ,
)
2024-02-02 17:10:47 +00:00
def push_pending (
self , job : str , batches : List [ int ] , num_batches : int , release_branch : bool
) - > None :
"""
pushes pending record for a job to the cache
"""
pending_state = PendingState ( time . time ( ) , run_url = GITHUB_RUN_URL )
self . push (
self . RecordType . PENDING ,
job ,
batches ,
num_batches ,
pending_state ,
release_branch ,
)
def get_successful (
self , job : str , batch : int , num_batches : int
) - > Optional [ CommitStatusData ] :
"""
Gets a cache record ( CommitStatusData ) for a job , or None if a cache miss
"""
res = self . get ( self . RecordType . SUCCESSFUL , job , batch , num_batches )
assert res is None or isinstance ( res , CommitStatusData )
return res
def delete_pending (
self , job : str , batch : int , num_batches : int , release_branch : bool
) - > None :
"""
deletes pending record from the cache
"""
self . delete ( self . RecordType . PENDING , job , batch , num_batches , release_branch )
def download_build_reports ( self , file_prefix : str = " " ) - > List [ str ] :
"""
not ideal class for this method ,
but let it be as we store build reports in CI cache directory on s3
and CiCache knows where exactly
@file_prefix allows to filter out reports by git head_ref
"""
report_path = Path ( REPORT_PATH )
report_path . mkdir ( exist_ok = True , parents = True )
path = (
self . _get_record_s3_path ( Build . PACKAGE_RELEASE )
+ self . _CACHE_BUILD_REPORT_PREFIX
)
if file_prefix :
path + = " _ " + file_prefix
reports_files = self . s3 . download_files (
bucket = S3_BUILDS_BUCKET ,
s3_path = path ,
file_suffix = " .json " ,
local_directory = report_path ,
)
return reports_files
def upload_build_report ( self , build_result : BuildResult ) - > str :
result_json_path = build_result . write_json ( Path ( TEMP_PATH ) )
s3_path = (
self . _get_record_s3_path ( Build . PACKAGE_RELEASE ) + result_json_path . name
)
return self . s3 . upload_file (
bucket = S3_BUILDS_BUCKET , file_path = result_json_path , s3_path = s3_path
)
2024-02-04 19:12:37 +00:00
def await_jobs (
self , jobs_with_params : Dict [ str , Dict [ str , Any ] ] , is_release_branch : bool
) - > Dict [ str , List [ int ] ] :
"""
await pending jobs to be finished
@jobs_with_params - jobs to await . { JOB_NAME : { " batches " : [ BATCHES . . . ] , " num_batches " : NUM_BATCHES } }
returns successfully finished jobs : { JOB_NAME : [ BATCHES . . . ] }
"""
if not jobs_with_params :
return { }
poll_interval_sec = 300
TIMEOUT = 3600
2024-02-22 18:34:51 +00:00
MAX_ROUNDS_TO_WAIT = 6
MAX_JOB_NUM_TO_WAIT = 3
2024-02-04 19:12:37 +00:00
await_finished : Dict [ str , List [ int ] ] = { }
round_cnt = 0
2024-02-22 18:34:51 +00:00
while (
len ( jobs_with_params ) > MAX_JOB_NUM_TO_WAIT
and round_cnt < MAX_ROUNDS_TO_WAIT
) :
2024-02-04 19:12:37 +00:00
round_cnt + = 1
GHActions . print_in_group (
2024-02-22 18:34:51 +00:00
f " Wait pending jobs, round [ { round_cnt } / { MAX_ROUNDS_TO_WAIT } ]: " ,
list ( jobs_with_params ) ,
2024-02-04 19:12:37 +00:00
)
# this is initial approach to wait pending jobs:
2024-02-09 13:17:49 +00:00
# start waiting for the next TIMEOUT seconds if there are more than X(=4) jobs to wait
# wait TIMEOUT seconds in rounds. Y(=5) is the max number of rounds
2024-02-04 19:12:37 +00:00
expired_sec = 0
start_at = int ( time . time ( ) )
while expired_sec < TIMEOUT and jobs_with_params :
time . sleep ( poll_interval_sec )
self . update ( )
jobs_with_params_copy = deepcopy ( jobs_with_params )
for job_name in jobs_with_params :
num_batches = jobs_with_params [ job_name ] [ " num_batches " ]
job_config = CI_CONFIG . get_job_config ( job_name )
for batch in jobs_with_params [ job_name ] [ " batches " ] :
if self . is_pending (
job_name ,
batch ,
num_batches ,
release_branch = is_release_branch
and job_config . required_on_release_branch ,
) :
continue
print (
f " Job [ { job_name } _[ { batch } / { num_batches } ]] is not pending anymore "
)
# some_job_ready = True
jobs_with_params_copy [ job_name ] [ " batches " ] . remove ( batch )
if not jobs_with_params_copy [ job_name ] [ " batches " ] :
del jobs_with_params_copy [ job_name ]
if not self . is_successful (
job_name ,
batch ,
num_batches ,
release_branch = is_release_branch
and job_config . required_on_release_branch ,
) :
print (
f " NOTE: Job [ { job_name } : { batch } ] finished but no success - remove from awaiting list, do not add to ready "
)
continue
if job_name in await_finished :
await_finished [ job_name ] . append ( batch )
else :
await_finished [ job_name ] = [ batch ]
jobs_with_params = jobs_with_params_copy
expired_sec = int ( time . time ( ) ) - start_at
print (
f " ...awaiting continues... seconds left [ { TIMEOUT - expired_sec } ] "
)
2024-02-09 13:17:49 +00:00
if await_finished :
GHActions . print_in_group (
f " Finished jobs, round [ { round_cnt } ]: " ,
[ f " { job } : { batches } " for job , batches in await_finished . items ( ) ] ,
)
2024-02-04 19:12:37 +00:00
GHActions . print_in_group (
" Remaining jobs: " ,
[ f " { job } : { params [ ' batches ' ] } " for job , params in jobs_with_params . items ( ) ] ,
)
return await_finished
2024-02-02 17:10:47 +00:00
2023-12-18 08:07:22 +00:00
def get_check_name ( check_name : str , batch : int , num_batches : int ) - > str :
res = check_name
if num_batches > 1 :
res = f " { check_name } [ { batch + 1 } / { num_batches } ] "
return res
def normalize_check_name ( check_name : str ) - > str :
res = check_name . lower ( )
for r in ( ( " " , " _ " ) , ( " ( " , " _ " ) , ( " ) " , " _ " ) , ( " , " , " _ " ) , ( " / " , " _ " ) ) :
res = res . replace ( * r )
return res
def parse_args ( parser : argparse . ArgumentParser ) - > argparse . Namespace :
# FIXME: consider switching to sub_parser for configure, pre, run, post actions
parser . add_argument (
" --configure " ,
action = " store_true " ,
help = " Action that configures ci run. Calculates digests, checks job to be executed, generates json output " ,
)
parser . add_argument (
" --update-gh-statuses " ,
action = " store_true " ,
help = " Action that recreate success GH statuses for jobs that finished successfully in past and will be skipped this time " ,
)
parser . add_argument (
" --pre " ,
action = " store_true " ,
help = " Action that executes prerequesetes for the job provided in --job-name " ,
)
parser . add_argument (
" --run " ,
action = " store_true " ,
help = " Action that executes run action for specified --job-name. run_command must be configured for a given job name. " ,
)
parser . add_argument (
" --post " ,
action = " store_true " ,
help = " Action that executes post actions for the job provided in --job-name " ,
)
parser . add_argument (
" --mark-success " ,
action = " store_true " ,
2023-12-18 16:06:10 +00:00
help = " Action that marks job provided in --job-name (with batch provided in --batch) as successful " ,
2023-12-18 08:07:22 +00:00
)
parser . add_argument (
" --job-name " ,
default = " " ,
type = str ,
help = " Job name as in config " ,
)
2024-01-04 15:35:09 +00:00
parser . add_argument (
" --run-command " ,
default = " " ,
type = str ,
help = " A run command to run in --run action. Will override run_command from a job config if any " ,
)
2023-12-18 08:07:22 +00:00
parser . add_argument (
" --batch " ,
default = - 1 ,
type = int ,
help = " Current batch number (required for --mark-success), -1 or omit for single-batch job " ,
)
parser . add_argument (
" --infile " ,
default = " " ,
type = str ,
help = " Input json file or json string with ci run config " ,
)
parser . add_argument (
" --outfile " ,
default = " " ,
type = str ,
required = False ,
2023-12-18 16:06:10 +00:00
help = " output file to write json result to, if not set - stdout " ,
2023-12-18 08:07:22 +00:00
)
parser . add_argument (
" --pretty " ,
action = " store_true " ,
default = False ,
2023-12-18 16:06:10 +00:00
help = " makes json output pretty formatted " ,
2023-12-18 08:07:22 +00:00
)
parser . add_argument (
" --skip-docker " ,
action = " store_true " ,
default = False ,
help = " skip fetching docker data from dockerhub, used in --configure action (for debugging) " ,
)
parser . add_argument (
" --docker-digest-or-latest " ,
action = " store_true " ,
default = False ,
help = " temporary hack to fallback to latest if image with digest as a tag is not on docker hub " ,
)
parser . add_argument (
" --skip-jobs " ,
action = " store_true " ,
default = False ,
2023-12-21 11:33:22 +00:00
help = " skip fetching data about job runs, used in --configure action (for debugging and nigthly ci) " ,
2023-12-18 08:07:22 +00:00
)
2024-02-09 12:21:11 +00:00
parser . add_argument (
" --force " ,
action = " store_true " ,
default = False ,
help = " Used with --run, force the job to run, omitting the ci cache " ,
)
2024-02-02 17:10:47 +00:00
# FIXME: remove, not used
2023-12-18 08:07:22 +00:00
parser . add_argument (
" --rebuild-all-binaries " ,
action = " store_true " ,
default = False ,
2024-02-02 17:10:47 +00:00
help = " [DEPRECATED. to be removed, once no wf use it] will create run config without skipping build jobs in any case, used in --configure action (for release branches) " ,
2023-12-18 08:07:22 +00:00
)
2024-01-04 15:35:09 +00:00
parser . add_argument (
" --commit-message " ,
default = " " ,
help = " debug option to test commit message processing " ,
)
2023-12-18 08:07:22 +00:00
return parser . parse_args ( )
2024-02-02 17:10:47 +00:00
# FIXME: rewrite the docker job as regular reusable_test job and move interaction with docker hub inside job script
# that way run config will be more clean, workflow more generic and less api calls to dockerhub
2023-12-18 08:07:22 +00:00
def check_missing_images_on_dockerhub (
image_name_tag : Dict [ str , str ] , arch : Optional [ str ] = None
) - > Dict [ str , str ] :
"""
Checks missing images on dockerhub .
Works concurrently for all given images .
Docker must be logged in .
"""
def run_docker_command (
image : str , image_digest : str , arch : Optional [ str ] = None
) - > Dict :
"""
aux command for fetching single docker manifest
"""
command = [
" docker " ,
" manifest " ,
" inspect " ,
f " { image } : { image_digest } " if not arch else f " { image } : { image_digest } - { arch } " ,
]
process = subprocess . run (
command ,
stdout = subprocess . PIPE ,
stderr = subprocess . PIPE ,
text = True ,
check = False ,
)
return {
" image " : image ,
" image_digest " : image_digest ,
" arch " : arch ,
" stdout " : process . stdout ,
" stderr " : process . stderr ,
" return_code " : process . returncode ,
}
result : Dict [ str , str ] = { }
with concurrent . futures . ThreadPoolExecutor ( ) as executor :
futures = [
executor . submit ( run_docker_command , image , tag , arch )
for image , tag in image_name_tag . items ( )
]
responses = [
future . result ( ) for future in concurrent . futures . as_completed ( futures )
]
for resp in responses :
name , stdout , stderr , digest , arch = (
resp [ " image " ] ,
resp [ " stdout " ] ,
resp [ " stderr " ] ,
resp [ " image_digest " ] ,
resp [ " arch " ] ,
)
if stderr :
if stderr . startswith ( " no such manifest " ) :
result [ name ] = digest
else :
print ( f " Error: Unknown error: { stderr } , { name } , { arch } " )
elif stdout :
if " mediaType " in stdout :
pass
else :
print ( f " Error: Unknown response: { stdout } " )
assert False , " FIXME "
else :
print ( f " Error: No response for { name } , { digest } , { arch } " )
assert False , " FIXME "
return result
2024-02-02 17:10:47 +00:00
def _pre_action ( s3 , indata , pr_info ) :
CommitStatusData . cleanup ( )
JobReport . cleanup ( )
BuildResult . cleanup ( )
ci_cache = CiCache ( s3 , indata [ " jobs_data " ] [ " digests " ] )
# for release/master branches reports must be from the same branches
2024-02-04 19:12:37 +00:00
report_prefix = normalize_string ( pr_info . head_ref ) if pr_info . number == 0 else " "
print (
f " Use report prefix [ { report_prefix } ], pr_num [ { pr_info . number } ], head_ref [ { pr_info . head_ref } ] "
)
2024-02-02 17:10:47 +00:00
reports_files = ci_cache . download_build_reports ( file_prefix = report_prefix )
print ( f " Pre action done. Report files [ { reports_files } ] have been downloaded " )
def _mark_success_action (
s3 : S3Helper ,
indata : Dict [ str , Any ] ,
pr_info : PRInfo ,
job : str ,
batch : int ,
) - > None :
ci_cache = CiCache ( s3 , indata [ " jobs_data " ] [ " digests " ] )
job_config = CI_CONFIG . get_job_config ( job )
num_batches = job_config . num_batches
# if batch is not provided - set to 0
batch = 0 if batch == - 1 else batch
assert (
0 < = batch < num_batches
) , f " --batch must be provided and in range [0, { num_batches } ) for { job } "
# FIXME: find generic design for propagating and handling job status (e.g. stop using statuses in GH api)
# now job ca be build job w/o status data, any other job that exit with 0 with or w/o status data
if CI_CONFIG . is_build_job ( job ) :
# there is no status for build jobs
# create dummy success to mark it as done
# FIXME: consider creating commit status for build jobs too, to treat everything the same way
2024-02-06 12:39:34 +00:00
CommitStatusData ( SUCCESS , " dummy description " , " dummy_url " ) . dump_status ( )
2024-02-02 17:10:47 +00:00
job_status = None
if CommitStatusData . exist ( ) :
# normal scenario
job_status = CommitStatusData . load_status ( )
else :
# apparently exit after rerun-helper check
# do nothing, exit without failure
print ( f " ERROR: no status file for job [ { job } ] " )
if job_config . run_always or job_config . run_by_label :
print ( f " Job [ { job } ] runs always or by label in CI - do not cache " )
else :
if pr_info . is_master ( ) :
pass
# delete method is disabled for ci_cache. need it?
# pending enabled for master branch jobs only
# ci_cache.delete_pending(job, batch, num_batches, release_branch=True)
if job_status and job_status . is_ok ( ) :
ci_cache . push_successful (
job , batch , num_batches , job_status , pr_info . is_release_branch ( )
)
print ( f " Job [ { job } ] is ok " )
2024-02-04 19:12:37 +00:00
elif job_status and not job_status . is_ok ( ) :
ci_cache . push_failed (
job , batch , num_batches , job_status , pr_info . is_release_branch ( )
)
print ( f " Job [ { job } ] is failed with status [ { job_status . status } ] " )
else :
job_status = CommitStatusData (
description = " dummy description " , status = ERROR , report_url = " dummy url "
)
ci_cache . push_failed (
job , batch , num_batches , job_status , pr_info . is_release_branch ( )
)
print ( f " No CommitStatusData for [ { job } ], push dummy failure to ci_cache " )
2024-02-02 17:10:47 +00:00
def _print_results ( result : Any , outfile : Optional [ str ] , pretty : bool = False ) - > None :
if outfile :
with open ( outfile , " w " ) as f :
if isinstance ( result , str ) :
print ( result , file = f )
elif isinstance ( result , dict ) :
print ( json . dumps ( result , indent = 2 if pretty else None ) , file = f )
else :
raise AssertionError ( f " Unexpected type for ' res ' : { type ( result ) } " )
else :
if isinstance ( result , str ) :
print ( result )
elif isinstance ( result , dict ) :
print ( json . dumps ( result , indent = 2 if pretty else None ) )
else :
raise AssertionError ( f " Unexpected type for ' res ' : { type ( result ) } " )
def _check_and_update_for_early_style_check ( jobs_data : dict , docker_data : dict ) - > None :
2023-12-18 08:07:22 +00:00
"""
This is temporary hack to start style check before docker build if possible
FIXME : need better solution to do style check as soon as possible and as fast as possible w / o dependency on docker job
"""
2024-02-02 17:10:47 +00:00
jobs_to_do = jobs_data . get ( " jobs_to_do " , [ ] )
docker_to_build = docker_data . get ( " missing_multi " , [ ] )
2023-12-18 08:07:22 +00:00
if (
2024-01-30 17:46:53 +00:00
JobNames . STYLE_CHECK in jobs_to_do
2023-12-18 08:07:22 +00:00
and docker_to_build
and " clickhouse/style-test " not in docker_to_build
) :
2024-01-30 17:46:53 +00:00
index = jobs_to_do . index ( JobNames . STYLE_CHECK )
2023-12-18 08:07:22 +00:00
jobs_to_do [ index ] = " Style check early "
2024-02-02 17:10:47 +00:00
def _update_config_for_docs_only ( jobs_data : dict ) - > None :
2024-01-30 17:44:52 +00:00
DOCS_CHECK_JOBS = [ JobNames . DOCS_CHECK , JobNames . STYLE_CHECK ]
2023-12-18 16:06:10 +00:00
print ( f " NOTE: Will keep only docs related jobs: [ { DOCS_CHECK_JOBS } ] " )
2024-02-02 17:10:47 +00:00
jobs_to_do = jobs_data . get ( " jobs_to_do " , [ ] )
jobs_data [ " jobs_to_do " ] = [ job for job in jobs_to_do if job in DOCS_CHECK_JOBS ]
jobs_data [ " jobs_to_wait " ] = {
job : params
for job , params in jobs_data [ " jobs_to_wait " ] . items ( )
if job in DOCS_CHECK_JOBS
}
2023-12-18 16:06:10 +00:00
2024-02-05 17:09:43 +00:00
def _configure_docker_jobs ( docker_digest_or_latest : bool ) - > Dict :
2024-01-04 15:35:09 +00:00
print ( " ::group::Docker images check " )
2023-12-18 08:07:22 +00:00
# generate docker jobs data
docker_digester = DockerDigester ( )
imagename_digest_dict = (
docker_digester . get_all_digests ( )
) # 'image name - digest' mapping
images_info = docker_images_helper . get_images_info ( )
2024-02-05 17:09:43 +00:00
# FIXME: we need login as docker manifest inspect goes directly to one of the *.docker.com hosts instead of "registry-mirrors" : ["http://dockerhub-proxy.dockerhub-proxy-zone:5000"]
# find if it's possible to use the setting of /etc/docker/daemon.json (https://github.com/docker/cli/issues/4484#issuecomment-1688095463)
docker_images_helper . docker_login ( )
missing_multi_dict = check_missing_images_on_dockerhub ( imagename_digest_dict )
missing_multi = list ( missing_multi_dict )
missing_amd64 = [ ]
missing_aarch64 = [ ]
if not docker_digest_or_latest :
# look for missing arm and amd images only among missing multiarch manifests @missing_multi_dict
# to avoid extra dockerhub api calls
missing_amd64 = list (
check_missing_images_on_dockerhub ( missing_multi_dict , " amd64 " )
)
# FIXME: WA until full arm support: skip not supported arm images
missing_aarch64 = list (
check_missing_images_on_dockerhub (
{
im : digest
for im , digest in missing_multi_dict . items ( )
if not images_info [ im ] [ " only_amd64 " ]
} ,
" aarch64 " ,
2023-12-18 08:07:22 +00:00
)
2024-02-05 17:09:43 +00:00
)
2023-12-18 08:07:22 +00:00
else :
2024-02-05 17:09:43 +00:00
if missing_multi :
assert False , f " Missing images [ { missing_multi } ], cannot proceed "
2024-01-04 15:35:09 +00:00
print ( " ::endgroup:: " )
2023-12-18 08:07:22 +00:00
return {
" images " : imagename_digest_dict ,
" missing_aarch64 " : missing_aarch64 ,
" missing_amd64 " : missing_amd64 ,
" missing_multi " : missing_multi ,
}
def _configure_jobs (
job_digester : JobDigester ,
s3 : S3Helper ,
2024-02-02 17:10:47 +00:00
pr_info : PRInfo ,
2023-12-18 08:07:22 +00:00
commit_tokens : List [ str ] ,
2024-02-02 17:10:47 +00:00
ci_cache_disabled : bool ,
2023-12-18 08:07:22 +00:00
) - > Dict :
2024-01-04 15:35:09 +00:00
## a. digest each item from the config
2023-12-18 08:07:22 +00:00
job_digester = JobDigester ( )
jobs_params : Dict [ str , Dict ] = { }
jobs_to_do : List [ str ] = [ ]
jobs_to_skip : List [ str ] = [ ]
digests : Dict [ str , str ] = { }
2024-01-04 15:35:09 +00:00
2024-02-04 19:12:37 +00:00
print ( " ::group::Job Digests " )
2023-12-18 08:07:22 +00:00
for job in CI_CONFIG . job_generator ( ) :
digest = job_digester . get_job_digest ( CI_CONFIG . get_digest_config ( job ) )
digests [ job ] = digest
print ( f " job [ { job . rjust ( 50 ) } ] has digest [ { digest } ] " )
2024-01-04 15:35:09 +00:00
print ( " ::endgroup:: " )
2024-02-02 17:10:47 +00:00
## b. check what we need to run
ci_cache = None
if not ci_cache_disabled :
2024-02-04 19:12:37 +00:00
ci_cache = CiCache ( s3 , digests ) . update ( )
ci_cache . print_status ( )
2024-02-02 17:10:47 +00:00
jobs_to_wait : Dict [ str , Dict [ str , Any ] ] = { }
2024-02-21 14:55:48 +00:00
randomization_buckets = { } # type: Dict[str, Set[str]]
2024-01-04 15:35:09 +00:00
2023-12-18 08:07:22 +00:00
for job in digests :
digest = digests [ job ]
job_config = CI_CONFIG . get_job_config ( job )
num_batches : int = job_config . num_batches
batches_to_do : List [ int ] = [ ]
2024-02-04 19:12:37 +00:00
add_to_skip = False
2023-12-18 08:07:22 +00:00
2024-02-21 14:55:48 +00:00
if job_config . pr_only and pr_info . is_release_branch ( ) :
continue
if job_config . release_only and not pr_info . is_release_branch ( ) :
continue
# fill job randomization buckets (for jobs with configured @random_bucket property))
if job_config . random_bucket :
if not job_config . random_bucket in randomization_buckets :
randomization_buckets [ job_config . random_bucket ] = set ( )
randomization_buckets [ job_config . random_bucket ] . add ( job )
2024-02-02 17:10:47 +00:00
for batch in range ( num_batches ) : # type: ignore
if job_config . run_by_label :
# this job controlled by label, add to todo if its label is set in pr
if job_config . run_by_label in pr_info . labels :
2023-12-18 08:07:22 +00:00
batches_to_do . append ( batch )
2024-02-02 17:10:47 +00:00
elif job_config . run_always :
# always add to todo
batches_to_do . append ( batch )
elif not ci_cache :
batches_to_do . append ( batch )
elif not ci_cache . is_successful (
job ,
batch ,
num_batches ,
release_branch = pr_info . is_release_branch ( )
and job_config . required_on_release_branch ,
) :
# ci cache is enabled and job is not in the cache - add
batches_to_do . append ( batch )
# check if it's pending in the cache
2024-02-04 19:12:37 +00:00
if ci_cache . is_pending (
job ,
batch ,
num_batches ,
release_branch = pr_info . is_release_branch ( )
and job_config . required_on_release_branch ,
) :
2024-02-02 17:10:47 +00:00
if job in jobs_to_wait :
jobs_to_wait [ job ] [ " batches " ] . append ( batch )
else :
jobs_to_wait [ job ] = {
" batches " : [ batch ] ,
" num_batches " : num_batches ,
}
2024-02-04 19:12:37 +00:00
else :
add_to_skip = True
2023-12-18 08:07:22 +00:00
if batches_to_do :
jobs_to_do . append ( job )
2024-02-04 19:12:37 +00:00
elif add_to_skip :
2024-01-25 19:46:57 +00:00
# treat job as being skipped only if it's controlled by digest
2024-01-04 15:35:09 +00:00
jobs_to_skip . append ( job )
2024-01-25 19:46:57 +00:00
jobs_params [ job ] = {
" batches " : batches_to_do ,
" num_batches " : num_batches ,
}
2023-12-18 08:07:22 +00:00
2024-02-21 14:55:48 +00:00
if not pr_info . is_release_branch ( ) :
# randomization bucket filtering (pick one random job from each bucket, for jobs with configured random_bucket property)
for _ , jobs in randomization_buckets . items ( ) :
jobs_to_remove_randomization = set ( )
bucket_ = list ( jobs )
random . shuffle ( bucket_ )
while len ( bucket_ ) > 1 :
random_job = bucket_ . pop ( )
if random_job in jobs_to_do :
jobs_to_remove_randomization . add ( random_job )
if jobs_to_remove_randomization :
print (
f " Following jobs will be removed due to randomization bucket: [ { jobs_to_remove_randomization } ] "
)
jobs_to_do = [
job for job in jobs_to_do if job not in jobs_to_remove_randomization
]
2024-02-02 17:10:47 +00:00
## c. check CI controlling labels and commit messages
if pr_info . labels :
2023-12-21 17:08:25 +00:00
jobs_requested_by_label = [ ] # type: List[str]
ci_controlling_labels = [ ] # type: List[str]
2024-02-02 17:10:47 +00:00
for label in pr_info . labels :
2023-12-21 17:08:25 +00:00
label_config = CI_CONFIG . get_label_config ( label )
if label_config :
jobs_requested_by_label + = label_config . run_jobs
ci_controlling_labels + = [ label ]
if ci_controlling_labels :
print ( f " NOTE: CI controlling labels are set: [ { ci_controlling_labels } ] " )
print (
f " : following jobs will be executed: [ { jobs_requested_by_label } ] "
)
2024-02-02 17:10:47 +00:00
# so far there is only "do not test" label in the config that runs only Style check.
# check later if we need to filter out requested jobs using ci cache. right now we do it:
2024-01-04 15:35:09 +00:00
jobs_to_do = [ job for job in jobs_requested_by_label if job in jobs_to_do ]
2023-12-21 17:08:25 +00:00
2023-12-18 08:07:22 +00:00
if commit_tokens :
2024-01-04 15:35:09 +00:00
jobs_to_do_requested = [ ] # type: List[str]
# handle ci set tokens
ci_controlling_tokens = [
token for token in commit_tokens if token in CI_CONFIG . label_configs
]
for token_ in ci_controlling_tokens :
label_config = CI_CONFIG . get_label_config ( token_ )
assert label_config , f " Unknonwn token [ { token_ } ] "
print (
f " NOTE: CI controlling token: [ { ci_controlling_tokens } ], add jobs: [ { label_config . run_jobs } ] "
)
jobs_to_do_requested + = label_config . run_jobs
# handle specific job requests
2023-12-18 08:07:22 +00:00
requested_jobs = [
2024-01-04 15:35:09 +00:00
token [ len ( " job_ " ) : ] for token in commit_tokens if token . startswith ( " job_ " )
2023-12-18 08:07:22 +00:00
]
if requested_jobs :
2023-12-19 19:14:47 +00:00
assert any (
len ( x ) > 1 for x in requested_jobs
) , f " Invalid job names requested [ { requested_jobs } ] "
2023-12-18 08:07:22 +00:00
for job in requested_jobs :
job_with_parents = CI_CONFIG . get_job_with_parents ( job )
2024-01-04 15:35:09 +00:00
print (
f " NOTE: CI controlling token: [#job_ { job } ], add jobs: [ { job_with_parents } ] "
)
2023-12-18 08:07:22 +00:00
# always add requested job itself, even if it could be skipped
jobs_to_do_requested . append ( job_with_parents [ 0 ] )
for parent in job_with_parents [ 1 : ] :
if parent in jobs_to_do and parent not in jobs_to_do_requested :
jobs_to_do_requested . append ( parent )
2024-01-04 15:35:09 +00:00
if jobs_to_do_requested :
2023-12-18 08:07:22 +00:00
print (
2023-12-21 17:08:25 +00:00
f " NOTE: Only specific job(s) were requested by commit message tokens: [ { jobs_to_do_requested } ] "
2023-12-18 08:07:22 +00:00
)
2024-01-04 15:35:09 +00:00
jobs_to_do = list (
2024-01-25 19:46:57 +00:00
set ( job for job in jobs_to_do_requested if job not in jobs_to_skip )
2024-01-04 15:35:09 +00:00
)
2023-12-18 08:07:22 +00:00
return {
" digests " : digests ,
" jobs_to_do " : jobs_to_do ,
" jobs_to_skip " : jobs_to_skip ,
2024-02-04 19:12:37 +00:00
" jobs_to_wait " : {
job : params for job , params in jobs_to_wait . items ( ) if job in jobs_to_do
} ,
2024-01-04 15:35:09 +00:00
" jobs_params " : {
job : params for job , params in jobs_params . items ( ) if job in jobs_to_do
} ,
2023-12-18 08:07:22 +00:00
}
2024-02-04 19:12:37 +00:00
def _create_gh_status (
commit : Any , job : str , batch : int , num_batches : int , job_status : CommitStatusData
) - > None :
print ( f " Going to re-create GH status for job [ { job } ] " )
assert job_status . status == SUCCESS , " BUG! "
commit . create_status (
state = job_status . status ,
target_url = job_status . report_url ,
description = format_description (
f " Reused from [ { job_status . pr_num } - { job_status . sha [ 0 : 8 ] } ]: "
f " { job_status . description } "
) ,
context = get_check_name ( job , batch = batch , num_batches = num_batches ) ,
)
2024-02-02 17:10:47 +00:00
def _update_gh_statuses_action ( indata : Dict , s3 : S3Helper ) - > None :
2024-01-04 15:35:09 +00:00
if indata [ " ci_flags " ] [ Labels . NO_CI_CACHE ] :
print ( " CI cache is disabled - skip restoring commit statuses from CI cache " )
return
2023-12-18 08:07:22 +00:00
job_digests = indata [ " jobs_data " ] [ " digests " ]
2024-02-04 19:12:37 +00:00
jobs_to_skip = indata [ " jobs_data " ] [ " jobs_to_skip " ]
jobs_to_do = indata [ " jobs_data " ] [ " jobs_to_do " ]
ci_cache = CiCache ( s3 , job_digests ) . update ( ) . fetch_records_data ( ) . print_status ( )
2024-02-02 17:10:47 +00:00
2023-12-18 08:07:22 +00:00
# create GH status
pr_info = PRInfo ( )
commit = get_commit ( Github ( get_best_robot_token ( ) , per_page = 100 ) , pr_info . sha )
2024-02-04 19:12:37 +00:00
def _concurrent_create_status ( job : str , batch : int , num_batches : int ) - > None :
2024-02-02 17:10:47 +00:00
job_status = ci_cache . get_successful ( job , batch , num_batches )
if not job_status :
return
2024-02-04 19:12:37 +00:00
_create_gh_status ( commit , job , batch , num_batches , job_status )
2023-12-18 08:07:22 +00:00
with concurrent . futures . ThreadPoolExecutor ( ) as executor :
futures = [ ]
for job in job_digests :
2024-02-08 19:09:28 +00:00
if job not in jobs_to_skip and job not in jobs_to_do :
2024-02-04 19:12:37 +00:00
# no need to create status for job that are not supposed to be executed
continue
2024-01-19 17:21:01 +00:00
if CI_CONFIG . is_build_job ( job ) :
2023-12-18 08:07:22 +00:00
# no GH status for build jobs
continue
2024-01-29 17:13:32 +00:00
job_config = CI_CONFIG . get_job_config ( job )
if not job_config :
# there might be a new job that does not exist on this branch - skip it
continue
for batch in range ( job_config . num_batches ) :
2024-02-04 19:12:37 +00:00
future = executor . submit (
2024-01-29 17:13:32 +00:00
_concurrent_create_status , job , batch , job_config . num_batches
2024-02-04 19:12:37 +00:00
)
2023-12-18 08:07:22 +00:00
futures . append ( future )
done , _ = concurrent . futures . wait ( futures )
for future in done :
try :
_ = future . result ( )
except Exception as e :
raise e
print ( " Going to update overall CI report " )
set_status_comment ( commit , pr_info )
print ( " ... CI report update - done " )
def _fetch_commit_tokens ( message : str ) - > List [ str ] :
pattern = r " #[ \ w-]+ "
2024-01-04 15:35:09 +00:00
matches = [ match [ 1 : ] for match in re . findall ( pattern , message ) ]
res = [ match for match in matches if match in Labels or match . startswith ( " job_ " ) ]
2023-12-19 10:01:37 +00:00
return res
2023-12-18 08:07:22 +00:00
2024-01-04 15:35:09 +00:00
def _upload_build_artifacts (
pr_info : PRInfo ,
build_name : str ,
2024-02-02 17:10:47 +00:00
ci_cache : CiCache ,
2024-01-04 15:35:09 +00:00
job_report : JobReport ,
s3 : S3Helper ,
s3_destination : str ,
) - > str :
# There are ugly artifacts for the performance test. FIXME:
s3_performance_path = " / " . join (
(
get_release_or_pr ( pr_info , get_version_from_repo ( ) ) [ 1 ] ,
pr_info . sha ,
2024-02-04 19:12:37 +00:00
normalize_string ( build_name ) ,
2024-01-04 15:35:09 +00:00
" performance.tar.zst " ,
)
)
performance_urls = [ ]
assert job_report . build_dir_for_upload , " Must be set for build job "
performance_path = Path ( job_report . build_dir_for_upload ) / " performance.tar.zst "
if performance_path . exists ( ) :
performance_urls . append (
s3 . upload_build_file_to_s3 ( performance_path , s3_performance_path )
)
print (
" Uploaded performance.tar.zst to %s , now delete to avoid duplication " ,
performance_urls [ 0 ] ,
)
performance_path . unlink ( )
build_urls = (
s3 . upload_build_directory_to_s3 (
Path ( job_report . build_dir_for_upload ) ,
s3_destination ,
keep_dirs_in_s3_path = False ,
upload_symlinks = False ,
)
+ performance_urls
)
print ( " ::notice ::Build URLs: {} " . format ( " \n " . join ( build_urls ) ) )
log_path = Path ( job_report . additional_files [ 0 ] )
log_url = " "
if log_path . exists ( ) :
log_url = s3 . upload_build_file_to_s3 (
log_path , s3_destination + " / " + log_path . name
)
print ( f " ::notice ::Log URL: { log_url } " )
# generate and upload build report
build_result = BuildResult (
build_name ,
log_url ,
build_urls ,
job_report . version ,
job_report . status ,
int ( job_report . duration ) ,
GITHUB_JOB_API_URL ( ) ,
head_ref = pr_info . head_ref ,
pr_number = pr_info . number ,
)
2024-02-02 17:10:47 +00:00
report_url = ci_cache . upload_build_report ( build_result )
print ( f " Report file has been uploaded to [ { report_url } ] " )
2024-01-04 15:35:09 +00:00
# Upload head master binaries
static_bin_name = CI_CONFIG . build_config [ build_name ] . static_binary_name
if pr_info . is_master ( ) and static_bin_name :
# Full binary with debug info:
s3_path_full = " / " . join ( ( pr_info . base_ref , static_bin_name , " clickhouse-full " ) )
binary_full = Path ( job_report . build_dir_for_upload ) / " clickhouse "
url_full = s3 . upload_build_file_to_s3 ( binary_full , s3_path_full )
print ( f " ::notice ::Binary static URL (with debug info): { url_full } " )
# Stripped binary without debug info:
s3_path_compact = " / " . join ( ( pr_info . base_ref , static_bin_name , " clickhouse " ) )
binary_compact = Path ( job_report . build_dir_for_upload ) / " clickhouse-stripped "
url_compact = s3 . upload_build_file_to_s3 ( binary_compact , s3_path_compact )
print ( f " ::notice ::Binary static URL (compact): { url_compact } " )
return log_url
def _upload_build_profile_data (
pr_info : PRInfo ,
build_name : str ,
job_report : JobReport ,
git_runner : GitRunner ,
ch_helper : ClickHouseHelper ,
) - > None :
ci_logs_credentials = CiLogsCredentials ( Path ( " /dev/null " ) )
if ci_logs_credentials . host :
instance_type = get_instance_type ( )
instance_id = get_instance_id ( )
query = f """ INSERT INTO build_time_trace
(
pull_request_number ,
commit_sha ,
check_start_time ,
check_name ,
instance_type ,
instance_id ,
file ,
library ,
time ,
pid ,
tid ,
ph ,
ts ,
dur ,
cat ,
name ,
detail ,
count ,
avgMs ,
args_name
)
SELECT { pr_info . number } , ' {pr_info.sha} ' , ' {job_report.start_time} ' , ' {build_name} ' , ' {instance_type} ' , ' {instance_id} ' , *
FROM input ( '
file String ,
library String ,
time DateTime64 ( 6 ) ,
pid UInt32 ,
tid UInt32 ,
ph String ,
ts UInt64 ,
dur UInt64 ,
cat String ,
name String ,
detail String ,
count UInt64 ,
avgMs UInt64 ,
args_name String ' )
FORMAT JSONCompactEachRow """
auth = {
" X-ClickHouse-User " : " ci " ,
" X-ClickHouse-Key " : ci_logs_credentials . password ,
}
url = f " https:// { ci_logs_credentials . host } / "
profiles_dir = Path ( TEMP_PATH ) / " profiles_source "
profiles_dir . mkdir ( parents = True , exist_ok = True )
print (
" Processing profile JSON files from %s " ,
Path ( REPO_COPY ) / " build_docker " ,
)
git_runner (
" ./utils/prepare-time-trace/prepare-time-trace.sh "
f " build_docker { profiles_dir . absolute ( ) } "
)
profile_data_file = Path ( TEMP_PATH ) / " profile.json "
with open ( profile_data_file , " wb " ) as profile_fd :
for profile_source in profiles_dir . iterdir ( ) :
if profile_source . name != " binary_sizes.txt " :
with open ( profiles_dir / profile_source , " rb " ) as ps_fd :
profile_fd . write ( ps_fd . read ( ) )
print (
" ::notice ::Log Uploading profile data, path: %s , size: %s , query: %s " ,
profile_data_file ,
profile_data_file . stat ( ) . st_size ,
query ,
)
ch_helper . insert_file ( url , auth , query , profile_data_file )
query = f """ INSERT INTO binary_sizes
(
pull_request_number ,
commit_sha ,
check_start_time ,
check_name ,
instance_type ,
instance_id ,
file ,
size
)
SELECT { pr_info . number } , ' {pr_info.sha} ' , ' {job_report.start_time} ' , ' {build_name} ' , ' {instance_type} ' , ' {instance_id} ' , file , size
FROM input ( ' size UInt64, file String ' )
SETTINGS format_regexp = ' ^ \\ s*( \\ d+) (.+)$ '
FORMAT Regexp """
binary_sizes_file = profiles_dir / " binary_sizes.txt "
print (
" ::notice ::Log Uploading binary sizes data, path: %s , size: %s , query: %s " ,
binary_sizes_file ,
binary_sizes_file . stat ( ) . st_size ,
query ,
)
ch_helper . insert_file ( url , auth , query , binary_sizes_file )
def _run_test ( job_name : str , run_command : str ) - > int :
assert (
run_command or CI_CONFIG . get_job_config ( job_name ) . run_command
) , " Run command must be provided as input argument or be configured in job config "
if not run_command :
if CI_CONFIG . get_job_config ( job_name ) . timeout :
os . environ [ " KILL_TIMEOUT " ] = str ( CI_CONFIG . get_job_config ( job_name ) . timeout )
run_command = " / " . join (
( os . path . dirname ( __file__ ) , CI_CONFIG . get_job_config ( job_name ) . run_command )
)
if " .py " in run_command and not run_command . startswith ( " python " ) :
run_command = " python3 " + run_command
print ( " Use run command from a job config " )
else :
print ( " Use run command from the workflow " )
os . environ [ " CHECK_NAME " ] = job_name
print ( f " Going to start run command [ { run_command } ] " )
process = subprocess . run (
run_command ,
stdout = sys . stdout ,
stderr = sys . stderr ,
text = True ,
check = False ,
shell = True ,
)
if process . returncode == 0 :
print ( f " Run action done for: [ { job_name } ] " )
exit_code = 0
else :
print (
f " Run action failed for: [ { job_name } ] with exit code [ { process . returncode } ] "
)
exit_code = process . returncode
return exit_code
def _get_ext_check_name ( check_name : str ) - > str :
run_by_hash_num = int ( os . getenv ( " RUN_BY_HASH_NUM " , " 0 " ) )
run_by_hash_total = int ( os . getenv ( " RUN_BY_HASH_TOTAL " , " 0 " ) )
if run_by_hash_total > 1 :
check_name_with_group = (
check_name + f " [ { run_by_hash_num + 1 } / { run_by_hash_total } ] "
)
else :
check_name_with_group = check_name
return check_name_with_group
2023-12-18 08:07:22 +00:00
def main ( ) - > int :
2024-01-04 15:35:09 +00:00
logging . basicConfig ( level = logging . INFO )
2023-12-18 08:07:22 +00:00
exit_code = 0
parser = argparse . ArgumentParser (
formatter_class = argparse . ArgumentDefaultsHelpFormatter ,
)
args = parse_args ( parser )
2024-01-22 08:26:46 +00:00
if args . mark_success or args . pre or args . run :
2023-12-18 08:07:22 +00:00
assert args . infile , " Run config must be provided via --infile "
assert args . job_name , " Job name must be provided via --job-name "
indata : Optional [ Dict [ str , Any ] ] = None
if args . infile :
indata = (
json . loads ( args . infile )
if not os . path . isfile ( args . infile )
else json . load ( open ( args . infile ) )
)
assert indata and isinstance ( indata , dict ) , " Invalid --infile json "
result : Dict [ str , Any ] = { }
s3 = S3Helper ( )
2024-01-04 15:35:09 +00:00
pr_info = PRInfo ( )
git_runner = GitRunner ( set_cwd_to_git_root = True )
2023-12-18 08:07:22 +00:00
2024-01-04 15:35:09 +00:00
### CONFIGURE action: start
2023-12-18 08:07:22 +00:00
if args . configure :
2024-01-04 15:35:09 +00:00
# if '#no_merge_commit' is set in commit message - set git ref to PR branch head to avoid merge-commit
2023-12-18 08:07:22 +00:00
tokens = [ ]
2024-01-04 15:35:09 +00:00
ci_flags = {
Labels . NO_MERGE_COMMIT : False ,
Labels . NO_CI_CACHE : False ,
}
if ( pr_info . number != 0 and not args . skip_jobs ) or args . commit_message :
message = args . commit_message or git_runner . run (
f " { GIT_PREFIX } log { pr_info . sha } --format=%B -n 1 "
)
2023-12-18 08:07:22 +00:00
tokens = _fetch_commit_tokens ( message )
2024-01-04 15:35:09 +00:00
print ( f " Commit message tokens: [ { tokens } ] " )
if Labels . NO_MERGE_COMMIT in tokens and CI :
git_runner . run ( f " { GIT_PREFIX } checkout { pr_info . sha } " )
git_ref = git_runner . run ( f " { GIT_PREFIX } rev-parse HEAD " )
ci_flags [ Labels . NO_MERGE_COMMIT ] = True
print ( " NOTE: Disable Merge Commit " )
if Labels . NO_CI_CACHE in tokens :
ci_flags [ Labels . NO_CI_CACHE ] = True
print ( " NOTE: Disable CI Cache " )
2023-12-18 08:07:22 +00:00
2024-02-02 17:10:47 +00:00
docker_data = { }
git_ref = git_runner . run ( f " { GIT_PREFIX } rev-parse HEAD " )
2023-12-18 08:07:22 +00:00
# let's get CH version
version = get_version_from_repo ( git = Git ( True ) ) . string
print ( f " Got CH version for this commit: [ { version } ] " )
docker_data = (
2024-02-05 17:09:43 +00:00
_configure_docker_jobs ( args . docker_digest_or_latest )
2023-12-18 08:07:22 +00:00
if not args . skip_docker
else { }
)
job_digester = JobDigester ( )
build_digest = job_digester . get_job_digest (
CI_CONFIG . get_digest_config ( " package_release " )
)
docs_digest = job_digester . get_job_digest (
2024-01-30 17:44:52 +00:00
CI_CONFIG . get_digest_config ( JobNames . DOCS_CHECK )
2023-12-18 08:07:22 +00:00
)
jobs_data = (
_configure_jobs (
job_digester ,
s3 ,
2024-02-02 17:10:47 +00:00
pr_info ,
2023-12-18 08:07:22 +00:00
tokens ,
2024-01-04 15:35:09 +00:00
ci_flags [ Labels . NO_CI_CACHE ] ,
2023-12-18 08:07:22 +00:00
)
if not args . skip_jobs
else { }
)
2024-02-05 17:09:43 +00:00
# # FIXME: Early style check manipulates with job names might be not robust with await feature
# if pr_info.number != 0:
# # FIXME: it runs style check before docker build if possible (style-check images is not changed)
# # find a way to do style check always before docker build and others
# _check_and_update_for_early_style_check(jobs_data, docker_data)
if not args . skip_jobs and pr_info . has_changes_in_documentation_only ( ) :
2024-02-02 17:10:47 +00:00
_update_config_for_docs_only ( jobs_data )
2024-02-04 19:12:37 +00:00
if not args . skip_jobs :
2024-02-05 17:09:43 +00:00
ci_cache = CiCache ( s3 , jobs_data [ " digests " ] )
2024-02-04 19:12:37 +00:00
2024-02-20 21:19:30 +00:00
if pr_info . is_release_branch ( ) :
2024-02-04 19:12:37 +00:00
# wait for pending jobs to be finished, await_jobs is a long blocking call
# wait pending jobs (for now only on release/master branches)
ready_jobs_batches_dict = ci_cache . await_jobs (
jobs_data . get ( " jobs_to_wait " , { } ) , pr_info . is_release_branch ( )
2024-02-02 17:10:47 +00:00
)
2024-02-04 19:12:37 +00:00
jobs_to_do = jobs_data [ " jobs_to_do " ]
jobs_to_skip = jobs_data [ " jobs_to_skip " ]
jobs_params = jobs_data [ " jobs_params " ]
for job , batches in ready_jobs_batches_dict . items ( ) :
if job not in jobs_params :
print ( f " WARNING: Job [ { job } ] is not in the params list " )
continue
for batch in batches :
jobs_params [ job ] [ " batches " ] . remove ( batch )
if not jobs_params [ job ] [ " batches " ] :
jobs_to_do . remove ( job )
jobs_to_skip . append ( job )
del jobs_params [ job ]
# set planned jobs as pending in the CI cache if on the master
if pr_info . is_master ( ) :
for job in jobs_data [ " jobs_to_do " ] :
config = CI_CONFIG . get_job_config ( job )
if config . run_always or config . run_by_label :
continue
job_params = jobs_data [ " jobs_params " ] [ job ]
ci_cache . push_pending (
job ,
job_params [ " batches " ] ,
config . num_batches ,
release_branch = pr_info . is_release_branch ( ) ,
)
if " jobs_to_wait " in jobs_data :
del jobs_data [ " jobs_to_wait " ]
2024-02-02 17:10:47 +00:00
2023-12-18 08:07:22 +00:00
# conclude results
result [ " git_ref " ] = git_ref
result [ " version " ] = version
result [ " build " ] = build_digest
result [ " docs " ] = docs_digest
2024-01-04 15:35:09 +00:00
result [ " ci_flags " ] = ci_flags
2023-12-18 08:07:22 +00:00
result [ " jobs_data " ] = jobs_data
result [ " docker_data " ] = docker_data
2024-01-04 15:35:09 +00:00
### CONFIGURE action: end
2023-12-18 08:07:22 +00:00
2024-01-04 15:35:09 +00:00
### PRE action: start
2023-12-18 08:07:22 +00:00
elif args . pre :
2024-01-04 15:35:09 +00:00
assert indata , " Run config must be provided via --infile "
2024-02-02 17:10:47 +00:00
_pre_action ( s3 , indata , pr_info )
2023-12-18 08:07:22 +00:00
2024-01-04 15:35:09 +00:00
### RUN action: start
2023-12-18 08:07:22 +00:00
elif args . run :
2024-01-04 15:35:09 +00:00
assert indata
check_name = args . job_name
check_name_with_group = _get_ext_check_name ( check_name )
print (
f " Check if rerun for name: [ { check_name } ], extended name [ { check_name_with_group } ] "
2023-12-18 08:07:22 +00:00
)
2024-01-04 15:35:09 +00:00
previous_status = None
2024-01-19 17:21:01 +00:00
if CI_CONFIG . is_build_job ( check_name ) :
2024-01-04 15:35:09 +00:00
# this is a build job - check if build report is present
build_result = (
BuildResult . load_any ( check_name , pr_info . number , pr_info . head_ref )
if not indata [ " ci_flags " ] [ Labels . NO_CI_CACHE ]
else None
)
if build_result :
if build_result . status == SUCCESS :
previous_status = build_result . status
else :
# FIXME: Consider reusing failures for build jobs.
# Just remove this if/else - that makes build job starting and failing immediately
print (
" Build report found but status is unsuccessful - will try to rerun "
)
print ( " ::group::Build Report " )
print ( build_result . as_json ( ) )
print ( " ::endgroup:: " )
2023-12-18 08:07:22 +00:00
else :
2024-01-04 15:35:09 +00:00
# this is a test job - check if GH commit status is present
2024-02-02 17:10:47 +00:00
# rerun helper check
# FIXME: remove rerun_helper check and rely on ci cache only
2024-01-04 15:35:09 +00:00
commit = get_commit (
Github ( get_best_robot_token ( ) , per_page = 100 ) , pr_info . sha
)
rerun_helper = RerunHelper ( commit , check_name_with_group )
if rerun_helper . is_already_finished_by_status ( ) :
status = rerun_helper . get_finished_status ( )
assert status
previous_status = status . state
print ( " ::group::Commit Status " )
print ( status )
print ( " ::endgroup:: " )
2024-02-02 17:10:47 +00:00
# ci cache check
elif not indata [ " ci_flags " ] [ Labels . NO_CI_CACHE ] :
ci_cache = CiCache ( s3 , indata [ " jobs_data " ] [ " digests " ] ) . update ( )
job_config = CI_CONFIG . get_job_config ( check_name )
if ci_cache . is_successful (
check_name ,
args . batch ,
job_config . num_batches ,
job_config . required_on_release_branch ,
) :
job_status = ci_cache . get_successful (
check_name , args . batch , job_config . num_batches
)
assert job_status , " BUG "
2024-02-04 19:12:37 +00:00
_create_gh_status (
commit ,
check_name ,
args . batch ,
job_config . num_batches ,
job_status ,
2024-02-02 17:10:47 +00:00
)
previous_status = job_status . status
2024-02-04 19:12:37 +00:00
GHActions . print_in_group ( " Commit Status Data " , job_status )
2024-02-02 17:10:47 +00:00
2024-02-09 12:21:11 +00:00
if previous_status and not args . force :
2023-12-18 08:07:22 +00:00
print (
2024-01-04 15:35:09 +00:00
f " Commit status or Build Report is already present - job will be skipped with status: [ { previous_status } ] "
2023-12-18 08:07:22 +00:00
)
2024-01-04 15:35:09 +00:00
if previous_status == SUCCESS :
exit_code = 0
else :
exit_code = 1
else :
exit_code = _run_test ( check_name , args . run_command )
### RUN action: end
2023-12-18 08:07:22 +00:00
2024-01-04 15:35:09 +00:00
### POST action: start
2023-12-18 08:07:22 +00:00
elif args . post :
2024-01-04 15:35:09 +00:00
job_report = JobReport . load ( ) if JobReport . exist ( ) else None
if job_report :
ch_helper = ClickHouseHelper ( )
check_url = " "
2024-01-19 17:21:01 +00:00
if CI_CONFIG . is_build_job ( args . job_name ) :
2024-02-02 17:10:47 +00:00
assert (
indata
2024-02-04 19:12:37 +00:00
) , f " --infile with config must be provided for POST action of a build type job [ { args . job_name } ] "
2024-01-04 15:35:09 +00:00
build_name = args . job_name
s3_path_prefix = " / " . join (
(
get_release_or_pr ( pr_info , get_version_from_repo ( ) ) [ 0 ] ,
pr_info . sha ,
build_name ,
)
)
log_url = _upload_build_artifacts (
pr_info ,
build_name ,
2024-02-02 17:10:47 +00:00
ci_cache = CiCache ( s3 , indata [ " jobs_data " ] [ " digests " ] ) ,
2024-01-04 15:35:09 +00:00
job_report = job_report ,
s3 = s3 ,
s3_destination = s3_path_prefix ,
)
_upload_build_profile_data (
pr_info , build_name , job_report , git_runner , ch_helper
)
check_url = log_url
else :
# test job
additional_urls = [ ]
s3_path_prefix = " / " . join (
(
get_release_or_pr ( pr_info , get_version_from_repo ( ) ) [ 0 ] ,
pr_info . sha ,
2024-02-04 19:12:37 +00:00
normalize_string (
2024-01-04 15:35:09 +00:00
job_report . check_name or _get_ext_check_name ( args . job_name )
) ,
)
)
if job_report . build_dir_for_upload :
additional_urls = s3 . upload_build_directory_to_s3 (
Path ( job_report . build_dir_for_upload ) ,
s3_path_prefix ,
keep_dirs_in_s3_path = False ,
upload_symlinks = False ,
)
if job_report . test_results or job_report . additional_files :
check_url = upload_result_helper . upload_results (
s3 ,
pr_info . number ,
pr_info . sha ,
job_report . test_results ,
job_report . additional_files ,
2024-02-20 21:19:30 +00:00
job_report . check_name or _get_ext_check_name ( args . job_name ) ,
2024-01-04 15:35:09 +00:00
additional_urls = additional_urls or None ,
)
commit = get_commit (
Github ( get_best_robot_token ( ) , per_page = 100 ) , pr_info . sha
)
post_commit_status (
commit ,
job_report . status ,
check_url ,
format_description ( job_report . description ) ,
2024-02-20 21:19:30 +00:00
job_report . check_name or _get_ext_check_name ( args . job_name ) ,
2024-01-04 15:35:09 +00:00
pr_info ,
dump_to_file = True ,
)
update_mergeable_check (
commit ,
pr_info ,
job_report . check_name or _get_ext_check_name ( args . job_name ) ,
)
print ( f " Job report url: [ { check_url } ] " )
prepared_events = prepare_tests_results_for_clickhouse (
pr_info ,
job_report . test_results ,
job_report . status ,
job_report . duration ,
job_report . start_time ,
check_url or " " ,
2024-02-20 21:19:30 +00:00
job_report . check_name or _get_ext_check_name ( args . job_name ) ,
2023-12-18 08:07:22 +00:00
)
2024-01-04 15:35:09 +00:00
ch_helper . insert_events_into (
db = " default " , table = " checks " , events = prepared_events
2023-12-18 08:07:22 +00:00
)
else :
2024-01-04 15:35:09 +00:00
# no job report
print ( f " No job report for { [ args . job_name ] } - do nothing " )
### POST action: end
2023-12-18 08:07:22 +00:00
2024-01-04 15:35:09 +00:00
### MARK SUCCESS action: start
2023-12-18 08:07:22 +00:00
elif args . mark_success :
assert indata , " Run config must be provided via --infile "
2024-02-02 17:10:47 +00:00
_mark_success_action ( s3 , indata , pr_info , args . job_name , args . batch )
2024-01-04 15:35:09 +00:00
### UPDATE GH STATUSES action: start
elif args . update_gh_statuses :
assert indata , " Run config must be provided via --infile "
2024-02-02 17:10:47 +00:00
_update_gh_statuses_action ( indata = indata , s3 = s3 )
2023-12-18 08:07:22 +00:00
2024-01-04 15:35:09 +00:00
### print results
2024-02-02 17:10:47 +00:00
_print_results ( result , args . outfile , args . pretty )
2023-12-18 08:07:22 +00:00
return exit_code
if __name__ == " __main__ " :
sys . exit ( main ( ) )