Merge pull request #64734 from ClickHouse/ci_py_small_refactoring

CI: ci.py refactoring
This commit is contained in:
Max K 2024-06-04 19:35:21 +00:00 committed by GitHub
commit ad7097587f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 1329 additions and 1367 deletions

View File

@ -58,7 +58,7 @@ jobs:
env: env:
GITHUB_JOB_OVERRIDDEN: ${{inputs.test_name}}${{ fromJson(inputs.data).jobs_data.jobs_params[inputs.test_name].num_batches > 1 && format('-{0}',matrix.batch) || '' }} GITHUB_JOB_OVERRIDDEN: ${{inputs.test_name}}${{ fromJson(inputs.data).jobs_data.jobs_params[inputs.test_name].num_batches > 1 && format('-{0}',matrix.batch) || '' }}
strategy: strategy:
fail-fast: false # we always wait for entire matrix fail-fast: false # we always wait for the entire matrix
matrix: matrix:
batch: ${{ fromJson(inputs.data).jobs_data.jobs_params[inputs.test_name].batches }} batch: ${{ fromJson(inputs.data).jobs_data.jobs_params[inputs.test_name].batches }}
steps: steps:

View File

@ -1,29 +0,0 @@
### CI modificators (add a leading space to apply) ###
## To avoid a merge commit in CI:
#no_merge_commit
## To discard CI cache:
#no_ci_cache
## To not test (only style check):
#do_not_test
## To run specified set of tests in CI:
#ci_set_<SET_NAME>
#ci_set_reduced
#ci_set_arm
#ci_set_integration
#ci_set_old_analyzer
## To run specified job in CI:
#job_<JOB NAME>
#job_stateless_tests_release
#job_package_debug
#job_integration_tests_asan
## To run only specified batches for multi-batch job(s)
#batch_2
#batch_1_2_3

View File

@ -197,7 +197,6 @@ class CargoCache(Cache):
logging.info("Cache for Cargo.lock md5 %s will be uploaded", self.lock_hash) logging.info("Cache for Cargo.lock md5 %s will be uploaded", self.lock_hash)
self._force_upload_cache = True self._force_upload_cache = True
self.directory.mkdir(parents=True, exist_ok=True) self.directory.mkdir(parents=True, exist_ok=True)
return
def upload(self): def upload(self):
self._upload(f"{self.PREFIX}/{self.archive_name}", self._force_upload_cache) self._upload(f"{self.PREFIX}/{self.archive_name}", self._force_upload_cache)

File diff suppressed because it is too large Load Diff

818
tests/ci/ci_cache.py Normal file
View File

