ClickHouse/ci/praktika/cache.py
2024-10-24 16:48:14 +02:00

128 lines
4.4 KiB
Python

import dataclasses
import json
from pathlib import Path
from praktika import Artifact, Job, Workflow
from praktika._environment import _Environment
from praktika.digest import Digest
from praktika.s3 import S3
from praktika.settings import Settings
from praktika.utils import Utils
class Cache:
@dataclasses.dataclass
class CacheRecord:
class Type:
SUCCESS = "success"
type: str
sha: str
pr_number: int
branch: str
def dump(self, path):
with open(path, "w", encoding="utf8") as f:
json.dump(dataclasses.asdict(self), f)
@classmethod
def from_fs(cls, path):
with open(path, "r", encoding="utf8") as f:
return Cache.CacheRecord(**json.load(f))
@classmethod
def from_dict(cls, obj):
return Cache.CacheRecord(**obj)
def __init__(self):
self.digest = Digest()
self.success = {} # type Dict[str, Any]
@classmethod
def push_success_record(cls, job_name, job_digest, sha):
type_ = Cache.CacheRecord.Type.SUCCESS
record = Cache.CacheRecord(
type=type_,
sha=sha,
pr_number=_Environment.get().PR_NUMBER,
branch=_Environment.get().BRANCH,
)
assert (
Settings.CACHE_S3_PATH
), f"Setting CACHE_S3_PATH must be defined with enabled CI Cache"
record_path = f"{Settings.CACHE_S3_PATH}/v{Settings.CACHE_VERSION}/{Utils.normalize_string(job_name)}/{job_digest}"
record_file = Path(Settings.TEMP_DIR) / type_
record.dump(record_file)
S3.copy_file_to_s3(s3_path=record_path, local_path=record_file)
record_file.unlink()
def fetch_success(self, job_name, job_digest):
type_ = Cache.CacheRecord.Type.SUCCESS
assert (
Settings.CACHE_S3_PATH
), f"Setting CACHE_S3_PATH must be defined with enabled CI Cache"
record_path = f"{Settings.CACHE_S3_PATH}/v{Settings.CACHE_VERSION}/{Utils.normalize_string(job_name)}/{job_digest}/{type_}"
record_file_local_dir = (
f"{Settings.CACHE_LOCAL_PATH}/{Utils.normalize_string(job_name)}/"
)
Path(record_file_local_dir).mkdir(parents=True, exist_ok=True)
if S3.head_object(record_path):
res = S3.copy_file_from_s3(
s3_path=record_path, local_path=record_file_local_dir
)
else:
res = None
if res:
print(f"Cache record found, job [{job_name}], digest [{job_digest}]")
self.success[job_name] = True
return Cache.CacheRecord.from_fs(Path(record_file_local_dir) / type_)
return None
if __name__ == "__main__":
# test
c = Cache()
workflow = Workflow.Config(
name="TEST",
event=Workflow.Event.PULL_REQUEST,
jobs=[
Job.Config(
name="JobA",
runs_on=["some"],
command="python -m unittest ./ci/tests/example_1/test_example_produce_artifact.py",
provides=["greet"],
job_requirements=Job.Requirements(
python_requirements_txt="./ci/requirements.txt"
),
digest_config=Job.CacheDigestConfig(
# example: use glob to include files
include_paths=["./ci/tests/example_1/test_example_consume*.py"],
),
),
Job.Config(
name="JobB",
runs_on=["some"],
command="python -m unittest ./ci/tests/example_1/test_example_consume_artifact.py",
requires=["greet"],
job_requirements=Job.Requirements(
python_requirements_txt="./ci/requirements.txt"
),
digest_config=Job.CacheDigestConfig(
# example: use dir to include files recursively
include_paths=["./ci/tests/example_1"],
# example: use glob to exclude files from digest
exclude_paths=[
"./ci/tests/example_1/test_example_consume*",
"./**/*.pyc",
],
),
),
],
artifacts=[Artifact.Config(type="s3", name="greet", path="hello")],
enable_cache=True,
)
for job in workflow.jobs:
print(c.digest.calc_job_digest(job))