mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 01:51:59 +00:00
572 lines
20 KiB
Python
572 lines
20 KiB
Python
import dataclasses
|
|
import datetime
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Union
|
|
|
|
from praktika._environment import _Environment
|
|
from praktika.cache import Cache
|
|
from praktika.s3 import S3
|
|
from praktika.settings import Settings
|
|
from praktika.utils import ContextManager, MetaClasses, Shell, Utils
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class Result(MetaClasses.Serializable):
|
|
"""
|
|
Represents the outcome of a workflow/job/task or any operation, along with associated metadata.
|
|
|
|
This class supports nesting of results to represent tasks with sub-tasks, and includes
|
|
various attributes to track status, timing, files, and links.
|
|
|
|
Attributes:
|
|
name (str): The name of the task.
|
|
status (str): The current status of the task. Should be one of the values defined in the Status class.
|
|
start_time (Optional[float]): The start time of the task in Unix timestamp format. None if not started.
|
|
duration (Optional[float]): The duration of the task in seconds. None if not completed.
|
|
results (List[Result]): A list of sub-results representing nested tasks.
|
|
files (List[str]): A list of file paths or names related to the result.
|
|
links (List[str]): A list of URLs related to the result (e.g., links to reports or resources).
|
|
info (str): Additional information about the result. Free-form text.
|
|
|
|
Inner Class:
|
|
Status: Defines possible statuses for the task, such as "success", "failure", etc.
|
|
"""
|
|
|
|
class Status:
|
|
SKIPPED = "skipped"
|
|
SUCCESS = "success"
|
|
FAILED = "failure"
|
|
PENDING = "pending"
|
|
RUNNING = "running"
|
|
ERROR = "error"
|
|
|
|
name: str
|
|
status: str
|
|
start_time: Optional[float] = None
|
|
duration: Optional[float] = None
|
|
results: List["Result"] = dataclasses.field(default_factory=list)
|
|
files: List[str] = dataclasses.field(default_factory=list)
|
|
links: List[str] = dataclasses.field(default_factory=list)
|
|
info: str = ""
|
|
|
|
@staticmethod
|
|
def create_from(
|
|
name="",
|
|
results: List["Result"] = None,
|
|
stopwatch: Utils.Stopwatch = None,
|
|
status="",
|
|
files=None,
|
|
info: Union[List[str], str] = "",
|
|
with_info_from_results=True,
|
|
):
|
|
if isinstance(status, bool):
|
|
status = Result.Status.SUCCESS if status else Result.Status.FAILED
|
|
if not results and not status:
|
|
Utils.raise_with_error(
|
|
f"Either .results ({results}) or .status ({status}) must be provided"
|
|
)
|
|
if not name:
|
|
name = _Environment.get().JOB_NAME
|
|
if not name:
|
|
print("ERROR: Failed to guess the .name")
|
|
raise
|
|
result_status = status or Result.Status.SUCCESS
|
|
infos = []
|
|
if info:
|
|
if isinstance(info, str):
|
|
infos += [info]
|
|
else:
|
|
infos += info
|
|
if results and not status:
|
|
for result in results:
|
|
if result.status not in (Result.Status.SUCCESS, Result.Status.FAILED):
|
|
Utils.raise_with_error(
|
|
f"Unexpected result status [{result.status}] for Result.create_from call"
|
|
)
|
|
if result.status != Result.Status.SUCCESS:
|
|
result_status = Result.Status.FAILED
|
|
if results:
|
|
for result in results:
|
|
if result.info and with_info_from_results:
|
|
infos.append(f"{result.name}: {result.info}")
|
|
return Result(
|
|
name=name,
|
|
status=result_status,
|
|
start_time=stopwatch.start_time if stopwatch else None,
|
|
duration=stopwatch.duration if stopwatch else None,
|
|
info="\n".join(infos) if infos else "",
|
|
results=results or [],
|
|
files=files or [],
|
|
)
|
|
|
|
@staticmethod
|
|
def get():
|
|
return Result.from_fs(_Environment.get().JOB_NAME)
|
|
|
|
def is_completed(self):
|
|
return self.status not in (Result.Status.PENDING, Result.Status.RUNNING)
|
|
|
|
def is_running(self):
|
|
return self.status in (Result.Status.RUNNING,)
|
|
|
|
def is_ok(self):
|
|
return self.status in (Result.Status.SKIPPED, Result.Status.SUCCESS)
|
|
|
|
def set_status(self, status) -> "Result":
|
|
self.status = status
|
|
self.dump()
|
|
return self
|
|
|
|
def set_success(self) -> "Result":
|
|
return self.set_status(Result.Status.SUCCESS)
|
|
|
|
def set_results(self, results: List["Result"]) -> "Result":
|
|
self.results = results
|
|
self.dump()
|
|
return self
|
|
|
|
def set_files(self, files) -> "Result":
|
|
for file in files:
|
|
assert Path(
|
|
file
|
|
).is_file(), f"Not valid file [{file}] from file list [{files}]"
|
|
if not self.files:
|
|
self.files = []
|
|
self.files += files
|
|
self.dump()
|
|
return self
|
|
|
|
def set_info(self, info: str) -> "Result":
|
|
if self.info:
|
|
self.info += "\n"
|
|
self.info += info
|
|
self.dump()
|
|
return self
|
|
|
|
def set_link(self, link) -> "Result":
|
|
self.links.append(link)
|
|
self.dump()
|
|
return self
|
|
|
|
@classmethod
|
|
def file_name_static(cls, name):
|
|
return f"{Settings.TEMP_DIR}/result_{Utils.normalize_string(name)}.json"
|
|
|
|
@classmethod
|
|
def from_dict(cls, obj: Dict[str, Any]) -> "Result":
|
|
sub_results = []
|
|
for result_dict in obj["results"] or []:
|
|
sub_res = cls.from_dict(result_dict)
|
|
sub_results.append(sub_res)
|
|
obj["results"] = sub_results
|
|
return Result(**obj)
|
|
|
|
def update_duration(self):
|
|
if not self.duration and self.start_time:
|
|
self.duration = datetime.datetime.utcnow().timestamp() - self.start_time
|
|
else:
|
|
if not self.duration:
|
|
print(
|
|
f"NOTE: duration is set for job [{self.name}] Result - do not update by CI"
|
|
)
|
|
else:
|
|
print(
|
|
f"NOTE: start_time is not set for job [{self.name}] Result - do not update duration"
|
|
)
|
|
return self
|
|
|
|
def set_timing(self, stopwatch: Utils.Stopwatch):
|
|
self.start_time = stopwatch.start_time
|
|
self.duration = stopwatch.duration
|
|
return self
|
|
|
|
def update_sub_result(self, result: "Result"):
|
|
assert self.results, "BUG?"
|
|
for i, result_ in enumerate(self.results):
|
|
if result_.name == result.name:
|
|
self.results[i] = result
|
|
self._update_status()
|
|
return self
|
|
|
|
def _update_status(self):
|
|
was_pending = False
|
|
was_running = False
|
|
if self.status == self.Status.PENDING:
|
|
was_pending = True
|
|
if self.status == self.Status.RUNNING:
|
|
was_running = True
|
|
|
|
has_pending, has_running, has_failed = False, False, False
|
|
for result_ in self.results:
|
|
if result_.status in (self.Status.RUNNING,):
|
|
has_running = True
|
|
if result_.status in (self.Status.PENDING,):
|
|
has_pending = True
|
|
if result_.status in (self.Status.ERROR, self.Status.FAILED):
|
|
has_failed = True
|
|
if has_running:
|
|
self.status = self.Status.RUNNING
|
|
elif has_pending:
|
|
self.status = self.Status.PENDING
|
|
elif has_failed:
|
|
self.status = self.Status.FAILED
|
|
else:
|
|
self.status = self.Status.SUCCESS
|
|
if (was_pending or was_running) and self.status not in (
|
|
self.Status.PENDING,
|
|
self.Status.RUNNING,
|
|
):
|
|
print("Pipeline finished")
|
|
self.update_duration()
|
|
|
|
@classmethod
|
|
def generate_pending(cls, name, results=None):
|
|
return Result(
|
|
name=name,
|
|
status=Result.Status.PENDING,
|
|
start_time=None,
|
|
duration=None,
|
|
results=results or [],
|
|
files=[],
|
|
links=[],
|
|
info="",
|
|
)
|
|
|
|
@classmethod
|
|
def generate_skipped(cls, name, cache_record: Cache.CacheRecord, results=None):
|
|
return Result(
|
|
name=name,
|
|
status=Result.Status.SKIPPED,
|
|
start_time=None,
|
|
duration=None,
|
|
results=results or [],
|
|
files=[],
|
|
links=[],
|
|
info=f"from cache: sha [{cache_record.sha}], pr/branch [{cache_record.pr_number or cache_record.branch}]",
|
|
)
|
|
|
|
@classmethod
|
|
def create_from_command_execution(
|
|
cls,
|
|
name,
|
|
command,
|
|
with_log=False,
|
|
fail_fast=True,
|
|
workdir=None,
|
|
command_args=None,
|
|
command_kwargs=None,
|
|
):
|
|
"""
|
|
Executes shell commands or Python callables, optionally logging output, and handles errors.
|
|
|
|
:param name: Check name
|
|
:param command: Shell command (str) or Python callable, or list of them.
|
|
:param workdir: Optional working directory.
|
|
:param with_log: Boolean flag to log output to a file.
|
|
:param fail_fast: Boolean flag to stop execution if one command fails.
|
|
:param command_args: Positional arguments for the callable command.
|
|
:param command_kwargs: Keyword arguments for the callable command.
|
|
:return: Result object with status and optional log file.
|
|
"""
|
|
|
|
# Stopwatch to track execution time
|
|
stop_watch_ = Utils.Stopwatch()
|
|
command_args = command_args or []
|
|
command_kwargs = command_kwargs or {}
|
|
|
|
# Set log file path if logging is enabled
|
|
log_file = (
|
|
f"{Settings.TEMP_DIR}/{Utils.normalize_string(name)}.log"
|
|
if with_log
|
|
else None
|
|
)
|
|
|
|
# Ensure the command is a list for consistent iteration
|
|
if not isinstance(command, list):
|
|
fail_fast = False
|
|
command = [command]
|
|
|
|
print(f"> Starting execution for [{name}]")
|
|
res = True # Track success/failure status
|
|
error_infos = []
|
|
for command_ in command:
|
|
if callable(command_):
|
|
# If command is a Python function, call it with provided arguments
|
|
result = command_(*command_args, **command_kwargs)
|
|
if isinstance(result, bool):
|
|
res = result
|
|
elif result:
|
|
error_infos.append(str(result))
|
|
res = False
|
|
else:
|
|
# Run shell command in a specified directory with logging and verbosity
|
|
with ContextManager.cd(workdir):
|
|
exit_code = Shell.run(command_, verbose=True, log_file=log_file)
|
|
res = exit_code == 0
|
|
|
|
# If fail_fast is enabled, stop on first failure
|
|
if not res and fail_fast:
|
|
print(f"Execution stopped due to failure in [{command_}]")
|
|
break
|
|
|
|
# Create and return the result object with status and log file (if any)
|
|
return Result.create_from(
|
|
name=name,
|
|
status=res,
|
|
stopwatch=stop_watch_,
|
|
info=error_infos,
|
|
files=[log_file] if log_file else None,
|
|
)
|
|
|
|
def complete_job(self):
|
|
self.dump()
|
|
if not self.is_ok():
|
|
print("ERROR: Job Failed")
|
|
print(self.to_stdout_formatted())
|
|
sys.exit(1)
|
|
else:
|
|
print("ok")
|
|
|
|
def to_stdout_formatted(self, indent="", res=""):
|
|
if self.is_ok():
|
|
return res
|
|
|
|
res += f"{indent}Task [{self.name}] failed.\n"
|
|
fail_info = ""
|
|
sub_indent = indent + " "
|
|
|
|
if not self.results:
|
|
if not self.is_ok():
|
|
fail_info += f"{sub_indent}{self.name}:\n"
|
|
for line in self.info.splitlines():
|
|
fail_info += f"{sub_indent}{sub_indent}{line}\n"
|
|
return res + fail_info
|
|
|
|
for sub_result in self.results:
|
|
res = sub_result.to_stdout_formatted(sub_indent, res)
|
|
|
|
return res
|
|
|
|
|
|
class ResultInfo:
|
|
SETUP_ENV_JOB_FAILED = (
|
|
"Failed to set up job env, it's praktika bug or misconfiguration"
|
|
)
|
|
PRE_JOB_FAILED = (
|
|
"Failed to do a job pre-run step, it's praktika bug or misconfiguration"
|
|
)
|
|
KILLED = "Job killed or terminated, no Result provided"
|
|
NOT_FOUND_IMPOSSIBLE = (
|
|
"No Result file (bug, or job misbehaviour, must not ever happen)"
|
|
)
|
|
SKIPPED_DUE_TO_PREVIOUS_FAILURE = "Skipped due to previous failure"
|
|
TIMEOUT = "Timeout"
|
|
|
|
GH_STATUS_ERROR = "Failed to set GH commit status"
|
|
|
|
NOT_FINALIZED = (
|
|
"Job did not not provide Result: job script bug, died CI runner or praktika bug"
|
|
)
|
|
|
|
S3_ERROR = "S3 call failure"
|
|
|
|
|
|
class _ResultS3:
|
|
|
|
@classmethod
|
|
def copy_result_to_s3(cls, result, unlock=False):
|
|
result.dump()
|
|
env = _Environment.get()
|
|
s3_path = f"{Settings.HTML_S3_PATH}/{env.get_s3_prefix()}"
|
|
s3_path_full = f"{s3_path}/{Path(result.file_name()).name}"
|
|
url = S3.copy_file_to_s3(s3_path=s3_path, local_path=result.file_name())
|
|
# if unlock:
|
|
# if not cls.unlock(s3_path_full):
|
|
# print(f"ERROR: File [{s3_path_full}] unlock failure")
|
|
# assert False # TODO: investigate
|
|
return url
|
|
|
|
@classmethod
|
|
def copy_result_from_s3(cls, local_path, lock=False):
|
|
env = _Environment.get()
|
|
file_name = Path(local_path).name
|
|
s3_path = f"{Settings.HTML_S3_PATH}/{env.get_s3_prefix()}/{file_name}"
|
|
# if lock:
|
|
# cls.lock(s3_path)
|
|
if not S3.copy_file_from_s3(s3_path=s3_path, local_path=local_path):
|
|
print(f"ERROR: failed to cp file [{s3_path}] from s3")
|
|
raise
|
|
|
|
@classmethod
|
|
def copy_result_from_s3_with_version(cls, local_path):
|
|
env = _Environment.get()
|
|
file_name = Path(local_path).name
|
|
local_dir = Path(local_path).parent
|
|
file_name_pattern = f"{file_name}_*"
|
|
for file_path in local_dir.glob(file_name_pattern):
|
|
file_path.unlink()
|
|
s3_path = f"{Settings.HTML_S3_PATH}/{env.get_s3_prefix()}/"
|
|
if not S3.copy_file_from_s3_matching_pattern(
|
|
s3_path=s3_path, local_path=local_dir, include=file_name_pattern
|
|
):
|
|
print(f"ERROR: failed to cp file [{s3_path}] from s3")
|
|
raise
|
|
result_files = []
|
|
for file_path in local_dir.glob(file_name_pattern):
|
|
result_files.append(file_path)
|
|
assert result_files, "No result files found"
|
|
result_files.sort()
|
|
version = int(result_files[-1].name.split("_")[-1])
|
|
Shell.check(f"cp {result_files[-1]} {local_path}", strict=True, verbose=True)
|
|
return version
|
|
|
|
@classmethod
|
|
def copy_result_to_s3_with_version(cls, result, version):
|
|
result.dump()
|
|
filename = Path(result.file_name()).name
|
|
file_name_versioned = f"{filename}_{str(version).zfill(3)}"
|
|
env = _Environment.get()
|
|
s3_path_versioned = (
|
|
f"{Settings.HTML_S3_PATH}/{env.get_s3_prefix()}/{file_name_versioned}"
|
|
)
|
|
s3_path = f"{Settings.HTML_S3_PATH}/{env.get_s3_prefix()}/"
|
|
if version == 0:
|
|
S3.clean_s3_directory(s3_path=s3_path)
|
|
if not S3.put(
|
|
s3_path=s3_path_versioned,
|
|
local_path=result.file_name(),
|
|
if_none_matched=True,
|
|
):
|
|
print("Failed to put versioned Result")
|
|
return False
|
|
if not S3.put(s3_path=s3_path, local_path=result.file_name()):
|
|
print("Failed to put non-versioned Result")
|
|
return True
|
|
|
|
# @classmethod
|
|
# def lock(cls, s3_path, level=0):
|
|
# env = _Environment.get()
|
|
# s3_path_lock = s3_path + f".lock"
|
|
# file_path_lock = f"{Settings.TEMP_DIR}/{Path(s3_path_lock).name}"
|
|
# assert Shell.check(
|
|
# f"echo '''{env.JOB_NAME}''' > {file_path_lock}", verbose=True
|
|
# ), "Never"
|
|
#
|
|
# i = 20
|
|
# meta = S3.head_object(s3_path_lock)
|
|
# while meta:
|
|
# locked_by_job = meta.get("Metadata", {"job": ""}).get("job", "")
|
|
# if locked_by_job:
|
|
# decoded_bytes = base64.b64decode(locked_by_job)
|
|
# locked_by_job = decoded_bytes.decode("utf-8")
|
|
# print(
|
|
# f"WARNING: Failed to acquire lock, meta [{meta}], job [{locked_by_job}] - wait"
|
|
# )
|
|
# i -= 5
|
|
# if i < 0:
|
|
# info = f"ERROR: lock acquire failure - unlock forcefully"
|
|
# print(info)
|
|
# env.add_info(info)
|
|
# break
|
|
# time.sleep(5)
|
|
#
|
|
# metadata = {"job": Utils.to_base64(env.JOB_NAME)}
|
|
# S3.put(
|
|
# s3_path=s3_path_lock,
|
|
# local_path=file_path_lock,
|
|
# metadata=metadata,
|
|
# if_none_matched=True,
|
|
# )
|
|
# time.sleep(1)
|
|
# obj = S3.head_object(s3_path_lock)
|
|
# if not obj or not obj.has_tags(tags=metadata):
|
|
# print(f"WARNING: locked by another job [{obj}]")
|
|
# env.add_info("S3 lock file failure")
|
|
# cls.lock(s3_path, level=level + 1)
|
|
# print("INFO: lock acquired")
|
|
#
|
|
# @classmethod
|
|
# def unlock(cls, s3_path):
|
|
# s3_path_lock = s3_path + ".lock"
|
|
# env = _Environment.get()
|
|
# obj = S3.head_object(s3_path_lock)
|
|
# if not obj:
|
|
# print("ERROR: lock file is removed")
|
|
# assert False # investigate
|
|
# elif not obj.has_tags({"job": Utils.to_base64(env.JOB_NAME)}):
|
|
# print("ERROR: lock file was acquired by another job")
|
|
# assert False # investigate
|
|
#
|
|
# if not S3.delete(s3_path_lock):
|
|
# print(f"ERROR: File [{s3_path_lock}] delete failure")
|
|
# print("INFO: lock released")
|
|
# return True
|
|
|
|
@classmethod
|
|
def upload_result_files_to_s3(cls, result):
|
|
if result.results:
|
|
for result_ in result.results:
|
|
cls.upload_result_files_to_s3(result_)
|
|
for file in result.files:
|
|
if not Path(file).is_file():
|
|
print(f"ERROR: Invalid file [{file}] in [{result.name}] - skip upload")
|
|
result.info += f"\nWARNING: Result file [{file}] was not found"
|
|
file_link = S3._upload_file_to_s3(file, upload_to_s3=False)
|
|
else:
|
|
is_text = False
|
|
for text_file_suffix in Settings.TEXT_CONTENT_EXTENSIONS:
|
|
if file.endswith(text_file_suffix):
|
|
print(
|
|
f"File [{file}] matches Settings.TEXT_CONTENT_EXTENSIONS [{Settings.TEXT_CONTENT_EXTENSIONS}] - add text attribute for s3 object"
|
|
)
|
|
is_text = True
|
|
break
|
|
file_link = S3._upload_file_to_s3(
|
|
file,
|
|
upload_to_s3=True,
|
|
text=is_text,
|
|
s3_subprefix=Utils.normalize_string(result.name),
|
|
)
|
|
result.links.append(file_link)
|
|
if result.files:
|
|
print(
|
|
f"Result files [{result.files}] uploaded to s3 [{result.links[-len(result.files):]}] - clean files list"
|
|
)
|
|
result.files = []
|
|
result.dump()
|
|
|
|
@classmethod
|
|
def update_workflow_results(cls, workflow_name, new_info="", new_sub_results=None):
|
|
assert new_info or new_sub_results
|
|
|
|
attempt = 1
|
|
prev_status = ""
|
|
new_status = ""
|
|
done = False
|
|
while attempt < 10:
|
|
version = cls.copy_result_from_s3_with_version(
|
|
Result.file_name_static(workflow_name)
|
|
)
|
|
workflow_result = Result.from_fs(workflow_name)
|
|
prev_status = workflow_result.status
|
|
if new_info:
|
|
workflow_result.set_info(new_info)
|
|
if new_sub_results:
|
|
if isinstance(new_sub_results, Result):
|
|
new_sub_results = [new_sub_results]
|
|
for result_ in new_sub_results:
|
|
workflow_result.update_sub_result(result_)
|
|
new_status = workflow_result.status
|
|
if cls.copy_result_to_s3_with_version(workflow_result, version=version + 1):
|
|
done = True
|
|
break
|
|
print(f"Attempt [{attempt}] to upload workflow result failed")
|
|
attempt += 1
|
|
assert done
|
|
|
|
if prev_status != new_status:
|
|
return new_status
|
|
else:
|
|
return None
|