import dataclasses import datetime import sys from pathlib import Path from typing import Any, Dict, List, Optional, Union from praktika._environment import _Environment from praktika.cache import Cache from praktika.s3 import S3 from praktika.settings import Settings from praktika.utils import ContextManager, MetaClasses, Shell, Utils @dataclasses.dataclass class Result(MetaClasses.Serializable): """ Represents the outcome of a workflow/job/task or any operation, along with associated metadata. This class supports nesting of results to represent tasks with sub-tasks, and includes various attributes to track status, timing, files, and links. Attributes: name (str): The name of the task. status (str): The current status of the task. Should be one of the values defined in the Status class. start_time (Optional[float]): The start time of the task in Unix timestamp format. None if not started. duration (Optional[float]): The duration of the task in seconds. None if not completed. results (List[Result]): A list of sub-results representing nested tasks. files (List[str]): A list of file paths or names related to the result. links (List[str]): A list of URLs related to the result (e.g., links to reports or resources). info (str): Additional information about the result. Free-form text. Inner Class: Status: Defines possible statuses for the task, such as "success", "failure", etc. """ class Status: SKIPPED = "skipped" SUCCESS = "success" FAILED = "failure" PENDING = "pending" RUNNING = "running" ERROR = "error" name: str status: str start_time: Optional[float] = None duration: Optional[float] = None results: List["Result"] = dataclasses.field(default_factory=list) files: List[str] = dataclasses.field(default_factory=list) links: List[str] = dataclasses.field(default_factory=list) info: str = "" @staticmethod def create_from( name="", results: List["Result"] = None, stopwatch: Utils.Stopwatch = None, status="", files=None, info: Union[List[str], str] = "", with_info_from_results=True, ): if isinstance(status, bool): status = Result.Status.SUCCESS if status else Result.Status.FAILED if not results and not status: Utils.raise_with_error( f"Either .results ({results}) or .status ({status}) must be provided" ) if not name: name = _Environment.get().JOB_NAME if not name: print("ERROR: Failed to guess the .name") raise result_status = status or Result.Status.SUCCESS infos = [] if info: if isinstance(info, str): infos += [info] else: infos += info if results and not status: for result in results: if result.status not in (Result.Status.SUCCESS, Result.Status.FAILED): Utils.raise_with_error( f"Unexpected result status [{result.status}] for Result.create_from call" ) if result.status != Result.Status.SUCCESS: result_status = Result.Status.FAILED if results: for result in results: if result.info and with_info_from_results: infos.append(f"{result.name}: {result.info}") return Result( name=name, status=result_status, start_time=stopwatch.start_time if stopwatch else None, duration=stopwatch.duration if stopwatch else None, info="\n".join(infos) if infos else "", results=results or [], files=files or [], ) @staticmethod def get(): return Result.from_fs(_Environment.get().JOB_NAME) def is_completed(self): return self.status not in (Result.Status.PENDING, Result.Status.RUNNING) def is_running(self): return self.status in (Result.Status.RUNNING,) def is_ok(self): return self.status in (Result.Status.SKIPPED, Result.Status.SUCCESS) def set_status(self, status) -> "Result": self.status = status self.dump() return self def set_success(self) -> "Result": return self.set_status(Result.Status.SUCCESS) def set_results(self, results: List["Result"]) -> "Result": self.results = results self.dump() return self def set_files(self, files) -> "Result": for file in files: assert Path( file ).is_file(), f"Not valid file [{file}] from file list [{files}]" if not self.files: self.files = [] self.files += files self.dump() return self def set_info(self, info: str) -> "Result": if self.info: self.info += "\n" self.info += info self.dump() return self def set_link(self, link) -> "Result": self.links.append(link) self.dump() return self @classmethod def file_name_static(cls, name): return f"{Settings.TEMP_DIR}/result_{Utils.normalize_string(name)}.json" @classmethod def from_dict(cls, obj: Dict[str, Any]) -> "Result": sub_results = [] for result_dict in obj["results"] or []: sub_res = cls.from_dict(result_dict) sub_results.append(sub_res) obj["results"] = sub_results return Result(**obj) def update_duration(self): if not self.duration and self.start_time: self.duration = datetime.datetime.utcnow().timestamp() - self.start_time else: if not self.duration: print( f"NOTE: duration is set for job [{self.name}] Result - do not update by CI" ) else: print( f"NOTE: start_time is not set for job [{self.name}] Result - do not update duration" ) return self def set_timing(self, stopwatch: Utils.Stopwatch): self.start_time = stopwatch.start_time self.duration = stopwatch.duration return self def update_sub_result(self, result: "Result"): assert self.results, "BUG?" for i, result_ in enumerate(self.results): if result_.name == result.name: self.results[i] = result self._update_status() return self def _update_status(self): was_pending = False was_running = False if self.status == self.Status.PENDING: was_pending = True if self.status == self.Status.RUNNING: was_running = True has_pending, has_running, has_failed = False, False, False for result_ in self.results: if result_.status in (self.Status.RUNNING,): has_running = True if result_.status in (self.Status.PENDING,): has_pending = True if result_.status in (self.Status.ERROR, self.Status.FAILED): has_failed = True if has_running: self.status = self.Status.RUNNING elif has_pending: self.status = self.Status.PENDING elif has_failed: self.status = self.Status.FAILED else: self.status = self.Status.SUCCESS if (was_pending or was_running) and self.status not in ( self.Status.PENDING, self.Status.RUNNING, ): print("Pipeline finished") self.update_duration() @classmethod def generate_pending(cls, name, results=None): return Result( name=name, status=Result.Status.PENDING, start_time=None, duration=None, results=results or [], files=[], links=[], info="", ) @classmethod def generate_skipped(cls, name, cache_record: Cache.CacheRecord, results=None): return Result( name=name, status=Result.Status.SKIPPED, start_time=None, duration=None, results=results or [], files=[], links=[], info=f"from cache: sha [{cache_record.sha}], pr/branch [{cache_record.pr_number or cache_record.branch}]", ) @classmethod def create_from_command_execution( cls, name, command, with_log=False, fail_fast=True, workdir=None, command_args=None, command_kwargs=None, ): """ Executes shell commands or Python callables, optionally logging output, and handles errors. :param name: Check name :param command: Shell command (str) or Python callable, or list of them. :param workdir: Optional working directory. :param with_log: Boolean flag to log output to a file. :param fail_fast: Boolean flag to stop execution if one command fails. :param command_args: Positional arguments for the callable command. :param command_kwargs: Keyword arguments for the callable command. :return: Result object with status and optional log file. """ # Stopwatch to track execution time stop_watch_ = Utils.Stopwatch() command_args = command_args or [] command_kwargs = command_kwargs or {} # Set log file path if logging is enabled log_file = ( f"{Settings.TEMP_DIR}/{Utils.normalize_string(name)}.log" if with_log else None ) # Ensure the command is a list for consistent iteration if not isinstance(command, list): fail_fast = False command = [command] print(f"> Starting execution for [{name}]") res = True # Track success/failure status error_infos = [] for command_ in command: if callable(command_): # If command is a Python function, call it with provided arguments result = command_(*command_args, **command_kwargs) if isinstance(result, bool): res = result elif result: error_infos.append(str(result)) res = False else: # Run shell command in a specified directory with logging and verbosity with ContextManager.cd(workdir): exit_code = Shell.run(command_, verbose=True, log_file=log_file) res = exit_code == 0 # If fail_fast is enabled, stop on first failure if not res and fail_fast: print(f"Execution stopped due to failure in [{command_}]") break # Create and return the result object with status and log file (if any) return Result.create_from( name=name, status=res, stopwatch=stop_watch_, info=error_infos, files=[log_file] if log_file else None, ) def complete_job(self): self.dump() if not self.is_ok(): print("ERROR: Job Failed") print(self.to_stdout_formatted()) sys.exit(1) else: print("ok") def to_stdout_formatted(self, indent="", res=""): if self.is_ok(): return res res += f"{indent}Task [{self.name}] failed.\n" fail_info = "" sub_indent = indent + " " if not self.results: if not self.is_ok(): fail_info += f"{sub_indent}{self.name}:\n" for line in self.info.splitlines(): fail_info += f"{sub_indent}{sub_indent}{line}\n" return res + fail_info for sub_result in self.results: res = sub_result.to_stdout_formatted(sub_indent, res) return res class ResultInfo: SETUP_ENV_JOB_FAILED = ( "Failed to set up job env, it's praktika bug or misconfiguration" ) PRE_JOB_FAILED = ( "Failed to do a job pre-run step, it's praktika bug or misconfiguration" ) KILLED = "Job killed or terminated, no Result provided" NOT_FOUND_IMPOSSIBLE = ( "No Result file (bug, or job misbehaviour, must not ever happen)" ) SKIPPED_DUE_TO_PREVIOUS_FAILURE = "Skipped due to previous failure" TIMEOUT = "Timeout" GH_STATUS_ERROR = "Failed to set GH commit status" NOT_FINALIZED = ( "Job did not not provide Result: job script bug, died CI runner or praktika bug" ) S3_ERROR = "S3 call failure" class _ResultS3: @classmethod def copy_result_to_s3(cls, result, unlock=False): result.dump() env = _Environment.get() s3_path = f"{Settings.HTML_S3_PATH}/{env.get_s3_prefix()}" s3_path_full = f"{s3_path}/{Path(result.file_name()).name}" url = S3.copy_file_to_s3(s3_path=s3_path, local_path=result.file_name()) # if unlock: # if not cls.unlock(s3_path_full): # print(f"ERROR: File [{s3_path_full}] unlock failure") # assert False # TODO: investigate return url @classmethod def copy_result_from_s3(cls, local_path, lock=False): env = _Environment.get() file_name = Path(local_path).name s3_path = f"{Settings.HTML_S3_PATH}/{env.get_s3_prefix()}/{file_name}" # if lock: # cls.lock(s3_path) if not S3.copy_file_from_s3(s3_path=s3_path, local_path=local_path): print(f"ERROR: failed to cp file [{s3_path}] from s3") raise @classmethod def copy_result_from_s3_with_version(cls, local_path): env = _Environment.get() file_name = Path(local_path).name local_dir = Path(local_path).parent file_name_pattern = f"{file_name}_*" for file_path in local_dir.glob(file_name_pattern): file_path.unlink() s3_path = f"{Settings.HTML_S3_PATH}/{env.get_s3_prefix()}/" if not S3.copy_file_from_s3_matching_pattern( s3_path=s3_path, local_path=local_dir, include=file_name_pattern ): print(f"ERROR: failed to cp file [{s3_path}] from s3") raise result_files = [] for file_path in local_dir.glob(file_name_pattern): result_files.append(file_path) assert result_files, "No result files found" result_files.sort() version = int(result_files[-1].name.split("_")[-1]) Shell.check(f"cp {result_files[-1]} {local_path}", strict=True, verbose=True) return version @classmethod def copy_result_to_s3_with_version(cls, result, version): result.dump() filename = Path(result.file_name()).name file_name_versioned = f"{filename}_{str(version).zfill(3)}" env = _Environment.get() s3_path_versioned = ( f"{Settings.HTML_S3_PATH}/{env.get_s3_prefix()}/{file_name_versioned}" ) s3_path = f"{Settings.HTML_S3_PATH}/{env.get_s3_prefix()}/" if version == 0: S3.clean_s3_directory(s3_path=s3_path) if not S3.put( s3_path=s3_path_versioned, local_path=result.file_name(), if_none_matched=True, ): print("Failed to put versioned Result") return False if not S3.put(s3_path=s3_path, local_path=result.file_name()): print("Failed to put non-versioned Result") return True # @classmethod # def lock(cls, s3_path, level=0): # env = _Environment.get() # s3_path_lock = s3_path + f".lock" # file_path_lock = f"{Settings.TEMP_DIR}/{Path(s3_path_lock).name}" # assert Shell.check( # f"echo '''{env.JOB_NAME}''' > {file_path_lock}", verbose=True # ), "Never" # # i = 20 # meta = S3.head_object(s3_path_lock) # while meta: # locked_by_job = meta.get("Metadata", {"job": ""}).get("job", "") # if locked_by_job: # decoded_bytes = base64.b64decode(locked_by_job) # locked_by_job = decoded_bytes.decode("utf-8") # print( # f"WARNING: Failed to acquire lock, meta [{meta}], job [{locked_by_job}] - wait" # ) # i -= 5 # if i < 0: # info = f"ERROR: lock acquire failure - unlock forcefully" # print(info) # env.add_info(info) # break # time.sleep(5) # # metadata = {"job": Utils.to_base64(env.JOB_NAME)} # S3.put( # s3_path=s3_path_lock, # local_path=file_path_lock, # metadata=metadata, # if_none_matched=True, # ) # time.sleep(1) # obj = S3.head_object(s3_path_lock) # if not obj or not obj.has_tags(tags=metadata): # print(f"WARNING: locked by another job [{obj}]") # env.add_info("S3 lock file failure") # cls.lock(s3_path, level=level + 1) # print("INFO: lock acquired") # # @classmethod # def unlock(cls, s3_path): # s3_path_lock = s3_path + ".lock" # env = _Environment.get() # obj = S3.head_object(s3_path_lock) # if not obj: # print("ERROR: lock file is removed") # assert False # investigate # elif not obj.has_tags({"job": Utils.to_base64(env.JOB_NAME)}): # print("ERROR: lock file was acquired by another job") # assert False # investigate # # if not S3.delete(s3_path_lock): # print(f"ERROR: File [{s3_path_lock}] delete failure") # print("INFO: lock released") # return True @classmethod def upload_result_files_to_s3(cls, result): if result.results: for result_ in result.results: cls.upload_result_files_to_s3(result_) for file in result.files: if not Path(file).is_file(): print(f"ERROR: Invalid file [{file}] in [{result.name}] - skip upload") result.info += f"\nWARNING: Result file [{file}] was not found" file_link = S3._upload_file_to_s3(file, upload_to_s3=False) else: is_text = False for text_file_suffix in Settings.TEXT_CONTENT_EXTENSIONS: if file.endswith(text_file_suffix): print( f"File [{file}] matches Settings.TEXT_CONTENT_EXTENSIONS [{Settings.TEXT_CONTENT_EXTENSIONS}] - add text attribute for s3 object" ) is_text = True break file_link = S3._upload_file_to_s3( file, upload_to_s3=True, text=is_text, s3_subprefix=Utils.normalize_string(result.name), ) result.links.append(file_link) if result.files: print( f"Result files [{result.files}] uploaded to s3 [{result.links[-len(result.files):]}] - clean files list" ) result.files = [] result.dump() @classmethod def update_workflow_results(cls, workflow_name, new_info="", new_sub_results=None): assert new_info or new_sub_results attempt = 1 prev_status = "" new_status = "" done = False while attempt < 10: version = cls.copy_result_from_s3_with_version( Result.file_name_static(workflow_name) ) workflow_result = Result.from_fs(workflow_name) prev_status = workflow_result.status if new_info: workflow_result.set_info(new_info) if new_sub_results: if isinstance(new_sub_results, Result): new_sub_results = [new_sub_results] for result_ in new_sub_results: workflow_result.update_sub_result(result_) new_status = workflow_result.status if cls.copy_result_to_s3_with_version(workflow_result, version=version + 1): done = True break print(f"Attempt [{attempt}] to upload workflow result failed") attempt += 1 assert done if prev_status != new_status: return new_status else: return None