mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-28 10:31:57 +00:00
233 lines
8.2 KiB
Python
233 lines
8.2 KiB
Python
import dataclasses
|
|
import json
|
|
from pathlib import Path
|
|
from typing import List
|
|
|
|
from praktika._environment import _Environment
|
|
from praktika.gh import GH
|
|
from praktika.parser import WorkflowConfigParser
|
|
from praktika.result import Result, ResultInfo, _ResultS3
|
|
from praktika.runtime import RunConfig
|
|
from praktika.s3 import S3
|
|
from praktika.settings import Settings
|
|
from praktika.utils import Utils
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class GitCommit:
|
|
# date: str
|
|
# message: str
|
|
sha: str
|
|
|
|
@staticmethod
|
|
def from_json(file) -> List["GitCommit"]:
|
|
commits = []
|
|
json_data = None
|
|
try:
|
|
with open(file, "r", encoding="utf-8") as f:
|
|
json_data = json.load(f)
|
|
commits = [
|
|
GitCommit(
|
|
# message=commit["messageHeadline"],
|
|
sha=commit["sha"],
|
|
# date=commit["committedDate"],
|
|
)
|
|
for commit in json_data
|
|
]
|
|
except Exception as e:
|
|
print(
|
|
f"ERROR: Failed to deserialize commit's data [{json_data}], ex: [{e}]"
|
|
)
|
|
|
|
return commits
|
|
|
|
@classmethod
|
|
def update_s3_data(cls):
|
|
env = _Environment.get()
|
|
sha = env.SHA
|
|
if not sha:
|
|
print("WARNING: Failed to retrieve commit sha")
|
|
return
|
|
commits = cls.pull_from_s3()
|
|
for commit in commits:
|
|
if sha == commit.sha:
|
|
print(
|
|
f"INFO: Sha already present in commits data [{sha}] - skip data update"
|
|
)
|
|
return
|
|
commits.append(GitCommit(sha=sha))
|
|
cls.push_to_s3(commits)
|
|
return
|
|
|
|
@classmethod
|
|
def dump(cls, commits):
|
|
commits_ = []
|
|
for commit in commits:
|
|
commits_.append(dataclasses.asdict(commit))
|
|
with open(cls.file_name(), "w", encoding="utf8") as f:
|
|
json.dump(commits_, f)
|
|
|
|
@classmethod
|
|
def pull_from_s3(cls):
|
|
local_path = Path(cls.file_name())
|
|
file_name = local_path.name
|
|
env = _Environment.get()
|
|
s3_path = f"{Settings.HTML_S3_PATH}/{cls.get_s3_prefix(pr_number=env.PR_NUMBER, branch=env.BRANCH)}/{file_name}"
|
|
if not S3.copy_file_from_s3(s3_path=s3_path, local_path=local_path):
|
|
print(f"WARNING: failed to cp file [{s3_path}] from s3")
|
|
return []
|
|
return cls.from_json(local_path)
|
|
|
|
@classmethod
|
|
def push_to_s3(cls, commits):
|
|
print(f"INFO: push commits data to s3, commits num [{len(commits)}]")
|
|
cls.dump(commits)
|
|
local_path = Path(cls.file_name())
|
|
file_name = local_path.name
|
|
env = _Environment.get()
|
|
s3_path = f"{Settings.HTML_S3_PATH}/{cls.get_s3_prefix(pr_number=env.PR_NUMBER, branch=env.BRANCH)}/{file_name}"
|
|
if not S3.copy_file_to_s3(s3_path=s3_path, local_path=local_path, text=True):
|
|
print(f"WARNING: failed to cp file [{local_path}] to s3")
|
|
|
|
@classmethod
|
|
def get_s3_prefix(cls, pr_number, branch):
|
|
prefix = ""
|
|
assert pr_number or branch
|
|
if pr_number and pr_number > 0:
|
|
prefix += f"{pr_number}"
|
|
else:
|
|
prefix += f"{branch}"
|
|
return prefix
|
|
|
|
@classmethod
|
|
def file_name(cls):
|
|
return f"{Settings.TEMP_DIR}/commits.json"
|
|
|
|
# def _get_pr_commits(pr_number):
|
|
# res = []
|
|
# if not pr_number:
|
|
# return res
|
|
# output = Shell.get_output(f"gh pr view {pr_number} --json commits")
|
|
# if output:
|
|
# res = GitCommit.from_json(output)
|
|
# return res
|
|
|
|
|
|
class HtmlRunnerHooks:
|
|
@classmethod
|
|
def configure(cls, _workflow):
|
|
# generate pending Results for all jobs in the workflow
|
|
if _workflow.enable_cache:
|
|
skip_jobs = RunConfig.from_fs(_workflow.name).cache_success
|
|
job_cache_records = RunConfig.from_fs(_workflow.name).cache_jobs
|
|
else:
|
|
skip_jobs = []
|
|
|
|
env = _Environment.get()
|
|
results = []
|
|
for job in _workflow.jobs:
|
|
if job.name not in skip_jobs:
|
|
result = Result.generate_pending(job.name)
|
|
# Preemptively add the general job log to the result directory to ensure
|
|
# the post-job handler can upload it, even if the job is terminated unexpectedly
|
|
result.set_files([Settings.RUN_LOG])
|
|
else:
|
|
result = Result.generate_skipped(job.name, job_cache_records[job.name])
|
|
results.append(result)
|
|
summary_result = Result.generate_pending(_workflow.name, results=results)
|
|
summary_result.links.append(env.CHANGE_URL)
|
|
summary_result.links.append(env.RUN_URL)
|
|
summary_result.start_time = Utils.timestamp()
|
|
|
|
assert _ResultS3.copy_result_to_s3_with_version(summary_result, version=0)
|
|
page_url = env.get_report_url(settings=Settings, latest=True)
|
|
print(f"CI Status page url [{page_url}]")
|
|
|
|
res1 = GH.post_commit_status(
|
|
name=_workflow.name,
|
|
status=Result.Status.PENDING,
|
|
description="",
|
|
url=page_url,
|
|
)
|
|
res2 = GH.post_pr_comment(
|
|
comment_body=f"Workflow [[{_workflow.name}]({page_url})], commit [{_Environment.get().SHA[:8]}]",
|
|
or_update_comment_with_substring=f"Workflow [",
|
|
)
|
|
if not (res1 or res2):
|
|
Utils.raise_with_error(
|
|
"Failed to set both GH commit status and PR comment with Workflow Status, cannot proceed"
|
|
)
|
|
if env.PR_NUMBER:
|
|
# TODO: enable for branch, add commit number limiting
|
|
GitCommit.update_s3_data()
|
|
|
|
@classmethod
|
|
def pre_run(cls, _workflow, _job):
|
|
result = Result.from_fs(_job.name)
|
|
_ResultS3.update_workflow_results(
|
|
workflow_name=_workflow.name, new_sub_results=result
|
|
)
|
|
|
|
@classmethod
|
|
def run(cls, _workflow, _job):
|
|
pass
|
|
|
|
@classmethod
|
|
def post_run(cls, _workflow, _job, info_errors):
|
|
result = Result.from_fs(_job.name)
|
|
_ResultS3.upload_result_files_to_s3(result)
|
|
_ResultS3.copy_result_to_s3(result)
|
|
|
|
env = _Environment.get()
|
|
|
|
new_sub_results = [result]
|
|
new_result_info = ""
|
|
env_info = env.REPORT_INFO
|
|
if env_info:
|
|
print(
|
|
f"WARNING: some info lines are set in Environment - append to report [{env_info}]"
|
|
)
|
|
info_errors += env_info
|
|
if info_errors:
|
|
info_errors = [f" | {error}" for error in info_errors]
|
|
info_str = f"{_job.name}:\n"
|
|
info_str += "\n".join(info_errors)
|
|
print("Update workflow results with new info")
|
|
new_result_info = info_str
|
|
|
|
if not result.is_ok():
|
|
print(
|
|
"Current job failed - find dependee jobs in the workflow and set their statuses to skipped"
|
|
)
|
|
workflow_config_parsed = WorkflowConfigParser(_workflow).parse()
|
|
for dependee_job in workflow_config_parsed.workflow_yaml_config.jobs:
|
|
if _job.name in dependee_job.needs:
|
|
if _workflow.get_job(dependee_job.name).run_unless_cancelled:
|
|
continue
|
|
print(
|
|
f"NOTE: Set job [{dependee_job.name}] status to [{Result.Status.SKIPPED}] due to current failure"
|
|
)
|
|
new_sub_results.append(
|
|
Result(
|
|
name=dependee_job.name,
|
|
status=Result.Status.SKIPPED,
|
|
info=ResultInfo.SKIPPED_DUE_TO_PREVIOUS_FAILURE
|
|
+ f" [{_job.name}]",
|
|
)
|
|
)
|
|
|
|
updated_status = _ResultS3.update_workflow_results(
|
|
new_info=new_result_info,
|
|
new_sub_results=new_sub_results,
|
|
workflow_name=_workflow.name,
|
|
)
|
|
|
|
if updated_status:
|
|
print(f"Update GH commit status [{result.name}]: [{updated_status}]")
|
|
GH.post_commit_status(
|
|
name=_workflow.name,
|
|
status=GH.convert_to_gh_status(updated_status),
|
|
description="",
|
|
url=env.get_report_url(settings=Settings, latest=True),
|
|
)
|