ClickHouse/ci/praktika/hook_cache.py
2024-11-11 20:53:27 +00:00

124 lines
5.2 KiB
Python

from praktika._environment import _Environment
from praktika.cache import Cache
from praktika.runtime import RunConfig
from praktika.settings import Settings
from praktika.utils import Utils
class CacheRunnerHooks:
@classmethod
def configure(cls, workflow):
workflow_config = RunConfig.from_fs(workflow.name)
docker_digests = workflow_config.digest_dockers
cache = Cache()
print(f"Workflow Configure, workflow [{workflow.name}]")
assert (
workflow.enable_cache
), f"Outdated yaml pipelines or BUG. Configuration must be run only for workflow with enabled cache, workflow [{workflow.name}]"
artifact_digest_map = {}
job_digest_map = {}
for job in workflow.jobs:
digest = cache.digest.calc_job_digest(
job_config=job, docker_digests=docker_digests
)
if not job.digest_config:
print(
f"NOTE: job [{job.name}] has no Config.digest_config - skip cache check, always run"
)
job_digest_map[job.name] = digest
if job.provides:
# assign the job digest also to the artifacts it provides
for artifact in job.provides:
artifact_digest_map[artifact] = digest
for job in workflow.jobs:
digests_combined_list = []
if job.requires:
# include digest of required artifact to the job digest, so that they affect job state
for artifact_name in job.requires:
if artifact_name not in [
artifact.name for artifact in workflow.artifacts
]:
# phony artifact assumed to be not affecting jobs that depend on it
continue
digests_combined_list.append(artifact_digest_map[artifact_name])
digests_combined_list.append(job_digest_map[job.name])
final_digest = "-".join(digests_combined_list)
workflow_config.digest_jobs[job.name] = final_digest
assert (
workflow_config.digest_jobs
), f"BUG, Workflow with enabled cache must have job digests after configuration, wf [{workflow.name}]"
print("Check remote cache")
for job_name, job_digest in workflow_config.digest_jobs.items():
record = cache.fetch_success(job_name=job_name, job_digest=job_digest)
if record:
assert (
Utils.normalize_string(job_name)
not in workflow_config.cache_success
)
workflow_config.cache_success.append(job_name)
workflow_config.cache_success_base64.append(Utils.to_base64(job_name))
workflow_config.cache_jobs[job_name] = record
print("Check artifacts to reuse")
for job in workflow.jobs:
if job.name in workflow_config.cache_success:
if job.provides:
for artifact_name in job.provides:
workflow_config.cache_artifacts[artifact_name] = (
workflow_config.cache_jobs[job.name]
)
print(f"Write config to GH's job output")
with open(_Environment.get().JOB_OUTPUT_STREAM, "a", encoding="utf8") as f:
print(
f"DATA={workflow_config.to_json()}",
file=f,
)
print(f"WorkflowRuntimeConfig: [{workflow_config.to_json(pretty=True)}]")
print(
"Dump WorkflowConfig to fs, the next hooks in this job might want to see it"
)
workflow_config.dump()
return workflow_config
@classmethod
def pre_run(cls, _workflow, _job, _required_artifacts=None):
path_prefixes = []
if _job.name == Settings.CI_CONFIG_JOB_NAME:
# SPECIAL handling
return path_prefixes
env = _Environment.get()
runtime_config = RunConfig.from_fs(_workflow.name)
required_artifacts = []
if _required_artifacts:
required_artifacts = _required_artifacts
for artifact in required_artifacts:
if artifact.name in runtime_config.cache_artifacts:
record = runtime_config.cache_artifacts[artifact.name]
print(f"Reuse artifact [{artifact.name}] from [{record}]")
path_prefixes.append(
env.get_s3_prefix_static(
record.pr_number, record.branch, record.sha
)
)
else:
path_prefixes.append(env.get_s3_prefix())
return path_prefixes
@classmethod
def run(cls, workflow, job):
pass
@classmethod
def post_run(cls, workflow, job):
if job.name == Settings.CI_CONFIG_JOB_NAME:
return
if job.digest_config:
# cache is enabled, and it's a job that supposed to be cached (has defined digest config)
workflow_runtime = RunConfig.from_fs(workflow.name)
job_digest = workflow_runtime.digest_jobs[job.name]
Cache.push_success_record(job.name, job_digest, workflow_runtime.sha)