Merge pull request #54504 from ClickHouse/s3-artifacts

S3 artifacts
This commit is contained in:
Mikhail f. Shiryaev 2023-09-15 18:21:57 +02:00 committed by GitHub
commit 4c9f8d150b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 263 additions and 39 deletions

View File

@ -0,0 +1,192 @@
#!/usr/bin/env python
"""Manages artifacts similar to GH actions, but in S3"""
from dataclasses import dataclass
from datetime import datetime
from fnmatch import fnmatch
from os import path as op
from pathlib import Path
from shutil import copy2
from typing import List, Optional, Union
from github.Commit import Commit
from build_download_helper import download_build_with_progress
from commit_status_helper import post_commit_status
from compress_files import SUFFIX, compress_fast, decompress_fast
from env_helper import CI, RUNNER_TEMP, S3_BUILDS_BUCKET
from git_helper import SHA_REGEXP
from report import HEAD_HTML_TEMPLATE, FOOTER_HTML_TEMPLATE
from s3_helper import S3Helper
ARTIFACTS_PATH = Path(RUNNER_TEMP) / "artifacts"
@dataclass
class S3Object:
key: str
last_modified: str
size: int
class ArtifactsHelper:
INDEX = "index.html"
RESTRICTED_SYMBOLS = r"\/':<>|*?\""
def __init__(
self,
s3_helper: S3Helper,
commit: Union[str, Commit],
s3_prefix: str = "artifacts",
):
"""The helper to compress+upload and download+decompress artifacts
If `commit` is github.Commit.Commit instance, the status Artifacts for a
given commit will be updated on an uploading"""
self._commit = commit
assert SHA_REGEXP.match(self.commit)
self.temp_path = ARTIFACTS_PATH
self.temp_path.mkdir(parents=True, exist_ok=True)
self.s3_helper = s3_helper
# The s3 prefix is done with trailing slash!
self._s3_prefix = op.join(s3_prefix, self.commit, "")
self._s3_index_key = f"{self.s3_prefix}{self.INDEX}"
self._s3_index_url = None # type: Optional[str]
@property
def commit(self) -> str:
"""string of the commit SHA"""
if isinstance(self._commit, str):
return self._commit
return self._commit.sha
@property
def s3_prefix(self) -> str:
"""Prefix with the trailing slash"""
return self._s3_prefix
@property
def s3_index_key(self) -> str:
"""Prefix with the trailing slash"""
return self._s3_index_key
@property
def s3_index_url(self) -> str:
if self._s3_index_url is None:
self._s3_index_url = self.s3_helper.get_url(
S3_BUILDS_BUCKET, self.s3_index_key
)
return self._s3_index_url
def upload(self, artifact_name: str, artifact_path: Path) -> None:
"""Creates archive 'artifact_name.tar{compress_files.SUFFIX} with directory of"""
assert not any(s in artifact_name for s in self.RESTRICTED_SYMBOLS)
archive_path = self.temp_path / f"{artifact_name}.tar{SUFFIX}"
s3_artifact_key = f"{self.s3_prefix}{archive_path.name}"
compress_fast(artifact_path, archive_path)
self.s3_helper.upload_build_file_to_s3(archive_path, s3_artifact_key)
self._regenerate_index()
def download(
self,
artifact_name: str,
extract_directory: Path = ARTIFACTS_PATH,
keep_archive: bool = False,
) -> Path:
"""Downloads artifact, if exists, and extracts it. If not, returns False"""
assert not any(s in artifact_name for s in self.RESTRICTED_SYMBOLS)
assert extract_directory.is_dir()
archive_path = self.temp_path / f"{artifact_name}.tar{SUFFIX}"
artifact_path = extract_directory / artifact_name
s3_artifact_key = f"{self.s3_prefix}{archive_path.name}"
url = self.s3_helper.url_if_exists(s3_artifact_key, S3_BUILDS_BUCKET)
if not url:
return artifact_path
if url.startswith("file://"):
copy2(Path(url[7:]), archive_path)
else:
download_build_with_progress(url, archive_path)
artifact_path.mkdir(parents=True, exist_ok=True)
decompress_fast(archive_path, artifact_path)
if not keep_archive:
archive_path.unlink()
return artifact_path
def list_artifacts(self, glob: str = "") -> List[str]:
"""return the list of artifacts existing for a commit"""
def ignore(key: str) -> bool:
if key == self.s3_index_key:
return False
if glob:
return fnmatch(key, glob)
return True
results = filter(
ignore, self.s3_helper.list_prefix(self.s3_prefix, S3_BUILDS_BUCKET)
)
return list(results)
@staticmethod
def post_commit_status(commit: Commit, url: str) -> None:
post_commit_status(
commit, "success", url, "Artifacts for workflow", "Artifacts"
)
def _regenerate_index(self) -> None:
if CI:
files = self._get_s3_objects()
else:
files = self._get_local_s3_objects()
def name(uri: str) -> str:
return Path(uri).name
links = [
f'<tr><td><a href="{f.key}">{name(f.key)}</a></td><td>{f.size}</td>'
f"<td>{f.last_modified}</td></tr>"
for f in files
]
index_path = self.temp_path / self.INDEX
title = f"Artifacts for workflow commit {self.commit}"
index_content = (
HEAD_HTML_TEMPLATE.format(title=title, header=title)
+ "<table><tr><th>Artifact</th><th>Size</th><th>Modified</th></tr>"
+ "\n".join(links)
+ "</table>"
+ FOOTER_HTML_TEMPLATE
)
index_path.write_text(index_content, encoding="utf-8")
url = self.s3_helper.upload_build_file_to_s3(index_path, self.s3_index_key)
if isinstance(self._commit, Commit):
self.post_commit_status(self._commit, url)
def _get_s3_objects(self) -> List[S3Object]:
objects = self.s3_helper.client.list_objects_v2(
Bucket=S3_BUILDS_BUCKET, Prefix=self.s3_prefix
)
files = [] # type: List[S3Object]
if "Contents" in objects:
files = [
S3Object(
obj["Key"][len(self.s3_prefix) :],
obj["LastModified"].isoformat(),
obj["Size"],
)
for obj in objects["Contents"]
]
return files
def _get_local_s3_objects(self) -> List[S3Object]:
files = [
S3Object(
fp.as_uri(),
datetime.fromtimestamp(fp.stat().st_mtime).isoformat(),
fp.stat().st_size,
)
for fp in self.s3_helper.local_path(S3_BUILDS_BUCKET, self.s3_prefix)
.absolute()
.iterdir()
]
return files

