ClickHouse/ci/praktika/s3.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

180 lines
6.4 KiB
Python
Raw Normal View History

2024-10-01 19:19:35 +00:00
import dataclasses
import json
from pathlib import Path
from typing import Dict
from praktika._environment import _Environment
from praktika.settings import Settings
from praktika.utils import Shell
2024-10-01 19:19:35 +00:00
class S3:
@dataclasses.dataclass
class Object:
AcceptRanges: str
Expiration: str
LastModified: str
ContentLength: int
ETag: str
ContentType: str
ServerSideEncryption: str
Metadata: Dict
def has_tags(self, tags):
meta = self.Metadata
for k, v in tags.items():
if k not in meta or meta[k] != v:
print(f"tag [{k}={v}] does not match meta [{meta}]")
return False
return True
@classmethod
def clean_s3_directory(cls, s3_path):
assert len(s3_path.split("/")) > 2, "check to not delete too much"
cmd = f"aws s3 rm s3://{s3_path} --recursive"
cls.run_command_with_retries(cmd, retries=1)
return
@classmethod
def copy_file_to_s3(cls, s3_path, local_path, text=False):
assert Path(local_path).exists(), f"Path [{local_path}] does not exist"
assert Path(s3_path), f"Invalid S3 Path [{s3_path}]"
assert Path(
local_path
).is_file(), f"Path [{local_path}] is not file. Only files are supported"
file_name = Path(local_path).name
s3_full_path = s3_path
if not s3_full_path.endswith(file_name):
s3_full_path = f"{s3_path}/{Path(local_path).name}"
cmd = f"aws s3 cp {local_path} s3://{s3_full_path}"
if text:
cmd += " --content-type text/plain"
res = cls.run_command_with_retries(cmd)
if not res:
raise RuntimeError()
2024-10-01 19:19:35 +00:00
bucket = s3_path.split("/")[0]
endpoint = Settings.S3_BUCKET_TO_HTTP_ENDPOINT[bucket]
assert endpoint
return f"https://{s3_full_path}".replace(bucket, endpoint)
@classmethod
def put(cls, s3_path, local_path, text=False, metadata=None, if_none_matched=False):
2024-10-01 19:19:35 +00:00
assert Path(local_path).exists(), f"Path [{local_path}] does not exist"
assert Path(s3_path), f"Invalid S3 Path [{s3_path}]"
assert Path(
local_path
).is_file(), f"Path [{local_path}] is not file. Only files are supported"
s3_full_path = s3_path
if s3_full_path.endswith("/"):
s3_full_path = f"{s3_path}{Path(local_path).name}"
2024-10-01 19:19:35 +00:00
s3_full_path = str(s3_full_path).removeprefix("s3://")
bucket, key = s3_full_path.split("/", maxsplit=1)
command = (
f"aws s3api put-object --bucket {bucket} --key {key} --body {local_path}"
)
if if_none_matched:
command += f' --if-none-match "*"'
2024-10-01 19:19:35 +00:00
if metadata:
for k, v in metadata.items():
command += f" --metadata {k}={v}"
cmd = f"aws s3 cp {local_path} s3://{s3_full_path}"
if text:
cmd += " --content-type text/plain"
res = cls.run_command_with_retries(command)
return res
2024-10-01 19:19:35 +00:00
@classmethod
def run_command_with_retries(cls, command, retries=Settings.MAX_RETRIES_S3):
i = 0
res = False
while not res and i < retries:
i += 1
ret_code, stdout, stderr = Shell.get_res_stdout_stderr(
command, verbose=True
)
if "aws sso login" in stderr:
print("ERROR: aws login expired")
break
elif "does not exist" in stderr:
print("ERROR: requested file does not exist")
break
elif "Unknown options" in stderr:
print("ERROR: Invalid AWS CLI command or CLI client version:")
print(f" | awc error: {stderr}")
break
elif "PreconditionFailed" in stderr:
print("ERROR: AWS API Call Precondition Failed")
print(f" | awc error: {stderr}")
break
2024-10-01 19:19:35 +00:00
if ret_code != 0:
print(
f"ERROR: aws s3 cp failed, stdout/stderr err: [{stderr}], out [{stdout}]"
)
res = ret_code == 0
return res
@classmethod
def copy_file_from_s3(cls, s3_path, local_path):
assert Path(s3_path), f"Invalid S3 Path [{s3_path}]"
if Path(local_path).is_dir():
local_path = Path(local_path) / Path(s3_path).name
else:
assert Path(
local_path
).parent.is_dir(), f"Parent path for [{local_path}] does not exist"
cmd = f"aws s3 cp s3://{s3_path} {local_path}"
res = cls.run_command_with_retries(cmd)
return res
@classmethod
def copy_file_from_s3_matching_pattern(
cls, s3_path, local_path, include, exclude="*"
):
assert Path(s3_path), f"Invalid S3 Path [{s3_path}]"
assert Path(
local_path
).is_dir(), f"Path [{local_path}] does not exist or not a directory"
assert s3_path.endswith("/"), f"s3 path is invalid [{s3_path}]"
cmd = f'aws s3 cp s3://{s3_path} {local_path} --exclude "{exclude}" --include "{include}" --recursive'
res = cls.run_command_with_retries(cmd)
return res
2024-10-01 19:19:35 +00:00
@classmethod
def head_object(cls, s3_path):
s3_path = str(s3_path).removeprefix("s3://")
bucket, key = s3_path.split("/", maxsplit=1)
output = Shell.get_output(
f"aws s3api head-object --bucket {bucket} --key {key}", verbose=True
)
if not output:
return None
else:
return cls.Object(**json.loads(output))
@classmethod
def delete(cls, s3_path):
assert Path(s3_path), f"Invalid S3 Path [{s3_path}]"
return Shell.check(
f"aws s3 rm s3://{s3_path}",
verbose=True,
)
@classmethod
def _upload_file_to_s3(
cls, local_file_path, upload_to_s3: bool, text: bool = False, s3_subprefix=""
) -> str:
if upload_to_s3:
env = _Environment.get()
s3_path = f"{Settings.HTML_S3_PATH}/{env.get_s3_prefix()}"
if s3_subprefix:
s3_subprefix.removeprefix("/").removesuffix("/")
s3_path += f"/{s3_subprefix}"
html_link = S3.copy_file_to_s3(
s3_path=s3_path, local_path=local_file_path, text=text
)
return html_link
return f"file://{Path(local_file_path).absolute()}"