From 06debdc479bab58f2d1d7fd4b3764e65a8c9fa01 Mon Sep 17 00:00:00 2001 From: Max Kainov Date: Fri, 8 Nov 2024 17:48:17 +0100 Subject: [PATCH] result with versioning --- ci/docker/stateless-test/Dockerfile | 6 +- ci/jobs/build_clickhouse.py | 1 - ci/jobs/functional_stateless_tests.py | 11 +- ci/jobs/scripts/clickhouse_proc.py | 11 + .../setup_hdfs_minicluster.sh | 19 ++ ci/praktika/__main__.py | 7 + ci/praktika/_environment.py | 14 +- ci/praktika/_settings.py | 128 ---------- ci/praktika/digest.py | 38 +-- ci/praktika/hook_cache.py | 11 +- ci/praktika/hook_html.py | 71 ++---- ci/praktika/json.html | 11 +- ci/praktika/mangle.py | 36 +-- ci/praktika/native_jobs.py | 14 +- ci/praktika/result.py | 240 +++++++++++++++++- ci/praktika/runner.py | 18 +- ci/praktika/runtime.py | 6 + ci/praktika/s3.py | 172 ++----------- ci/praktika/settings.py | 156 +++++++++++- ci/praktika/utils.py | 2 - ci/praktika/validator.py | 8 +- ci/workflows/pull_request.py | 1 + 22 files changed, 551 insertions(+), 430 deletions(-) create mode 100755 ci/jobs/scripts/functional_tests/setup_hdfs_minicluster.sh delete mode 100644 ci/praktika/_settings.py diff --git a/ci/docker/stateless-test/Dockerfile b/ci/docker/stateless-test/Dockerfile index 4abd8204f1d..760fceeebbf 100644 --- a/ci/docker/stateless-test/Dockerfile +++ b/ci/docker/stateless-test/Dockerfile @@ -100,8 +100,12 @@ ENV PATH="/wd/tests:/tmp/praktika/input:$PATH" RUN curl -L --no-verbose -O 'https://archive.apache.org/dist/hadoop/common/hadoop-3.3.1/hadoop-3.3.1.tar.gz' \ && tar -xvf hadoop-3.3.1.tar.gz \ - && rm -rf hadoop-3.3.1.tar.gz + && rm -rf hadoop-3.3.1.tar.gz \ + && chmod 777 /hadoop-3.3.1 RUN npm install -g azurite@3.30.0 \ && npm install -g tslib && npm install -g node + +RUN addgroup --gid 1001 clickhouse && adduser --uid 1001 --gid 1001 --disabled-password clickhouse +USER clickhouse \ No newline at end of file diff --git a/ci/jobs/build_clickhouse.py b/ci/jobs/build_clickhouse.py index 1e6d2c648a7..3bdc23d383c 100644 --- a/ci/jobs/build_clickhouse.py +++ b/ci/jobs/build_clickhouse.py @@ -127,7 +127,6 @@ def main(): Shell.check(f"ls -l {build_dir}/programs/") res = results[-1].is_ok() - Result.create_from(results=results, stopwatch=stop_watch).complete_job() diff --git a/ci/jobs/functional_stateless_tests.py b/ci/jobs/functional_stateless_tests.py index 0481086d80a..390a6336b45 100644 --- a/ci/jobs/functional_stateless_tests.py +++ b/ci/jobs/functional_stateless_tests.py @@ -27,11 +27,12 @@ def parse_args(): default="", ) parser.add_argument("--param", help="Optional job start stage", default=None) + parser.add_argument("--test", help="Optional test name pattern", default="") return parser.parse_args() def run_stateless_test( - no_parallel: bool, no_sequiential: bool, batch_num: int, batch_total: int + no_parallel: bool, no_sequiential: bool, batch_num: int, batch_total: int, test="" ): assert not (no_parallel and no_sequiential) test_output_file = f"{Settings.OUTPUT_DIR}/test_result.txt" @@ -43,7 +44,7 @@ def run_stateless_test( --no-drop-if-fail --capture-client-stacktrace --queries /repo/tests/queries --test-runs 1 --hung-check \ {'--no-parallel' if no_parallel else ''} {'--no-sequential' if no_sequiential else ''} \ --print-time --jobs {nproc} --report-coverage --report-logs-stats {aux} \ - --queries ./tests/queries -- '' | ts '%Y-%m-%d %H:%M:%S' \ + --queries ./tests/queries -- '{test}' | ts '%Y-%m-%d %H:%M:%S' \ | tee -a \"{test_output_file}\"" if Path(test_output_file).exists(): Path(test_output_file).unlink() @@ -119,11 +120,14 @@ def main(): stop_watch_ = Utils.Stopwatch() step_name = "Start ClickHouse Server" print(step_name) + hdfs_log = "/tmp/praktika/output/hdfs_mini.log" minio_log = "/tmp/praktika/output/minio.log" + res = res and CH.start_hdfs(log_file_path=hdfs_log) res = res and CH.start_minio(log_file_path=minio_log) - logs_to_attach += [minio_log] + logs_to_attach += [minio_log, hdfs_log] time.sleep(10) Shell.check("ps -ef | grep minio", verbose=True) + Shell.check("ps -ef | grep hdfs", verbose=True) res = res and Shell.check( "aws s3 ls s3://test --endpoint-url http://localhost:11111/", verbose=True ) @@ -153,6 +157,7 @@ def main(): no_sequiential=no_sequential, batch_num=batch_num, batch_total=total_batches, + test=args.test, ) results.append(FTResultsProcessor(wd=Settings.OUTPUT_DIR).run()) results[-1].set_timing(stopwatch=stop_watch_) diff --git a/ci/jobs/scripts/clickhouse_proc.py b/ci/jobs/scripts/clickhouse_proc.py index c43283e75e0..8f9bef57083 100644 --- a/ci/jobs/scripts/clickhouse_proc.py +++ b/ci/jobs/scripts/clickhouse_proc.py @@ -44,6 +44,17 @@ class ClickHouseProc: self.minio_proc = None + def start_hdfs(self, log_file_path): + command = ["./ci/jobs/scripts/functional_tests/setup_hdfs_minicluster.sh"] + with open(log_file_path, "w") as log_file: + process = subprocess.Popen( + command, stdout=log_file, stderr=subprocess.STDOUT + ) + print( + f"Started setup_hdfs_minicluster.sh asynchronously with PID {process.pid}" + ) + return True + def start_minio(self, log_file_path): command = ["tests/docker_scripts/setup_minio.sh", "stateless", "./tests"] with open(log_file_path, "w") as log_file: diff --git a/ci/jobs/scripts/functional_tests/setup_hdfs_minicluster.sh b/ci/jobs/scripts/functional_tests/setup_hdfs_minicluster.sh new file mode 100755 index 00000000000..b810b27fe2b --- /dev/null +++ b/ci/jobs/scripts/functional_tests/setup_hdfs_minicluster.sh @@ -0,0 +1,19 @@ +#!/bin/bash +# shellcheck disable=SC2024 + +set -e -x -a -u + +ls -lha + +cd /hadoop-3.3.1 + +export JAVA_HOME=/usr +mkdir -p target/test/data + +bin/mapred minicluster -format -nomr -nnport 12222 & + +while ! nc -z localhost 12222; do + sleep 1 +done + +lsof -i :12222 diff --git a/ci/praktika/__main__.py b/ci/praktika/__main__.py index fbb9f92909a..3dfdc26d69d 100644 --- a/ci/praktika/__main__.py +++ b/ci/praktika/__main__.py @@ -37,6 +37,12 @@ def create_parser(): type=str, default=None, ) + run_parser.add_argument( + "--test", + help="Custom parameter to pass into a job script, it's up to job script how to use it, for local test", + type=str, + default="", + ) run_parser.add_argument( "--pr", help="PR number. Optional parameter for local run. Set if you want an required artifact to be uploaded from CI run in that PR", @@ -106,6 +112,7 @@ if __name__ == "__main__": local_run=not args.ci, no_docker=args.no_docker, param=args.param, + test=args.test, pr=args.pr, branch=args.branch, sha=args.sha, diff --git a/ci/praktika/_environment.py b/ci/praktika/_environment.py index 1c6b547ddde..734a4be3176 100644 --- a/ci/praktika/_environment.py +++ b/ci/praktika/_environment.py @@ -6,7 +6,7 @@ from types import SimpleNamespace from typing import Any, Dict, List, Type from praktika import Workflow -from praktika._settings import _Settings +from praktika.settings import Settings from praktika.utils import MetaClasses, T @@ -35,7 +35,7 @@ class _Environment(MetaClasses.Serializable): @classmethod def file_name_static(cls, _name=""): - return f"{_Settings.TEMP_DIR}/{cls.name}.json" + return f"{Settings.TEMP_DIR}/{cls.name}.json" @classmethod def from_dict(cls: Type[T], obj: Dict[str, Any]) -> T: @@ -66,12 +66,12 @@ class _Environment(MetaClasses.Serializable): @staticmethod def get_needs_statuses(): - if Path(_Settings.WORKFLOW_STATUS_FILE).is_file(): - with open(_Settings.WORKFLOW_STATUS_FILE, "r", encoding="utf8") as f: + if Path(Settings.WORKFLOW_STATUS_FILE).is_file(): + with open(Settings.WORKFLOW_STATUS_FILE, "r", encoding="utf8") as f: return json.load(f) else: print( - f"ERROR: Status file [{_Settings.WORKFLOW_STATUS_FILE}] does not exist" + f"ERROR: Status file [{Settings.WORKFLOW_STATUS_FILE}] does not exist" ) raise RuntimeError() @@ -171,7 +171,7 @@ class _Environment(MetaClasses.Serializable): # TODO: find a better place for the function. This file should not import praktika.settings # as it's requires reading users config, that's why imports nested inside the function - def get_report_url(self, settings): + def get_report_url(self, settings, latest=False): import urllib path = settings.HTML_S3_PATH @@ -179,7 +179,7 @@ class _Environment(MetaClasses.Serializable): if bucket in path: path = path.replace(bucket, endpoint) break - REPORT_URL = f"https://{path}/{Path(settings.HTML_PAGE_FILE).name}?PR={self.PR_NUMBER}&sha={self.SHA}&name_0={urllib.parse.quote(self.WORKFLOW_NAME, safe='')}&name_1={urllib.parse.quote(self.JOB_NAME, safe='')}" + REPORT_URL = f"https://{path}/{Path(settings.HTML_PAGE_FILE).name}?PR={self.PR_NUMBER}&sha={'latest' if latest else self.SHA}&name_0={urllib.parse.quote(self.WORKFLOW_NAME, safe='')}&name_1={urllib.parse.quote(self.JOB_NAME, safe='')}" return REPORT_URL def is_local_run(self): diff --git a/ci/praktika/_settings.py b/ci/praktika/_settings.py deleted file mode 100644 index 17da1519e37..00000000000 --- a/ci/praktika/_settings.py +++ /dev/null @@ -1,128 +0,0 @@ -import dataclasses -from typing import Dict, Iterable, List, Optional - - -@dataclasses.dataclass -class _Settings: - ###################################### - # Pipeline generation settings # - ###################################### - MAIN_BRANCH = "main" - CI_PATH = "./ci" - WORKFLOW_PATH_PREFIX: str = "./.github/workflows" - WORKFLOWS_DIRECTORY: str = f"{CI_PATH}/workflows" - SETTINGS_DIRECTORY: str = f"{CI_PATH}/settings" - CI_CONFIG_JOB_NAME = "Config Workflow" - DOCKER_BUILD_JOB_NAME = "Docker Builds" - FINISH_WORKFLOW_JOB_NAME = "Finish Workflow" - READY_FOR_MERGE_STATUS_NAME = "Ready for Merge" - CI_CONFIG_RUNS_ON: Optional[List[str]] = None - DOCKER_BUILD_RUNS_ON: Optional[List[str]] = None - VALIDATE_FILE_PATHS: bool = True - - ###################################### - # Runtime Settings # - ###################################### - MAX_RETRIES_S3 = 3 - MAX_RETRIES_GH = 3 - - ###################################### - # S3 (artifact storage) settings # - ###################################### - S3_ARTIFACT_PATH: str = "" - - ###################################### - # CI workspace settings # - ###################################### - TEMP_DIR: str = "/tmp/praktika" - OUTPUT_DIR: str = f"{TEMP_DIR}/output" - INPUT_DIR: str = f"{TEMP_DIR}/input" - PYTHON_INTERPRETER: str = "python3" - PYTHON_PACKET_MANAGER: str = "pip3" - PYTHON_VERSION: str = "3.9" - INSTALL_PYTHON_FOR_NATIVE_JOBS: bool = False - INSTALL_PYTHON_REQS_FOR_NATIVE_JOBS: str = "./ci/requirements.txt" - ENVIRONMENT_VAR_FILE: str = f"{TEMP_DIR}/environment.json" - RUN_LOG: str = f"{TEMP_DIR}/praktika_run.log" - - SECRET_GH_APP_ID: str = "GH_APP_ID" - SECRET_GH_APP_PEM_KEY: str = "GH_APP_PEM_KEY" - - ENV_SETUP_SCRIPT: str = "/tmp/praktika_setup_env.sh" - WORKFLOW_STATUS_FILE: str = f"{TEMP_DIR}/workflow_status.json" - - ###################################### - # CI Cache settings # - ###################################### - CACHE_VERSION: int = 1 - CACHE_DIGEST_LEN: int = 20 - CACHE_S3_PATH: str = "" - CACHE_LOCAL_PATH: str = f"{TEMP_DIR}/ci_cache" - - ###################################### - # Report settings # - ###################################### - HTML_S3_PATH: str = "" - HTML_PAGE_FILE: str = "./praktika/json.html" - TEXT_CONTENT_EXTENSIONS: Iterable[str] = frozenset([".txt", ".log"]) - S3_BUCKET_TO_HTTP_ENDPOINT: Optional[Dict[str, str]] = None - - DOCKERHUB_USERNAME: str = "" - DOCKERHUB_SECRET: str = "" - DOCKER_WD: str = "/wd" - - ###################################### - # CI DB Settings # - ###################################### - SECRET_CI_DB_URL: str = "CI_DB_URL" - SECRET_CI_DB_PASSWORD: str = "CI_DB_PASSWORD" - CI_DB_DB_NAME = "" - CI_DB_TABLE_NAME = "" - CI_DB_INSERT_TIMEOUT_SEC = 5 - - DISABLE_MERGE_COMMIT = True - - -_USER_DEFINED_SETTINGS = [ - "S3_ARTIFACT_PATH", - "CACHE_S3_PATH", - "HTML_S3_PATH", - "S3_BUCKET_TO_HTTP_ENDPOINT", - "TEXT_CONTENT_EXTENSIONS", - "TEMP_DIR", - "OUTPUT_DIR", - "INPUT_DIR", - "CI_CONFIG_RUNS_ON", - "DOCKER_BUILD_RUNS_ON", - "CI_CONFIG_JOB_NAME", - "PYTHON_INTERPRETER", - "PYTHON_VERSION", - "PYTHON_PACKET_MANAGER", - "INSTALL_PYTHON_FOR_NATIVE_JOBS", - "INSTALL_PYTHON_REQS_FOR_NATIVE_JOBS", - "MAX_RETRIES_S3", - "MAX_RETRIES_GH", - "VALIDATE_FILE_PATHS", - "DOCKERHUB_USERNAME", - "DOCKERHUB_SECRET", - "READY_FOR_MERGE_STATUS_NAME", - "SECRET_CI_DB_URL", - "SECRET_CI_DB_PASSWORD", - "CI_DB_DB_NAME", - "CI_DB_TABLE_NAME", - "CI_DB_INSERT_TIMEOUT_SEC", - "SECRET_GH_APP_PEM_KEY", - "SECRET_GH_APP_ID", - "MAIN_BRANCH", - "DISABLE_MERGE_COMMIT", -] - - -class GHRunners: - ubuntu = "ubuntu-latest" - - -if __name__ == "__main__": - for setting in _USER_DEFINED_SETTINGS: - print(_Settings().__getattribute__(setting)) - # print(dataclasses.asdict(_Settings())) diff --git a/ci/praktika/digest.py b/ci/praktika/digest.py index a1f2eecf9b6..6b7e5eec07b 100644 --- a/ci/praktika/digest.py +++ b/ci/praktika/digest.py @@ -23,7 +23,7 @@ class Digest: hash_string = hash_obj.hexdigest() return hash_string - def calc_job_digest(self, job_config: Job.Config): + def calc_job_digest(self, job_config: Job.Config, docker_digests): config = job_config.digest_config if not config: return "f" * Settings.CACHE_DIGEST_LEN @@ -34,28 +34,28 @@ class Digest: print( f"calc digest for job [{job_config.name}]: hash_key [{cache_key}] - from cache" ) - return self.digest_cache[cache_key] - - included_files = Utils.traverse_paths( - job_config.digest_config.include_paths, - job_config.digest_config.exclude_paths, - sorted=True, - ) - print( - f"calc digest for job [{job_config.name}]: hash_key [{cache_key}], include [{len(included_files)}] files" - ) - - # Calculate MD5 hash - res = "" - if not included_files: - res = "f" * Settings.CACHE_DIGEST_LEN - print(f"NOTE: empty digest config [{config}] - return dummy digest") + digest = self.digest_cache[cache_key] else: + included_files = Utils.traverse_paths( + job_config.digest_config.include_paths, + job_config.digest_config.exclude_paths, + sorted=True, + ) + print( + f"calc digest for job [{job_config.name}]: hash_key [{cache_key}], include [{len(included_files)}] files" + ) + hash_md5 = hashlib.md5() for i, file_path in enumerate(included_files): hash_md5 = self._calc_file_digest(file_path, hash_md5) - digest = hash_md5.hexdigest()[: Settings.CACHE_DIGEST_LEN] - self.digest_cache[cache_key] = digest + digest = hash_md5.hexdigest()[: Settings.CACHE_DIGEST_LEN] + self.digest_cache[cache_key] = digest + + if job_config.run_in_docker: + # respect docker digest in the job digest + docker_digest = docker_digests[job_config.run_in_docker.split("+")[0]] + digest = "-".join([docker_digest, digest]) + return digest def calc_docker_digest( diff --git a/ci/praktika/hook_cache.py b/ci/praktika/hook_cache.py index 5cfedec0144..e001e936a71 100644 --- a/ci/praktika/hook_cache.py +++ b/ci/praktika/hook_cache.py @@ -1,6 +1,5 @@ from praktika._environment import _Environment from praktika.cache import Cache -from praktika.mangle import _get_workflows from praktika.runtime import RunConfig from praktika.settings import Settings from praktika.utils import Utils @@ -10,6 +9,7 @@ class CacheRunnerHooks: @classmethod def configure(cls, workflow): workflow_config = RunConfig.from_fs(workflow.name) + docker_digests = workflow_config.digest_dockers cache = Cache() print(f"Workflow Configure, workflow [{workflow.name}]") assert ( @@ -18,11 +18,13 @@ class CacheRunnerHooks: artifact_digest_map = {} job_digest_map = {} for job in workflow.jobs: + digest = cache.digest.calc_job_digest( + job_config=job, docker_digests=docker_digests + ) if not job.digest_config: print( f"NOTE: job [{job.name}] has no Config.digest_config - skip cache check, always run" ) - digest = cache.digest.calc_job_digest(job_config=job) job_digest_map[job.name] = digest if job.provides: # assign the job digest also to the artifacts it provides @@ -48,7 +50,6 @@ class CacheRunnerHooks: ), f"BUG, Workflow with enabled cache must have job digests after configuration, wf [{workflow.name}]" print("Check remote cache") - job_to_cache_record = {} for job_name, job_digest in workflow_config.digest_jobs.items(): record = cache.fetch_success(job_name=job_name, job_digest=job_digest) if record: @@ -58,7 +59,7 @@ class CacheRunnerHooks: ) workflow_config.cache_success.append(job_name) workflow_config.cache_success_base64.append(Utils.to_base64(job_name)) - job_to_cache_record[job_name] = record + workflow_config.cache_jobs[job_name] = record print("Check artifacts to reuse") for job in workflow.jobs: @@ -66,7 +67,7 @@ class CacheRunnerHooks: if job.provides: for artifact_name in job.provides: workflow_config.cache_artifacts[artifact_name] = ( - job_to_cache_record[job.name] + workflow_config.cache_jobs[job.name] ) print(f"Write config to GH's job output") diff --git a/ci/praktika/hook_html.py b/ci/praktika/hook_html.py index ca2692d1b22..e2faefb2fa9 100644 --- a/ci/praktika/hook_html.py +++ b/ci/praktika/hook_html.py @@ -6,7 +6,7 @@ from typing import List from praktika._environment import _Environment from praktika.gh import GH from praktika.parser import WorkflowConfigParser -from praktika.result import Result, ResultInfo +from praktika.result import Result, ResultInfo, _ResultS3 from praktika.runtime import RunConfig from praktika.s3 import S3 from praktika.settings import Settings @@ -119,6 +119,7 @@ class HtmlRunnerHooks: # generate pending Results for all jobs in the workflow if _workflow.enable_cache: skip_jobs = RunConfig.from_fs(_workflow.name).cache_success + job_cache_records = RunConfig.from_fs(_workflow.name).cache_jobs else: skip_jobs = [] @@ -128,21 +129,14 @@ class HtmlRunnerHooks: if job.name not in skip_jobs: result = Result.generate_pending(job.name) else: - result = Result.generate_skipped(job.name) + result = Result.generate_skipped(job.name, job_cache_records[job.name]) results.append(result) summary_result = Result.generate_pending(_workflow.name, results=results) summary_result.links.append(env.CHANGE_URL) summary_result.links.append(env.RUN_URL) summary_result.start_time = Utils.timestamp() - # clean the previous latest results in PR if any - if env.PR_NUMBER: - S3.clean_latest_result() - S3.copy_result_to_s3( - summary_result, - unlock=False, - ) - + assert _ResultS3.copy_result_to_s3_with_version(summary_result, version=0) page_url = env.get_report_url(settings=Settings) print(f"CI Status page url [{page_url}]") @@ -150,7 +144,7 @@ class HtmlRunnerHooks: name=_workflow.name, status=Result.Status.PENDING, description="", - url=env.get_report_url(settings=Settings), + url=env.get_report_url(settings=Settings, latest=True), ) res2 = GH.post_pr_comment( comment_body=f"Workflow [[{_workflow.name}]({page_url})], commit [{_Environment.get().SHA[:8]}]", @@ -167,14 +161,8 @@ class HtmlRunnerHooks: @classmethod def pre_run(cls, _workflow, _job): result = Result.from_fs(_job.name) - S3.copy_result_from_s3( - Result.file_name_static(_workflow.name), - ) - workflow_result = Result.from_fs(_workflow.name) - workflow_result.update_sub_result(result) - S3.copy_result_to_s3( - workflow_result, - unlock=True, + _ResultS3.update_workflow_results( + workflow_name=_workflow.name, new_sub_results=result ) @classmethod @@ -184,14 +172,13 @@ class HtmlRunnerHooks: @classmethod def post_run(cls, _workflow, _job, info_errors): result = Result.from_fs(_job.name) - env = _Environment.get() - S3.copy_result_from_s3( - Result.file_name_static(_workflow.name), - lock=True, - ) - workflow_result = Result.from_fs(_workflow.name) - print(f"Workflow info [{workflow_result.info}], info_errors [{info_errors}]") + _ResultS3.upload_result_files_to_s3(result) + _ResultS3.copy_result_to_s3(result) + env = _Environment.get() + + new_sub_results = [result] + new_result_info = "" env_info = env.REPORT_INFO if env_info: print( @@ -203,14 +190,8 @@ class HtmlRunnerHooks: info_str = f"{_job.name}:\n" info_str += "\n".join(info_errors) print("Update workflow results with new info") - workflow_result.set_info(info_str) + new_result_info = info_str - old_status = workflow_result.status - - S3.upload_result_files_to_s3(result) - workflow_result.update_sub_result(result) - - skipped_job_results = [] if not result.is_ok(): print( "Current job failed - find dependee jobs in the workflow and set their statuses to skipped" @@ -223,7 +204,7 @@ class HtmlRunnerHooks: print( f"NOTE: Set job [{dependee_job.name}] status to [{Result.Status.SKIPPED}] due to current failure" ) - skipped_job_results.append( + new_sub_results.append( Result( name=dependee_job.name, status=Result.Status.SKIPPED, @@ -231,20 +212,18 @@ class HtmlRunnerHooks: + f" [{_job.name}]", ) ) - for skipped_job_result in skipped_job_results: - workflow_result.update_sub_result(skipped_job_result) - S3.copy_result_to_s3( - workflow_result, - unlock=True, + updated_status = _ResultS3.update_workflow_results( + new_info=new_result_info, + new_sub_results=new_sub_results, + workflow_name=_workflow.name, ) - if workflow_result.status != old_status: - print( - f"Update GH commit status [{result.name}]: [{old_status} -> {workflow_result.status}]" - ) + + if updated_status: + print(f"Update GH commit status [{result.name}]: [{updated_status}]") GH.post_commit_status( - name=workflow_result.name, - status=GH.convert_to_gh_status(workflow_result.status), + name=_workflow.name, + status=GH.convert_to_gh_status(updated_status), description="", - url=env.get_report_url(settings=Settings), + url=env.get_report_url(settings=Settings, latest=True), ) diff --git a/ci/praktika/json.html b/ci/praktika/json.html index 4e15a67ba76..544fd6e68d4 100644 --- a/ci/praktika/json.html +++ b/ci/praktika/json.html @@ -342,7 +342,7 @@ const milliseconds = Math.floor((duration % 1) * 1000); const formattedSeconds = String(seconds); - const formattedMilliseconds = String(milliseconds).padStart(3, '0'); + const formattedMilliseconds = String(milliseconds).padStart(2, '0').slice(-2); return `${formattedSeconds}.${formattedMilliseconds}`; } @@ -600,8 +600,7 @@ td.classList.add('time-column'); td.textContent = value ? formatDuration(value) : ''; } else if (column === 'info') { - // For info and other columns, just display the value - td.textContent = value || ''; + td.textContent = value.includes('\n') ? '↵' : (value || ''); td.classList.add('info-column'); } @@ -675,7 +674,8 @@ } if (targetData) { - infoElement.style.display = 'none'; + //infoElement.style.display = 'none'; + infoElement.innerHTML = (targetData.info || '').replace(/\n/g, '
'); addStatusToStatus(targetData.status, targetData.start_time, targetData.duration) @@ -804,7 +804,8 @@ // Check if all required parameters are present to load JSON if (PR && sha && root_name) { - loadResultsJSON(PR, sha, nameParams); + const shaToLoad = (sha === 'latest') ? commitsArray[commitsArray.length - 1] : sha; + loadResultsJSON(PR, shaToLoad, nameParams); } else { document.getElementById('title').textContent = 'Error: Missing required URL parameters: PR, sha, or name_0'; } diff --git a/ci/praktika/mangle.py b/ci/praktika/mangle.py index b16d52fbbbf..f94b11adad5 100644 --- a/ci/praktika/mangle.py +++ b/ci/praktika/mangle.py @@ -1,11 +1,10 @@ import copy import importlib.util from pathlib import Path -from typing import Any, Dict from praktika import Job -from praktika._settings import _USER_DEFINED_SETTINGS, _Settings -from praktika.utils import ContextManager, Utils +from praktika.settings import Settings +from praktika.utils import Utils def _get_workflows(name=None, file=None): @@ -14,13 +13,13 @@ def _get_workflows(name=None, file=None): """ res = [] - directory = Path(_Settings.WORKFLOWS_DIRECTORY) + directory = Path(Settings.WORKFLOWS_DIRECTORY) for py_file in directory.glob("*.py"): if file and file not in str(py_file): continue module_name = py_file.name.removeprefix(".py") spec = importlib.util.spec_from_file_location( - module_name, f"{_Settings.WORKFLOWS_DIRECTORY}/{module_name}" + module_name, f"{Settings.WORKFLOWS_DIRECTORY}/{module_name}" ) assert spec foo = importlib.util.module_from_spec(spec) @@ -106,30 +105,3 @@ def _update_workflow_with_native_jobs(workflow): for job in workflow.jobs: aux_job.requires.append(job.name) workflow.jobs.append(aux_job) - - -def _get_user_settings() -> Dict[str, Any]: - """ - Gets user's settings - """ - res = {} # type: Dict[str, Any] - - directory = Path(_Settings.SETTINGS_DIRECTORY) - for py_file in directory.glob("*.py"): - module_name = py_file.name.removeprefix(".py") - spec = importlib.util.spec_from_file_location( - module_name, f"{_Settings.SETTINGS_DIRECTORY}/{module_name}" - ) - assert spec - foo = importlib.util.module_from_spec(spec) - assert spec.loader - spec.loader.exec_module(foo) - for setting in _USER_DEFINED_SETTINGS: - try: - value = getattr(foo, setting) - res[setting] = value - print(f"Apply user defined setting [{setting} = {value}]") - except Exception as e: - pass - - return res diff --git a/ci/praktika/native_jobs.py b/ci/praktika/native_jobs.py index 58af211988b..52bf6c6e204 100644 --- a/ci/praktika/native_jobs.py +++ b/ci/praktika/native_jobs.py @@ -10,9 +10,8 @@ from praktika.gh import GH from praktika.hook_cache import CacheRunnerHooks from praktika.hook_html import HtmlRunnerHooks from praktika.mangle import _get_workflows -from praktika.result import Result, ResultInfo +from praktika.result import Result, ResultInfo, _ResultS3 from praktika.runtime import RunConfig -from praktika.s3 import S3 from praktika.settings import Settings from praktika.utils import Shell, Utils @@ -225,6 +224,7 @@ def _config_workflow(workflow: Workflow.Config, job_name): cache_success=[], cache_success_base64=[], cache_artifacts={}, + cache_jobs={}, ).dump() # checks: @@ -310,9 +310,8 @@ def _finish_workflow(workflow, job_name): print(env.get_needs_statuses()) print("Check Workflow results") - S3.copy_result_from_s3( + _ResultS3.copy_result_from_s3( Result.file_name_static(workflow.name), - lock=False, ) workflow_result = Result.from_fs(workflow.name) @@ -345,7 +344,7 @@ def _finish_workflow(workflow, job_name): failed_results.append(result.name) if failed_results: - ready_for_merge_description = f"failed: {', '.join(failed_results)}" + ready_for_merge_description = f"Failed: {', '.join(failed_results)}" if not GH.post_commit_status( name=Settings.READY_FOR_MERGE_STATUS_NAME + f" [{workflow.name}]", @@ -357,10 +356,9 @@ def _finish_workflow(workflow, job_name): env.add_info(ResultInfo.GH_STATUS_ERROR) if update_final_report: - S3.copy_result_to_s3( + _ResultS3.copy_result_to_s3( workflow_result, - unlock=False, - ) # no lock - no unlock + ) Result.from_fs(job_name).set_status(Result.Status.SUCCESS) diff --git a/ci/praktika/result.py b/ci/praktika/result.py index 842deacbcbd..8164b1d1295 100644 --- a/ci/praktika/result.py +++ b/ci/praktika/result.py @@ -2,10 +2,12 @@ import dataclasses import datetime import sys from pathlib import Path -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Union from praktika._environment import _Environment -from praktika._settings import _Settings +from praktika.cache import Cache +from praktika.s3 import S3 +from praktika.settings import Settings from praktika.utils import ContextManager, MetaClasses, Shell, Utils @@ -55,7 +57,7 @@ class Result(MetaClasses.Serializable): stopwatch: Utils.Stopwatch = None, status="", files=None, - info="", + info: Union[List[str], str] = "", with_info_from_results=True, ): if isinstance(status, bool): @@ -149,7 +151,7 @@ class Result(MetaClasses.Serializable): @classmethod def file_name_static(cls, name): - return f"{_Settings.TEMP_DIR}/result_{Utils.normalize_string(name)}.json" + return f"{Settings.TEMP_DIR}/result_{Utils.normalize_string(name)}.json" @classmethod def from_dict(cls, obj: Dict[str, Any]) -> "Result": @@ -232,7 +234,7 @@ class Result(MetaClasses.Serializable): ) @classmethod - def generate_skipped(cls, name, results=None): + def generate_skipped(cls, name, cache_record: Cache.CacheRecord, results=None): return Result( name=name, status=Result.Status.SKIPPED, @@ -241,7 +243,7 @@ class Result(MetaClasses.Serializable): results=results or [], files=[], links=[], - info="from cache", + info=f"from cache: sha [{cache_record.sha}], pr/branch [{cache_record.pr_number or cache_record.branch}]", ) @classmethod @@ -275,7 +277,7 @@ class Result(MetaClasses.Serializable): # Set log file path if logging is enabled log_file = ( - f"{_Settings.TEMP_DIR}/{Utils.normalize_string(name)}.log" + f"{Settings.TEMP_DIR}/{Utils.normalize_string(name)}.log" if with_log else None ) @@ -321,14 +323,31 @@ class Result(MetaClasses.Serializable): self.dump() if not self.is_ok(): print("ERROR: Job Failed") - for result in self.results: - if not result.is_ok(): - print("Failed checks:") - print(" | ", result) + print(self.to_stdout_formatted()) sys.exit(1) else: print("ok") + def to_stdout_formatted(self, indent="", res=""): + if self.is_ok(): + return res + + res += f"{indent}Task [{self.name}] failed.\n" + fail_info = "" + sub_indent = indent + " " + + if not self.results: + if not self.is_ok(): + fail_info += f"{sub_indent}{self.name}:\n" + for line in self.info.splitlines(): + fail_info += f"{sub_indent}{sub_indent}{line}\n" + return res + fail_info + + for sub_result in self.results: + res = sub_result.to_stdout_formatted(sub_indent, res) + + return res + class ResultInfo: SETUP_ENV_JOB_FAILED = ( @@ -351,3 +370,202 @@ class ResultInfo: ) S3_ERROR = "S3 call failure" + + +class _ResultS3: + + @classmethod + def copy_result_to_s3(cls, result, unlock=False): + result.dump() + env = _Environment.get() + s3_path = f"{Settings.HTML_S3_PATH}/{env.get_s3_prefix()}" + s3_path_full = f"{s3_path}/{Path(result.file_name()).name}" + url = S3.copy_file_to_s3(s3_path=s3_path, local_path=result.file_name()) + # if unlock: + # if not cls.unlock(s3_path_full): + # print(f"ERROR: File [{s3_path_full}] unlock failure") + # assert False # TODO: investigate + return url + + @classmethod + def copy_result_from_s3(cls, local_path, lock=False): + env = _Environment.get() + file_name = Path(local_path).name + s3_path = f"{Settings.HTML_S3_PATH}/{env.get_s3_prefix()}/{file_name}" + # if lock: + # cls.lock(s3_path) + if not S3.copy_file_from_s3(s3_path=s3_path, local_path=local_path): + print(f"ERROR: failed to cp file [{s3_path}] from s3") + raise + + @classmethod + def copy_result_from_s3_with_version(cls, local_path): + env = _Environment.get() + file_name = Path(local_path).name + local_dir = Path(local_path).parent + file_name_pattern = f"{file_name}_*" + for file_path in local_dir.glob(file_name_pattern): + file_path.unlink() + s3_path = f"{Settings.HTML_S3_PATH}/{env.get_s3_prefix()}/" + if not S3.copy_file_from_s3_matching_pattern( + s3_path=s3_path, local_path=local_dir, include=file_name_pattern + ): + print(f"ERROR: failed to cp file [{s3_path}] from s3") + raise + result_files = [] + for file_path in local_dir.glob(file_name_pattern): + result_files.append(file_path) + assert result_files, "No result files found" + result_files.sort() + version = int(result_files[-1].name.split("_")[-1]) + Shell.check(f"cp {result_files[-1]} {local_path}", strict=True, verbose=True) + return version + + @classmethod + def copy_result_to_s3_with_version(cls, result, version): + result.dump() + filename = Path(result.file_name()).name + file_name_versioned = f"{filename}_{str(version).zfill(3)}" + env = _Environment.get() + s3_path_versioned = ( + f"{Settings.HTML_S3_PATH}/{env.get_s3_prefix()}/{file_name_versioned}" + ) + s3_path = f"{Settings.HTML_S3_PATH}/{env.get_s3_prefix()}/" + if version == 0: + S3.clean_s3_directory(s3_path=s3_path) + if not S3.put( + s3_path=s3_path_versioned, + local_path=result.file_name(), + if_none_matched=True, + ): + print("Failed to put versioned Result") + return False + if not S3.put(s3_path=s3_path, local_path=result.file_name()): + print("Failed to put non-versioned Result") + return True + + # @classmethod + # def lock(cls, s3_path, level=0): + # env = _Environment.get() + # s3_path_lock = s3_path + f".lock" + # file_path_lock = f"{Settings.TEMP_DIR}/{Path(s3_path_lock).name}" + # assert Shell.check( + # f"echo '''{env.JOB_NAME}''' > {file_path_lock}", verbose=True + # ), "Never" + # + # i = 20 + # meta = S3.head_object(s3_path_lock) + # while meta: + # locked_by_job = meta.get("Metadata", {"job": ""}).get("job", "") + # if locked_by_job: + # decoded_bytes = base64.b64decode(locked_by_job) + # locked_by_job = decoded_bytes.decode("utf-8") + # print( + # f"WARNING: Failed to acquire lock, meta [{meta}], job [{locked_by_job}] - wait" + # ) + # i -= 5 + # if i < 0: + # info = f"ERROR: lock acquire failure - unlock forcefully" + # print(info) + # env.add_info(info) + # break + # time.sleep(5) + # + # metadata = {"job": Utils.to_base64(env.JOB_NAME)} + # S3.put( + # s3_path=s3_path_lock, + # local_path=file_path_lock, + # metadata=metadata, + # if_none_matched=True, + # ) + # time.sleep(1) + # obj = S3.head_object(s3_path_lock) + # if not obj or not obj.has_tags(tags=metadata): + # print(f"WARNING: locked by another job [{obj}]") + # env.add_info("S3 lock file failure") + # cls.lock(s3_path, level=level + 1) + # print("INFO: lock acquired") + # + # @classmethod + # def unlock(cls, s3_path): + # s3_path_lock = s3_path + ".lock" + # env = _Environment.get() + # obj = S3.head_object(s3_path_lock) + # if not obj: + # print("ERROR: lock file is removed") + # assert False # investigate + # elif not obj.has_tags({"job": Utils.to_base64(env.JOB_NAME)}): + # print("ERROR: lock file was acquired by another job") + # assert False # investigate + # + # if not S3.delete(s3_path_lock): + # print(f"ERROR: File [{s3_path_lock}] delete failure") + # print("INFO: lock released") + # return True + + @classmethod + def upload_result_files_to_s3(cls, result): + if result.results: + for result_ in result.results: + cls.upload_result_files_to_s3(result_) + for file in result.files: + if not Path(file).is_file(): + print(f"ERROR: Invalid file [{file}] in [{result.name}] - skip upload") + result.info += f"\nWARNING: Result file [{file}] was not found" + file_link = S3._upload_file_to_s3(file, upload_to_s3=False) + else: + is_text = False + for text_file_suffix in Settings.TEXT_CONTENT_EXTENSIONS: + if file.endswith(text_file_suffix): + print( + f"File [{file}] matches Settings.TEXT_CONTENT_EXTENSIONS [{Settings.TEXT_CONTENT_EXTENSIONS}] - add text attribute for s3 object" + ) + is_text = True + break + file_link = S3._upload_file_to_s3( + file, + upload_to_s3=True, + text=is_text, + s3_subprefix=Utils.normalize_string(result.name), + ) + result.links.append(file_link) + if result.files: + print( + f"Result files [{result.files}] uploaded to s3 [{result.links[-len(result.files):]}] - clean files list" + ) + result.files = [] + result.dump() + + @classmethod + def update_workflow_results(cls, workflow_name, new_info="", new_sub_results=None): + assert new_info or new_sub_results + + attempt = 1 + prev_status = "" + new_status = "" + done = False + while attempt < 10: + version = cls.copy_result_from_s3_with_version( + Result.file_name_static(workflow_name) + ) + workflow_result = Result.from_fs(workflow_name) + prev_status = workflow_result.status + if new_info: + workflow_result.set_info(new_info) + if new_sub_results: + if isinstance(new_sub_results, Result): + new_sub_results = [new_sub_results] + for result_ in new_sub_results: + workflow_result.update_sub_result(result_) + new_status = workflow_result.status + if cls.copy_result_to_s3_with_version(workflow_result, version=version + 1): + done = True + break + print(f"Attempt [{attempt}] to upload workflow result failed") + attempt += 1 + assert done + + if prev_status != new_status: + return new_status + else: + return None diff --git a/ci/praktika/runner.py b/ci/praktika/runner.py index 1ac8748d1c0..38112dd5684 100644 --- a/ci/praktika/runner.py +++ b/ci/praktika/runner.py @@ -52,6 +52,7 @@ class Runner: cache_success=[], cache_success_base64=[], cache_artifacts={}, + cache_jobs={}, ) for docker in workflow.dockers: workflow_config.digest_dockers[docker.name] = Digest().calc_docker_digest( @@ -123,7 +124,7 @@ class Runner: return 0 - def _run(self, workflow, job, docker="", no_docker=False, param=None): + def _run(self, workflow, job, docker="", no_docker=False, param=None, test=""): # re-set envs for local run env = _Environment.get() env.JOB_NAME = job.name @@ -162,6 +163,9 @@ class Runner: if param: print(f"Custom --param [{param}] will be passed to job's script") cmd += f" --param {param}" + if test: + print(f"Custom --test [{test}] will be passed to job's script") + cmd += f" --test {test}" print(f"--- Run command [{cmd}]") with TeePopen(cmd, timeout=job.timeout) as process: @@ -240,10 +244,6 @@ class Runner: result.set_files(files=[Settings.RUN_LOG]) result.update_duration().dump() - if result.info and result.status != Result.Status.SUCCESS: - # provide job info to workflow level - info_errors.append(result.info) - if run_exit_code == 0: providing_artifacts = [] if job.provides and workflow.artifacts: @@ -310,6 +310,7 @@ class Runner: local_run=False, no_docker=False, param=None, + test="", pr=None, sha=None, branch=None, @@ -358,7 +359,12 @@ class Runner: print(f"=== Run script [{job.name}], workflow [{workflow.name}] ===") try: run_code = self._run( - workflow, job, docker=docker, no_docker=no_docker, param=param + workflow, + job, + docker=docker, + no_docker=no_docker, + param=param, + test=test, ) res = run_code == 0 if not res: diff --git a/ci/praktika/runtime.py b/ci/praktika/runtime.py index a87b67c2c79..07c24e0498c 100644 --- a/ci/praktika/runtime.py +++ b/ci/praktika/runtime.py @@ -15,17 +15,23 @@ class RunConfig(MetaClasses.Serializable): # there are might be issue with special characters in job names if used directly in yaml syntax - create base64 encoded list to avoid this cache_success_base64: List[str] cache_artifacts: Dict[str, Cache.CacheRecord] + cache_jobs: Dict[str, Cache.CacheRecord] sha: str @classmethod def from_dict(cls, obj): cache_artifacts = obj["cache_artifacts"] + cache_jobs = obj["cache_jobs"] cache_artifacts_deserialized = {} + cache_jobs_deserialized = {} for artifact_name, cache_artifact in cache_artifacts.items(): cache_artifacts_deserialized[artifact_name] = Cache.CacheRecord.from_dict( cache_artifact ) obj["cache_artifacts"] = cache_artifacts_deserialized + for job_name, cache_jobs in cache_jobs.items(): + cache_jobs_deserialized[job_name] = Cache.CacheRecord.from_dict(cache_jobs) + obj["cache_jobs"] = cache_artifacts_deserialized return RunConfig(**obj) @classmethod diff --git a/ci/praktika/s3.py b/ci/praktika/s3.py index 04a08622dcd..82034b57b80 100644 --- a/ci/praktika/s3.py +++ b/ci/praktika/s3.py @@ -1,12 +1,11 @@ import dataclasses import json -import time from pathlib import Path from typing import Dict from praktika._environment import _Environment from praktika.settings import Settings -from praktika.utils import Shell, Utils +from praktika.utils import Shell class S3: @@ -59,16 +58,15 @@ class S3: return f"https://{s3_full_path}".replace(bucket, endpoint) @classmethod - def put(cls, s3_path, local_path, text=False, metadata=None): + def put(cls, s3_path, local_path, text=False, metadata=None, if_none_matched=False): assert Path(local_path).exists(), f"Path [{local_path}] does not exist" assert Path(s3_path), f"Invalid S3 Path [{s3_path}]" assert Path( local_path ).is_file(), f"Path [{local_path}] is not file. Only files are supported" - file_name = Path(local_path).name s3_full_path = s3_path - if not s3_full_path.endswith(file_name): - s3_full_path = f"{s3_path}/{Path(local_path).name}" + if s3_full_path.endswith("/"): + s3_full_path = f"{s3_path}{Path(local_path).name}" s3_full_path = str(s3_full_path).removeprefix("s3://") bucket, key = s3_full_path.split("/", maxsplit=1) @@ -76,6 +74,8 @@ class S3: command = ( f"aws s3api put-object --bucket {bucket} --key {key} --body {local_path}" ) + if if_none_matched: + command += f' --if-none-match "*"' if metadata: for k, v in metadata.items(): command += f" --metadata {k}={v}" @@ -84,7 +84,7 @@ class S3: if text: cmd += " --content-type text/plain" res = cls.run_command_with_retries(command) - assert res + return res @classmethod def run_command_with_retries(cls, command, retries=Settings.MAX_RETRIES_S3): @@ -101,6 +101,14 @@ class S3: elif "does not exist" in stderr: print("ERROR: requested file does not exist") break + elif "Unknown options" in stderr: + print("ERROR: Invalid AWS CLI command or CLI client version:") + print(f" | awc error: {stderr}") + break + elif "PreconditionFailed" in stderr: + print("ERROR: AWS API Call Precondition Failed") + print(f" | awc error: {stderr}") + break if ret_code != 0: print( f"ERROR: aws s3 cp failed, stdout/stderr err: [{stderr}], out [{stdout}]" @@ -108,13 +116,6 @@ class S3: res = ret_code == 0 return res - @classmethod - def get_link(cls, s3_path, local_path): - s3_full_path = f"{s3_path}/{Path(local_path).name}" - bucket = s3_path.split("/")[0] - endpoint = Settings.S3_BUCKET_TO_HTTP_ENDPOINT[bucket] - return f"https://{s3_full_path}".replace(bucket, endpoint) - @classmethod def copy_file_from_s3(cls, s3_path, local_path): assert Path(s3_path), f"Invalid S3 Path [{s3_path}]" @@ -128,6 +129,19 @@ class S3: res = cls.run_command_with_retries(cmd) return res + @classmethod + def copy_file_from_s3_matching_pattern( + cls, s3_path, local_path, include, exclude="*" + ): + assert Path(s3_path), f"Invalid S3 Path [{s3_path}]" + assert Path( + local_path + ).is_dir(), f"Path [{local_path}] does not exist or not a directory" + assert s3_path.endswith("/"), f"s3 path is invalid [{s3_path}]" + cmd = f'aws s3 cp s3://{s3_path} {local_path} --exclude "{exclude}" --include "{include}" --recursive' + res = cls.run_command_with_retries(cmd) + return res + @classmethod def head_object(cls, s3_path): s3_path = str(s3_path).removeprefix("s3://") @@ -148,103 +162,6 @@ class S3: verbose=True, ) - # TODO: apparently should be placed into separate file to be used only inside praktika - # keeping this module clean from importing Settings, Environment and etc, making it easy for use externally - @classmethod - def copy_result_to_s3(cls, result, unlock=True): - result.dump() - env = _Environment.get() - s3_path = f"{Settings.HTML_S3_PATH}/{env.get_s3_prefix()}" - s3_path_full = f"{s3_path}/{Path(result.file_name()).name}" - url = S3.copy_file_to_s3(s3_path=s3_path, local_path=result.file_name()) - if env.PR_NUMBER: - print("Duplicate Result for latest commit alias in PR") - s3_path = f"{Settings.HTML_S3_PATH}/{env.get_s3_prefix(latest=True)}" - url = S3.copy_file_to_s3(s3_path=s3_path, local_path=result.file_name()) - if unlock: - if not cls.unlock(s3_path_full): - print(f"ERROR: File [{s3_path_full}] unlock failure") - assert False # TODO: investigate - return url - - @classmethod - def copy_result_from_s3(cls, local_path, lock=True): - env = _Environment.get() - file_name = Path(local_path).name - s3_path = f"{Settings.HTML_S3_PATH}/{env.get_s3_prefix()}/{file_name}" - if lock: - cls.lock(s3_path) - if not S3.copy_file_from_s3(s3_path=s3_path, local_path=local_path): - print(f"ERROR: failed to cp file [{s3_path}] from s3") - raise - - @classmethod - def lock(cls, s3_path, level=0): - assert level < 3, "Never" - env = _Environment.get() - s3_path_lock = s3_path + f".lock" - file_path_lock = f"{Settings.TEMP_DIR}/{Path(s3_path_lock).name}" - assert Shell.check( - f"echo '''{env.JOB_NAME}''' > {file_path_lock}", verbose=True - ), "Never" - - i = 20 - meta = S3.head_object(s3_path_lock) - while meta: - print(f"WARNING: Failed to acquire lock, meta [{meta}] - wait") - i -= 5 - if i < 0: - info = f"ERROR: lock acquire failure - unlock forcefully" - print(info) - env.add_info(info) - break - time.sleep(5) - - metadata = {"job": Utils.to_base64(env.JOB_NAME)} - S3.put( - s3_path=s3_path_lock, - local_path=file_path_lock, - metadata=metadata, - ) - time.sleep(1) - obj = S3.head_object(s3_path_lock) - if not obj or not obj.has_tags(tags=metadata): - print(f"WARNING: locked by another job [{obj}]") - env.add_info("S3 lock file failure") - cls.lock(s3_path, level=level + 1) - print("INFO: lock acquired") - - @classmethod - def unlock(cls, s3_path): - s3_path_lock = s3_path + ".lock" - env = _Environment.get() - obj = S3.head_object(s3_path_lock) - if not obj: - print("ERROR: lock file is removed") - assert False # investigate - elif not obj.has_tags({"job": Utils.to_base64(env.JOB_NAME)}): - print("ERROR: lock file was acquired by another job") - assert False # investigate - - if not S3.delete(s3_path_lock): - print(f"ERROR: File [{s3_path_lock}] delete failure") - print("INFO: lock released") - return True - - @classmethod - def get_result_link(cls, result): - env = _Environment.get() - s3_path = f"{Settings.HTML_S3_PATH}/{env.get_s3_prefix(latest=True if env.PR_NUMBER else False)}" - return S3.get_link(s3_path=s3_path, local_path=result.file_name()) - - @classmethod - def clean_latest_result(cls): - env = _Environment.get() - env.SHA = "latest" - assert env.PR_NUMBER - s3_path = f"{Settings.HTML_S3_PATH}/{env.get_s3_prefix()}" - S3.clean_s3_directory(s3_path=s3_path) - @classmethod def _upload_file_to_s3( cls, local_file_path, upload_to_s3: bool, text: bool = False, s3_subprefix="" @@ -260,36 +177,3 @@ class S3: ) return html_link return f"file://{Path(local_file_path).absolute()}" - - @classmethod - def upload_result_files_to_s3(cls, result): - if result.results: - for result_ in result.results: - cls.upload_result_files_to_s3(result_) - for file in result.files: - if not Path(file).is_file(): - print(f"ERROR: Invalid file [{file}] in [{result.name}] - skip upload") - result.info += f"\nWARNING: Result file [{file}] was not found" - file_link = cls._upload_file_to_s3(file, upload_to_s3=False) - else: - is_text = False - for text_file_suffix in Settings.TEXT_CONTENT_EXTENSIONS: - if file.endswith(text_file_suffix): - print( - f"File [{file}] matches Settings.TEXT_CONTENT_EXTENSIONS [{Settings.TEXT_CONTENT_EXTENSIONS}] - add text attribute for s3 object" - ) - is_text = True - break - file_link = cls._upload_file_to_s3( - file, - upload_to_s3=True, - text=is_text, - s3_subprefix=Utils.normalize_string(result.name), - ) - result.links.append(file_link) - if result.files: - print( - f"Result files [{result.files}] uploaded to s3 [{result.links[-len(result.files):]}] - clean files list" - ) - result.files = [] - result.dump() diff --git a/ci/praktika/settings.py b/ci/praktika/settings.py index 1a4068d9398..b281a95370c 100644 --- a/ci/praktika/settings.py +++ b/ci/praktika/settings.py @@ -1,8 +1,152 @@ -from praktika._settings import _Settings -from praktika.mangle import _get_user_settings +import dataclasses +import importlib.util +from pathlib import Path +from typing import Dict, Iterable, List, Optional -Settings = _Settings() -user_settings = _get_user_settings() -for setting, value in user_settings.items(): - Settings.__setattr__(setting, value) +@dataclasses.dataclass +class _Settings: + ###################################### + # Pipeline generation settings # + ###################################### + MAIN_BRANCH = "main" + CI_PATH = "./ci" + WORKFLOW_PATH_PREFIX: str = "./.github/workflows" + WORKFLOWS_DIRECTORY: str = f"{CI_PATH}/workflows" + SETTINGS_DIRECTORY: str = f"{CI_PATH}/settings" + CI_CONFIG_JOB_NAME = "Config Workflow" + DOCKER_BUILD_JOB_NAME = "Docker Builds" + FINISH_WORKFLOW_JOB_NAME = "Finish Workflow" + READY_FOR_MERGE_STATUS_NAME = "Ready for Merge" + CI_CONFIG_RUNS_ON: Optional[List[str]] = None + DOCKER_BUILD_RUNS_ON: Optional[List[str]] = None + VALIDATE_FILE_PATHS: bool = True + + ###################################### + # Runtime Settings # + ###################################### + MAX_RETRIES_S3 = 3 + MAX_RETRIES_GH = 3 + + ###################################### + # S3 (artifact storage) settings # + ###################################### + S3_ARTIFACT_PATH: str = "" + + ###################################### + # CI workspace settings # + ###################################### + TEMP_DIR: str = "/tmp/praktika" + OUTPUT_DIR: str = f"{TEMP_DIR}/output" + INPUT_DIR: str = f"{TEMP_DIR}/input" + PYTHON_INTERPRETER: str = "python3" + PYTHON_PACKET_MANAGER: str = "pip3" + PYTHON_VERSION: str = "3.9" + INSTALL_PYTHON_FOR_NATIVE_JOBS: bool = False + INSTALL_PYTHON_REQS_FOR_NATIVE_JOBS: str = "./ci/requirements.txt" + ENVIRONMENT_VAR_FILE: str = f"{TEMP_DIR}/environment.json" + RUN_LOG: str = f"{TEMP_DIR}/praktika_run.log" + + SECRET_GH_APP_ID: str = "GH_APP_ID" + SECRET_GH_APP_PEM_KEY: str = "GH_APP_PEM_KEY" + + ENV_SETUP_SCRIPT: str = "/tmp/praktika_setup_env.sh" + WORKFLOW_STATUS_FILE: str = f"{TEMP_DIR}/workflow_status.json" + + ###################################### + # CI Cache settings # + ###################################### + CACHE_VERSION: int = 1 + CACHE_DIGEST_LEN: int = 20 + CACHE_S3_PATH: str = "" + CACHE_LOCAL_PATH: str = f"{TEMP_DIR}/ci_cache" + + ###################################### + # Report settings # + ###################################### + HTML_S3_PATH: str = "" + HTML_PAGE_FILE: str = "./praktika/json.html" + TEXT_CONTENT_EXTENSIONS: Iterable[str] = frozenset([".txt", ".log"]) + S3_BUCKET_TO_HTTP_ENDPOINT: Optional[Dict[str, str]] = None + + DOCKERHUB_USERNAME: str = "" + DOCKERHUB_SECRET: str = "" + DOCKER_WD: str = "/wd" + + ###################################### + # CI DB Settings # + ###################################### + SECRET_CI_DB_URL: str = "CI_DB_URL" + SECRET_CI_DB_PASSWORD: str = "CI_DB_PASSWORD" + CI_DB_DB_NAME = "" + CI_DB_TABLE_NAME = "" + CI_DB_INSERT_TIMEOUT_SEC = 5 + + DISABLE_MERGE_COMMIT = True + + +_USER_DEFINED_SETTINGS = [ + "S3_ARTIFACT_PATH", + "CACHE_S3_PATH", + "HTML_S3_PATH", + "S3_BUCKET_TO_HTTP_ENDPOINT", + "TEXT_CONTENT_EXTENSIONS", + "TEMP_DIR", + "OUTPUT_DIR", + "INPUT_DIR", + "CI_CONFIG_RUNS_ON", + "DOCKER_BUILD_RUNS_ON", + "CI_CONFIG_JOB_NAME", + "PYTHON_INTERPRETER", + "PYTHON_VERSION", + "PYTHON_PACKET_MANAGER", + "INSTALL_PYTHON_FOR_NATIVE_JOBS", + "INSTALL_PYTHON_REQS_FOR_NATIVE_JOBS", + "MAX_RETRIES_S3", + "MAX_RETRIES_GH", + "VALIDATE_FILE_PATHS", + "DOCKERHUB_USERNAME", + "DOCKERHUB_SECRET", + "READY_FOR_MERGE_STATUS_NAME", + "SECRET_CI_DB_URL", + "SECRET_CI_DB_PASSWORD", + "CI_DB_DB_NAME", + "CI_DB_TABLE_NAME", + "CI_DB_INSERT_TIMEOUT_SEC", + "SECRET_GH_APP_PEM_KEY", + "SECRET_GH_APP_ID", + "MAIN_BRANCH", + "DISABLE_MERGE_COMMIT", +] + + +def _get_settings() -> _Settings: + res = _Settings() + + directory = Path(_Settings.SETTINGS_DIRECTORY) + for py_file in directory.glob("*.py"): + module_name = py_file.name.removeprefix(".py") + spec = importlib.util.spec_from_file_location( + module_name, f"{_Settings.SETTINGS_DIRECTORY}/{module_name}" + ) + assert spec + foo = importlib.util.module_from_spec(spec) + assert spec.loader + spec.loader.exec_module(foo) + for setting in _USER_DEFINED_SETTINGS: + try: + value = getattr(foo, setting) + res.__setattr__(setting, value) + # print(f"- read user defined setting [{setting} = {value}]") + except Exception as e: + # print(f"Exception while read user settings: {e}") + pass + + return res + + +class GHRunners: + ubuntu = "ubuntu-latest" + + +Settings = _get_settings() diff --git a/ci/praktika/utils.py b/ci/praktika/utils.py index 62eb13b3e19..2bcc94f2559 100644 --- a/ci/praktika/utils.py +++ b/ci/praktika/utils.py @@ -17,8 +17,6 @@ from threading import Thread from types import SimpleNamespace from typing import Any, Dict, Iterator, List, Optional, Type, TypeVar, Union -from praktika._settings import _Settings - T = TypeVar("T", bound="Serializable") diff --git a/ci/praktika/validator.py b/ci/praktika/validator.py index d612881b819..0bb722903e5 100644 --- a/ci/praktika/validator.py +++ b/ci/praktika/validator.py @@ -4,10 +4,8 @@ from itertools import chain from pathlib import Path from praktika import Workflow -from praktika._settings import GHRunners from praktika.mangle import _get_workflows -from praktika.settings import Settings -from praktika.utils import ContextManager +from praktika.settings import GHRunners, Settings class Validator: @@ -168,9 +166,7 @@ class Validator: "\n echo requests==2.32.3 >> ./ci/requirements.txt" ) message += "\n echo https://clickhouse-builds.s3.amazonaws.com/packages/praktika-0.1-py3-none-any.whl >> ./ci/requirements.txt" - cls.evaluate_check( - path.is_file(), message, job.name, workflow.name - ) + cls.evaluate_check(path.is_file(), message, job.name, workflow.name) @classmethod def validate_dockers(cls, workflow: Workflow.Config): diff --git a/ci/workflows/pull_request.py b/ci/workflows/pull_request.py index 0d505ae27c4..707babb1250 100644 --- a/ci/workflows/pull_request.py +++ b/ci/workflows/pull_request.py @@ -68,6 +68,7 @@ stateless_tests_jobs = Job.Config( name=JobNames.STATELESS, runs_on=[RunnerLabels.BUILDER], command="python3 ./ci/jobs/functional_stateless_tests.py --test-options {PARAMETER}", + # many tests expect to see "/var/lib/clickhouse" in various output lines - add mount for now, consider creating this dir in docker file run_in_docker="clickhouse/stateless-test+--security-opt seccomp=unconfined+--volume=/tmp/praktika:/var/lib/clickhouse", digest_config=Job.CacheDigestConfig( include_paths=[