Merge pull request #59555 from ClickHouse/await_on_ci_jobs

CI: ci_cache, enable await
This commit is contained in:
Max K 2024-02-08 20:04:06 +01:00 committed by GitHub
commit 3c1c7bb824
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 409 additions and 138 deletions

View File

@ -385,6 +385,22 @@ jobs:
test_name: Stateless tests (release, s3 storage) test_name: Stateless tests (release, s3 storage)
runner_type: func-tester runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }} data: ${{ needs.RunConfig.outputs.data }}
FunctionalStatelessTestS3Debug:
needs: [RunConfig, BuilderDebDebug]
if: ${{ !failure() && !cancelled() }}
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Stateless tests (debug, s3 storage)
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
FunctionalStatelessTestS3Tsan:
needs: [RunConfig, BuilderDebTsan]
if: ${{ !failure() && !cancelled() }}
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Stateless tests (tsan, s3 storage)
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
FunctionalStatelessTestAarch64: FunctionalStatelessTestAarch64:
needs: [RunConfig, BuilderDebAarch64] needs: [RunConfig, BuilderDebAarch64]
if: ${{ !failure() && !cancelled() }} if: ${{ !failure() && !cancelled() }}
@ -493,6 +509,55 @@ jobs:
test_name: Stateful tests (debug) test_name: Stateful tests (debug)
runner_type: func-tester runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }} data: ${{ needs.RunConfig.outputs.data }}
# Parallel replicas
FunctionalStatefulTestDebugParallelReplicas:
needs: [RunConfig, BuilderDebDebug]
if: ${{ !failure() && !cancelled() }}
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Stateful tests (debug, ParallelReplicas)
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
FunctionalStatefulTestUBsanParallelReplicas:
needs: [RunConfig, BuilderDebUBsan]
if: ${{ !failure() && !cancelled() }}
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Stateful tests (ubsan, ParallelReplicas)
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
FunctionalStatefulTestMsanParallelReplicas:
needs: [RunConfig, BuilderDebMsan]
if: ${{ !failure() && !cancelled() }}
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Stateful tests (msan, ParallelReplicas)
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
FunctionalStatefulTestTsanParallelReplicas:
needs: [RunConfig, BuilderDebTsan]
if: ${{ !failure() && !cancelled() }}
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Stateful tests (tsan, ParallelReplicas)
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
FunctionalStatefulTestAsanParallelReplicas:
needs: [RunConfig, BuilderDebAsan]
if: ${{ !failure() && !cancelled() }}
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Stateful tests (asan, ParallelReplicas)
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
FunctionalStatefulTestReleaseParallelReplicas:
needs: [RunConfig, BuilderDebRelease]
if: ${{ !failure() && !cancelled() }}
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Stateful tests (release, ParallelReplicas)
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
############################################################################################## ##############################################################################################
########################### ClickBench ####################################################### ########################### ClickBench #######################################################
############################################################################################## ##############################################################################################
@ -700,6 +765,28 @@ jobs:
runner_type: func-tester-aarch64 runner_type: func-tester-aarch64
data: ${{ needs.RunConfig.outputs.data }} data: ${{ needs.RunConfig.outputs.data }}
############################################################################################## ##############################################################################################
############################ SQLLOGIC TEST ###################################################
##############################################################################################
SQLLogicTestRelease:
needs: [RunConfig, BuilderDebRelease]
if: ${{ !failure() && !cancelled() }}
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Sqllogic test (release)
runner_type: func-tester
data: ${{ needs.RunConfig.outputs.data }}
##############################################################################################
##################################### SQL TEST ###############################################
##############################################################################################
SQLTest:
needs: [RunConfig, BuilderDebRelease]
if: ${{ !failure() && !cancelled() }}
uses: ./.github/workflows/reusable_test.yml
with:
test_name: SQLTest
runner_type: fuzzer-unit-tester
data: ${{ needs.RunConfig.outputs.data }}
##############################################################################################
###################################### SQLANCER FUZZERS ###################################### ###################################### SQLANCER FUZZERS ######################################
############################################################################################## ##############################################################################################
SQLancerTestRelease: SQLancerTestRelease:
@ -732,6 +819,8 @@ jobs:
- FunctionalStatelessTestTsan - FunctionalStatelessTestTsan
- FunctionalStatelessTestMsan - FunctionalStatelessTestMsan
- FunctionalStatelessTestUBsan - FunctionalStatelessTestUBsan
- FunctionalStatelessTestS3Debug
- FunctionalStatelessTestS3Tsan
- FunctionalStatefulTestDebug - FunctionalStatefulTestDebug
- FunctionalStatefulTestRelease - FunctionalStatefulTestRelease
- FunctionalStatefulTestAarch64 - FunctionalStatefulTestAarch64
@ -739,6 +828,12 @@ jobs:
- FunctionalStatefulTestTsan - FunctionalStatefulTestTsan
- FunctionalStatefulTestMsan - FunctionalStatefulTestMsan
- FunctionalStatefulTestUBsan - FunctionalStatefulTestUBsan
- FunctionalStatefulTestDebugParallelReplicas
- FunctionalStatefulTestUBsanParallelReplicas
- FunctionalStatefulTestMsanParallelReplicas
- FunctionalStatefulTestTsanParallelReplicas
- FunctionalStatefulTestAsanParallelReplicas
- FunctionalStatefulTestReleaseParallelReplicas
- StressTestDebug - StressTestDebug
- StressTestAsan - StressTestAsan
- StressTestTsan - StressTestTsan
@ -764,6 +859,8 @@ jobs:
- UnitTestsReleaseClang - UnitTestsReleaseClang
- SQLancerTestRelease - SQLancerTestRelease
- SQLancerTestDebug - SQLancerTestDebug
- SQLLogicTestRelease
- SQLTest
runs-on: [self-hosted, style-checker] runs-on: [self-hosted, style-checker]
steps: steps:
- name: Check out repository code - name: Check out repository code

