mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-15 19:02:04 +00:00
503 lines
16 KiB
Python
503 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
|
|
from collections import namedtuple
|
|
import fnmatch
|
|
import json
|
|
import time
|
|
|
|
import jwt
|
|
import requests # type: ignore
|
|
import boto3 # type: ignore
|
|
|
|
SUSPICIOUS_CHANGED_FILES_NUMBER = 200
|
|
|
|
SUSPICIOUS_PATTERNS = [
|
|
"tests/ci/*",
|
|
"docs/tools/*",
|
|
".github/*",
|
|
"utils/release/*",
|
|
"docker/*",
|
|
"release",
|
|
]
|
|
|
|
# Number of retries for API calls.
|
|
MAX_RETRY = 5
|
|
|
|
# Number of times a check can re-run as a whole.
|
|
# It is needed, because we are using AWS "spot" instances, that are terminated often
|
|
MAX_WORKFLOW_RERUN = 30
|
|
|
|
WorkflowDescription = namedtuple(
|
|
"WorkflowDescription",
|
|
[
|
|
"name",
|
|
"action",
|
|
"run_id",
|
|
"event",
|
|
"workflow_id",
|
|
"conclusion",
|
|
"status",
|
|
"api_url",
|
|
"fork_owner_login",
|
|
"fork_branch",
|
|
"rerun_url",
|
|
"jobs_url",
|
|
"attempt",
|
|
"repo_url",
|
|
"url",
|
|
],
|
|
)
|
|
|
|
# See https://api.github.com/orgs/{name}
|
|
TRUSTED_ORG_IDS = {
|
|
54801242, # clickhouse
|
|
}
|
|
|
|
# See https://api.github.com/repos/ClickHouse/ClickHouse/actions/workflows
|
|
# Use ID to not inject a malicious workflow
|
|
TRUSTED_WORKFLOW_IDS = {
|
|
14586616, # Cancel workflows, always trusted
|
|
}
|
|
|
|
NEED_RERUN_WORKFLOWS = {
|
|
"BackportPR",
|
|
"DocsCheck",
|
|
"DocsReleaseChecks",
|
|
"MasterCI",
|
|
"PullRequestCI",
|
|
"ReleaseBranchCI",
|
|
}
|
|
|
|
# Individual trusted contirbutors who are not in any trusted organization.
|
|
# Can be changed in runtime: we will append users that we learned to be in
|
|
# a trusted org, to save GitHub API calls.
|
|
TRUSTED_CONTRIBUTORS = {
|
|
e.lower()
|
|
for e in [
|
|
"achimbab",
|
|
"adevyatova ", # DOCSUP
|
|
"Algunenano", # Raúl Marín, Tinybird
|
|
"amosbird",
|
|
"AnaUvarova", # DOCSUP
|
|
"anauvarova", # technical writer, Yandex
|
|
"annvsh", # technical writer, Yandex
|
|
"atereh", # DOCSUP
|
|
"azat",
|
|
"bharatnc", # Newbie, but already with many contributions.
|
|
"bobrik", # Seasoned contributor, CloudFlare
|
|
"BohuTANG",
|
|
"codyrobert", # Flickerbox engineer
|
|
"cwurm", # Employee
|
|
"damozhaeva", # DOCSUP
|
|
"den-crane",
|
|
"flickerbox-tom", # Flickerbox
|
|
"gyuton", # DOCSUP
|
|
"hagen1778", # Roman Khavronenko, seasoned contributor
|
|
"hczhcz",
|
|
"hexiaoting", # Seasoned contributor
|
|
"ildus", # adjust, ex-pgpro
|
|
"javisantana", # a Spanish ClickHouse enthusiast, ex-Carto
|
|
"ka1bi4", # DOCSUP
|
|
"kirillikoff", # DOCSUP
|
|
"kreuzerkrieg",
|
|
"lehasm", # DOCSUP
|
|
"michon470", # DOCSUP
|
|
"nikvas0",
|
|
"nvartolomei",
|
|
"olgarev", # DOCSUP
|
|
"otrazhenia", # Yandex docs contractor
|
|
"pdv-ru", # DOCSUP
|
|
"podshumok", # cmake expert from QRator Labs
|
|
"s-mx", # Maxim Sabyanin, former employee, present contributor
|
|
"sevirov", # technical writer, Yandex
|
|
"spongedu", # Seasoned contributor
|
|
"taiyang-li",
|
|
"ucasFL", # Amos Bird's friend
|
|
"vdimir", # Employee
|
|
"vzakaznikov",
|
|
"YiuRULE",
|
|
"zlobober", # Developer of YT
|
|
"ilejn", # Arenadata, responsible for Kerberized Kafka
|
|
"thomoco", # ClickHouse
|
|
"BoloniniD", # Seasoned contributor, HSE
|
|
"tonickkozlov", # Cloudflare
|
|
"tylerhannan", # ClickHouse Employee
|
|
]
|
|
}
|
|
|
|
|
|
def get_installation_id(jwt_token):
|
|
headers = {
|
|
"Authorization": f"Bearer {jwt_token}",
|
|
"Accept": "application/vnd.github.v3+json",
|
|
}
|
|
response = requests.get("https://api.github.com/app/installations", headers=headers)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
for installation in data:
|
|
if installation["account"]["login"] == "ClickHouse":
|
|
installation_id = installation["id"]
|
|
return installation_id
|
|
|
|
|
|
def get_access_token(jwt_token, installation_id):
|
|
headers = {
|
|
"Authorization": f"Bearer {jwt_token}",
|
|
"Accept": "application/vnd.github.v3+json",
|
|
}
|
|
response = requests.post(
|
|
f"https://api.github.com/app/installations/{installation_id}/access_tokens",
|
|
headers=headers,
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
return data["token"]
|
|
|
|
|
|
def get_key_and_app_from_aws():
|
|
secret_name = "clickhouse_github_secret_key"
|
|
session = boto3.session.Session()
|
|
client = session.client(
|
|
service_name="secretsmanager",
|
|
)
|
|
get_secret_value_response = client.get_secret_value(SecretId=secret_name)
|
|
data = json.loads(get_secret_value_response["SecretString"])
|
|
return data["clickhouse-app-key"], int(data["clickhouse-app-id"])
|
|
|
|
|
|
def is_trusted_contributor(pr_user_login, pr_user_orgs):
|
|
if pr_user_login.lower() in TRUSTED_CONTRIBUTORS:
|
|
print(f"User '{pr_user_login}' is trusted")
|
|
return True
|
|
|
|
print(f"User '{pr_user_login}' is not trusted")
|
|
|
|
for org_id in pr_user_orgs:
|
|
if org_id in TRUSTED_ORG_IDS:
|
|
print(
|
|
f"Org '{org_id}' is trusted; will mark user {pr_user_login} as trusted"
|
|
)
|
|
return True
|
|
print(f"Org '{org_id}' is not trusted")
|
|
|
|
return False
|
|
|
|
|
|
def _exec_get_with_retry(url, token):
|
|
headers = {"Authorization": f"token {token}"}
|
|
for i in range(MAX_RETRY):
|
|
try:
|
|
response = requests.get(url, headers=headers)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as ex:
|
|
print("Got exception executing request", ex)
|
|
time.sleep(i + 1)
|
|
|
|
raise Exception("Cannot execute GET request with retries")
|
|
|
|
|
|
def _exec_post_with_retry(url, token, data=None):
|
|
headers = {"Authorization": f"token {token}"}
|
|
for i in range(MAX_RETRY):
|
|
try:
|
|
if data:
|
|
response = requests.post(url, headers=headers, json=data)
|
|
else:
|
|
response = requests.post(url, headers=headers)
|
|
if response.status_code == 403:
|
|
data = response.json()
|
|
if (
|
|
"message" in data
|
|
and data["message"]
|
|
== "This workflow run is not waiting for approval"
|
|
):
|
|
print("Workflow doesn't need approval")
|
|
return data
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as ex:
|
|
print("Got exception executing request", ex)
|
|
time.sleep(i + 1)
|
|
|
|
raise Exception("Cannot execute POST request with retry")
|
|
|
|
|
|
def _get_pull_requests_from(repo_url, owner, branch, token):
|
|
url = f"{repo_url}/pulls?head={owner}:{branch}"
|
|
return _exec_get_with_retry(url, token)
|
|
|
|
|
|
def get_workflow_description_from_event(event):
|
|
action = event["action"]
|
|
run_id = event["workflow_run"]["id"]
|
|
event_type = event["workflow_run"]["event"]
|
|
fork_owner = event["workflow_run"]["head_repository"]["owner"]["login"]
|
|
fork_branch = event["workflow_run"]["head_branch"]
|
|
name = event["workflow_run"]["name"]
|
|
workflow_id = event["workflow_run"]["workflow_id"]
|
|
conclusion = event["workflow_run"]["conclusion"]
|
|
attempt = event["workflow_run"]["run_attempt"]
|
|
status = event["workflow_run"]["status"]
|
|
jobs_url = event["workflow_run"]["jobs_url"]
|
|
rerun_url = event["workflow_run"]["rerun_url"]
|
|
url = event["workflow_run"]["html_url"]
|
|
api_url = event["workflow_run"]["url"]
|
|
repo_url = event["repository"]["url"]
|
|
return WorkflowDescription(
|
|
name=name,
|
|
action=action,
|
|
run_id=run_id,
|
|
event=event_type,
|
|
fork_owner_login=fork_owner,
|
|
fork_branch=fork_branch,
|
|
workflow_id=workflow_id,
|
|
conclusion=conclusion,
|
|
attempt=attempt,
|
|
status=status,
|
|
jobs_url=jobs_url,
|
|
rerun_url=rerun_url,
|
|
url=url,
|
|
repo_url=repo_url,
|
|
api_url=api_url,
|
|
)
|
|
|
|
|
|
def get_pr_author_and_orgs(pull_request, token):
|
|
author = pull_request["user"]["login"]
|
|
orgs = _exec_get_with_retry(pull_request["user"]["organizations_url"], token)
|
|
return author, [org["id"] for org in orgs]
|
|
|
|
|
|
def get_changed_files_for_pull_request(pull_request, token):
|
|
url = pull_request["url"]
|
|
|
|
changed_files = set([])
|
|
for i in range(1, 31):
|
|
print("Requesting changed files page", i)
|
|
data = _exec_get_with_retry(f"{url}/files?page={i}&per_page=100", token)
|
|
print(f"Got {len(data)} changed files")
|
|
if len(data) == 0:
|
|
print("No more changed files")
|
|
break
|
|
|
|
for change in data:
|
|
# print("Adding changed file", change['filename'])
|
|
changed_files.add(change["filename"])
|
|
|
|
if len(changed_files) >= SUSPICIOUS_CHANGED_FILES_NUMBER:
|
|
print(
|
|
f"More than {len(changed_files)} changed files. "
|
|
"Will stop fetching new files."
|
|
)
|
|
break
|
|
|
|
return changed_files
|
|
|
|
|
|
def check_suspicious_changed_files(changed_files):
|
|
if len(changed_files) >= SUSPICIOUS_CHANGED_FILES_NUMBER:
|
|
print(f"Too many files changed {len(changed_files)}, need manual approve")
|
|
return True
|
|
|
|
for path in changed_files:
|
|
for pattern in SUSPICIOUS_PATTERNS:
|
|
if fnmatch.fnmatch(path, pattern):
|
|
print(
|
|
f"File {path} match suspicious pattern {pattern}, "
|
|
"will not approve automatically"
|
|
)
|
|
return True
|
|
|
|
print("No changed files match suspicious patterns, run will be approved")
|
|
return False
|
|
|
|
|
|
def approve_run(workflow_description: WorkflowDescription, token: str) -> None:
|
|
url = f"{workflow_description.api_url}/approve"
|
|
_exec_post_with_retry(url, token)
|
|
|
|
|
|
def label_manual_approve(pull_request, token):
|
|
url = f"{pull_request['url']}/labels"
|
|
data = {"labels": "manual approve"}
|
|
|
|
_exec_post_with_retry(url, token, data)
|
|
|
|
|
|
def get_token_from_aws():
|
|
private_key, app_id = get_key_and_app_from_aws()
|
|
payload = {
|
|
"iat": int(time.time()) - 60,
|
|
"exp": int(time.time()) + (10 * 60),
|
|
"iss": app_id,
|
|
}
|
|
|
|
encoded_jwt = jwt.encode(payload, private_key, algorithm="RS256")
|
|
installation_id = get_installation_id(encoded_jwt)
|
|
return get_access_token(encoded_jwt, installation_id)
|
|
|
|
|
|
def get_workflow_jobs(workflow_description, token):
|
|
jobs_url = (
|
|
workflow_description.api_url + f"/attempts/{workflow_description.attempt}/jobs"
|
|
)
|
|
jobs = []
|
|
i = 1
|
|
while True:
|
|
got_jobs = _exec_get_with_retry(jobs_url + f"?page={i}", token)
|
|
if len(got_jobs["jobs"]) == 0:
|
|
break
|
|
|
|
jobs += got_jobs["jobs"]
|
|
i += 1
|
|
|
|
return jobs
|
|
|
|
|
|
def check_need_to_rerun(workflow_description, token):
|
|
if workflow_description.attempt >= MAX_WORKFLOW_RERUN:
|
|
print(
|
|
"Not going to rerun workflow because it's already tried more than two times"
|
|
)
|
|
return False
|
|
print("Going to check jobs")
|
|
|
|
jobs = get_workflow_jobs(workflow_description, token)
|
|
print("Got jobs", len(jobs))
|
|
for job in jobs:
|
|
if job["conclusion"] not in ("success", "skipped"):
|
|
print("Job", job["name"], "failed, checking steps")
|
|
for step in job["steps"]:
|
|
# always the last job
|
|
if step["name"] == "Complete job":
|
|
print("Found Complete job step for job", job["name"])
|
|
break
|
|
else:
|
|
print(
|
|
"Checked all steps and doesn't found Complete job, going to rerun"
|
|
)
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def rerun_workflow(workflow_description, token):
|
|
print("Going to rerun workflow")
|
|
try:
|
|
_exec_post_with_retry(f"{workflow_description.rerun_url}-failed-jobs", token)
|
|
except Exception:
|
|
_exec_post_with_retry(workflow_description.rerun_url, token)
|
|
|
|
|
|
def check_workflow_completed(
|
|
event_data: dict, workflow_description: WorkflowDescription, token: str
|
|
) -> bool:
|
|
if workflow_description.action == "completed":
|
|
attempt = 0
|
|
# Nice and reliable GH API sends from time to time such events, e.g:
|
|
# action='completed', conclusion=None, status='in_progress',
|
|
# So let's try receiving a real workflow data
|
|
while workflow_description.conclusion is None and attempt < MAX_RETRY:
|
|
progressive_sleep = 3 * sum(i + 1 for i in range(attempt))
|
|
time.sleep(progressive_sleep)
|
|
event_data["workflow_run"] = _exec_get_with_retry(
|
|
workflow_description.api_url, token
|
|
)
|
|
workflow_description = get_workflow_description_from_event(event_data)
|
|
attempt += 1
|
|
|
|
if workflow_description.conclusion != "failure":
|
|
print(
|
|
"Workflow finished with status "
|
|
f"{workflow_description.conclusion}, exiting"
|
|
)
|
|
return True
|
|
|
|
print(
|
|
"Workflow",
|
|
workflow_description.url,
|
|
"completed and failed, let's check for rerun",
|
|
)
|
|
|
|
if workflow_description.name not in NEED_RERUN_WORKFLOWS:
|
|
print(
|
|
"Workflow",
|
|
workflow_description.name,
|
|
"not in list of rerunable workflows",
|
|
)
|
|
return True
|
|
|
|
if check_need_to_rerun(workflow_description, token):
|
|
rerun_workflow(workflow_description, token)
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def main(event):
|
|
token = get_token_from_aws()
|
|
event_data = json.loads(event["body"])
|
|
print("The body received:", event["body"])
|
|
workflow_description = get_workflow_description_from_event(event_data)
|
|
|
|
print("Got workflow description", workflow_description)
|
|
if check_workflow_completed(event_data, workflow_description, token):
|
|
return
|
|
|
|
if workflow_description.action != "requested":
|
|
print("Exiting, event action is", workflow_description.action)
|
|
return
|
|
|
|
if workflow_description.workflow_id in TRUSTED_WORKFLOW_IDS:
|
|
print("Workflow in trusted list, approving run")
|
|
approve_run(workflow_description, token)
|
|
return
|
|
|
|
pull_requests = _get_pull_requests_from(
|
|
workflow_description.repo_url,
|
|
workflow_description.fork_owner_login,
|
|
workflow_description.fork_branch,
|
|
token,
|
|
)
|
|
|
|
print("Got pull requests for workflow", len(pull_requests))
|
|
if len(pull_requests) != 1:
|
|
print(f"Can't continue with non-uniq PRs: {pull_requests}")
|
|
return
|
|
|
|
pull_request = pull_requests[0]
|
|
print("Pull request for workflow number", pull_request["number"])
|
|
|
|
author, author_orgs = get_pr_author_and_orgs(pull_request, token)
|
|
if is_trusted_contributor(author, author_orgs):
|
|
print("Contributor is trusted, approving run")
|
|
approve_run(workflow_description, token)
|
|
return
|
|
|
|
changed_files = get_changed_files_for_pull_request(pull_request, token)
|
|
print(f"Totally have {len(changed_files)} changed files in PR:", changed_files)
|
|
if check_suspicious_changed_files(changed_files):
|
|
print(
|
|
f"Pull Request {pull_request['number']} has suspicious changes, "
|
|
"label it for manuall approve"
|
|
)
|
|
label_manual_approve(pull_request, token)
|
|
else:
|
|
print(f"Pull Request {pull_request['number']} has no suspicious changes")
|
|
approve_run(workflow_description, token)
|
|
|
|
|
|
def handler(event, _):
|
|
try:
|
|
main(event)
|
|
|
|
return {
|
|
"statusCode": 200,
|
|
"headers": {"Content-Type": "application/json"},
|
|
"body": '{"status": "OK"}',
|
|
}
|
|
except Exception:
|
|
print("Received event: ", event)
|
|
raise
|