@ -0,0 +1,818 @@
import json
import time
from dataclasses import dataclass, asdict
from enum import Enum
from pathlib import Path
from typing import Dict, Optional, Any, Union, Sequence, List, Set
from ci_config import JobNames, Build, CI_CONFIG, JobConfig
from ci_utils import is_hex, GHActions
from commit_status_helper import CommitStatusData
from env_helper import (
TEMP_PATH,
CI_CONFIG_PATH,
S3_BUILDS_BUCKET,
GITHUB_RUN_URL,
REPORT_PATH,
)
from report import BuildResult
from s3_helper import S3Helper
from digest_helper import JobDigester
@dataclass
class PendingState:
updated_at: float
run_url: str
class CiCache:
"""
CI cache is a bunch of records. Record is a file stored under special location on s3.
The file name has a format:
<RECORD_TYPE>_[<ATTRIBUTES>]--<JOB_NAME>_<JOB_DIGEST>_<BATCH>_<NUM_BATCHES>.ci
RECORD_TYPE:
SUCCESSFUL - for successful jobs
PENDING - for pending jobs
ATTRIBUTES:
release - for jobs being executed on the release branch including master branch (not a PR branch)
"""
_REQUIRED_DIGESTS = [JobNames.DOCS_CHECK, Build.PACKAGE_RELEASE]
_S3_CACHE_PREFIX = "CI_cache_v1"
_CACHE_BUILD_REPORT_PREFIX = "build_report"
_RECORD_FILE_EXTENSION = ".ci"
_LOCAL_CACHE_PATH = Path(TEMP_PATH) / "ci_cache"
_ATTRIBUTE_RELEASE = "release"
# divider symbol 1
_DIV1 = "--"
# divider symbol 2
_DIV2 = "_"
assert _DIV1 != _DIV2
class RecordType(Enum):
SUCCESSFUL = "successful"
PENDING = "pending"
FAILED = "failed"
@dataclass
class Record:
record_type: "CiCache.RecordType"
job_name: str
job_digest: str
batch: int
num_batches: int
release_branch: bool
file: str = ""
def to_str_key(self):
"""other fields must not be included in the hash str"""
return "_".join(
[self.job_name, self.job_digest, str(self.batch), str(self.num_batches)]
)
class JobType(Enum):
DOCS = "DOCS"
SRCS = "SRCS"
@classmethod
def is_docs_job(cls, job_name: str) -> bool:
return job_name == JobNames.DOCS_CHECK
@classmethod
def is_srcs_job(cls, job_name: str) -> bool:
return not cls.is_docs_job(job_name)
@classmethod
def get_type_by_name(cls, job_name: str) -> "CiCache.JobType":
res = cls.SRCS
if cls.is_docs_job(job_name):
res = cls.DOCS
elif cls.is_srcs_job(job_name):
res = cls.SRCS
else:
assert False
return res
def __init__(
self,
s3: S3Helper,
job_digests: Dict[str, str],
cache_enabled: bool = True,
):
self.enabled = cache_enabled
self.jobs_to_skip = [] # type: List[str]
self.jobs_to_wait = {} # type: Dict[str, JobConfig]
self.jobs_to_do = {} # type: Dict[str, JobConfig]
self.s3 = s3
self.job_digests = job_digests
self.cache_s3_paths = {
job_type: f"{self._S3_CACHE_PREFIX}/{job_type.value}-{self._get_digest_for_job_type(self.job_digests, job_type)}/"
for job_type in self.JobType
}
self.s3_record_prefixes = {
record_type: record_type.value for record_type in self.RecordType
}
self.records: Dict["CiCache.RecordType", Dict[str, "CiCache.Record"]] = {
record_type: {} for record_type in self.RecordType
}
self.updated = False
self.cache_data_fetched = True
if not self._LOCAL_CACHE_PATH.exists():
self._LOCAL_CACHE_PATH.mkdir(parents=True, exist_ok=True)
@classmethod
def calc_digests_and_create(
cls, s3: S3Helper, job_configs: Dict[str, JobConfig], cache_enabled: bool = True
) -> "CiCache":
job_digester = JobDigester()
digests = {}
print("::group::Job Digests")
for job, job_config in job_configs.items():
digest = job_digester.get_job_digest(job_config.digest)
digests[job] = digest
print(f" job [{job.rjust(50)}] has digest [{digest}]")
for job in cls._REQUIRED_DIGESTS:
if job not in job_configs:
digest = job_digester.get_job_digest(
CI_CONFIG.get_job_config(job).digest
)
digests[job] = digest
print(
f" job [{job.rjust(50)}] required for CI Cache has digest [{digest}]"
)
print("::endgroup::")
return CiCache(s3, digests, cache_enabled=cache_enabled)
def _get_digest_for_job_type(
self, job_digests: Dict[str, str], job_type: JobType
) -> str:
if job_type == self.JobType.DOCS:
res = job_digests[JobNames.DOCS_CHECK]
elif job_type == self.JobType.SRCS:
if Build.PACKAGE_RELEASE in job_digests:
res = job_digests[Build.PACKAGE_RELEASE]
else:
assert False, "BUG, no build job in digest' list"
else:
assert False, "BUG, New JobType? - please update the function"
return res
def _get_record_file_name(
self,
record_type: RecordType,
job_name: str,
batch: int,
num_batches: int,
release_branch: bool,
) -> str:
prefix = self.s3_record_prefixes[record_type]
prefix_extended = (
self._DIV2.join([prefix, self._ATTRIBUTE_RELEASE])
if release_branch
else prefix
)
assert self._DIV1 not in job_name, f"Invalid job name {job_name}"
job_name = self._DIV2.join(
[job_name, self.job_digests[job_name], str(batch), str(num_batches)]
)
file_name = self._DIV1.join([prefix_extended, job_name])
file_name += self._RECORD_FILE_EXTENSION
return file_name
def _get_record_s3_path(self, job_name: str) -> str:
return self.cache_s3_paths[self.JobType.get_type_by_name(job_name)]
def _parse_record_file_name(
self, record_type: RecordType, file_name: str
) -> Optional["CiCache.Record"]:
# validate filename
if (
not file_name.endswith(self._RECORD_FILE_EXTENSION)
or not len(file_name.split(self._DIV1)) == 2
):
print("ERROR: wrong file name format")
return None
file_name = file_name.removesuffix(self._RECORD_FILE_EXTENSION)
release_branch = False
prefix_extended, job_suffix = file_name.split(self._DIV1)
record_type_and_attribute = prefix_extended.split(self._DIV2)
# validate filename prefix
failure = False
if not 0 < len(record_type_and_attribute) <= 2:
print("ERROR: wrong file name prefix")
failure = True
if (
len(record_type_and_attribute) > 1
and record_type_and_attribute[1] != self._ATTRIBUTE_RELEASE
):
print("ERROR: wrong record attribute")
failure = True
if record_type_and_attribute[0] != self.s3_record_prefixes[record_type]:
print("ERROR: wrong record type")
failure = True
if failure:
return None
if (
len(record_type_and_attribute) > 1
and record_type_and_attribute[1] == self._ATTRIBUTE_RELEASE
):
release_branch = True
job_properties = job_suffix.split(self._DIV2)
job_name, job_digest, batch, num_batches = (
self._DIV2.join(job_properties[:-3]),
job_properties[-3],
int(job_properties[-2]),
int(job_properties[-1]),
)
if not is_hex(job_digest):
print("ERROR: wrong record job digest")
return None
record = self.Record(
record_type,
job_name,
job_digest,
batch,
num_batches,
release_branch,
file="",
)
return record
def print_status(self):
print(f"Cache enabled: [{self.enabled}]")
for record_type in self.RecordType:
GHActions.print_in_group(
f"Cache records: [{record_type}]", list(self.records[record_type])
)
GHActions.print_in_group(
"Jobs to do:",
list(self.jobs_to_do.items()),
)
GHActions.print_in_group("Jobs to skip:", self.jobs_to_skip)
GHActions.print_in_group(
"Jobs to wait:",
list(self.jobs_to_wait.items()),
)
return self
@staticmethod
def dump_run_config(indata: Dict[str, Any]) -> None:
assert indata
assert CI_CONFIG_PATH
with open(CI_CONFIG_PATH, "w", encoding="utf-8") as json_file:
json.dump(indata, json_file, indent=2)
def update(self):
"""
Pulls cache records from s3. Only records name w/o content.
"""
if not self.enabled:
return self
for record_type in self.RecordType:
prefix = self.s3_record_prefixes[record_type]
cache_list = self.records[record_type]
for job_type in self.JobType:
path = self.cache_s3_paths[job_type]
records = self.s3.list_prefix(f"{path}{prefix}", S3_BUILDS_BUCKET)
records = [record.split("/")[-1] for record in records]
for file in records:
record = self._parse_record_file_name(
record_type=record_type, file_name=file
)
if not record:
print(f"ERROR: failed to parse cache record [{file}]")
continue
if (
record.job_name not in self.job_digests
or self.job_digests[record.job_name] != record.job_digest
):
# skip records we are not interested in
continue
if record.to_str_key() not in cache_list:
cache_list[record.to_str_key()] = record
self.cache_data_fetched = False
elif (
not cache_list[record.to_str_key()].release_branch
and record.release_branch
):
# replace a non-release record with a release one
cache_list[record.to_str_key()] = record
self.cache_data_fetched = False
self.updated = True
return self
def fetch_records_data(self):
"""
Pulls CommitStatusData for all cached jobs from s3
"""
if not self.updated:
self.update()
if self.cache_data_fetched:
# there are no records without fetched data - no need to fetch
return self
# clean up
for file in self._LOCAL_CACHE_PATH.glob("*.ci"):
file.unlink()
# download all record files
for job_type in self.JobType:
path = self.cache_s3_paths[job_type]
for record_type in self.RecordType:
prefix = self.s3_record_prefixes[record_type]
_ = self.s3.download_files(
bucket=S3_BUILDS_BUCKET,
s3_path=f"{path}{prefix}",
file_suffix=self._RECORD_FILE_EXTENSION,
local_directory=self._LOCAL_CACHE_PATH,
)
# validate we have files for all records and save file names meanwhile
for record_type in self.RecordType:
record_list = self.records[record_type]
for _, record in record_list.items():
record_file_name = self._get_record_file_name(
record_type,
record.job_name,
record.batch,
record.num_batches,
record.release_branch,
)
assert (
self._LOCAL_CACHE_PATH / record_file_name
).is_file(), f"BUG. Record file must be present: {self._LOCAL_CACHE_PATH / record_file_name}"
record.file = record_file_name
self.cache_data_fetched = True
return self
def exist(
self,
record_type: "CiCache.RecordType",
job: str,
batch: int,
num_batches: int,
release_branch: bool,
) -> bool:
if not self.updated:
self.update()
record_key = self.Record(
record_type,
job,
self.job_digests[job],
batch,
num_batches,
release_branch,
).to_str_key()
res = record_key in self.records[record_type]
if release_branch:
return res and self.records[record_type][record_key].release_branch
else:
return res
def push(
self,
record_type: "CiCache.RecordType",
job: str,
batches: Union[int, Sequence[int]],
num_batches: int,
status: Union[CommitStatusData, PendingState],
release_branch: bool = False,
) -> None:
"""
Pushes a cache record (CommitStatusData)
@release_branch adds "release" attribute to a record
"""
if isinstance(batches, int):
batches = [batches]
for batch in batches:
record_file = self._LOCAL_CACHE_PATH / self._get_record_file_name(
record_type, job, batch, num_batches, release_branch
)
record_s3_path = self._get_record_s3_path(job)
if record_type == self.RecordType.SUCCESSFUL:
assert isinstance(status, CommitStatusData)
status.dump_to_file(record_file)
elif record_type == self.RecordType.FAILED:
assert isinstance(status, CommitStatusData)
status.dump_to_file(record_file)
elif record_type == self.RecordType.PENDING:
assert isinstance(status, PendingState)
with open(record_file, "w", encoding="utf-8") as json_file:
json.dump(asdict(status), json_file)
else:
assert False
_ = self.s3.upload_file(
bucket=S3_BUILDS_BUCKET,
file_path=record_file,
s3_path=record_s3_path + record_file.name,
)
record = self.Record(
record_type,
job,
self.job_digests[job],
batch,
num_batches,
release_branch,
file=record_file.name,
)
if (
record.release_branch
or record.to_str_key() not in self.records[record_type]
):
self.records[record_type][record.to_str_key()] = record
def get(
self, record_type: "CiCache.RecordType", job: str, batch: int, num_batches: int
) -> Optional[Union[CommitStatusData, PendingState]]:
"""
Gets a cache record data for a job, or None if a cache miss
"""
if not self.cache_data_fetched:
self.fetch_records_data()
record_key = self.Record(
record_type,
job,
self.job_digests[job],
batch,
num_batches,
release_branch=False,
).to_str_key()
if record_key not in self.records[record_type]:
return None
record_file_name = self.records[record_type][record_key].file
res = CommitStatusData.load_from_file(
self._LOCAL_CACHE_PATH / record_file_name
) # type: CommitStatusData
return res
def delete(
self,
record_type: "CiCache.RecordType",
job: str,
batch: int,
num_batches: int,
release_branch: bool,
) -> None:
"""
deletes record from the cache
"""
raise NotImplementedError("Let's try make cache push-and-read-only")
# assert (
# record_type == self.RecordType.PENDING
# ), "FIXME: delete is supported for pending records only"
# record_file_name = self._get_record_file_name(
# self.RecordType.PENDING,
# job,
# batch,
# num_batches,
# release_branch=release_branch,
# )
# record_s3_path = self._get_record_s3_path(job)
# self.s3.delete_file_from_s3(S3_BUILDS_BUCKET, record_s3_path + record_file_name)
# record_key = self.Record(
# record_type,
# job,
# self.job_digests[job],
# batch,
# num_batches,
# release_branch=False,
# ).to_str_key()
# if record_key in self.records[record_type]:
# del self.records[record_type][record_key]
def is_successful(
self, job: str, batch: int, num_batches: int, release_branch: bool
) -> bool:
"""
checks if a given job have already been done successfully
"""
return self.exist(
self.RecordType.SUCCESSFUL, job, batch, num_batches, release_branch
)
def is_failed(
self, job: str, batch: int, num_batches: int, release_branch: bool
) -> bool:
"""
checks if a given job have already been done with failure
"""
return self.exist(
self.RecordType.FAILED, job, batch, num_batches, release_branch
)
def is_pending(
self, job: str, batch: int, num_batches: int, release_branch: bool
) -> bool:
"""
check pending record in the cache for a given job
@release_branch - checks that "release" attribute is set for a record
"""
if self.is_successful(
job, batch, num_batches, release_branch
) or self.is_failed(job, batch, num_batches, release_branch):
return False
return self.exist(
self.RecordType.PENDING, job, batch, num_batches, release_branch
)
def push_successful(
self,
job: str,
batch: int,
num_batches: int,
job_status: CommitStatusData,
release_branch: bool = False,
) -> None:
"""
Pushes a cache record (CommitStatusData)
@release_branch adds "release" attribute to a record
"""
self.push(
self.RecordType.SUCCESSFUL,
job,
[batch],
num_batches,
job_status,
release_branch,
)
def push_failed(
self,
job: str,
batch: int,
num_batches: int,
job_status: CommitStatusData,
release_branch: bool = False,
) -> None:
"""
Pushes a cache record of type Failed (CommitStatusData)
@release_branch adds "release" attribute to a record
"""
self.push(
self.RecordType.FAILED,
job,
[batch],
num_batches,
job_status,
release_branch,
)
def push_pending(
self, job: str, batches: List[int], num_batches: int, release_branch: bool
) -> None:
"""
pushes pending record for a job to the cache
"""
pending_state = PendingState(time.time(), run_url=GITHUB_RUN_URL)
self.push(
self.RecordType.PENDING,
job,
batches,
num_batches,
pending_state,
release_branch,
)
def push_pending_all(self, release_branch: bool) -> None:
"""
pushes pending records for all jobs that supposed to be run
"""
for job, job_config in self.jobs_to_do.items():
if job_config.run_always:
continue
pending_state = PendingState(time.time(), run_url=GITHUB_RUN_URL)
assert job_config.batches
self.push(
self.RecordType.PENDING,
job,
job_config.batches,
job_config.num_batches,
pending_state,
release_branch,
)
def get_successful(
self, job: str, batch: int, num_batches: int
) -> Optional[CommitStatusData]:
"""
Gets a cache record (CommitStatusData) for a job, or None if a cache miss
"""
res = self.get(self.RecordType.SUCCESSFUL, job, batch, num_batches)
assert res is None or isinstance(res, CommitStatusData)
return res
def delete_pending(
self, job: str, batch: int, num_batches: int, release_branch: bool
) -> None:
"""
deletes pending record from the cache
"""
self.delete(self.RecordType.PENDING, job, batch, num_batches, release_branch)
def download_build_reports(self, file_prefix: str = "") -> List[str]:
"""
not an ideal class for this method,
but let it be as we store build reports in CI cache directory on s3
and CiCache knows where exactly
@file_prefix allows filtering out reports by git head_ref
"""
report_path = Path(REPORT_PATH)
report_path.mkdir(exist_ok=True, parents=True)
path = (
self._get_record_s3_path(Build.PACKAGE_RELEASE)
+ self._CACHE_BUILD_REPORT_PREFIX
)
if file_prefix:
path += "_" + file_prefix
reports_files = self.s3.download_files(
bucket=S3_BUILDS_BUCKET,
s3_path=path,
file_suffix=".json",
local_directory=report_path,
)
return reports_files
def upload_build_report(self, build_result: BuildResult) -> str:
result_json_path = build_result.write_json(Path(TEMP_PATH))
s3_path = (
self._get_record_s3_path(Build.PACKAGE_RELEASE) + result_json_path.name
)
return self.s3.upload_file(
bucket=S3_BUILDS_BUCKET, file_path=result_json_path, s3_path=s3_path
)
def await_pending_jobs(self, is_release: bool) -> None:
"""
await pending jobs to be finished
@jobs_with_params - jobs to await. {JOB_NAME: {"batches": [BATCHES...], "num_batches": NUM_BATCHES}}
returns successfully finished jobs: {JOB_NAME: [BATCHES...]}
"""
if not self.jobs_to_wait:
print("CI cache: no pending jobs to wait - continue")
return
poll_interval_sec = 300
# TIMEOUT * MAX_ROUNDS_TO_WAIT must be less than 6h (GH job timeout) with a room for rest RunConfig work
TIMEOUT = 3000 # 50 min
MAX_ROUNDS_TO_WAIT = 6
MAX_JOB_NUM_TO_WAIT = 3
round_cnt = 0
# FIXME: temporary experiment: lets enable await for PR' workflows awaiting on build' jobs only
if not is_release:
MAX_ROUNDS_TO_WAIT = 1
remove_from_wait = []
for job in self.jobs_to_wait:
if job not in Build:
remove_from_wait.append(job)
for job in remove_from_wait:
del self.jobs_to_wait[job]
while (
len(self.jobs_to_wait) > MAX_JOB_NUM_TO_WAIT
and round_cnt < MAX_ROUNDS_TO_WAIT
):
await_finished: Set[str] = set()
round_cnt += 1
GHActions.print_in_group(
f"Wait pending jobs, round [{round_cnt}/{MAX_ROUNDS_TO_WAIT}]:",
list(self.jobs_to_wait),
)
# this is an initial approach to wait pending jobs:
# start waiting for the next TIMEOUT seconds if there are more than X(=4) jobs to wait
# wait TIMEOUT seconds in rounds. Y(=5) is the max number of rounds
expired_sec = 0
start_at = int(time.time())
while expired_sec < TIMEOUT and self.jobs_to_wait:
time.sleep(poll_interval_sec)
self.update()
for job_name, job_config in self.jobs_to_wait.items():
num_batches = job_config.num_batches
job_config = CI_CONFIG.get_job_config(job_name)
assert job_config.pending_batches
assert job_config.batches
pending_batches = list(job_config.pending_batches)
for batch in pending_batches:
if self.is_pending(
job_name,
batch,
num_batches,
release_branch=is_release
and job_config.required_on_release_branch,
):
continue
if self.is_successful(
job_name,
batch,
num_batches,
release_branch=is_release
and job_config.required_on_release_branch,
):
print(
f"Job [{job_name}_[{batch}/{num_batches}]] is not pending anymore"
)
job_config.batches.remove(batch)
job_config.pending_batches.remove(batch)
else:
print(
f"NOTE: Job [{job_name}:{batch}] finished failed - do not add to ready"
)
job_config.pending_batches.remove(batch)
if not job_config.pending_batches:
await_finished.add(job_name)
for job in await_finished:
self.jobs_to_skip.append(job)
del self.jobs_to_wait[job]
expired_sec = int(time.time()) - start_at
print(
f"...awaiting continues... seconds left [{TIMEOUT - expired_sec}]"
)
if await_finished:
GHActions.print_in_group(
f"Finished jobs, round [{round_cnt}]: [{list(await_finished)}]",
list(await_finished),
)
GHActions.print_in_group(
"Remaining jobs:",
[list(self.jobs_to_wait)],
)
def apply(self, job_configs: Dict[str, JobConfig], is_release: bool) -> "CiCache":
if not self.enabled:
self.jobs_to_do = job_configs
return self
if not self.updated:
self.update()
for job, job_config in job_configs.items():
assert (
job_config.batches
), "Batches must be generated. check ci_settings.apply()"
if job_config.run_always:
self.jobs_to_do[job] = job_config
continue
ready_batches = []
for batch in job_config.batches:
if self.is_successful(
job,
batch,
job_config.num_batches,
release_branch=is_release and job_config.required_on_release_branch,
):
ready_batches.append(batch)
elif self.is_pending(
job,
batch,
job_config.num_batches,
release_branch=is_release and job_config.required_on_release_branch,
):
if job_config.pending_batches is None:
job_config.pending_batches = []
job_config.pending_batches.append(batch)
if ready_batches == job_config.batches:
self.jobs_to_skip.append(job)
else:
for batch in ready_batches:
job_config.batches.remove(batch)
self.jobs_to_do[job] = job_config
if job_config.pending_batches:
self.jobs_to_wait[job] = job_config
return self