View File

@ -1002,7 +1002,7 @@ jobs:
####################################### libFuzzer ########################################### ####################################### libFuzzer ###########################################
############################################################################################# #############################################################################################
libFuzzer: libFuzzer:
if: ${{ !failure() && !cancelled() && contains(github.event.pull_request.labels.*.name, 'libFuzzer') }} if: ${{ !failure() && !cancelled() }}
needs: [RunConfig, StyleCheck] needs: [RunConfig, StyleCheck]
uses: ./.github/workflows/libfuzzer.yml uses: ./.github/workflows/libfuzzer.yml
with: with:

View File

@ -85,6 +85,7 @@ jobs:
run: | run: |
python3 "$GITHUB_WORKSPACE/tests/ci/ci.py" --infile ${{ toJson(inputs.data) }} --post --job-name '${{inputs.build_name}}' python3 "$GITHUB_WORKSPACE/tests/ci/ci.py" --infile ${{ toJson(inputs.data) }} --post --job-name '${{inputs.build_name}}'
- name: Mark as done - name: Mark as done
if: ${{ !cancelled() }}
run: | run: |
python3 "$GITHUB_WORKSPACE/tests/ci/ci.py" --infile ${{ toJson(inputs.data) }} --mark-success --job-name '${{inputs.build_name}}' python3 "$GITHUB_WORKSPACE/tests/ci/ci.py" --infile ${{ toJson(inputs.data) }} --mark-success --job-name '${{inputs.build_name}}'
- name: Clean - name: Clean

View File

@ -107,6 +107,7 @@ jobs:
run: | run: |
python3 "$GITHUB_WORKSPACE/tests/ci/ci.py" --infile ${{ toJson(inputs.data) }} --post --job-name '${{inputs.test_name}}' python3 "$GITHUB_WORKSPACE/tests/ci/ci.py" --infile ${{ toJson(inputs.data) }} --post --job-name '${{inputs.test_name}}'
- name: Mark as done - name: Mark as done
if: ${{ !cancelled() }}
run: | run: |
python3 "$GITHUB_WORKSPACE/tests/ci/ci.py" --infile ${{ toJson(inputs.data) }} --mark-success --job-name '${{inputs.test_name}}' --batch ${{matrix.batch}} python3 "$GITHUB_WORKSPACE/tests/ci/ci.py" --infile ${{ toJson(inputs.data) }} --mark-success --job-name '${{inputs.test_name}}' --batch ${{matrix.batch}}
- name: Clean - name: Clean

View File

