mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-28 10:31:57 +00:00
296 lines
11 KiB
Python
296 lines
11 KiB
Python
import dataclasses
|
|
import json
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Dict
|
|
|
|
from praktika._environment import _Environment
|
|
from praktika.settings import Settings
|
|
from praktika.utils import Shell, Utils
|
|
|
|
|
|
class S3:
|
|
@dataclasses.dataclass
|
|
class Object:
|
|
AcceptRanges: str
|
|
Expiration: str
|
|
LastModified: str
|
|
ContentLength: int
|
|
ETag: str
|
|
ContentType: str
|
|
ServerSideEncryption: str
|
|
Metadata: Dict
|
|
|
|
def has_tags(self, tags):
|
|
meta = self.Metadata
|
|
for k, v in tags.items():
|
|
if k not in meta or meta[k] != v:
|
|
print(f"tag [{k}={v}] does not match meta [{meta}]")
|
|
return False
|
|
return True
|
|
|
|
@classmethod
|
|
def clean_s3_directory(cls, s3_path):
|
|
assert len(s3_path.split("/")) > 2, "check to not delete too much"
|
|
cmd = f"aws s3 rm s3://{s3_path} --recursive"
|
|
cls.run_command_with_retries(cmd, retries=1)
|
|
return
|
|
|
|
@classmethod
|
|
def copy_file_to_s3(cls, s3_path, local_path, text=False):
|
|
assert Path(local_path).exists(), f"Path [{local_path}] does not exist"
|
|
assert Path(s3_path), f"Invalid S3 Path [{s3_path}]"
|
|
assert Path(
|
|
local_path
|
|
).is_file(), f"Path [{local_path}] is not file. Only files are supported"
|
|
file_name = Path(local_path).name
|
|
s3_full_path = s3_path
|
|
if not s3_full_path.endswith(file_name):
|
|
s3_full_path = f"{s3_path}/{Path(local_path).name}"
|
|
cmd = f"aws s3 cp {local_path} s3://{s3_full_path}"
|
|
if text:
|
|
cmd += " --content-type text/plain"
|
|
res = cls.run_command_with_retries(cmd)
|
|
if not res:
|
|
raise
|
|
bucket = s3_path.split("/")[0]
|
|
endpoint = Settings.S3_BUCKET_TO_HTTP_ENDPOINT[bucket]
|
|
assert endpoint
|
|
return f"https://{s3_full_path}".replace(bucket, endpoint)
|
|
|
|
@classmethod
|
|
def put(cls, s3_path, local_path, text=False, metadata=None):
|
|
assert Path(local_path).exists(), f"Path [{local_path}] does not exist"
|
|
assert Path(s3_path), f"Invalid S3 Path [{s3_path}]"
|
|
assert Path(
|
|
local_path
|
|
).is_file(), f"Path [{local_path}] is not file. Only files are supported"
|
|
file_name = Path(local_path).name
|
|
s3_full_path = s3_path
|
|
if not s3_full_path.endswith(file_name):
|
|
s3_full_path = f"{s3_path}/{Path(local_path).name}"
|
|
|
|
s3_full_path = str(s3_full_path).removeprefix("s3://")
|
|
bucket, key = s3_full_path.split("/", maxsplit=1)
|
|
|
|
command = (
|
|
f"aws s3api put-object --bucket {bucket} --key {key} --body {local_path}"
|
|
)
|
|
if metadata:
|
|
for k, v in metadata.items():
|
|
command += f" --metadata {k}={v}"
|
|
|
|
cmd = f"aws s3 cp {local_path} s3://{s3_full_path}"
|
|
if text:
|
|
cmd += " --content-type text/plain"
|
|
res = cls.run_command_with_retries(command)
|
|
assert res
|
|
|
|
@classmethod
|
|
def run_command_with_retries(cls, command, retries=Settings.MAX_RETRIES_S3):
|
|
i = 0
|
|
res = False
|
|
while not res and i < retries:
|
|
i += 1
|
|
ret_code, stdout, stderr = Shell.get_res_stdout_stderr(
|
|
command, verbose=True
|
|
)
|
|
if "aws sso login" in stderr:
|
|
print("ERROR: aws login expired")
|
|
break
|
|
elif "does not exist" in stderr:
|
|
print("ERROR: requested file does not exist")
|
|
break
|
|
if ret_code != 0:
|
|
print(
|
|
f"ERROR: aws s3 cp failed, stdout/stderr err: [{stderr}], out [{stdout}]"
|
|
)
|
|
res = ret_code == 0
|
|
return res
|
|
|
|
@classmethod
|
|
def get_link(cls, s3_path, local_path):
|
|
s3_full_path = f"{s3_path}/{Path(local_path).name}"
|
|
bucket = s3_path.split("/")[0]
|
|
endpoint = Settings.S3_BUCKET_TO_HTTP_ENDPOINT[bucket]
|
|
return f"https://{s3_full_path}".replace(bucket, endpoint)
|
|
|
|
@classmethod
|
|
def copy_file_from_s3(cls, s3_path, local_path):
|
|
assert Path(s3_path), f"Invalid S3 Path [{s3_path}]"
|
|
if Path(local_path).is_dir():
|
|
local_path = Path(local_path) / Path(s3_path).name
|
|
else:
|
|
assert Path(
|
|
local_path
|
|
).parent.is_dir(), f"Parent path for [{local_path}] does not exist"
|
|
cmd = f"aws s3 cp s3://{s3_path} {local_path}"
|
|
res = cls.run_command_with_retries(cmd)
|
|
return res
|
|
|
|
@classmethod
|
|
def head_object(cls, s3_path):
|
|
s3_path = str(s3_path).removeprefix("s3://")
|
|
bucket, key = s3_path.split("/", maxsplit=1)
|
|
output = Shell.get_output(
|
|
f"aws s3api head-object --bucket {bucket} --key {key}", verbose=True
|
|
)
|
|
if not output:
|
|
return None
|
|
else:
|
|
return cls.Object(**json.loads(output))
|
|
|
|
@classmethod
|
|
def delete(cls, s3_path):
|
|
assert Path(s3_path), f"Invalid S3 Path [{s3_path}]"
|
|
return Shell.check(
|
|
f"aws s3 rm s3://{s3_path}",
|
|
verbose=True,
|
|
)
|
|
|
|
# TODO: apparently should be placed into separate file to be used only inside praktika
|
|
# keeping this module clean from importing Settings, Environment and etc, making it easy for use externally
|
|
@classmethod
|
|
def copy_result_to_s3(cls, result, unlock=True):
|
|
result.dump()
|
|
env = _Environment.get()
|
|
s3_path = f"{Settings.HTML_S3_PATH}/{env.get_s3_prefix()}"
|
|
s3_path_full = f"{s3_path}/{Path(result.file_name()).name}"
|
|
url = S3.copy_file_to_s3(s3_path=s3_path, local_path=result.file_name())
|
|
if env.PR_NUMBER:
|
|
print("Duplicate Result for latest commit alias in PR")
|
|
s3_path = f"{Settings.HTML_S3_PATH}/{env.get_s3_prefix(latest=True)}"
|
|
url = S3.copy_file_to_s3(s3_path=s3_path, local_path=result.file_name())
|
|
if unlock:
|
|
if not cls.unlock(s3_path_full):
|
|
print(f"ERROR: File [{s3_path_full}] unlock failure")
|
|
assert False # TODO: investigate
|
|
return url
|
|
|
|
@classmethod
|
|
def copy_result_from_s3(cls, local_path, lock=True):
|
|
env = _Environment.get()
|
|
file_name = Path(local_path).name
|
|
s3_path = f"{Settings.HTML_S3_PATH}/{env.get_s3_prefix()}/{file_name}"
|
|
if lock:
|
|
cls.lock(s3_path)
|
|
if not S3.copy_file_from_s3(s3_path=s3_path, local_path=local_path):
|
|
print(f"ERROR: failed to cp file [{s3_path}] from s3")
|
|
raise
|
|
|
|
@classmethod
|
|
def lock(cls, s3_path, level=0):
|
|
assert level < 3, "Never"
|
|
env = _Environment.get()
|
|
s3_path_lock = s3_path + f".lock"
|
|
file_path_lock = f"{Settings.TEMP_DIR}/{Path(s3_path_lock).name}"
|
|
assert Shell.check(
|
|
f"echo '''{env.JOB_NAME}''' > {file_path_lock}", verbose=True
|
|
), "Never"
|
|
|
|
i = 20
|
|
meta = S3.head_object(s3_path_lock)
|
|
while meta:
|
|
print(f"WARNING: Failed to acquire lock, meta [{meta}] - wait")
|
|
i -= 5
|
|
if i < 0:
|
|
info = f"ERROR: lock acquire failure - unlock forcefully"
|
|
print(info)
|
|
env.add_info(info)
|
|
break
|
|
time.sleep(5)
|
|
|
|
metadata = {"job": Utils.to_base64(env.JOB_NAME)}
|
|
S3.put(
|
|
s3_path=s3_path_lock,
|
|
local_path=file_path_lock,
|
|
metadata=metadata,
|
|
)
|
|
time.sleep(1)
|
|
obj = S3.head_object(s3_path_lock)
|
|
if not obj or not obj.has_tags(tags=metadata):
|
|
print(f"WARNING: locked by another job [{obj}]")
|
|
env.add_info("S3 lock file failure")
|
|
cls.lock(s3_path, level=level + 1)
|
|
print("INFO: lock acquired")
|
|
|
|
@classmethod
|
|
def unlock(cls, s3_path):
|
|
s3_path_lock = s3_path + ".lock"
|
|
env = _Environment.get()
|
|
obj = S3.head_object(s3_path_lock)
|
|
if not obj:
|
|
print("ERROR: lock file is removed")
|
|
assert False # investigate
|
|
elif not obj.has_tags({"job": Utils.to_base64(env.JOB_NAME)}):
|
|
print("ERROR: lock file was acquired by another job")
|
|
assert False # investigate
|
|
|
|
if not S3.delete(s3_path_lock):
|
|
print(f"ERROR: File [{s3_path_lock}] delete failure")
|
|
print("INFO: lock released")
|
|
return True
|
|
|
|
@classmethod
|
|
def get_result_link(cls, result):
|
|
env = _Environment.get()
|
|
s3_path = f"{Settings.HTML_S3_PATH}/{env.get_s3_prefix(latest=True if env.PR_NUMBER else False)}"
|
|
return S3.get_link(s3_path=s3_path, local_path=result.file_name())
|
|
|
|
@classmethod
|
|
def clean_latest_result(cls):
|
|
env = _Environment.get()
|
|
env.SHA = "latest"
|
|
assert env.PR_NUMBER
|
|
s3_path = f"{Settings.HTML_S3_PATH}/{env.get_s3_prefix()}"
|
|
S3.clean_s3_directory(s3_path=s3_path)
|
|
|
|
@classmethod
|
|
def _upload_file_to_s3(
|
|
cls, local_file_path, upload_to_s3: bool, text: bool = False, s3_subprefix=""
|
|
) -> str:
|
|
if upload_to_s3:
|
|
env = _Environment.get()
|
|
s3_path = f"{Settings.HTML_S3_PATH}/{env.get_s3_prefix()}"
|
|
if s3_subprefix:
|
|
s3_subprefix.removeprefix("/").removesuffix("/")
|
|
s3_path += f"/{s3_subprefix}"
|
|
html_link = S3.copy_file_to_s3(
|
|
s3_path=s3_path, local_path=local_file_path, text=text
|
|
)
|
|
return html_link
|
|
return f"file://{Path(local_file_path).absolute()}"
|
|
|
|
@classmethod
|
|
def upload_result_files_to_s3(cls, result):
|
|
if result.results:
|
|
for result_ in result.results:
|
|
cls.upload_result_files_to_s3(result_)
|
|
for file in result.files:
|
|
if not Path(file).is_file():
|
|
print(f"ERROR: Invalid file [{file}] in [{result.name}] - skip upload")
|
|
result.info += f"\nWARNING: Result file [{file}] was not found"
|
|
file_link = cls._upload_file_to_s3(file, upload_to_s3=False)
|
|
else:
|
|
is_text = False
|
|
for text_file_suffix in Settings.TEXT_CONTENT_EXTENSIONS:
|
|
if file.endswith(text_file_suffix):
|
|
print(
|
|
f"File [{file}] matches Settings.TEXT_CONTENT_EXTENSIONS [{Settings.TEXT_CONTENT_EXTENSIONS}] - add text attribute for s3 object"
|
|
)
|
|
is_text = True
|
|
break
|
|
file_link = cls._upload_file_to_s3(
|
|
file,
|
|
upload_to_s3=True,
|
|
text=is_text,
|
|
s3_subprefix=Utils.normalize_string(result.name),
|
|
)
|
|
result.links.append(file_link)
|
|
if result.files:
|
|
print(
|
|
f"Result files [{result.files}] uploaded to s3 [{result.links[-len(result.files):]}] - clean files list"
|
|
)
|
|
result.files = []
|
|
result.dump()
|