View File

@ -7,10 +7,11 @@ from typing import Optional
PIGZ = Path("/usr/bin/pigz")
SUFFIX = ".zst"
def compress_file_fast(path: Path, archive_path: Path) -> None:
if archive_path.suffix == ".zst":
if archive_path.suffix == SUFFIX:
subprocess.check_call(f"zstd < {path} > {archive_path}", shell=True)
elif PIGZ.exists():
subprocess.check_call(f"pigz < {path} > {archive_path}", shell=True)
@ -22,7 +23,7 @@ def compress_fast(
path: Path, archive_path: Path, exclude: Optional[Path] = None
) -> None:
program_part = ""
if archive_path.suffix == ".zst":
if archive_path.suffix == SUFFIX:
logging.info("zstd will be used for compression")
program_part = "--use-compress-program='zstd --threads=0'"
elif PIGZ.exists():
@ -39,6 +40,7 @@ def compress_fast(
else:
exclude_part = f"--exclude {exclude}"
archive_path.parent.mkdir(parents=True, exist_ok=True)
fname = path.name
cmd = (
@ -50,7 +52,7 @@ def compress_fast(
def decompress_fast(archive_path: Path, result_path: Optional[Path] = None) -> None:
program_part = ""
if archive_path.suffix == ".zst":
if archive_path.suffix == SUFFIX:
logging.info(
"zstd will be used for decompression ('%s' -> '%s')",
archive_path,
@ -75,6 +77,7 @@ def decompress_fast(archive_path: Path, result_path: Optional[Path] = None) -> N
if result_path is None:
subprocess.check_call(f"tar {program_part} -xf {archive_path}", shell=True)
else:
result_path.mkdir(parents=True, exist_ok=True)
subprocess.check_call(
f"tar {program_part} -xf {archive_path} -C {result_path}",
shell=True,

View File

@ -149,7 +149,7 @@ def gen_versions(
pr_commit_version = str(pr_info.number) + "-" + pr_info.sha
# The order is important, PR number is used as cache during the build
versions = [str(pr_info.number), pr_commit_version]
result_version = pr_commit_version
result_version = pr_commit_version # type: Union[str, List[str]]
if pr_info.number == 0 and pr_info.base_ref == "master":
# First get the latest for cache
versions.insert(0, "latest")

View File

@ -272,16 +272,17 @@ def main():
if test_result != "OK":
status = "failure"
enriched_images = changed_images.copy()
try:
# changed_images now contains all the images that are changed in this PR. Let's find the latest tag for the images that are not changed.
enrich_images(changed_images)
enrich_images(enriched_images)
except CHException as ex:
logging.warning("Couldn't get proper tags for not changed images: %s", ex)
with open(
os.path.join(args.path, "changed_images.json"), "w", encoding="utf-8"
) as ci:
json.dump(changed_images, ci)
json.dump(enriched_images, ci)
pr_info = PRInfo()
s3_helper = S3Helper()

View File

@ -14,7 +14,7 @@ RELEASE_BRANCH_REGEXP = r"\A\d+[.]\d+\Z"
TAG_REGEXP = (
r"\Av\d{2}[.][1-9]\d*[.][1-9]\d*[.][1-9]\d*-(testing|prestable|stable|lts)\Z"
)
SHA_REGEXP = r"\A([0-9]|[a-f]){40}\Z"
SHA_REGEXP = re.compile(r"\A([0-9]|[a-f]){40}\Z")
CWD = p.dirname(p.realpath(__file__))
TWEAK = 1
@ -34,8 +34,7 @@ def removesuffix(string: str, suffix: str) -> str:
def commit(name: str) -> str:
r = re.compile(SHA_REGEXP)
if not r.match(name):
if not SHA_REGEXP.match(name):
raise argparse.ArgumentTypeError(
"commit hash should contain exactly 40 hex characters"
)
@ -52,8 +51,11 @@ def release_branch(name: str) -> str:
class Runner:
"""lightweight check_output wrapper with stripping last NEW_LINE"""
def __init__(self, cwd: str = CWD):
def __init__(self, cwd: str = CWD, set_cwd_to_git_root: bool = False):
self._cwd = cwd
# delayed set cwd to the repo's root, to not do it at the import stage
self._git_root = None # type: Optional[str]
self._set_cwd_to_git_root = set_cwd_to_git_root
def run(self, cmd: str, cwd: Optional[str] = None, **kwargs: Any) -> str:
if cwd is None:
@ -68,6 +70,12 @@ class Runner:
@property
def cwd(self) -> str:
if self._set_cwd_to_git_root:
if self._git_root is None:
self._git_root = p.realpath(
p.join(self._cwd, self.run("git rev-parse --show-cdup", self._cwd))
)
return self._git_root
return self._cwd
@cwd.setter
@ -81,11 +89,7 @@ class Runner:
return self.run(*args, **kwargs)
git_runner = Runner()
# Set cwd to abs path of git root
git_runner.cwd = p.relpath(
p.join(git_runner.cwd, git_runner.run("git rev-parse --show-cdup"))
)
git_runner = Runner(set_cwd_to_git_root=True)
def is_shallow() -> bool:

View File

@ -94,7 +94,7 @@ class PRInfo:
self.event = github_event
self.changed_files = set() # type: Set[str]
self.body = ""
self.diff_urls = []
self.diff_urls = [] # type: List[str]
# release_pr and merged_pr are used for docker images additional cache
self.release_pr = 0
self.merged_pr = 0
@ -104,7 +104,7 @@ class PRInfo:
# workflow completed event, used for PRs only
if "action" in github_event and github_event["action"] == "completed":
self.sha = github_event["workflow_run"]["head_sha"]
self.sha = github_event["workflow_run"]["head_sha"] # type: str
prs_for_sha = get_gh_api(
f"https://api.github.com/repos/{GITHUB_REPOSITORY}/commits/{self.sha}"
"/pulls",
@ -114,7 +114,7 @@ class PRInfo:
github_event["pull_request"] = prs_for_sha[0]
if "pull_request" in github_event: # pull request and other similar events
self.number = github_event["pull_request"]["number"]
self.number = github_event["pull_request"]["number"] # type: int
if pr_event_from_api:
try:
response = get_gh_api(
@ -144,20 +144,24 @@ class PRInfo:
self.pr_html_url = f"{repo_prefix}/pull/{self.number}"
# master or backport/xx.x/xxxxx - where the PR will be merged
self.base_ref = github_event["pull_request"]["base"]["ref"]
self.base_ref = github_event["pull_request"]["base"]["ref"] # type: str
# ClickHouse/ClickHouse
self.base_name = github_event["pull_request"]["base"]["repo"]["full_name"]
self.base_name = github_event["pull_request"]["base"]["repo"][
"full_name"
] # type: str
# any_branch-name - the name of working branch name
self.head_ref = github_event["pull_request"]["head"]["ref"]
self.head_ref = github_event["pull_request"]["head"]["ref"] # type: str
# UserName/ClickHouse or ClickHouse/ClickHouse
self.head_name = github_event["pull_request"]["head"]["repo"]["full_name"]
self.head_name = github_event["pull_request"]["head"]["repo"][
"full_name"
] # type: str
self.body = github_event["pull_request"]["body"]
self.labels = {
label["name"] for label in github_event["pull_request"]["labels"]
} # type: Set[str]
self.user_login = github_event["pull_request"]["user"]["login"]
self.user_orgs = set([])
self.user_login = github_event["pull_request"]["user"]["login"] # type: str
self.user_orgs = set() # type: Set[str]
if need_orgs:
user_orgs_response = get_gh_api(
github_event["pull_request"]["user"]["organizations_url"],
@ -170,7 +174,7 @@ class PRInfo:
self.diff_urls.append(github_event["pull_request"]["diff_url"])
elif "commits" in github_event:
# `head_commit` always comes with `commits`
commit_message = github_event["head_commit"]["message"]
commit_message = github_event["head_commit"]["message"] # type: str
if commit_message.startswith("Merge pull request #"):
merged_pr = commit_message.split(maxsplit=4)[3]
try:
@ -234,7 +238,9 @@ class PRInfo:
else:
print("event.json does not match pull_request or push:")
print(json.dumps(github_event, sort_keys=True, indent=4))
self.sha = os.getenv("GITHUB_SHA")
self.sha = os.getenv(
"GITHUB_SHA", "0000000000000000000000000000000000000000"
)
self.number = 0
self.labels = set()
repo_prefix = f"{GITHUB_SERVER_URL}/{GITHUB_REPOSITORY}"

View File

@ -50,7 +50,6 @@ class S3Helper:
self.session = boto3.session.Session(region_name="us-east-1")
self.client = self.session.client("s3", endpoint_url=S3_URL, config=config)
self.host = S3_URL
self.download_host = S3_DOWNLOAD
def _upload_file_to_s3(
self, bucket_name: str, file_path: Path, s3_path: str
@ -113,10 +112,7 @@ class S3Helper:
logging.info("File is too large, do not provide content type")
self.client.upload_file(file_path, bucket_name, s3_path, ExtraArgs=metadata)
# last two replacements are specifics of AWS urls:
# https://jamesd3142.wordpress.com/2018/02/28/amazon-s3-and-the-plus-symbol/
url = f"{self.download_host}/{bucket_name}/{s3_path}"
url = url.replace("+", "%2B").replace(" ", "%20")
url = self.s3_url(bucket_name, s3_path)
logging.info("Upload %s to %s. Meta: %s", file_path, url, metadata)
return url
@ -183,7 +179,7 @@ class S3Helper:
t = time.time()
except Exception as ex:
logging.critical("Failed to upload file, expcetion %s", ex)
return f"{self.download_host}/{bucket_name}/{s3_path}"
return self.s3_url(bucket_name, s3_path)
p = Pool(self.max_pool_size)
@ -296,21 +292,43 @@ class S3Helper:
return result
def exists(self, key: str, bucket: str = S3_BUILDS_BUCKET) -> bool:
def url_if_exists(self, key: str, bucket: str = S3_BUILDS_BUCKET) -> str:
if not CI:
local_path = self.local_path(bucket, key)
if local_path.exists():
return local_path.as_uri()
return ""
try:
self.client.head_object(Bucket=bucket, Key=key)
return True
return self.s3_url(bucket, key)
except Exception:
return False
return ""
@staticmethod
def get_url(bucket: str, key: str) -> str:
if CI:
return S3Helper.s3_url(bucket, key)
return S3Helper.local_path(bucket, key).as_uri()
@staticmethod
def s3_url(bucket: str, key: str) -> str:
url = f"{S3_DOWNLOAD}/{bucket}/{key}"
# last two replacements are specifics of AWS urls:
# https://jamesd3142.wordpress.com/2018/02/28/amazon-s3-and-the-plus-symbol/
url = url.replace("+", "%2B").replace(" ", "%20")
return url
@staticmethod
def local_path(bucket: str, key: str) -> Path:
return (Path(RUNNER_TEMP) / "s3" / bucket / key).absolute()
@staticmethod
def copy_file_to_local(bucket_name: str, file_path: Path, s3_path: str) -> str:
local_path = (
Path(RUNNER_TEMP) / "s3" / os.path.join(bucket_name, s3_path)
).absolute()
local_path = S3Helper.local_path(bucket_name, s3_path)
local_dir = local_path.parent
local_dir.mkdir(parents=True, exist_ok=True)
shutil.copy(file_path, local_path)
logging.info("Copied %s to %s", file_path, local_path)
return f"file://{local_path}"
return local_path.as_uri()