@ -1,5 +1,8 @@
import argparse import argparse
import concurrent.futures import concurrent.futures
from copy import deepcopy
from dataclasses import asdict, dataclass
from enum import Enum
import json import json
import logging import logging
import os import os
@ -7,16 +10,14 @@ import re
import subprocess import subprocess
import sys import sys
import time import time
from dataclasses import asdict, dataclass
from enum import Enum
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Union from typing import Any, Dict, List, Optional, Sequence, Union
import docker_images_helper import docker_images_helper
import upload_result_helper import upload_result_helper
from build_check import get_release_or_pr from build_check import get_release_or_pr
from ci_config import CI_CONFIG, Build, JobNames, Labels from ci_config import CI_CONFIG, Build, Labels, JobNames
from ci_utils import GHActions, is_hex from ci_utils import GHActions, is_hex, normalize_string
from clickhouse_helper import ( from clickhouse_helper import (
CiLogsCredentials, CiLogsCredentials,
ClickHouseHelper, ClickHouseHelper,
@ -48,7 +49,7 @@ from git_helper import GIT_PREFIX, Git
from git_helper import Runner as GitRunner from git_helper import Runner as GitRunner
from github import Github from github import Github
from pr_info import PRInfo from pr_info import PRInfo
from report import SUCCESS, BuildResult, JobReport from report import ERROR, SUCCESS, BuildResult, JobReport
from s3_helper import S3Helper from s3_helper import S3Helper
from version_helper import get_version_from_repo from version_helper import get_version_from_repo
@ -88,6 +89,7 @@ class CiCache:
class RecordType(Enum): class RecordType(Enum):
SUCCESSFUL = "successful" SUCCESSFUL = "successful"
PENDING = "pending" PENDING = "pending"
FAILED = "failed"
@dataclass @dataclass
class Record: class Record:
@ -249,6 +251,13 @@ class CiCache:
) )
return record return record
def print_status(self):
for record_type in self.RecordType:
GHActions.print_in_group(
f"Cache records: [{record_type}]", list(self.records[record_type])
)
return self
def update(self): def update(self):
""" """
Pulls cache records from s3. Only records name w/o content. Pulls cache records from s3. Only records name w/o content.
@ -260,9 +269,6 @@ class CiCache:
path = self.cache_s3_paths[job_type] path = self.cache_s3_paths[job_type]
records = self.s3.list_prefix(f"{path}{prefix}", S3_BUILDS_BUCKET) records = self.s3.list_prefix(f"{path}{prefix}", S3_BUILDS_BUCKET)
records = [record.split("/")[-1] for record in records] records = [record.split("/")[-1] for record in records]
GHActions.print_in_group(
f"Cache records: [{record_type}] in [{job_type.value}]", records
)
for file in records: for file in records:
record = self._parse_record_file_name( record = self._parse_record_file_name(
record_type=record_type, file_name=file record_type=record_type, file_name=file
@ -384,6 +390,9 @@ class CiCache:
if record_type == self.RecordType.SUCCESSFUL: if record_type == self.RecordType.SUCCESSFUL:
assert isinstance(status, CommitStatusData) assert isinstance(status, CommitStatusData)
status.dump_to_file(record_file) status.dump_to_file(record_file)
elif record_type == self.RecordType.FAILED:
assert isinstance(status, CommitStatusData)
status.dump_to_file(record_file)
elif record_type == self.RecordType.PENDING: elif record_type == self.RecordType.PENDING:
assert isinstance(status, PendingState) assert isinstance(status, PendingState)
with open(record_file, "w") as json_file: with open(record_file, "w") as json_file:
@ -488,6 +497,16 @@ class CiCache:
self.RecordType.SUCCESSFUL, job, batch, num_batches, release_branch self.RecordType.SUCCESSFUL, job, batch, num_batches, release_branch
) )
def is_failed(
self, job: str, batch: int, num_batches: int, release_branch: bool
) -> bool:
"""
checks if a given job have already been done with failure
"""
return self.exist(
self.RecordType.FAILED, job, batch, num_batches, release_branch
)
def is_pending( def is_pending(
self, job: str, batch: int, num_batches: int, release_branch: bool self, job: str, batch: int, num_batches: int, release_branch: bool
) -> bool: ) -> bool:
@ -495,8 +514,9 @@ class CiCache:
check pending record in the cache for a given job check pending record in the cache for a given job
@release_branch - checks that "release" attribute is set for a record @release_branch - checks that "release" attribute is set for a record
""" """
if self.is_successful(job, batch, num_batches, release_branch): if self.is_successful(
# successful record is present - not pending job, batch, num_batches, release_branch
) or self.is_failed(job, batch, num_batches, release_branch):
return False return False
return self.exist( return self.exist(
@ -524,6 +544,27 @@ class CiCache:
release_branch, release_branch,
) )
def push_failed(
self,
job: str,
batch: int,
num_batches: int,
job_status: CommitStatusData,
release_branch: bool = False,
) -> None:
"""
Pushes a cache record of type Failed (CommitStatusData)
@release_branch adds "release" attribute to a record
"""
self.push(
self.RecordType.FAILED,
job,
[batch],
num_batches,
job_status,
release_branch,
)
def push_pending( def push_pending(
self, job: str, batches: List[int], num_batches: int, release_branch: bool self, job: str, batches: List[int], num_batches: int, release_branch: bool
) -> None: ) -> None:
@ -591,46 +632,87 @@ class CiCache:
bucket=S3_BUILDS_BUCKET, file_path=result_json_path, s3_path=s3_path bucket=S3_BUILDS_BUCKET, file_path=result_json_path, s3_path=s3_path
) )
# def await_jobs(self, jobs_with_params: Dict[str, Dict[str, Any]]) -> List[str]: def await_jobs(
# if not jobs_with_params: self, jobs_with_params: Dict[str, Dict[str, Any]], is_release_branch: bool
# return [] ) -> Dict[str, List[int]]:
# print(f"Start awaiting jobs [{list(jobs_with_params)}]") """
# poll_interval_sec = 180 await pending jobs to be finished
# start_at = int(time.time()) @jobs_with_params - jobs to await. {JOB_NAME: {"batches": [BATCHES...], "num_batches": NUM_BATCHES}}
# TIMEOUT = 3000 returns successfully finished jobs: {JOB_NAME: [BATCHES...]}
# expired_sec = 0 """
# done_jobs = [] # type: List[str] if not jobs_with_params:
# while expired_sec < TIMEOUT and jobs_with_params: return {}
# time.sleep(poll_interval_sec) poll_interval_sec = 300
# self.update() TIMEOUT = 3600
# pending_finished: List[str] = [] await_finished: Dict[str, List[int]] = {}
# for job_name in jobs_with_params: round_cnt = 0
# num_batches = jobs_with_params[job_name]["num_batches"] while len(jobs_with_params) > 5 and round_cnt < 3:
# for batch in jobs_with_params[job_name]["batches"]: round_cnt += 1
# if self.is_pending(job_name, batch, num_batches): GHActions.print_in_group(
# continue f"Wait pending jobs, round [{round_cnt}]:", list(jobs_with_params)
# print( )
# f"Job [{job_name}_[{batch}/{num_batches}]] is not pending anymore" # this is initial approach to wait pending jobs:
# ) # start waiting for the next TIMEOUT seconds if there are more than X(=5) jobs to wait
# pending_finished.append(job_name) # wait TIMEOUT seconds in rounds. Y(=3) is the max number of rounds
# if pending_finished: expired_sec = 0
# # restart timer start_at = int(time.time())
# start_at = int(time.time()) while expired_sec < TIMEOUT and jobs_with_params:
# expired_sec = 0 time.sleep(poll_interval_sec)
# # remove finished jobs from awaiting list self.update()
# for job in pending_finished: jobs_with_params_copy = deepcopy(jobs_with_params)
# del jobs_with_params[job] for job_name in jobs_with_params:
# done_jobs.append(job) num_batches = jobs_with_params[job_name]["num_batches"]
# else: job_config = CI_CONFIG.get_job_config(job_name)
# expired_sec = int(time.time()) - start_at for batch in jobs_with_params[job_name]["batches"]:
# print(f" ...awaiting continues... time left [{TIMEOUT - expired_sec}]") if self.is_pending(
# if done_jobs: job_name,
# print( batch,
# f"Awaiting OK. Left jobs: [{list(jobs_with_params)}], finished jobs: [{done_jobs}]" num_batches,
# ) release_branch=is_release_branch
# else: and job_config.required_on_release_branch,
# print("Awaiting FAILED. No job has finished.") ):
# return done_jobs continue
print(
f"Job [{job_name}_[{batch}/{num_batches}]] is not pending anymore"
)
# some_job_ready = True
jobs_with_params_copy[job_name]["batches"].remove(batch)
if not jobs_with_params_copy[job_name]["batches"]:
del jobs_with_params_copy[job_name]
if not self.is_successful(
job_name,
batch,
num_batches,
release_branch=is_release_branch
and job_config.required_on_release_branch,
):
print(
f"NOTE: Job [{job_name}:{batch}] finished but no success - remove from awaiting list, do not add to ready"
)
continue
if job_name in await_finished:
await_finished[job_name].append(batch)
else:
await_finished[job_name] = [batch]
jobs_with_params = jobs_with_params_copy
expired_sec = int(time.time()) - start_at
print(
f"...awaiting continues... seconds left [{TIMEOUT - expired_sec}]"
)
if await_finished:
GHActions.print_in_group(
"Finished jobs:",
[f"{job}:{batches}" for job, batches in await_finished.items()],
)
else:
print("Awaiting FAILED. No job has finished successfully.")
GHActions.print_in_group(
"Remaining jobs:",
[f"{job}:{params['batches']}" for job, params in jobs_with_params.items()],
)
return await_finished
def get_check_name(check_name: str, batch: int, num_batches: int) -> str: def get_check_name(check_name: str, batch: int, num_batches: int) -> str:
@ -832,7 +914,10 @@ def _pre_action(s3, indata, pr_info):
ci_cache = CiCache(s3, indata["jobs_data"]["digests"]) ci_cache = CiCache(s3, indata["jobs_data"]["digests"])
# for release/master branches reports must be from the same branches # for release/master branches reports must be from the same branches
report_prefix = pr_info.head_ref if pr_info.number == 0 else "" report_prefix = normalize_string(pr_info.head_ref) if pr_info.number == 0 else ""
print(
f"Use report prefix [{report_prefix}], pr_num [{pr_info.number}], head_ref [{pr_info.head_ref}]"
)
reports_files = ci_cache.download_build_reports(file_prefix=report_prefix) reports_files = ci_cache.download_build_reports(file_prefix=report_prefix)
print(f"Pre action done. Report files [{reports_files}] have been downloaded") print(f"Pre action done. Report files [{reports_files}] have been downloaded")
@ -883,8 +968,19 @@ def _mark_success_action(
job, batch, num_batches, job_status, pr_info.is_release_branch() job, batch, num_batches, job_status, pr_info.is_release_branch()
) )
print(f"Job [{job}] is ok") print(f"Job [{job}] is ok")
elif job_status: elif job_status and not job_status.is_ok():
print(f"Job [{job}] is not ok, status [{job_status.status}]") ci_cache.push_failed(
job, batch, num_batches, job_status, pr_info.is_release_branch()
)
print(f"Job [{job}] is failed with status [{job_status.status}]")
else:
job_status = CommitStatusData(
description="dummy description", status=ERROR, report_url="dummy url"
)
ci_cache.push_failed(
job, batch, num_batches, job_status, pr_info.is_release_branch()
)
print(f"No CommitStatusData for [{job}], push dummy failure to ci_cache")
def _print_results(result: Any, outfile: Optional[str], pretty: bool = False) -> None: def _print_results(result: Any, outfile: Optional[str], pretty: bool = False) -> None:
@ -992,8 +1088,8 @@ def _configure_jobs(
jobs_to_do: List[str] = [] jobs_to_do: List[str] = []
jobs_to_skip: List[str] = [] jobs_to_skip: List[str] = []
digests: Dict[str, str] = {} digests: Dict[str, str] = {}
print("::group::Job Digests")
print("::group::Job Digests")
for job in CI_CONFIG.job_generator(): for job in CI_CONFIG.job_generator():
digest = job_digester.get_job_digest(CI_CONFIG.get_digest_config(job)) digest = job_digester.get_job_digest(CI_CONFIG.get_digest_config(job))
digests[job] = digest digests[job] = digest
@ -1003,7 +1099,8 @@ def _configure_jobs(
## b. check what we need to run ## b. check what we need to run
ci_cache = None ci_cache = None
if not ci_cache_disabled: if not ci_cache_disabled:
ci_cache = CiCache(s3, digests) ci_cache = CiCache(s3, digests).update()
ci_cache.print_status()
jobs_to_wait: Dict[str, Dict[str, Any]] = {} jobs_to_wait: Dict[str, Dict[str, Any]] = {}
@ -1012,10 +1109,13 @@ def _configure_jobs(
job_config = CI_CONFIG.get_job_config(job) job_config = CI_CONFIG.get_job_config(job)
num_batches: int = job_config.num_batches num_batches: int = job_config.num_batches
batches_to_do: List[int] = [] batches_to_do: List[int] = []
add_to_skip = False
for batch in range(num_batches): # type: ignore for batch in range(num_batches): # type: ignore
if job_config.pr_only and pr_info.is_release_branch(): if job_config.pr_only and pr_info.is_release_branch():
continue continue
if job_config.release_only and not pr_info.is_release_branch():
continue
if job_config.run_by_label: if job_config.run_by_label:
# this job controlled by label, add to todo if its label is set in pr # this job controlled by label, add to todo if its label is set in pr
if job_config.run_by_label in pr_info.labels: if job_config.run_by_label in pr_info.labels:
@ -1036,7 +1136,13 @@ def _configure_jobs(
batches_to_do.append(batch) batches_to_do.append(batch)
# check if it's pending in the cache # check if it's pending in the cache
if ci_cache.is_pending(job, batch, num_batches, release_branch=False): if ci_cache.is_pending(
job,
batch,
num_batches,
release_branch=pr_info.is_release_branch()
and job_config.required_on_release_branch,
):
if job in jobs_to_wait: if job in jobs_to_wait:
jobs_to_wait[job]["batches"].append(batch) jobs_to_wait[job]["batches"].append(batch)
else: else:
@ -1044,10 +1150,12 @@ def _configure_jobs(
"batches": [batch], "batches": [batch],
"num_batches": num_batches, "num_batches": num_batches,
} }
else:
add_to_skip = True
if batches_to_do: if batches_to_do:
jobs_to_do.append(job) jobs_to_do.append(job)
elif not job_config.run_by_label: elif add_to_skip:
# treat job as being skipped only if it's controlled by digest # treat job as being skipped only if it's controlled by digest
jobs_to_skip.append(job) jobs_to_skip.append(job)
jobs_params[job] = { jobs_params[job] = {
@ -1119,29 +1227,19 @@ def _configure_jobs(
"digests": digests, "digests": digests,
"jobs_to_do": jobs_to_do, "jobs_to_do": jobs_to_do,
"jobs_to_skip": jobs_to_skip, "jobs_to_skip": jobs_to_skip,
"jobs_to_wait": jobs_to_wait, "jobs_to_wait": {
job: params for job, params in jobs_to_wait.items() if job in jobs_to_do
},
"jobs_params": { "jobs_params": {
job: params for job, params in jobs_params.items() if job in jobs_to_do job: params for job, params in jobs_params.items() if job in jobs_to_do
}, },
} }
def _update_gh_statuses_action(indata: Dict, s3: S3Helper) -> None: def _create_gh_status(
if indata["ci_flags"][Labels.NO_CI_CACHE]: commit: Any, job: str, batch: int, num_batches: int, job_status: CommitStatusData
print("CI cache is disabled - skip restoring commit statuses from CI cache") ) -> None:
return print(f"Going to re-create GH status for job [{job}]")
job_digests = indata["jobs_data"]["digests"]
ci_cache = CiCache(s3, job_digests).update().fetch_records_data()
# create GH status
pr_info = PRInfo()
commit = get_commit(Github(get_best_robot_token(), per_page=100), pr_info.sha)
def _run_create_status(job: str, batch: int, num_batches: int) -> None:
job_status = ci_cache.get_successful(job, batch, num_batches)
if not job_status:
return
print(f"Going to re-create GH status for job [{job}] sha [{pr_info.sha}]")
assert job_status.status == SUCCESS, "BUG!" assert job_status.status == SUCCESS, "BUG!"
commit.create_status( commit.create_status(
state=job_status.status, state=job_status.status,
@ -1153,15 +1251,40 @@ def _update_gh_statuses_action(indata: Dict, s3: S3Helper) -> None:
context=get_check_name(job, batch=batch, num_batches=num_batches), context=get_check_name(job, batch=batch, num_batches=num_batches),
) )
def _update_gh_statuses_action(indata: Dict, s3: S3Helper) -> None:
if indata["ci_flags"][Labels.NO_CI_CACHE]:
print("CI cache is disabled - skip restoring commit statuses from CI cache")
return
job_digests = indata["jobs_data"]["digests"]
jobs_to_skip = indata["jobs_data"]["jobs_to_skip"]
jobs_to_do = indata["jobs_data"]["jobs_to_do"]
ci_cache = CiCache(s3, job_digests).update().fetch_records_data().print_status()
# create GH status
pr_info = PRInfo()
commit = get_commit(Github(get_best_robot_token(), per_page=100), pr_info.sha)
def _concurrent_create_status(job: str, batch: int, num_batches: int) -> None:
job_status = ci_cache.get_successful(job, batch, num_batches)
if not job_status:
return
_create_gh_status(commit, job, batch, num_batches, job_status)
with concurrent.futures.ThreadPoolExecutor() as executor: with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [] futures = []
for job in job_digests: for job in job_digests:
if job not in jobs_to_skip or job not in jobs_to_do:
# no need to create status for job that are not supposed to be executed
continue
if CI_CONFIG.is_build_job(job): if CI_CONFIG.is_build_job(job):
# no GH status for build jobs # no GH status for build jobs
continue continue
num_batches = CI_CONFIG.get_job_config(job).num_batches num_batches = CI_CONFIG.get_job_config(job).num_batches
for batch in range(num_batches): for batch in range(num_batches):
future = executor.submit(_run_create_status, job, batch, num_batches) future = executor.submit(
_concurrent_create_status, job, batch, num_batches
)
futures.append(future) futures.append(future)
done, _ = concurrent.futures.wait(futures) done, _ = concurrent.futures.wait(futures)
for future in done: for future in done:
@ -1194,7 +1317,7 @@ def _upload_build_artifacts(
( (
get_release_or_pr(pr_info, get_version_from_repo())[1], get_release_or_pr(pr_info, get_version_from_repo())[1],
pr_info.sha, pr_info.sha,
CI_CONFIG.normalize_string(build_name), normalize_string(build_name),
"performance.tar.zst", "performance.tar.zst",
) )
) )
@ -1509,19 +1632,37 @@ def main() -> int:
if not args.skip_jobs and pr_info.has_changes_in_documentation_only(): if not args.skip_jobs and pr_info.has_changes_in_documentation_only():
_update_config_for_docs_only(jobs_data) _update_config_for_docs_only(jobs_data)
# TODO: await pending jobs if not args.skip_jobs:
# wait for pending jobs to be finished, await_jobs is a long blocking call if any job has to be awaited ci_cache = CiCache(s3, jobs_data["digests"])
# awaited_jobs = ci_cache.await_jobs(jobs_data.get("jobs_to_wait", {}))
# for job in awaited_jobs: if (
# jobs_to_do = jobs_data["jobs_to_do"] pr_info.is_release_branch()
# if job in jobs_to_do: or pr_info.event.get("pull_request", {})
# jobs_to_do.remove(job) .get("user", {})
# else: .get("login", "not_maxknv")
# assert False, "BUG" == "maxknv"
):
# wait for pending jobs to be finished, await_jobs is a long blocking call
# wait pending jobs (for now only on release/master branches)
ready_jobs_batches_dict = ci_cache.await_jobs(
jobs_data.get("jobs_to_wait", {}), pr_info.is_release_branch()
)
jobs_to_do = jobs_data["jobs_to_do"]
jobs_to_skip = jobs_data["jobs_to_skip"]
jobs_params = jobs_data["jobs_params"]
for job, batches in ready_jobs_batches_dict.items():
if job not in jobs_params:
print(f"WARNING: Job [{job}] is not in the params list")
continue
for batch in batches:
jobs_params[job]["batches"].remove(batch)
if not jobs_params[job]["batches"]:
jobs_to_do.remove(job)
jobs_to_skip.append(job)
del jobs_params[job]
# set planned jobs as pending in the CI cache if on the master # set planned jobs as pending in the CI cache if on the master
if pr_info.is_master() and not args.skip_jobs: if pr_info.is_master():
ci_cache = CiCache(s3, jobs_data["digests"])
for job in jobs_data["jobs_to_do"]: for job in jobs_data["jobs_to_do"]:
config = CI_CONFIG.get_job_config(job) config = CI_CONFIG.get_job_config(job)
if config.run_always or config.run_by_label: if config.run_always or config.run_by_label:
@ -1534,6 +1675,9 @@ def main() -> int:
release_branch=pr_info.is_release_branch(), release_branch=pr_info.is_release_branch(),
) )
if "jobs_to_wait" in jobs_data:
del jobs_data["jobs_to_wait"]
# conclude results # conclude results
result["git_ref"] = git_ref result["git_ref"] = git_ref
result["version"] = version result["version"] = version
@ -1608,23 +1752,15 @@ def main() -> int:
check_name, args.batch, job_config.num_batches check_name, args.batch, job_config.num_batches
) )
assert job_status, "BUG" assert job_status, "BUG"
commit.create_status( _create_gh_status(
state=job_status.status, commit,
target_url=job_status.report_url,
description=format_description(
f"Reused from [{job_status.pr_num}-{job_status.sha[0:8]}]: "
f"{job_status.description}"
),
context=get_check_name(
check_name, check_name,
batch=args.batch, args.batch,
num_batches=job_config.num_batches, job_config.num_batches,
), job_status,
) )
previous_status = job_status.status previous_status = job_status.status
print("::group::Commit Status Data") GHActions.print_in_group("Commit Status Data", job_status)
print(job_status)
print("::endgroup::")
if previous_status: if previous_status:
print( print(
@ -1648,7 +1784,7 @@ def main() -> int:
if CI_CONFIG.is_build_job(args.job_name): if CI_CONFIG.is_build_job(args.job_name):
assert ( assert (
indata indata
), "--infile with config must be provided for POST action of a build type job [{args.job_name}]" ), f"--infile with config must be provided for POST action of a build type job [{args.job_name}]"
build_name = args.job_name build_name = args.job_name
s3_path_prefix = "/".join( s3_path_prefix = "/".join(
( (
@ -1676,7 +1812,7 @@ def main() -> int:
( (
get_release_or_pr(pr_info, get_version_from_repo())[0], get_release_or_pr(pr_info, get_version_from_repo())[0],
pr_info.sha, pr_info.sha,
CI_CONFIG.normalize_string( normalize_string(
job_report.check_name or _get_ext_check_name(args.job_name) job_report.check_name or _get_ext_check_name(args.job_name)
), ),
) )

View File

@ -22,6 +22,8 @@ class Labels(metaclass=WithIter):
CI_SET_ARM = "ci_set_arm" CI_SET_ARM = "ci_set_arm"
CI_SET_INTEGRATION = "ci_set_integration" CI_SET_INTEGRATION = "ci_set_integration"
libFuzzer = "libFuzzer"
class Build(metaclass=WithIter): class Build(metaclass=WithIter):
PACKAGE_RELEASE = "package_release" PACKAGE_RELEASE = "package_release"
@ -193,6 +195,8 @@ class JobConfig:
required_on_release_branch: bool = False required_on_release_branch: bool = False
# job is for pr workflow only # job is for pr workflow only
pr_only: bool = False pr_only: bool = False
# job is for release/master branches only
release_only: bool = False
@dataclass @dataclass
@ -790,6 +794,7 @@ CI_CONFIG = CiConfig(
name=Build.FUZZERS, name=Build.FUZZERS,
compiler="clang-17", compiler="clang-17",
package_type="fuzzers", package_type="fuzzers",
job_config=JobConfig(run_by_label=Labels.libFuzzer),
), ),
}, },
builds_report_config={ builds_report_config={
@ -824,7 +829,7 @@ CI_CONFIG = CiConfig(
}, },
other_jobs_configs={ other_jobs_configs={
JobNames.MARK_RELEASE_READY: TestConfig( JobNames.MARK_RELEASE_READY: TestConfig(
"", job_config=JobConfig(required_on_release_branch=True) "", job_config=JobConfig(release_only=True)
), ),
JobNames.DOCKER_SERVER: TestConfig( JobNames.DOCKER_SERVER: TestConfig(
"", "",
@ -909,13 +914,6 @@ CI_CONFIG = CiConfig(
JobNames.STATEFUL_TEST_AARCH64: TestConfig( JobNames.STATEFUL_TEST_AARCH64: TestConfig(
Build.PACKAGE_AARCH64, job_config=JobConfig(**stateful_test_common_params) # type: ignore Build.PACKAGE_AARCH64, job_config=JobConfig(**stateful_test_common_params) # type: ignore
), ),
# FIXME: delete?
# "Stateful tests (release, DatabaseOrdinary)": TestConfig(
# Build.PACKAGE_RELEASE, job_config=JobConfig(**stateful_test_common_params) # type: ignore
# ),
# "Stateful tests (release, DatabaseReplicated)": TestConfig(
# Build.PACKAGE_RELEASE, job_config=JobConfig(**stateful_test_common_params) # type: ignore
# ),
# Stateful tests for parallel replicas # Stateful tests for parallel replicas
JobNames.STATEFUL_TEST_PARALLEL_REPL_RELEASE: TestConfig( JobNames.STATEFUL_TEST_PARALLEL_REPL_RELEASE: TestConfig(
Build.PACKAGE_RELEASE, job_config=JobConfig(**stateful_test_common_params) # type: ignore Build.PACKAGE_RELEASE, job_config=JobConfig(**stateful_test_common_params) # type: ignore
@ -997,16 +995,16 @@ CI_CONFIG = CiConfig(
Build.PACKAGE_DEBUG, job_config=JobConfig(**stress_test_common_params) # type: ignore Build.PACKAGE_DEBUG, job_config=JobConfig(**stress_test_common_params) # type: ignore
), ),
JobNames.UPGRADE_TEST_ASAN: TestConfig( JobNames.UPGRADE_TEST_ASAN: TestConfig(
Build.PACKAGE_ASAN, job_config=JobConfig(**upgrade_test_common_params) # type: ignore Build.PACKAGE_ASAN, job_config=JobConfig(pr_only=True, **upgrade_test_common_params) # type: ignore
), ),
JobNames.UPGRADE_TEST_TSAN: TestConfig( JobNames.UPGRADE_TEST_TSAN: TestConfig(
Build.PACKAGE_TSAN, job_config=JobConfig(**upgrade_test_common_params) # type: ignore Build.PACKAGE_TSAN, job_config=JobConfig(pr_only=True, **upgrade_test_common_params) # type: ignore
), ),
JobNames.UPGRADE_TEST_MSAN: TestConfig( JobNames.UPGRADE_TEST_MSAN: TestConfig(
Build.PACKAGE_MSAN, job_config=JobConfig(**upgrade_test_common_params) # type: ignore Build.PACKAGE_MSAN, job_config=JobConfig(pr_only=True, **upgrade_test_common_params) # type: ignore
), ),
JobNames.UPGRADE_TEST_DEBUG: TestConfig( JobNames.UPGRADE_TEST_DEBUG: TestConfig(
Build.PACKAGE_DEBUG, job_config=JobConfig(**upgrade_test_common_params) # type: ignore Build.PACKAGE_DEBUG, job_config=JobConfig(pr_only=True, **upgrade_test_common_params) # type: ignore
), ),
JobNames.INTEGRATION_TEST_ASAN: TestConfig( JobNames.INTEGRATION_TEST_ASAN: TestConfig(
Build.PACKAGE_ASAN, Build.PACKAGE_ASAN,
@ -1033,7 +1031,7 @@ CI_CONFIG = CiConfig(
job_config=JobConfig(num_batches=4, **integration_test_common_params), # type: ignore job_config=JobConfig(num_batches=4, **integration_test_common_params), # type: ignore
), ),
JobNames.INTEGRATION_TEST_FLAKY: TestConfig( JobNames.INTEGRATION_TEST_FLAKY: TestConfig(
Build.PACKAGE_ASAN, job_config=JobConfig(**integration_test_common_params) # type: ignore Build.PACKAGE_ASAN, job_config=JobConfig(pr_only=True, **integration_test_common_params) # type: ignore
), ),
JobNames.COMPATIBILITY_TEST: TestConfig( JobNames.COMPATIBILITY_TEST: TestConfig(
Build.PACKAGE_RELEASE, Build.PACKAGE_RELEASE,
@ -1080,7 +1078,7 @@ CI_CONFIG = CiConfig(
JobNames.STATELESS_TEST_FLAKY_ASAN: TestConfig( JobNames.STATELESS_TEST_FLAKY_ASAN: TestConfig(
# replace to non-default # replace to non-default
Build.PACKAGE_ASAN, Build.PACKAGE_ASAN,
job_config=JobConfig(**{**statless_test_common_params, "timeout": 3600}), # type: ignore job_config=JobConfig(pr_only=True, **{**statless_test_common_params, "timeout": 3600}), # type: ignore
), ),
JobNames.JEPSEN_KEEPER: TestConfig( JobNames.JEPSEN_KEEPER: TestConfig(
Build.BINARY_RELEASE, Build.BINARY_RELEASE,
@ -1116,7 +1114,7 @@ CI_CONFIG = CiConfig(
), ),
JobNames.CLCIKBENCH_TEST: TestConfig(Build.PACKAGE_RELEASE), JobNames.CLCIKBENCH_TEST: TestConfig(Build.PACKAGE_RELEASE),
JobNames.CLCIKBENCH_TEST_ARM: TestConfig(Build.PACKAGE_AARCH64), JobNames.CLCIKBENCH_TEST_ARM: TestConfig(Build.PACKAGE_AARCH64),
JobNames.LIBFUZZER_TEST: TestConfig(Build.FUZZERS), # type: ignore JobNames.LIBFUZZER_TEST: TestConfig(Build.FUZZERS, job_config=JobConfig(run_by_label=Labels.libFuzzer)), # type: ignore
}, },
) )
CI_CONFIG.validate() CI_CONFIG.validate()

View File

@ -1,6 +1,6 @@
from contextlib import contextmanager from contextlib import contextmanager
import os import os
from typing import List, Union, Iterator from typing import Any, List, Union, Iterator
from pathlib import Path from pathlib import Path
@ -27,9 +27,22 @@ def is_hex(s):
return False return False
def normalize_string(string: str) -> str:
lowercase_string = string.lower()
normalized_string = (
lowercase_string.replace(" ", "_")
.replace("-", "_")
.replace("/", "_")
.replace("(", "")
.replace(")", "")
.replace(",", "")
)
return normalized_string
class GHActions: class GHActions:
@staticmethod @staticmethod
def print_in_group(group_name: str, lines: Union[str, List[str]]) -> None: def print_in_group(group_name: str, lines: Union[Any, List[Any]]) -> None:
lines = list(lines) lines = list(lines)
print(f"::group::{group_name}") print(f"::group::{group_name}")
for line in lines: for line in lines:

View File

@ -370,6 +370,9 @@ class CommitStatusData:
def is_ok(self): def is_ok(self):
return self.status == SUCCESS return self.status == SUCCESS
def is_failure(self):
return self.status == FAILURE
@staticmethod @staticmethod
def cleanup(): def cleanup():
STATUS_FILE_PATH.unlink(missing_ok=True) STATUS_FILE_PATH.unlink(missing_ok=True)

View File

@ -23,6 +23,7 @@ from typing import (
from build_download_helper import get_gh_api from build_download_helper import get_gh_api
from ci_config import CI_CONFIG, BuildConfig from ci_config import CI_CONFIG, BuildConfig
from env_helper import REPORT_PATH, TEMP_PATH from env_helper import REPORT_PATH, TEMP_PATH
from ci_utils import normalize_string
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -550,7 +551,7 @@ class BuildResult:
def write_json(self, directory: Union[Path, str] = REPORT_PATH) -> Path: def write_json(self, directory: Union[Path, str] = REPORT_PATH) -> Path:
path = Path(directory) / self.get_report_name( path = Path(directory) / self.get_report_name(
self.build_name, self.pr_number or self.head_ref self.build_name, self.pr_number or normalize_string(self.head_ref)
) )
path.write_text( path.write_text(
json.dumps( json.dumps(

View File

@ -96,16 +96,27 @@ class TestCiCache(unittest.TestCase):
pr_num=PR_NUM, pr_num=PR_NUM,
) )
### add some pending statuses for two batches and on non-release branch ### add some pending statuses for two batches, non-release branch
for job in JobNames: for job in JobNames:
ci_cache.push_pending(job, [0, 1], NUM_BATCHES, release_branch=False) ci_cache.push_pending(job, [0, 1, 2], NUM_BATCHES, release_branch=False)
ci_cache_2.push_pending(job, [0, 1], NUM_BATCHES, release_branch=False) ci_cache_2.push_pending(job, [0, 1, 2], NUM_BATCHES, release_branch=False)
### add success status for 0 batch, non-release branch ### add success status for 0 batch, non-release branch
batch = 0
for job in JobNames: for job in JobNames:
ci_cache.push_successful(job, 0, NUM_BATCHES, status, release_branch=False) ci_cache.push_successful(
job, batch, NUM_BATCHES, status, release_branch=False
)
ci_cache_2.push_successful( ci_cache_2.push_successful(
job, 0, NUM_BATCHES, status, release_branch=False job, batch, NUM_BATCHES, status, release_branch=False
)
### add failed status for 2 batch, non-release branch
batch = 2
for job in JobNames:
ci_cache.push_failed(job, batch, NUM_BATCHES, status, release_branch=False)
ci_cache_2.push_failed(
job, batch, NUM_BATCHES, status, release_branch=False
) )
### check all expected directories were created on s3 mock ### check all expected directories were created on s3 mock
@ -128,7 +139,7 @@ class TestCiCache(unittest.TestCase):
) )
### check number of cache files is as expected ### check number of cache files is as expected
FILES_PER_JOB = 3 # 1 successful + 2 pending batches = 3 FILES_PER_JOB = 5 # 1 successful + 1 failed + 3 pending batches = 5
self.assertEqual( self.assertEqual(
len( len(
s3_mock.files_on_s3_paths[ s3_mock.files_on_s3_paths[
@ -219,7 +230,7 @@ class TestCiCache(unittest.TestCase):
ci_cache.push_successful(job, 0, NUM_BATCHES, status, release_branch=True) ci_cache.push_successful(job, 0, NUM_BATCHES, status, release_branch=True)
### check number of cache files is as expected ### check number of cache files is as expected
FILES_PER_JOB = 6 # 1 successful + 1 successful_release + 2 pending batches + 2 pending batches release = 6 FILES_PER_JOB = 8 # 1 successful + 1 failed + 1 successful_release + 3 pending batches + 2 pending batches release = 8
self.assertEqual( self.assertEqual(
len( len(
s3_mock.files_on_s3_paths[ s3_mock.files_on_s3_paths[
@ -252,6 +263,9 @@ class TestCiCache(unittest.TestCase):
self.assertEqual(ci_cache.is_pending(job, 1, NUM_BATCHES, False), True) self.assertEqual(ci_cache.is_pending(job, 1, NUM_BATCHES, False), True)
self.assertEqual(ci_cache.is_pending(job, 1, NUM_BATCHES, True), True) self.assertEqual(ci_cache.is_pending(job, 1, NUM_BATCHES, True), True)
self.assertEqual(ci_cache.is_failed(job, 2, NUM_BATCHES, False), True)
self.assertEqual(ci_cache.is_failed(job, 2, NUM_BATCHES, True), False)
status2 = ci_cache.get_successful(job, 0, NUM_BATCHES) status2 = ci_cache.get_successful(job, 0, NUM_BATCHES)
assert status2 and status2.pr_num == PR_NUM assert status2 and status2.pr_num == PR_NUM
status2 = ci_cache.get_successful(job, 1, NUM_BATCHES) status2 = ci_cache.get_successful(job, 1, NUM_BATCHES)
@ -273,6 +287,13 @@ class TestCiCache(unittest.TestCase):
self.assertEqual(ci_cache.is_pending(job, 1, NUM_BATCHES, False), True) self.assertEqual(ci_cache.is_pending(job, 1, NUM_BATCHES, False), True)
self.assertEqual(ci_cache.is_pending(job, 1, NUM_BATCHES, True), True) self.assertEqual(ci_cache.is_pending(job, 1, NUM_BATCHES, True), True)
self.assertEqual(ci_cache.is_failed(job, 2, NUM_BATCHES, False), True)
self.assertEqual(ci_cache.is_failed(job, 2, NUM_BATCHES, True), False)
# is_pending() is false for failed jobs batches
self.assertEqual(ci_cache.is_pending(job, 2, NUM_BATCHES, False), False)
self.assertEqual(ci_cache.is_pending(job, 2, NUM_BATCHES, True), False)
status2 = ci_cache.get_successful(job, 0, NUM_BATCHES) status2 = ci_cache.get_successful(job, 0, NUM_BATCHES)
assert status2 and status2.pr_num == PR_NUM assert status2 and status2.pr_num == PR_NUM
status2 = ci_cache.get_successful(job, 1, NUM_BATCHES) status2 = ci_cache.get_successful(job, 1, NUM_BATCHES)