diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml index 2471e4f9194..dac1332adc6 100644 --- a/.github/workflows/master.yml +++ b/.github/workflows/master.yml @@ -385,6 +385,22 @@ jobs: test_name: Stateless tests (release, s3 storage) runner_type: func-tester data: ${{ needs.RunConfig.outputs.data }} + FunctionalStatelessTestS3Debug: + needs: [RunConfig, BuilderDebDebug] + if: ${{ !failure() && !cancelled() }} + uses: ./.github/workflows/reusable_test.yml + with: + test_name: Stateless tests (debug, s3 storage) + runner_type: func-tester + data: ${{ needs.RunConfig.outputs.data }} + FunctionalStatelessTestS3Tsan: + needs: [RunConfig, BuilderDebTsan] + if: ${{ !failure() && !cancelled() }} + uses: ./.github/workflows/reusable_test.yml + with: + test_name: Stateless tests (tsan, s3 storage) + runner_type: func-tester + data: ${{ needs.RunConfig.outputs.data }} FunctionalStatelessTestAarch64: needs: [RunConfig, BuilderDebAarch64] if: ${{ !failure() && !cancelled() }} @@ -493,6 +509,55 @@ jobs: test_name: Stateful tests (debug) runner_type: func-tester data: ${{ needs.RunConfig.outputs.data }} + # Parallel replicas + FunctionalStatefulTestDebugParallelReplicas: + needs: [RunConfig, BuilderDebDebug] + if: ${{ !failure() && !cancelled() }} + uses: ./.github/workflows/reusable_test.yml + with: + test_name: Stateful tests (debug, ParallelReplicas) + runner_type: func-tester + data: ${{ needs.RunConfig.outputs.data }} + FunctionalStatefulTestUBsanParallelReplicas: + needs: [RunConfig, BuilderDebUBsan] + if: ${{ !failure() && !cancelled() }} + uses: ./.github/workflows/reusable_test.yml + with: + test_name: Stateful tests (ubsan, ParallelReplicas) + runner_type: func-tester + data: ${{ needs.RunConfig.outputs.data }} + FunctionalStatefulTestMsanParallelReplicas: + needs: [RunConfig, BuilderDebMsan] + if: ${{ !failure() && !cancelled() }} + uses: ./.github/workflows/reusable_test.yml + with: + test_name: Stateful tests (msan, ParallelReplicas) + runner_type: func-tester + data: ${{ needs.RunConfig.outputs.data }} + FunctionalStatefulTestTsanParallelReplicas: + needs: [RunConfig, BuilderDebTsan] + if: ${{ !failure() && !cancelled() }} + uses: ./.github/workflows/reusable_test.yml + with: + test_name: Stateful tests (tsan, ParallelReplicas) + runner_type: func-tester + data: ${{ needs.RunConfig.outputs.data }} + FunctionalStatefulTestAsanParallelReplicas: + needs: [RunConfig, BuilderDebAsan] + if: ${{ !failure() && !cancelled() }} + uses: ./.github/workflows/reusable_test.yml + with: + test_name: Stateful tests (asan, ParallelReplicas) + runner_type: func-tester + data: ${{ needs.RunConfig.outputs.data }} + FunctionalStatefulTestReleaseParallelReplicas: + needs: [RunConfig, BuilderDebRelease] + if: ${{ !failure() && !cancelled() }} + uses: ./.github/workflows/reusable_test.yml + with: + test_name: Stateful tests (release, ParallelReplicas) + runner_type: func-tester + data: ${{ needs.RunConfig.outputs.data }} ############################################################################################## ########################### ClickBench ####################################################### ############################################################################################## @@ -700,6 +765,28 @@ jobs: runner_type: func-tester-aarch64 data: ${{ needs.RunConfig.outputs.data }} ############################################################################################## +############################ SQLLOGIC TEST ################################################### +############################################################################################## + SQLLogicTestRelease: + needs: [RunConfig, BuilderDebRelease] + if: ${{ !failure() && !cancelled() }} + uses: ./.github/workflows/reusable_test.yml + with: + test_name: Sqllogic test (release) + runner_type: func-tester + data: ${{ needs.RunConfig.outputs.data }} +############################################################################################## +##################################### SQL TEST ############################################### +############################################################################################## + SQLTest: + needs: [RunConfig, BuilderDebRelease] + if: ${{ !failure() && !cancelled() }} + uses: ./.github/workflows/reusable_test.yml + with: + test_name: SQLTest + runner_type: fuzzer-unit-tester + data: ${{ needs.RunConfig.outputs.data }} +############################################################################################## ###################################### SQLANCER FUZZERS ###################################### ############################################################################################## SQLancerTestRelease: @@ -732,6 +819,8 @@ jobs: - FunctionalStatelessTestTsan - FunctionalStatelessTestMsan - FunctionalStatelessTestUBsan + - FunctionalStatelessTestS3Debug + - FunctionalStatelessTestS3Tsan - FunctionalStatefulTestDebug - FunctionalStatefulTestRelease - FunctionalStatefulTestAarch64 @@ -739,6 +828,12 @@ jobs: - FunctionalStatefulTestTsan - FunctionalStatefulTestMsan - FunctionalStatefulTestUBsan + - FunctionalStatefulTestDebugParallelReplicas + - FunctionalStatefulTestUBsanParallelReplicas + - FunctionalStatefulTestMsanParallelReplicas + - FunctionalStatefulTestTsanParallelReplicas + - FunctionalStatefulTestAsanParallelReplicas + - FunctionalStatefulTestReleaseParallelReplicas - StressTestDebug - StressTestAsan - StressTestTsan @@ -764,6 +859,8 @@ jobs: - UnitTestsReleaseClang - SQLancerTestRelease - SQLancerTestDebug + - SQLLogicTestRelease + - SQLTest runs-on: [self-hosted, style-checker] steps: - name: Check out repository code diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index 09e2d6dbb97..cf31738643b 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -1002,7 +1002,7 @@ jobs: ####################################### libFuzzer ########################################### ############################################################################################# libFuzzer: - if: ${{ !failure() && !cancelled() && contains(github.event.pull_request.labels.*.name, 'libFuzzer') }} + if: ${{ !failure() && !cancelled() }} needs: [RunConfig, StyleCheck] uses: ./.github/workflows/libfuzzer.yml with: diff --git a/.github/workflows/reusable_build.yml b/.github/workflows/reusable_build.yml index 2371579692f..6be9d30175e 100644 --- a/.github/workflows/reusable_build.yml +++ b/.github/workflows/reusable_build.yml @@ -85,6 +85,7 @@ jobs: run: | python3 "$GITHUB_WORKSPACE/tests/ci/ci.py" --infile ${{ toJson(inputs.data) }} --post --job-name '${{inputs.build_name}}' - name: Mark as done + if: ${{ !cancelled() }} run: | python3 "$GITHUB_WORKSPACE/tests/ci/ci.py" --infile ${{ toJson(inputs.data) }} --mark-success --job-name '${{inputs.build_name}}' - name: Clean diff --git a/.github/workflows/reusable_test.yml b/.github/workflows/reusable_test.yml index 749f64d434e..e30ef863a86 100644 --- a/.github/workflows/reusable_test.yml +++ b/.github/workflows/reusable_test.yml @@ -107,6 +107,7 @@ jobs: run: | python3 "$GITHUB_WORKSPACE/tests/ci/ci.py" --infile ${{ toJson(inputs.data) }} --post --job-name '${{inputs.test_name}}' - name: Mark as done + if: ${{ !cancelled() }} run: | python3 "$GITHUB_WORKSPACE/tests/ci/ci.py" --infile ${{ toJson(inputs.data) }} --mark-success --job-name '${{inputs.test_name}}' --batch ${{matrix.batch}} - name: Clean diff --git a/tests/ci/ci.py b/tests/ci/ci.py index 622b7bb005a..5c33d4e02a8 100644 --- a/tests/ci/ci.py +++ b/tests/ci/ci.py @@ -1,5 +1,8 @@ import argparse import concurrent.futures +from copy import deepcopy +from dataclasses import asdict, dataclass +from enum import Enum import json import logging import os @@ -7,16 +10,14 @@ import re import subprocess import sys import time -from dataclasses import asdict, dataclass -from enum import Enum from pathlib import Path from typing import Any, Dict, List, Optional, Sequence, Union import docker_images_helper import upload_result_helper from build_check import get_release_or_pr -from ci_config import CI_CONFIG, Build, JobNames, Labels -from ci_utils import GHActions, is_hex +from ci_config import CI_CONFIG, Build, Labels, JobNames +from ci_utils import GHActions, is_hex, normalize_string from clickhouse_helper import ( CiLogsCredentials, ClickHouseHelper, @@ -48,7 +49,7 @@ from git_helper import GIT_PREFIX, Git from git_helper import Runner as GitRunner from github import Github from pr_info import PRInfo -from report import SUCCESS, BuildResult, JobReport +from report import ERROR, SUCCESS, BuildResult, JobReport from s3_helper import S3Helper from version_helper import get_version_from_repo @@ -88,6 +89,7 @@ class CiCache: class RecordType(Enum): SUCCESSFUL = "successful" PENDING = "pending" + FAILED = "failed" @dataclass class Record: @@ -249,6 +251,13 @@ class CiCache: ) return record + def print_status(self): + for record_type in self.RecordType: + GHActions.print_in_group( + f"Cache records: [{record_type}]", list(self.records[record_type]) + ) + return self + def update(self): """ Pulls cache records from s3. Only records name w/o content. @@ -260,9 +269,6 @@ class CiCache: path = self.cache_s3_paths[job_type] records = self.s3.list_prefix(f"{path}{prefix}", S3_BUILDS_BUCKET) records = [record.split("/")[-1] for record in records] - GHActions.print_in_group( - f"Cache records: [{record_type}] in [{job_type.value}]", records - ) for file in records: record = self._parse_record_file_name( record_type=record_type, file_name=file @@ -384,6 +390,9 @@ class CiCache: if record_type == self.RecordType.SUCCESSFUL: assert isinstance(status, CommitStatusData) status.dump_to_file(record_file) + elif record_type == self.RecordType.FAILED: + assert isinstance(status, CommitStatusData) + status.dump_to_file(record_file) elif record_type == self.RecordType.PENDING: assert isinstance(status, PendingState) with open(record_file, "w") as json_file: @@ -488,6 +497,16 @@ class CiCache: self.RecordType.SUCCESSFUL, job, batch, num_batches, release_branch ) + def is_failed( + self, job: str, batch: int, num_batches: int, release_branch: bool + ) -> bool: + """ + checks if a given job have already been done with failure + """ + return self.exist( + self.RecordType.FAILED, job, batch, num_batches, release_branch + ) + def is_pending( self, job: str, batch: int, num_batches: int, release_branch: bool ) -> bool: @@ -495,8 +514,9 @@ class CiCache: check pending record in the cache for a given job @release_branch - checks that "release" attribute is set for a record """ - if self.is_successful(job, batch, num_batches, release_branch): - # successful record is present - not pending + if self.is_successful( + job, batch, num_batches, release_branch + ) or self.is_failed(job, batch, num_batches, release_branch): return False return self.exist( @@ -524,6 +544,27 @@ class CiCache: release_branch, ) + def push_failed( + self, + job: str, + batch: int, + num_batches: int, + job_status: CommitStatusData, + release_branch: bool = False, + ) -> None: + """ + Pushes a cache record of type Failed (CommitStatusData) + @release_branch adds "release" attribute to a record + """ + self.push( + self.RecordType.FAILED, + job, + [batch], + num_batches, + job_status, + release_branch, + ) + def push_pending( self, job: str, batches: List[int], num_batches: int, release_branch: bool ) -> None: @@ -591,46 +632,87 @@ class CiCache: bucket=S3_BUILDS_BUCKET, file_path=result_json_path, s3_path=s3_path ) - # def await_jobs(self, jobs_with_params: Dict[str, Dict[str, Any]]) -> List[str]: - # if not jobs_with_params: - # return [] - # print(f"Start awaiting jobs [{list(jobs_with_params)}]") - # poll_interval_sec = 180 - # start_at = int(time.time()) - # TIMEOUT = 3000 - # expired_sec = 0 - # done_jobs = [] # type: List[str] - # while expired_sec < TIMEOUT and jobs_with_params: - # time.sleep(poll_interval_sec) - # self.update() - # pending_finished: List[str] = [] - # for job_name in jobs_with_params: - # num_batches = jobs_with_params[job_name]["num_batches"] - # for batch in jobs_with_params[job_name]["batches"]: - # if self.is_pending(job_name, batch, num_batches): - # continue - # print( - # f"Job [{job_name}_[{batch}/{num_batches}]] is not pending anymore" - # ) - # pending_finished.append(job_name) - # if pending_finished: - # # restart timer - # start_at = int(time.time()) - # expired_sec = 0 - # # remove finished jobs from awaiting list - # for job in pending_finished: - # del jobs_with_params[job] - # done_jobs.append(job) - # else: - # expired_sec = int(time.time()) - start_at - # print(f" ...awaiting continues... time left [{TIMEOUT - expired_sec}]") - # if done_jobs: - # print( - # f"Awaiting OK. Left jobs: [{list(jobs_with_params)}], finished jobs: [{done_jobs}]" - # ) - # else: - # print("Awaiting FAILED. No job has finished.") - # return done_jobs + def await_jobs( + self, jobs_with_params: Dict[str, Dict[str, Any]], is_release_branch: bool + ) -> Dict[str, List[int]]: + """ + await pending jobs to be finished + @jobs_with_params - jobs to await. {JOB_NAME: {"batches": [BATCHES...], "num_batches": NUM_BATCHES}} + returns successfully finished jobs: {JOB_NAME: [BATCHES...]} + """ + if not jobs_with_params: + return {} + poll_interval_sec = 300 + TIMEOUT = 3600 + await_finished: Dict[str, List[int]] = {} + round_cnt = 0 + while len(jobs_with_params) > 5 and round_cnt < 3: + round_cnt += 1 + GHActions.print_in_group( + f"Wait pending jobs, round [{round_cnt}]:", list(jobs_with_params) + ) + # this is initial approach to wait pending jobs: + # start waiting for the next TIMEOUT seconds if there are more than X(=5) jobs to wait + # wait TIMEOUT seconds in rounds. Y(=3) is the max number of rounds + expired_sec = 0 + start_at = int(time.time()) + while expired_sec < TIMEOUT and jobs_with_params: + time.sleep(poll_interval_sec) + self.update() + jobs_with_params_copy = deepcopy(jobs_with_params) + for job_name in jobs_with_params: + num_batches = jobs_with_params[job_name]["num_batches"] + job_config = CI_CONFIG.get_job_config(job_name) + for batch in jobs_with_params[job_name]["batches"]: + if self.is_pending( + job_name, + batch, + num_batches, + release_branch=is_release_branch + and job_config.required_on_release_branch, + ): + continue + print( + f"Job [{job_name}_[{batch}/{num_batches}]] is not pending anymore" + ) + + # some_job_ready = True + jobs_with_params_copy[job_name]["batches"].remove(batch) + if not jobs_with_params_copy[job_name]["batches"]: + del jobs_with_params_copy[job_name] + + if not self.is_successful( + job_name, + batch, + num_batches, + release_branch=is_release_branch + and job_config.required_on_release_branch, + ): + print( + f"NOTE: Job [{job_name}:{batch}] finished but no success - remove from awaiting list, do not add to ready" + ) + continue + if job_name in await_finished: + await_finished[job_name].append(batch) + else: + await_finished[job_name] = [batch] + jobs_with_params = jobs_with_params_copy + expired_sec = int(time.time()) - start_at + print( + f"...awaiting continues... seconds left [{TIMEOUT - expired_sec}]" + ) + if await_finished: + GHActions.print_in_group( + "Finished jobs:", + [f"{job}:{batches}" for job, batches in await_finished.items()], + ) + else: + print("Awaiting FAILED. No job has finished successfully.") + GHActions.print_in_group( + "Remaining jobs:", + [f"{job}:{params['batches']}" for job, params in jobs_with_params.items()], + ) + return await_finished def get_check_name(check_name: str, batch: int, num_batches: int) -> str: @@ -832,7 +914,10 @@ def _pre_action(s3, indata, pr_info): ci_cache = CiCache(s3, indata["jobs_data"]["digests"]) # for release/master branches reports must be from the same branches - report_prefix = pr_info.head_ref if pr_info.number == 0 else "" + report_prefix = normalize_string(pr_info.head_ref) if pr_info.number == 0 else "" + print( + f"Use report prefix [{report_prefix}], pr_num [{pr_info.number}], head_ref [{pr_info.head_ref}]" + ) reports_files = ci_cache.download_build_reports(file_prefix=report_prefix) print(f"Pre action done. Report files [{reports_files}] have been downloaded") @@ -883,8 +968,19 @@ def _mark_success_action( job, batch, num_batches, job_status, pr_info.is_release_branch() ) print(f"Job [{job}] is ok") - elif job_status: - print(f"Job [{job}] is not ok, status [{job_status.status}]") + elif job_status and not job_status.is_ok(): + ci_cache.push_failed( + job, batch, num_batches, job_status, pr_info.is_release_branch() + ) + print(f"Job [{job}] is failed with status [{job_status.status}]") + else: + job_status = CommitStatusData( + description="dummy description", status=ERROR, report_url="dummy url" + ) + ci_cache.push_failed( + job, batch, num_batches, job_status, pr_info.is_release_branch() + ) + print(f"No CommitStatusData for [{job}], push dummy failure to ci_cache") def _print_results(result: Any, outfile: Optional[str], pretty: bool = False) -> None: @@ -992,8 +1088,8 @@ def _configure_jobs( jobs_to_do: List[str] = [] jobs_to_skip: List[str] = [] digests: Dict[str, str] = {} - print("::group::Job Digests") + print("::group::Job Digests") for job in CI_CONFIG.job_generator(): digest = job_digester.get_job_digest(CI_CONFIG.get_digest_config(job)) digests[job] = digest @@ -1003,7 +1099,8 @@ def _configure_jobs( ## b. check what we need to run ci_cache = None if not ci_cache_disabled: - ci_cache = CiCache(s3, digests) + ci_cache = CiCache(s3, digests).update() + ci_cache.print_status() jobs_to_wait: Dict[str, Dict[str, Any]] = {} @@ -1012,10 +1109,13 @@ def _configure_jobs( job_config = CI_CONFIG.get_job_config(job) num_batches: int = job_config.num_batches batches_to_do: List[int] = [] + add_to_skip = False for batch in range(num_batches): # type: ignore if job_config.pr_only and pr_info.is_release_branch(): continue + if job_config.release_only and not pr_info.is_release_branch(): + continue if job_config.run_by_label: # this job controlled by label, add to todo if its label is set in pr if job_config.run_by_label in pr_info.labels: @@ -1036,7 +1136,13 @@ def _configure_jobs( batches_to_do.append(batch) # check if it's pending in the cache - if ci_cache.is_pending(job, batch, num_batches, release_branch=False): + if ci_cache.is_pending( + job, + batch, + num_batches, + release_branch=pr_info.is_release_branch() + and job_config.required_on_release_branch, + ): if job in jobs_to_wait: jobs_to_wait[job]["batches"].append(batch) else: @@ -1044,10 +1150,12 @@ def _configure_jobs( "batches": [batch], "num_batches": num_batches, } + else: + add_to_skip = True if batches_to_do: jobs_to_do.append(job) - elif not job_config.run_by_label: + elif add_to_skip: # treat job as being skipped only if it's controlled by digest jobs_to_skip.append(job) jobs_params[job] = { @@ -1119,49 +1227,64 @@ def _configure_jobs( "digests": digests, "jobs_to_do": jobs_to_do, "jobs_to_skip": jobs_to_skip, - "jobs_to_wait": jobs_to_wait, + "jobs_to_wait": { + job: params for job, params in jobs_to_wait.items() if job in jobs_to_do + }, "jobs_params": { job: params for job, params in jobs_params.items() if job in jobs_to_do }, } +def _create_gh_status( + commit: Any, job: str, batch: int, num_batches: int, job_status: CommitStatusData +) -> None: + print(f"Going to re-create GH status for job [{job}]") + assert job_status.status == SUCCESS, "BUG!" + commit.create_status( + state=job_status.status, + target_url=job_status.report_url, + description=format_description( + f"Reused from [{job_status.pr_num}-{job_status.sha[0:8]}]: " + f"{job_status.description}" + ), + context=get_check_name(job, batch=batch, num_batches=num_batches), + ) + + def _update_gh_statuses_action(indata: Dict, s3: S3Helper) -> None: if indata["ci_flags"][Labels.NO_CI_CACHE]: print("CI cache is disabled - skip restoring commit statuses from CI cache") return job_digests = indata["jobs_data"]["digests"] - ci_cache = CiCache(s3, job_digests).update().fetch_records_data() + jobs_to_skip = indata["jobs_data"]["jobs_to_skip"] + jobs_to_do = indata["jobs_data"]["jobs_to_do"] + ci_cache = CiCache(s3, job_digests).update().fetch_records_data().print_status() # create GH status pr_info = PRInfo() commit = get_commit(Github(get_best_robot_token(), per_page=100), pr_info.sha) - def _run_create_status(job: str, batch: int, num_batches: int) -> None: + def _concurrent_create_status(job: str, batch: int, num_batches: int) -> None: job_status = ci_cache.get_successful(job, batch, num_batches) if not job_status: return - print(f"Going to re-create GH status for job [{job}] sha [{pr_info.sha}]") - assert job_status.status == SUCCESS, "BUG!" - commit.create_status( - state=job_status.status, - target_url=job_status.report_url, - description=format_description( - f"Reused from [{job_status.pr_num}-{job_status.sha[0:8]}]: " - f"{job_status.description}" - ), - context=get_check_name(job, batch=batch, num_batches=num_batches), - ) + _create_gh_status(commit, job, batch, num_batches, job_status) with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] for job in job_digests: + if job not in jobs_to_skip or job not in jobs_to_do: + # no need to create status for job that are not supposed to be executed + continue if CI_CONFIG.is_build_job(job): # no GH status for build jobs continue num_batches = CI_CONFIG.get_job_config(job).num_batches for batch in range(num_batches): - future = executor.submit(_run_create_status, job, batch, num_batches) + future = executor.submit( + _concurrent_create_status, job, batch, num_batches + ) futures.append(future) done, _ = concurrent.futures.wait(futures) for future in done: @@ -1194,7 +1317,7 @@ def _upload_build_artifacts( ( get_release_or_pr(pr_info, get_version_from_repo())[1], pr_info.sha, - CI_CONFIG.normalize_string(build_name), + normalize_string(build_name), "performance.tar.zst", ) ) @@ -1509,30 +1632,51 @@ def main() -> int: if not args.skip_jobs and pr_info.has_changes_in_documentation_only(): _update_config_for_docs_only(jobs_data) - # TODO: await pending jobs - # wait for pending jobs to be finished, await_jobs is a long blocking call if any job has to be awaited - # awaited_jobs = ci_cache.await_jobs(jobs_data.get("jobs_to_wait", {})) - # for job in awaited_jobs: - # jobs_to_do = jobs_data["jobs_to_do"] - # if job in jobs_to_do: - # jobs_to_do.remove(job) - # else: - # assert False, "BUG" - - # set planned jobs as pending in the CI cache if on the master - if pr_info.is_master() and not args.skip_jobs: + if not args.skip_jobs: ci_cache = CiCache(s3, jobs_data["digests"]) - for job in jobs_data["jobs_to_do"]: - config = CI_CONFIG.get_job_config(job) - if config.run_always or config.run_by_label: - continue - job_params = jobs_data["jobs_params"][job] - ci_cache.push_pending( - job, - job_params["batches"], - config.num_batches, - release_branch=pr_info.is_release_branch(), + + if ( + pr_info.is_release_branch() + or pr_info.event.get("pull_request", {}) + .get("user", {}) + .get("login", "not_maxknv") + == "maxknv" + ): + # wait for pending jobs to be finished, await_jobs is a long blocking call + # wait pending jobs (for now only on release/master branches) + ready_jobs_batches_dict = ci_cache.await_jobs( + jobs_data.get("jobs_to_wait", {}), pr_info.is_release_branch() ) + jobs_to_do = jobs_data["jobs_to_do"] + jobs_to_skip = jobs_data["jobs_to_skip"] + jobs_params = jobs_data["jobs_params"] + for job, batches in ready_jobs_batches_dict.items(): + if job not in jobs_params: + print(f"WARNING: Job [{job}] is not in the params list") + continue + for batch in batches: + jobs_params[job]["batches"].remove(batch) + if not jobs_params[job]["batches"]: + jobs_to_do.remove(job) + jobs_to_skip.append(job) + del jobs_params[job] + + # set planned jobs as pending in the CI cache if on the master + if pr_info.is_master(): + for job in jobs_data["jobs_to_do"]: + config = CI_CONFIG.get_job_config(job) + if config.run_always or config.run_by_label: + continue + job_params = jobs_data["jobs_params"][job] + ci_cache.push_pending( + job, + job_params["batches"], + config.num_batches, + release_branch=pr_info.is_release_branch(), + ) + + if "jobs_to_wait" in jobs_data: + del jobs_data["jobs_to_wait"] # conclude results result["git_ref"] = git_ref @@ -1608,23 +1752,15 @@ def main() -> int: check_name, args.batch, job_config.num_batches ) assert job_status, "BUG" - commit.create_status( - state=job_status.status, - target_url=job_status.report_url, - description=format_description( - f"Reused from [{job_status.pr_num}-{job_status.sha[0:8]}]: " - f"{job_status.description}" - ), - context=get_check_name( - check_name, - batch=args.batch, - num_batches=job_config.num_batches, - ), + _create_gh_status( + commit, + check_name, + args.batch, + job_config.num_batches, + job_status, ) previous_status = job_status.status - print("::group::Commit Status Data") - print(job_status) - print("::endgroup::") + GHActions.print_in_group("Commit Status Data", job_status) if previous_status: print( @@ -1648,7 +1784,7 @@ def main() -> int: if CI_CONFIG.is_build_job(args.job_name): assert ( indata - ), "--infile with config must be provided for POST action of a build type job [{args.job_name}]" + ), f"--infile with config must be provided for POST action of a build type job [{args.job_name}]" build_name = args.job_name s3_path_prefix = "/".join( ( @@ -1676,7 +1812,7 @@ def main() -> int: ( get_release_or_pr(pr_info, get_version_from_repo())[0], pr_info.sha, - CI_CONFIG.normalize_string( + normalize_string( job_report.check_name or _get_ext_check_name(args.job_name) ), ) diff --git a/tests/ci/ci_config.py b/tests/ci/ci_config.py index 6036a04080c..7c8990e8d16 100644 --- a/tests/ci/ci_config.py +++ b/tests/ci/ci_config.py @@ -22,6 +22,8 @@ class Labels(metaclass=WithIter): CI_SET_ARM = "ci_set_arm" CI_SET_INTEGRATION = "ci_set_integration" + libFuzzer = "libFuzzer" + class Build(metaclass=WithIter): PACKAGE_RELEASE = "package_release" @@ -193,6 +195,8 @@ class JobConfig: required_on_release_branch: bool = False # job is for pr workflow only pr_only: bool = False + # job is for release/master branches only + release_only: bool = False @dataclass @@ -790,6 +794,7 @@ CI_CONFIG = CiConfig( name=Build.FUZZERS, compiler="clang-17", package_type="fuzzers", + job_config=JobConfig(run_by_label=Labels.libFuzzer), ), }, builds_report_config={ @@ -824,7 +829,7 @@ CI_CONFIG = CiConfig( }, other_jobs_configs={ JobNames.MARK_RELEASE_READY: TestConfig( - "", job_config=JobConfig(required_on_release_branch=True) + "", job_config=JobConfig(release_only=True) ), JobNames.DOCKER_SERVER: TestConfig( "", @@ -909,13 +914,6 @@ CI_CONFIG = CiConfig( JobNames.STATEFUL_TEST_AARCH64: TestConfig( Build.PACKAGE_AARCH64, job_config=JobConfig(**stateful_test_common_params) # type: ignore ), - # FIXME: delete? - # "Stateful tests (release, DatabaseOrdinary)": TestConfig( - # Build.PACKAGE_RELEASE, job_config=JobConfig(**stateful_test_common_params) # type: ignore - # ), - # "Stateful tests (release, DatabaseReplicated)": TestConfig( - # Build.PACKAGE_RELEASE, job_config=JobConfig(**stateful_test_common_params) # type: ignore - # ), # Stateful tests for parallel replicas JobNames.STATEFUL_TEST_PARALLEL_REPL_RELEASE: TestConfig( Build.PACKAGE_RELEASE, job_config=JobConfig(**stateful_test_common_params) # type: ignore @@ -997,16 +995,16 @@ CI_CONFIG = CiConfig( Build.PACKAGE_DEBUG, job_config=JobConfig(**stress_test_common_params) # type: ignore ), JobNames.UPGRADE_TEST_ASAN: TestConfig( - Build.PACKAGE_ASAN, job_config=JobConfig(**upgrade_test_common_params) # type: ignore + Build.PACKAGE_ASAN, job_config=JobConfig(pr_only=True, **upgrade_test_common_params) # type: ignore ), JobNames.UPGRADE_TEST_TSAN: TestConfig( - Build.PACKAGE_TSAN, job_config=JobConfig(**upgrade_test_common_params) # type: ignore + Build.PACKAGE_TSAN, job_config=JobConfig(pr_only=True, **upgrade_test_common_params) # type: ignore ), JobNames.UPGRADE_TEST_MSAN: TestConfig( - Build.PACKAGE_MSAN, job_config=JobConfig(**upgrade_test_common_params) # type: ignore + Build.PACKAGE_MSAN, job_config=JobConfig(pr_only=True, **upgrade_test_common_params) # type: ignore ), JobNames.UPGRADE_TEST_DEBUG: TestConfig( - Build.PACKAGE_DEBUG, job_config=JobConfig(**upgrade_test_common_params) # type: ignore + Build.PACKAGE_DEBUG, job_config=JobConfig(pr_only=True, **upgrade_test_common_params) # type: ignore ), JobNames.INTEGRATION_TEST_ASAN: TestConfig( Build.PACKAGE_ASAN, @@ -1033,7 +1031,7 @@ CI_CONFIG = CiConfig( job_config=JobConfig(num_batches=4, **integration_test_common_params), # type: ignore ), JobNames.INTEGRATION_TEST_FLAKY: TestConfig( - Build.PACKAGE_ASAN, job_config=JobConfig(**integration_test_common_params) # type: ignore + Build.PACKAGE_ASAN, job_config=JobConfig(pr_only=True, **integration_test_common_params) # type: ignore ), JobNames.COMPATIBILITY_TEST: TestConfig( Build.PACKAGE_RELEASE, @@ -1080,7 +1078,7 @@ CI_CONFIG = CiConfig( JobNames.STATELESS_TEST_FLAKY_ASAN: TestConfig( # replace to non-default Build.PACKAGE_ASAN, - job_config=JobConfig(**{**statless_test_common_params, "timeout": 3600}), # type: ignore + job_config=JobConfig(pr_only=True, **{**statless_test_common_params, "timeout": 3600}), # type: ignore ), JobNames.JEPSEN_KEEPER: TestConfig( Build.BINARY_RELEASE, @@ -1116,7 +1114,7 @@ CI_CONFIG = CiConfig( ), JobNames.CLCIKBENCH_TEST: TestConfig(Build.PACKAGE_RELEASE), JobNames.CLCIKBENCH_TEST_ARM: TestConfig(Build.PACKAGE_AARCH64), - JobNames.LIBFUZZER_TEST: TestConfig(Build.FUZZERS), # type: ignore + JobNames.LIBFUZZER_TEST: TestConfig(Build.FUZZERS, job_config=JobConfig(run_by_label=Labels.libFuzzer)), # type: ignore }, ) CI_CONFIG.validate() diff --git a/tests/ci/ci_utils.py b/tests/ci/ci_utils.py index 7e2a3d11725..2967ec2f309 100644 --- a/tests/ci/ci_utils.py +++ b/tests/ci/ci_utils.py @@ -1,6 +1,6 @@ from contextlib import contextmanager import os -from typing import List, Union, Iterator +from typing import Any, List, Union, Iterator from pathlib import Path @@ -27,9 +27,22 @@ def is_hex(s): return False +def normalize_string(string: str) -> str: + lowercase_string = string.lower() + normalized_string = ( + lowercase_string.replace(" ", "_") + .replace("-", "_") + .replace("/", "_") + .replace("(", "") + .replace(")", "") + .replace(",", "") + ) + return normalized_string + + class GHActions: @staticmethod - def print_in_group(group_name: str, lines: Union[str, List[str]]) -> None: + def print_in_group(group_name: str, lines: Union[Any, List[Any]]) -> None: lines = list(lines) print(f"::group::{group_name}") for line in lines: diff --git a/tests/ci/commit_status_helper.py b/tests/ci/commit_status_helper.py index 5dd2a33adaf..8a34d375d1e 100644 --- a/tests/ci/commit_status_helper.py +++ b/tests/ci/commit_status_helper.py @@ -370,6 +370,9 @@ class CommitStatusData: def is_ok(self): return self.status == SUCCESS + def is_failure(self): + return self.status == FAILURE + @staticmethod def cleanup(): STATUS_FILE_PATH.unlink(missing_ok=True) diff --git a/tests/ci/report.py b/tests/ci/report.py index ce20c7293f9..ef09e9738ee 100644 --- a/tests/ci/report.py +++ b/tests/ci/report.py @@ -23,6 +23,7 @@ from typing import ( from build_download_helper import get_gh_api from ci_config import CI_CONFIG, BuildConfig from env_helper import REPORT_PATH, TEMP_PATH +from ci_utils import normalize_string logger = logging.getLogger(__name__) @@ -550,7 +551,7 @@ class BuildResult: def write_json(self, directory: Union[Path, str] = REPORT_PATH) -> Path: path = Path(directory) / self.get_report_name( - self.build_name, self.pr_number or self.head_ref + self.build_name, self.pr_number or normalize_string(self.head_ref) ) path.write_text( json.dumps( diff --git a/tests/ci/test_ci_cache.py b/tests/ci/test_ci_cache.py index 0f8acf2656c..3cdd6c78390 100644 --- a/tests/ci/test_ci_cache.py +++ b/tests/ci/test_ci_cache.py @@ -96,16 +96,27 @@ class TestCiCache(unittest.TestCase): pr_num=PR_NUM, ) - ### add some pending statuses for two batches and on non-release branch + ### add some pending statuses for two batches, non-release branch for job in JobNames: - ci_cache.push_pending(job, [0, 1], NUM_BATCHES, release_branch=False) - ci_cache_2.push_pending(job, [0, 1], NUM_BATCHES, release_branch=False) + ci_cache.push_pending(job, [0, 1, 2], NUM_BATCHES, release_branch=False) + ci_cache_2.push_pending(job, [0, 1, 2], NUM_BATCHES, release_branch=False) ### add success status for 0 batch, non-release branch + batch = 0 for job in JobNames: - ci_cache.push_successful(job, 0, NUM_BATCHES, status, release_branch=False) + ci_cache.push_successful( + job, batch, NUM_BATCHES, status, release_branch=False + ) ci_cache_2.push_successful( - job, 0, NUM_BATCHES, status, release_branch=False + job, batch, NUM_BATCHES, status, release_branch=False + ) + + ### add failed status for 2 batch, non-release branch + batch = 2 + for job in JobNames: + ci_cache.push_failed(job, batch, NUM_BATCHES, status, release_branch=False) + ci_cache_2.push_failed( + job, batch, NUM_BATCHES, status, release_branch=False ) ### check all expected directories were created on s3 mock @@ -128,7 +139,7 @@ class TestCiCache(unittest.TestCase): ) ### check number of cache files is as expected - FILES_PER_JOB = 3 # 1 successful + 2 pending batches = 3 + FILES_PER_JOB = 5 # 1 successful + 1 failed + 3 pending batches = 5 self.assertEqual( len( s3_mock.files_on_s3_paths[ @@ -219,7 +230,7 @@ class TestCiCache(unittest.TestCase): ci_cache.push_successful(job, 0, NUM_BATCHES, status, release_branch=True) ### check number of cache files is as expected - FILES_PER_JOB = 6 # 1 successful + 1 successful_release + 2 pending batches + 2 pending batches release = 6 + FILES_PER_JOB = 8 # 1 successful + 1 failed + 1 successful_release + 3 pending batches + 2 pending batches release = 8 self.assertEqual( len( s3_mock.files_on_s3_paths[ @@ -252,6 +263,9 @@ class TestCiCache(unittest.TestCase): self.assertEqual(ci_cache.is_pending(job, 1, NUM_BATCHES, False), True) self.assertEqual(ci_cache.is_pending(job, 1, NUM_BATCHES, True), True) + self.assertEqual(ci_cache.is_failed(job, 2, NUM_BATCHES, False), True) + self.assertEqual(ci_cache.is_failed(job, 2, NUM_BATCHES, True), False) + status2 = ci_cache.get_successful(job, 0, NUM_BATCHES) assert status2 and status2.pr_num == PR_NUM status2 = ci_cache.get_successful(job, 1, NUM_BATCHES) @@ -273,6 +287,13 @@ class TestCiCache(unittest.TestCase): self.assertEqual(ci_cache.is_pending(job, 1, NUM_BATCHES, False), True) self.assertEqual(ci_cache.is_pending(job, 1, NUM_BATCHES, True), True) + self.assertEqual(ci_cache.is_failed(job, 2, NUM_BATCHES, False), True) + self.assertEqual(ci_cache.is_failed(job, 2, NUM_BATCHES, True), False) + + # is_pending() is false for failed jobs batches + self.assertEqual(ci_cache.is_pending(job, 2, NUM_BATCHES, False), False) + self.assertEqual(ci_cache.is_pending(job, 2, NUM_BATCHES, True), False) + status2 = ci_cache.get_successful(job, 0, NUM_BATCHES) assert status2 and status2.pr_num == PR_NUM status2 = ci_cache.get_successful(job, 1, NUM_BATCHES)