mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-04 21:42:39 +00:00
125 lines
5.2 KiB
Python
125 lines
5.2 KiB
Python
from praktika._environment import _Environment
|
|
from praktika.cache import Cache
|
|
from praktika.mangle import _get_workflows
|
|
from praktika.runtime import RunConfig
|
|
from praktika.settings import Settings
|
|
from praktika.utils import Utils
|
|
|
|
|
|
class CacheRunnerHooks:
|
|
@classmethod
|
|
def configure(cls, _workflow):
|
|
workflow_config = RunConfig.from_fs(_workflow.name)
|
|
cache = Cache()
|
|
assert _Environment.get().WORKFLOW_NAME
|
|
workflow = _get_workflows(name=_Environment.get().WORKFLOW_NAME)[0]
|
|
print(f"Workflow Configure, workflow [{workflow.name}]")
|
|
assert (
|
|
workflow.enable_cache
|
|
), f"Outdated yaml pipelines or BUG. Configuration must be run only for workflow with enabled cache, workflow [{workflow.name}]"
|
|
artifact_digest_map = {}
|
|
job_digest_map = {}
|
|
for job in workflow.jobs:
|
|
if not job.digest_config:
|
|
print(
|
|
f"NOTE: job [{job.name}] has no Config.digest_config - skip cache check, always run"
|
|
)
|
|
digest = cache.digest.calc_job_digest(job_config=job)
|
|
job_digest_map[job.name] = digest
|
|
if job.provides:
|
|
# assign the job digest also to the artifacts it provides
|
|
for artifact in job.provides:
|
|
artifact_digest_map[artifact] = digest
|
|
for job in workflow.jobs:
|
|
digests_combined_list = []
|
|
if job.requires:
|
|
# include digest of required artifact to the job digest, so that they affect job state
|
|
for artifact_name in job.requires:
|
|
if artifact_name not in [
|
|
artifact.name for artifact in workflow.artifacts
|
|
]:
|
|
# phony artifact assumed to be not affecting jobs that depend on it
|
|
continue
|
|
digests_combined_list.append(artifact_digest_map[artifact_name])
|
|
digests_combined_list.append(job_digest_map[job.name])
|
|
final_digest = "-".join(digests_combined_list)
|
|
workflow_config.digest_jobs[job.name] = final_digest
|
|
|
|
assert (
|
|
workflow_config.digest_jobs
|
|
), f"BUG, Workflow with enabled cache must have job digests after configuration, wf [{workflow.name}]"
|
|
|
|
print("Check remote cache")
|
|
job_to_cache_record = {}
|
|
for job_name, job_digest in workflow_config.digest_jobs.items():
|
|
record = cache.fetch_success(job_name=job_name, job_digest=job_digest)
|
|
if record:
|
|
assert (
|
|
Utils.normalize_string(job_name)
|
|
not in workflow_config.cache_success
|
|
)
|
|
workflow_config.cache_success.append(job_name)
|
|
workflow_config.cache_success_base64.append(Utils.to_base64(job_name))
|
|
job_to_cache_record[job_name] = record
|
|
|
|
print("Check artifacts to reuse")
|
|
for job in workflow.jobs:
|
|
if job.name in workflow_config.cache_success:
|
|
if job.provides:
|
|
for artifact_name in job.provides:
|
|
workflow_config.cache_artifacts[artifact_name] = (
|
|
job_to_cache_record[job.name]
|
|
)
|
|
|
|
print(f"Write config to GH's job output")
|
|
with open(_Environment.get().JOB_OUTPUT_STREAM, "a", encoding="utf8") as f:
|
|
print(
|
|
f"DATA={workflow_config.to_json()}",
|
|
file=f,
|
|
)
|
|
print(f"WorkflowRuntimeConfig: [{workflow_config.to_json(pretty=True)}]")
|
|
print(
|
|
"Dump WorkflowConfig to fs, the next hooks in this job might want to see it"
|
|
)
|
|
workflow_config.dump()
|
|
|
|
return workflow_config
|
|
|
|
@classmethod
|
|
def pre_run(cls, _workflow, _job, _required_artifacts=None):
|
|
path_prefixes = []
|
|
if _job.name == Settings.CI_CONFIG_JOB_NAME:
|
|
# SPECIAL handling
|
|
return path_prefixes
|
|
env = _Environment.get()
|
|
runtime_config = RunConfig.from_fs(_workflow.name)
|
|
required_artifacts = []
|
|
if _required_artifacts:
|
|
required_artifacts = _required_artifacts
|
|
for artifact in required_artifacts:
|
|
if artifact.name in runtime_config.cache_artifacts:
|
|
record = runtime_config.cache_artifacts[artifact.name]
|
|
print(f"Reuse artifact [{artifact.name}] from [{record}]")
|
|
path_prefixes.append(
|
|
env.get_s3_prefix_static(
|
|
record.pr_number, record.branch, record.sha
|
|
)
|
|
)
|
|
else:
|
|
path_prefixes.append(env.get_s3_prefix())
|
|
return path_prefixes
|
|
|
|
@classmethod
|
|
def run(cls, workflow, job):
|
|
pass
|
|
|
|
@classmethod
|
|
def post_run(cls, workflow, job):
|
|
if job.name == Settings.CI_CONFIG_JOB_NAME:
|
|
return
|
|
if job.digest_config:
|
|
# cache is enabled, and it's a job that supposed to be cached (has defined digest config)
|
|
workflow_runtime = RunConfig.from_fs(workflow.name)
|
|
job_digest = workflow_runtime.digest_jobs[job.name]
|
|
Cache.push_success_record(job.name, job_digest, workflow_runtime.sha)
|