View File

@ -1,6 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import logging import logging
import random
import re import re
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
from copy import deepcopy from copy import deepcopy
@ -8,7 +9,7 @@ from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from typing import Callable, Dict, Iterable, List, Literal, Optional, Union from typing import Callable, Dict, Iterable, List, Literal, Optional, Union
from ci_utils import WithIter from ci_utils import WithIter, normalize_string
from integration_test_images import IMAGES from integration_test_images import IMAGES
@ -49,7 +50,7 @@ class CILabels(metaclass=WithIter):
NO_CI_CACHE = "no_ci_cache" NO_CI_CACHE = "no_ci_cache"
# to upload all binaries from build jobs # to upload all binaries from build jobs
UPLOAD_ALL_ARTIFACTS = "upload_all" UPLOAD_ALL_ARTIFACTS = "upload_all"
CI_SET_REDUCED = "ci_set_reduced" CI_SET_SYNC = "ci_set_sync"
CI_SET_ARM = "ci_set_arm" CI_SET_ARM = "ci_set_arm"
CI_SET_REQUIRED = "ci_set_required" CI_SET_REQUIRED = "ci_set_required"
CI_SET_NON_REQUIRED = "ci_set_non_required" CI_SET_NON_REQUIRED = "ci_set_non_required"
@ -233,23 +234,25 @@ class JobConfig:
run_command: str = "" run_command: str = ""
# job timeout, seconds # job timeout, seconds
timeout: Optional[int] = None timeout: Optional[int] = None
# sets number of batches for multi-batch job # sets number of batches for a multi-batch job
num_batches: int = 1 num_batches: int = 1
# label that enables job in CI, if set digest won't be used # label that enables job in CI, if set digest isn't used
run_by_label: str = "" run_by_label: str = ""
# to run always regardless of the job digest or/and label # to run always regardless of the job digest or/and label
run_always: bool = False run_always: bool = False
# if the job needs to be run on the release branch, including master (e.g. building packages, docker server). # if the job needs to be run on the release branch, including master (building packages, docker server).
# NOTE: Subsequent runs on the same branch with the similar digest are still considered skip-able. # NOTE: Subsequent runs on the same branch with the similar digest are still considered skip-able.
required_on_release_branch: bool = False required_on_release_branch: bool = False
# job is for pr workflow only # job is for pr workflow only
pr_only: bool = False pr_only: bool = False
# job is for release/master branches only # job is for release/master branches only
release_only: bool = False release_only: bool = False
# job will run if it's enabled in CI option # to randomly pick and run one job among jobs in the same @random_bucket (PR branches only).
run_by_ci_option: bool = False
# to randomly pick and run one job among jobs in the same @random_bucket. Applied in PR branches only.
random_bucket: str = "" random_bucket: str = ""
# Do not set it. A list of batches to run. It will be set in runtime in accordance with ci cache and ci settings
batches: Optional[List[int]] = None
# Do not set it. A list of batches to await. It will be set in runtime in accordance with ci cache and ci settings
pending_batches: Optional[List[int]] = None
builds_job_config = JobConfig( builds_job_config = JobConfig(
@ -552,9 +555,20 @@ class CIConfig:
other_jobs_configs: TestConfigs other_jobs_configs: TestConfigs
label_configs: LabelConfigs label_configs: LabelConfigs
# Jobs that run for doc related updates
_DOCS_CHECK_JOBS = [JobNames.DOCS_CHECK, JobNames.STYLE_CHECK]
# Jobs that run in Merge Queue if it's enabled
_MQ_JOBS = [
JobNames.STYLE_CHECK,
JobNames.FAST_TEST,
Build.BINARY_RELEASE,
JobNames.UNIT_TEST,
]
def get_label_config(self, label_name: str) -> Optional[LabelConfig]: def get_label_config(self, label_name: str) -> Optional[LabelConfig]:
for label, config in self.label_configs.items(): for label, config in self.label_configs.items():
if self.normalize_string(label_name) == self.normalize_string(label): if normalize_string(label_name) == normalize_string(label):
return config return config
return None return None
@ -670,21 +684,9 @@ class CIConfig:
return result return result
@staticmethod def get_job_parents(self, check_name: str) -> List[str]:
def normalize_string(input_string: str) -> str:
lowercase_string = input_string.lower()
normalized_string = (
lowercase_string.replace(" ", "_")
.replace("-", "_")
.replace("(", "")
.replace(")", "")
.replace(",", "")
)
return normalized_string
def get_job_with_parents(self, check_name: str) -> List[str]:
res = [] res = []
check_name = self.normalize_string(check_name) check_name = normalize_string(check_name)
for config in ( for config in (
self.build_config, self.build_config,
@ -693,23 +695,10 @@ class CIConfig:
self.other_jobs_configs, self.other_jobs_configs,
): ):
for job_name in config: # type: ignore for job_name in config: # type: ignore
if check_name == self.normalize_string(job_name): if check_name == normalize_string(job_name):
res.append(job_name)
if isinstance(config[job_name], TestConfig): # type: ignore if isinstance(config[job_name], TestConfig): # type: ignore
if config[job_name].required_build: # type: ignore if config[job_name].required_build: # type: ignore
res.append(config[job_name].required_build) # type: ignore res.append(config[job_name].required_build) # type: ignore
elif isinstance(config[job_name], BuildConfig): # type: ignore
pass
elif isinstance(config[job_name], BuildReportConfig): # type: ignore
pass
else:
assert (
False
), f"check commit message tags or FIXME: request for job [{check_name}] not yet supported"
break
assert (
res
), f"Error: Experimental feature... Invalid request or not supported job [{check_name}]"
return res return res
def get_digest_config(self, check_name: str) -> DigestConfig: def get_digest_config(self, check_name: str) -> DigestConfig:
@ -727,18 +716,49 @@ class CIConfig:
), f"Invalid check_name or CI_CONFIG outdated, config not found for [{check_name}]" ), f"Invalid check_name or CI_CONFIG outdated, config not found for [{check_name}]"
return res # type: ignore return res # type: ignore
def job_generator(self, branch: str) -> Iterable[str]: def get_workflow_jobs_with_configs(
self, is_mq: bool, is_docs_only: bool, is_master: bool
) -> Dict[str, JobConfig]:
""" """
traverses all check names in CI pipeline get a list of all jobs for a workflow with configs
""" """
assert branch jobs = []
if is_mq:
jobs = self._MQ_JOBS
elif is_docs_only:
jobs = self._DOCS_CHECK_JOBS
else:
for config in ( for config in (
self.other_jobs_configs, self.other_jobs_configs,
self.build_config, self.build_config,
self.builds_report_config, self.builds_report_config,
self.test_configs, self.test_configs,
): ):
yield from config # type: ignore jobs += list(config) # type:ignore
if is_master:
for job in self._MQ_JOBS:
jobs.remove(job)
randomization_bucket_jobs = {} # type: Dict[str, Dict[str, JobConfig]]
res = {} # type: Dict[str, JobConfig]
for job in jobs:
job_config = self.get_job_config(job)
if job_config.random_bucket:
if job_config.random_bucket not in randomization_bucket_jobs:
randomization_bucket_jobs[job_config.random_bucket] = {}
randomization_bucket_jobs[job_config.random_bucket][job] = job_config
continue
res[job] = job_config
# add to the result a random job from each random bucket, if any
for bucket, jobs_configs in randomization_bucket_jobs.items():
job = random.choice(list(jobs_configs))
print(f"Pick job [{job}] from randomization bucket [{bucket}]")
res[job] = jobs_configs[job]
return res
def get_builds_for_report( def get_builds_for_report(
self, report_name: str, release: bool = False, backport: bool = False self, report_name: str, release: bool = False, backport: bool = False
@ -773,6 +793,16 @@ class CIConfig:
def is_docs_job(cls, job: str) -> bool: def is_docs_job(cls, job: str) -> bool:
return job == JobNames.DOCS_CHECK return job == JobNames.DOCS_CHECK
@staticmethod
def is_required(check_name: str) -> bool:
"""Checks if a check_name is in REQUIRED_CHECKS, including batched jobs"""
_BATCH_REGEXP = re.compile(r"\s+\[[0-9/]+\]$")
if check_name in REQUIRED_CHECKS:
return True
if batch := _BATCH_REGEXP.search(check_name):
return check_name[: batch.start()] in REQUIRED_CHECKS
return False
def validate(self) -> None: def validate(self) -> None:
errors = [] errors = []
for name, build_config in self.build_config.items(): for name, build_config in self.build_config.items():
@ -852,8 +882,6 @@ REQUIRED_CHECKS = [
JobNames.STATELESS_TEST_OLD_ANALYZER_S3_REPLICATED_RELEASE, JobNames.STATELESS_TEST_OLD_ANALYZER_S3_REPLICATED_RELEASE,
] ]
BATCH_REGEXP = re.compile(r"\s+\[[0-9/]+\]$")
CI_CONFIG = CIConfig( CI_CONFIG = CIConfig(
label_configs={ label_configs={
CILabels.DO_NOT_TEST_LABEL: LabelConfig(run_jobs=[JobNames.STYLE_CHECK]), CILabels.DO_NOT_TEST_LABEL: LabelConfig(run_jobs=[JobNames.STYLE_CHECK]),
@ -878,22 +906,13 @@ CI_CONFIG = CIConfig(
JobNames.INTEGRATION_TEST_ASAN_OLD_ANALYZER, JobNames.INTEGRATION_TEST_ASAN_OLD_ANALYZER,
] ]
), ),
CILabels.CI_SET_REDUCED: LabelConfig( CILabels.CI_SET_SYNC: LabelConfig(
run_jobs=[ run_jobs=[
job Build.PACKAGE_ASAN,
for job in JobNames JobNames.STYLE_CHECK,
if not any( JobNames.BUILD_CHECK,
nogo in job JobNames.UNIT_TEST_ASAN,
for nogo in ( JobNames.STATEFUL_TEST_ASAN,
"asan",
"tsan",
"msan",
"ubsan",
"coverage",
# skip build report jobs as not all builds will be done
"build check",
)
)
] ]
), ),
}, },
@ -1202,7 +1221,7 @@ CI_CONFIG = CIConfig(
), ),
JobNames.STATELESS_TEST_AZURE_ASAN: TestConfig( JobNames.STATELESS_TEST_AZURE_ASAN: TestConfig(
Build.PACKAGE_ASAN, Build.PACKAGE_ASAN,
job_config=JobConfig(num_batches=4, **stateless_test_common_params, release_only=True, run_by_ci_option=True), # type: ignore job_config=JobConfig(num_batches=4, **stateless_test_common_params, release_only=True), # type: ignore
), ),
JobNames.STATELESS_TEST_S3_TSAN: TestConfig( JobNames.STATELESS_TEST_S3_TSAN: TestConfig(
Build.PACKAGE_TSAN, Build.PACKAGE_TSAN,
@ -1227,10 +1246,10 @@ CI_CONFIG = CIConfig(
Build.PACKAGE_ASAN, job_config=JobConfig(pr_only=True, random_bucket="upgrade_with_sanitizer", **upgrade_test_common_params) # type: ignore Build.PACKAGE_ASAN, job_config=JobConfig(pr_only=True, random_bucket="upgrade_with_sanitizer", **upgrade_test_common_params) # type: ignore
), ),
JobNames.STRESS_TEST_AZURE_TSAN: TestConfig( JobNames.STRESS_TEST_AZURE_TSAN: TestConfig(
Build.PACKAGE_TSAN, job_config=JobConfig(**stress_test_common_params, release_only=True, run_by_ci_option=True) # type: ignore Build.PACKAGE_TSAN, job_config=JobConfig(**stress_test_common_params, release_only=True) # type: ignore
), ),
JobNames.STRESS_TEST_AZURE_MSAN: TestConfig( JobNames.STRESS_TEST_AZURE_MSAN: TestConfig(
Build.PACKAGE_MSAN, job_config=JobConfig(**stress_test_common_params, release_only=True, run_by_ci_option=True) # type: ignore Build.PACKAGE_MSAN, job_config=JobConfig(**stress_test_common_params, release_only=True) # type: ignore
), ),
JobNames.UPGRADE_TEST_TSAN: TestConfig( JobNames.UPGRADE_TEST_TSAN: TestConfig(
Build.PACKAGE_TSAN, job_config=JobConfig(pr_only=True, random_bucket="upgrade_with_sanitizer", **upgrade_test_common_params) # type: ignore Build.PACKAGE_TSAN, job_config=JobConfig(pr_only=True, random_bucket="upgrade_with_sanitizer", **upgrade_test_common_params) # type: ignore
@ -1360,15 +1379,6 @@ CI_CONFIG = CIConfig(
CI_CONFIG.validate() CI_CONFIG.validate()
def is_required(check_name: str) -> bool:
"""Checks if a check_name is in REQUIRED_CHECKS, including batched jobs"""
if check_name in REQUIRED_CHECKS:
return True
if batch := BATCH_REGEXP.search(check_name):
return check_name[: batch.start()] in REQUIRED_CHECKS
return False
@dataclass @dataclass
class CheckDescription: class CheckDescription:
name: str name: str
@ -1380,6 +1390,11 @@ class CheckDescription:
CHECK_DESCRIPTIONS = [ CHECK_DESCRIPTIONS = [
CheckDescription(
"PR Check",
"Checks correctness of the PR's body",
lambda x: x == "PR Check",
),
CheckDescription( CheckDescription(
StatusNames.SYNC, StatusNames.SYNC,
"If it fails, ask a maintainer for help", "If it fails, ask a maintainer for help",

228
tests/ci/ci_settings.py Normal file
View File

@ -0,0 +1,228 @@
import re
from dataclasses import dataclass, asdict
from typing import Optional, List, Dict, Any, Iterable
from ci_utils import normalize_string
from ci_config import CILabels, CI_CONFIG, JobConfig, JobNames
from git_helper import Runner as GitRunner, GIT_PREFIX
from pr_info import PRInfo
# pylint: disable=too-many-return-statements
@dataclass
class CiSettings:
# job will be included in the run if any keyword from the list matches job name
include_keywords: Optional[List[str]] = None
# job will be excluded in the run if any keyword from the list matches job name
exclude_keywords: Optional[List[str]] = None
# list of specified preconfigured ci sets to run
ci_sets: Optional[List[str]] = None
# list of specified jobs to run
ci_jobs: Optional[List[str]] = None
# batches to run for all multi-batch jobs
job_batches: Optional[List[int]] = None
do_not_test: bool = False
no_ci_cache: bool = False
upload_all: bool = False
no_merge_commit: bool = False
def as_dict(self) -> Dict[str, Any]:
return asdict(self)
@staticmethod
def create_from_run_config(run_config: Dict[str, Any]) -> "CiSettings":
return CiSettings(**run_config["ci_settings"])
@staticmethod
def create_from_pr_message(
debug_message: Optional[str], update_from_api: bool
) -> "CiSettings":
"""
Creates CiSettings instance based on tags found in PR body and/or commit message
@commit_message - may be provided directly for debugging purposes, otherwise it will be retrieved from git.
"""
res = CiSettings()
pr_info = PRInfo()
if (
not pr_info.is_pr and not debug_message
): # if commit_message is provided it's test/debug scenario - do not return
# CI options can be configured in PRs only
# if debug_message is provided - it's a test
return res
message = debug_message or GitRunner(set_cwd_to_git_root=True).run(
f"{GIT_PREFIX} log {pr_info.sha} --format=%B -n 1"
)
# CI setting example we need to match with re:
# - [x] <!---ci_exclude_tsan|msan|ubsan|coverage--> Exclude: All with TSAN, MSAN, UBSAN, Coverage
pattern = r"(#|- \[x\] +<!---)([|\w]+)"
matches = [match[-1] for match in re.findall(pattern, message)]
print(f"CI tags from commit message: [{matches}]")
if not debug_message: # to be skipped if debug/test
pr_info = PRInfo(
pr_event_from_api=update_from_api
) # Fetch updated PR body from GH API
matches_pr = [match[-1] for match in re.findall(pattern, pr_info.body)]
print(f"CI tags from PR body: [{matches_pr}]")
matches = list(set(matches + matches_pr))
if "do not test" in pr_info.labels:
# do_not_test could be set in GH labels
res.do_not_test = True
for match in matches:
if match.startswith("job_"):
if not res.ci_jobs:
res.ci_jobs = []
res.ci_jobs.append(match.removeprefix("job_"))
elif match.startswith("ci_set_") and match in CILabels:
if not res.ci_sets:
res.ci_sets = []
res.ci_sets.append(match)
elif match.startswith("ci_include_"):
if not res.include_keywords:
res.include_keywords = []
res.include_keywords.append(
normalize_string(match.removeprefix("ci_include_"))
)
elif match.startswith("ci_exclude_"):
if not res.exclude_keywords:
res.exclude_keywords = []
keywords = match.removeprefix("ci_exclude_").split("|")
res.exclude_keywords += [
normalize_string(keyword) for keyword in keywords
]
elif match == CILabels.NO_CI_CACHE:
res.no_ci_cache = True
print("NOTE: CI Cache will be disabled")
elif match == CILabels.UPLOAD_ALL_ARTIFACTS:
res.upload_all = True
print("NOTE: All binary artifacts will be uploaded")
elif match == CILabels.DO_NOT_TEST_LABEL:
res.do_not_test = True
elif match == CILabels.NO_MERGE_COMMIT:
res.no_merge_commit = True
print("NOTE: Merge Commit will be disabled")
elif match.startswith("batch_"):
batches = []
try:
batches = [
int(batch) for batch in match.removeprefix("batch_").split("_")
]
except Exception:
print(f"ERROR: failed to parse commit tag [{match}] - skip")
if batches:
if not res.job_batches:
res.job_batches = []
res.job_batches += batches
res.job_batches = list(set(res.job_batches))
else:
print(
f"WARNING: Invalid tag in commit message or PR body [{match}] - skip"
)
return res
def _check_if_selected(
self,
job: str,
job_config: JobConfig,
is_release: bool,
is_pr: bool,
labels: Iterable[str],
) -> bool: # type: ignore #too-many-return-statements
if self.do_not_test:
label_config = CI_CONFIG.get_label_config(CILabels.DO_NOT_TEST_LABEL)
assert label_config, f"Unknown tag [{CILabels.DO_NOT_TEST_LABEL}]"
if job in label_config.run_jobs:
print(
f"Job [{job}] present in CI set [{CILabels.DO_NOT_TEST_LABEL}] - pass"
)
return True
return False
if job_config.run_by_label:
if job_config.run_by_label in labels and is_pr:
print(
f"Job [{job}] selected by GH label [{job_config.run_by_label}] - pass"
)
return True
else:
return False
if self.exclude_keywords:
for keyword in self.exclude_keywords:
if keyword in normalize_string(job):
print(f"Job [{job}] matches Exclude keyword [{keyword}] - deny")
return False
to_deny = False
if self.include_keywords:
if job == JobNames.STYLE_CHECK:
# never exclude Style Check by include keywords
return True
for keyword in self.include_keywords:
if keyword in normalize_string(job):
print(f"Job [{job}] matches Include keyword [{keyword}] - pass")
return True
to_deny = True
if self.ci_sets:
for tag in self.ci_sets:
label_config = CI_CONFIG.get_label_config(tag)
assert label_config, f"Unknown tag [{tag}]"
if job in label_config.run_jobs:
print(f"Job [{job}] present in CI set [{tag}] - pass")
return True
to_deny = True
if self.ci_jobs:
if job in self.ci_jobs:
print(f"Job [{job}] set by CI #job_ tags [{self.ci_jobs}] - pass")
return True
to_deny = True
if job_config.release_only and not is_release:
return False
elif job_config.pr_only and not is_pr:
return False
return not to_deny
def apply(
self,
job_configs: Dict[str, JobConfig],
is_release: bool,
is_pr: bool,
labels: Iterable[str],
) -> Dict[str, JobConfig]:
"""
Apply CI settings from pr body
"""
res = {}
for job, job_config in job_configs.items():
if self._check_if_selected(
job, job_config, is_release=is_release, is_pr=is_pr, labels=labels
):
res[job] = job_config
for job in list(res):
parent_jobs = CI_CONFIG.get_job_parents(job)
for parent_job in parent_jobs:
if parent_job not in res:
print(f"Job [{job}] requires [{parent_job}] - add")
res[parent_job] = job_configs[parent_job]
for job, job_config in res.items():
batches = []
for batch in range(job_config.num_batches):
if not self.job_batches or batch in self.job_batches:
batches.append(batch)
job_config.batches = batches
return res

View File

@ -28,16 +28,10 @@ def is_hex(s):
def normalize_string(string: str) -> str: def normalize_string(string: str) -> str:
lowercase_string = string.lower() res = string.lower()
normalized_string = ( for r in ((" ", "_"), ("(", "_"), (")", "_"), (",", "_"), ("/", "_"), ("-", "_")):
lowercase_string.replace(" ", "_") res = res.replace(*r)
.replace("-", "_") return res
.replace("/", "_")
.replace("(", "")
.replace(")", "")
.replace(",", "")
)
return normalized_string
class GHActions: class GHActions:

View File

@ -17,7 +17,7 @@ from github.GithubObject import NotSet
from github.IssueComment import IssueComment from github.IssueComment import IssueComment
from github.Repository import Repository from github.Repository import Repository
from ci_config import CHECK_DESCRIPTIONS, CheckDescription, StatusNames, is_required from ci_config import CHECK_DESCRIPTIONS, CheckDescription, StatusNames, CIConfig
from env_helper import GITHUB_REPOSITORY, GITHUB_UPSTREAM_REPOSITORY, TEMP_PATH from env_helper import GITHUB_REPOSITORY, GITHUB_UPSTREAM_REPOSITORY, TEMP_PATH
from lambda_shared_package.lambda_shared.pr import Labels from lambda_shared_package.lambda_shared.pr import Labels
from pr_info import PRInfo from pr_info import PRInfo
@ -443,7 +443,7 @@ def update_mergeable_check(commit: Commit, pr_info: PRInfo, check_name: str) ->
"check if the check_name in REQUIRED_CHECKS and then trigger update" "check if the check_name in REQUIRED_CHECKS and then trigger update"
not_run = ( not_run = (
pr_info.labels.intersection({Labels.SKIP_MERGEABLE_CHECK, Labels.RELEASE}) pr_info.labels.intersection({Labels.SKIP_MERGEABLE_CHECK, Labels.RELEASE})
or not is_required(check_name) or not CIConfig.is_required(check_name)
or pr_info.release_pr or pr_info.release_pr
or pr_info.number == 0 or pr_info.number == 0
) )
@ -465,7 +465,9 @@ def trigger_mergeable_check(
workflow_failed: bool = False, workflow_failed: bool = False,
) -> StatusType: ) -> StatusType:
"""calculate and update StatusNames.MERGEABLE""" """calculate and update StatusNames.MERGEABLE"""
required_checks = [status for status in statuses if is_required(status.context)] required_checks = [
status for status in statuses if CIConfig.is_required(status.context)
]
mergeable_status = None mergeable_status = None
for status in statuses: for status in statuses:

View File

@ -2,7 +2,6 @@
import json import json
import logging import logging
import os import os
import re
from typing import Dict, List, Set, Union from typing import Dict, List, Set, Union
from urllib.parse import quote from urllib.parse import quote
@ -312,12 +311,6 @@ class PRInfo:
@property @property
def is_release(self) -> bool: def is_release(self) -> bool:
return self.number == 0 and bool(
re.match(r"^2[1-9]\.[1-9][0-9]*$", self.head_ref)
)
@property
def is_release_branch(self) -> bool:
return self.number == 0 and not self.is_merge_queue return self.number == 0 and not self.is_merge_queue
@property @property

View File

@ -7,7 +7,7 @@ from typing import Dict, Set
import unittest import unittest
from ci_config import Build, JobNames from ci_config import Build, JobNames
from s3_helper import S3Helper from s3_helper import S3Helper
from ci import CiCache from ci_cache import CiCache
from digest_helper import JOB_DIGEST_LEN from digest_helper import JOB_DIGEST_LEN
from commit_status_helper import CommitStatusData from commit_status_helper import CommitStatusData
from env_helper import S3_BUILDS_BUCKET, TEMP_PATH from env_helper import S3_BUILDS_BUCKET, TEMP_PATH

View File

@ -3,8 +3,8 @@
# type: ignore # type: ignore
import unittest import unittest
from ci import CiOptions from ci_settings import CiSettings
from pr_info import PRInfo from ci_config import JobConfig
_TEST_BODY_1 = """ _TEST_BODY_1 = """
#### Run only: #### Run only:
@ -54,6 +54,14 @@ _TEST_JOB_LIST = [
"Fast test", "Fast test",
"package_release", "package_release",
"package_asan", "package_asan",
"package_aarch64",
"package_release_coverage",
"package_debug",
"package_tsan",
"package_msan",
"package_ubsan",
"binary_release",
"fuzzers",
"Docker server image", "Docker server image",
"Docker keeper image", "Docker keeper image",
"Install packages (amd64)", "Install packages (amd64)",
@ -129,10 +137,12 @@ _TEST_JOB_LIST = [
"Bugfix validation", "Bugfix validation",
] ]
_TEST_JOB_LIST_2 = ["Style check", "Fast test", "fuzzers"]
class TestCIOptions(unittest.TestCase): class TestCIOptions(unittest.TestCase):
def test_pr_body_parsing(self): def test_pr_body_parsing(self):
ci_options = CiOptions.create_from_pr_message( ci_options = CiSettings.create_from_pr_message(
_TEST_BODY_1, update_from_api=False _TEST_BODY_1, update_from_api=False
) )
self.assertFalse(ci_options.do_not_test) self.assertFalse(ci_options.do_not_test)
@ -144,7 +154,7 @@ class TestCIOptions(unittest.TestCase):
def test_options_applied(self): def test_options_applied(self):
self.maxDiff = None self.maxDiff = None
ci_options = CiOptions.create_from_pr_message( ci_options = CiSettings.create_from_pr_message(
_TEST_BODY_2, update_from_api=False _TEST_BODY_2, update_from_api=False
) )
self.assertCountEqual( self.assertCountEqual(
@ -155,24 +165,33 @@ class TestCIOptions(unittest.TestCase):
ci_options.exclude_keywords, ci_options.exclude_keywords,
["tsan", "foobar", "aarch64", "analyzer", "s3_storage", "coverage"], ["tsan", "foobar", "aarch64", "analyzer", "s3_storage", "coverage"],
) )
jobs_to_do = list(_TEST_JOB_LIST)
jobs_to_skip = [] jobs_configs = {job: JobConfig() for job in _TEST_JOB_LIST}
job_params = { jobs_configs[
"Stateless tests (azure, asan)": { "fuzzers"
"batches": list(range(3)), ].run_by_label = (
"num_batches": 3, "TEST_LABEL" # check "fuzzers" appears in the result due to the label
"run_by_ci_option": True, )
} jobs_configs[
} "Integration tests (asan)"
jobs_to_do, jobs_to_skip, job_params = ci_options.apply( ].release_only = (
jobs_to_do, jobs_to_skip, job_params, PRInfo() True # still must be included as it's set with include keywords
)
filtered_jobs = list(
ci_options.apply(
jobs_configs, is_release=False, is_pr=True, labels=["TEST_LABEL"]
)
) )
self.assertCountEqual( self.assertCountEqual(
jobs_to_do, filtered_jobs,
[ [
"Style check", "Style check",
"fuzzers",
"package_release", "package_release",
"package_asan", "package_asan",
"package_debug",
"package_msan",
"package_ubsan",
"Stateless tests (asan)", "Stateless tests (asan)",
"Stateless tests (azure, asan)", "Stateless tests (azure, asan)",
"Stateless tests flaky check (asan)", "Stateless tests flaky check (asan)",
@ -187,54 +206,88 @@ class TestCIOptions(unittest.TestCase):
) )
def test_options_applied_2(self): def test_options_applied_2(self):
jobs_configs = {job: JobConfig() for job in _TEST_JOB_LIST_2}
jobs_configs["Style check"].release_only = True
jobs_configs["Fast test"].pr_only = True
jobs_configs["fuzzers"].run_by_label = "TEST_LABEL"
# no settings are set
filtered_jobs = list(
CiSettings().apply(jobs_configs, is_release=False, is_pr=True, labels=[])
)
self.assertCountEqual(
filtered_jobs,
[
"Fast test",
],
)
filtered_jobs = list(
CiSettings().apply(jobs_configs, is_release=True, is_pr=False, labels=[])
)
self.assertCountEqual(
filtered_jobs,
[
"Style check",
],
)
def test_options_applied_3(self):
ci_settings = CiSettings()
ci_settings.include_keywords = ["Style"]
jobs_configs = {job: JobConfig() for job in _TEST_JOB_LIST_2}
jobs_configs["Style check"].release_only = True
jobs_configs["Fast test"].pr_only = True
# no settings are set
filtered_jobs = list(
ci_settings.apply(
jobs_configs, is_release=False, is_pr=True, labels=["TEST_LABEL"]
)
)
self.assertCountEqual(
filtered_jobs,
[
"Style check",
],
)
ci_settings.include_keywords = ["Fast"]
filtered_jobs = list(
ci_settings.apply(
jobs_configs, is_release=True, is_pr=False, labels=["TEST_LABEL"]
)
)
self.assertCountEqual(
filtered_jobs,
[
"Style check",
],
)
def test_options_applied_4(self):
self.maxDiff = None self.maxDiff = None
ci_options = CiOptions.create_from_pr_message( ci_options = CiSettings.create_from_pr_message(
_TEST_BODY_3, update_from_api=False _TEST_BODY_3, update_from_api=False
) )
self.assertCountEqual(ci_options.include_keywords, ["analyzer"]) self.assertCountEqual(ci_options.include_keywords, ["analyzer"])
self.assertIsNone(ci_options.exclude_keywords) self.assertIsNone(ci_options.exclude_keywords)
jobs_to_do = list(_TEST_JOB_LIST) jobs_configs = {job: JobConfig() for job in _TEST_JOB_LIST}
jobs_to_skip = [] jobs_configs[
job_params = {} "fuzzers"
jobs_to_do, jobs_to_skip, job_params = ci_options.apply( ].run_by_label = "TEST_LABEL" # check "fuzzers" does not appears in the result
jobs_to_do, jobs_to_skip, job_params, PRInfo() jobs_configs["Integration tests (asan)"].release_only = True
filtered_jobs = list(
ci_options.apply(
jobs_configs, is_release=False, is_pr=True, labels=["TEST_LABEL"]
)
) )
self.assertCountEqual( self.assertCountEqual(
jobs_to_do, filtered_jobs,
[ [
"Style check", "Style check",
"Integration tests (asan, old analyzer)", "Integration tests (asan, old analyzer)",
"package_release", "package_release",
"Stateless tests (release, old analyzer, s3, DatabaseReplicated)", "Stateless tests (release, old analyzer, s3, DatabaseReplicated)",
"package_asan", "package_asan",
"fuzzers",
], ],
) )
def test_options_applied_3(self):
self.maxDiff = None
ci_options = CiOptions.create_from_pr_message(
_TEST_BODY_4, update_from_api=False
)
self.assertIsNone(ci_options.include_keywords, None)
self.assertIsNone(ci_options.exclude_keywords, None)
jobs_to_do = list(_TEST_JOB_LIST)
jobs_to_skip = []
job_params = {}
for job in _TEST_JOB_LIST:
if "Stateless" in job:
job_params[job] = {
"batches": list(range(3)),
"num_batches": 3,
"run_by_ci_option": "azure" in job,
}
else:
job_params[job] = {"run_by_ci_option": False}
jobs_to_do, jobs_to_skip, job_params = ci_options.apply(
jobs_to_do, jobs_to_skip, job_params, PRInfo()
)
self.assertNotIn(
"Stateless tests (azure, asan)",
jobs_to_do,
)