mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Merge pull request #43295 from ClickHouse/cancel-lambda-api-url
Cancel lambda api url
This commit is contained in:
commit
f4ef20b5e4
@ -1,13 +0,0 @@
|
||||
FROM public.ecr.aws/lambda/python:3.9
|
||||
|
||||
# Install the function's dependencies using file requirements.txt
|
||||
# from your project folder.
|
||||
|
||||
COPY requirements.txt .
|
||||
RUN pip3 install -r requirements.txt --target "${LAMBDA_TASK_ROOT}"
|
||||
|
||||
# Copy function code
|
||||
COPY app.py ${LAMBDA_TASK_ROOT}
|
||||
|
||||
# Set the CMD to your handler (could also be done as a parameter override outside of the Dockerfile)
|
||||
CMD [ "app.handler" ]
|
@ -1,5 +1,6 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from base64 import b64decode
|
||||
from collections import namedtuple
|
||||
from typing import Any, Dict, List
|
||||
from threading import Thread
|
||||
@ -19,26 +20,25 @@ NEED_RERUN_OR_CANCELL_WORKFLOWS = {
|
||||
"BackportPR",
|
||||
}
|
||||
|
||||
# https://docs.github.com/en/rest/reference/actions#cancel-a-workflow-run
|
||||
#
|
||||
API_URL = os.getenv("API_URL", "https://api.github.com/repos/ClickHouse/ClickHouse")
|
||||
|
||||
MAX_RETRY = 5
|
||||
|
||||
DEBUG_INFO = {} # type: Dict[str, Any]
|
||||
|
||||
|
||||
class Worker(Thread):
|
||||
def __init__(self, request_queue: Queue, ignore_exception: bool = False):
|
||||
def __init__(
|
||||
self, request_queue: Queue, token: str, ignore_exception: bool = False
|
||||
):
|
||||
Thread.__init__(self)
|
||||
self.queue = request_queue
|
||||
self.token = token
|
||||
self.ignore_exception = ignore_exception
|
||||
self.response = {} # type: Dict
|
||||
|
||||
def run(self):
|
||||
m = self.queue.get()
|
||||
try:
|
||||
self.response = _exec_get_with_retry(m)
|
||||
self.response = _exec_get_with_retry(m, self.token)
|
||||
except Exception as e:
|
||||
if not self.ignore_exception:
|
||||
raise
|
||||
@ -98,10 +98,11 @@ def get_token_from_aws():
|
||||
return get_access_token(encoded_jwt, installation_id)
|
||||
|
||||
|
||||
def _exec_get_with_retry(url):
|
||||
def _exec_get_with_retry(url: str, token: str) -> dict:
|
||||
headers = {"Authorization": f"token {token}"}
|
||||
for i in range(MAX_RETRY):
|
||||
try:
|
||||
response = requests.get(url)
|
||||
response = requests.get(url, headers=headers)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
except Exception as ex:
|
||||
@ -113,23 +114,25 @@ def _exec_get_with_retry(url):
|
||||
|
||||
WorkflowDescription = namedtuple(
|
||||
"WorkflowDescription",
|
||||
["run_id", "head_sha", "status", "rerun_url", "cancel_url", "conclusion"],
|
||||
["url", "run_id", "head_sha", "status", "rerun_url", "cancel_url", "conclusion"],
|
||||
)
|
||||
|
||||
|
||||
def get_workflows_description_for_pull_request(
|
||||
pull_request_event,
|
||||
token,
|
||||
) -> List[WorkflowDescription]:
|
||||
head_repo = pull_request_event["head"]["repo"]["full_name"]
|
||||
head_branch = pull_request_event["head"]["ref"]
|
||||
print("PR", pull_request_event["number"], "has head ref", head_branch)
|
||||
|
||||
workflows_data = []
|
||||
request_url = f"{API_URL}/actions/runs?per_page=100"
|
||||
repo_url = pull_request_event["base"]["repo"]["url"]
|
||||
request_url = f"{repo_url}/actions/runs?per_page=100"
|
||||
# Get all workflows for the current branch
|
||||
for i in range(1, 11):
|
||||
workflows = _exec_get_with_retry(
|
||||
f"{request_url}&event=pull_request&branch={head_branch}&page={i}"
|
||||
f"{request_url}&event=pull_request&branch={head_branch}&page={i}", token
|
||||
)
|
||||
if not workflows["workflow_runs"]:
|
||||
break
|
||||
@ -164,6 +167,7 @@ def get_workflows_description_for_pull_request(
|
||||
):
|
||||
workflow_descriptions.append(
|
||||
WorkflowDescription(
|
||||
url=workflow["url"],
|
||||
run_id=workflow["id"],
|
||||
head_sha=workflow["head_sha"],
|
||||
status=workflow["status"],
|
||||
@ -176,19 +180,22 @@ def get_workflows_description_for_pull_request(
|
||||
return workflow_descriptions
|
||||
|
||||
|
||||
def get_workflow_description_fallback(pull_request_event) -> List[WorkflowDescription]:
|
||||
def get_workflow_description_fallback(
|
||||
pull_request_event, token
|
||||
) -> List[WorkflowDescription]:
|
||||
head_repo = pull_request_event["head"]["repo"]["full_name"]
|
||||
head_branch = pull_request_event["head"]["ref"]
|
||||
print("Get last 500 workflows from API to search related there")
|
||||
# Fallback for a case of an already deleted branch and no workflows received
|
||||
request_url = f"{API_URL}/actions/runs?per_page=100"
|
||||
repo_url = pull_request_event["base"]["repo"]["url"]
|
||||
request_url = f"{repo_url}/actions/runs?per_page=100"
|
||||
q = Queue() # type: Queue
|
||||
workers = []
|
||||
workflows_data = []
|
||||
i = 1
|
||||
for i in range(1, 6):
|
||||
q.put(f"{request_url}&page={i}")
|
||||
worker = Worker(q, True)
|
||||
worker = Worker(q, token, True)
|
||||
worker.start()
|
||||
workers.append(worker)
|
||||
|
||||
@ -220,6 +227,7 @@ def get_workflow_description_fallback(pull_request_event) -> List[WorkflowDescri
|
||||
|
||||
workflow_descriptions = [
|
||||
WorkflowDescription(
|
||||
url=wf["url"],
|
||||
run_id=wf["id"],
|
||||
head_sha=wf["head_sha"],
|
||||
status=wf["status"],
|
||||
@ -233,9 +241,10 @@ def get_workflow_description_fallback(pull_request_event) -> List[WorkflowDescri
|
||||
return workflow_descriptions
|
||||
|
||||
|
||||
def get_workflow_description(workflow_id) -> WorkflowDescription:
|
||||
workflow = _exec_get_with_retry(API_URL + f"/actions/runs/{workflow_id}")
|
||||
def get_workflow_description(workflow_url, token) -> WorkflowDescription:
|
||||
workflow = _exec_get_with_retry(workflow_url, token)
|
||||
return WorkflowDescription(
|
||||
url=workflow["url"],
|
||||
run_id=workflow["id"],
|
||||
head_sha=workflow["head_sha"],
|
||||
status=workflow["status"],
|
||||
@ -268,8 +277,11 @@ def exec_workflow_url(urls_to_cancel, token):
|
||||
|
||||
def main(event):
|
||||
token = get_token_from_aws()
|
||||
DEBUG_INFO["event_body"] = event["body"]
|
||||
event_data = json.loads(event["body"])
|
||||
DEBUG_INFO["event"] = event
|
||||
if event["isBase64Encoded"]:
|
||||
event_data = json.loads(b64decode(event["body"]))
|
||||
else:
|
||||
event_data = json.loads(event["body"])
|
||||
|
||||
print("Got event for PR", event_data["number"])
|
||||
action = event_data["action"]
|
||||
@ -279,9 +291,12 @@ def main(event):
|
||||
print("PR has labels", labels)
|
||||
if action == "closed" or "do not test" in labels:
|
||||
print("PR merged/closed or manually labeled 'do not test' will kill workflows")
|
||||
workflow_descriptions = get_workflows_description_for_pull_request(pull_request)
|
||||
workflow_descriptions = get_workflows_description_for_pull_request(
|
||||
pull_request, token
|
||||
)
|
||||
workflow_descriptions = (
|
||||
workflow_descriptions or get_workflow_description_fallback(pull_request)
|
||||
workflow_descriptions
|
||||
or get_workflow_description_fallback(pull_request, token)
|
||||
)
|
||||
urls_to_cancel = []
|
||||
for workflow_description in workflow_descriptions:
|
||||
@ -294,9 +309,12 @@ def main(event):
|
||||
exec_workflow_url(urls_to_cancel, token)
|
||||
elif action == "synchronize":
|
||||
print("PR is synchronized, going to stop old actions")
|
||||
workflow_descriptions = get_workflows_description_for_pull_request(pull_request)
|
||||
workflow_descriptions = get_workflows_description_for_pull_request(
|
||||
pull_request, token
|
||||
)
|
||||
workflow_descriptions = (
|
||||
workflow_descriptions or get_workflow_description_fallback(pull_request)
|
||||
workflow_descriptions
|
||||
or get_workflow_description_fallback(pull_request, token)
|
||||
)
|
||||
urls_to_cancel = []
|
||||
for workflow_description in workflow_descriptions:
|
||||
@ -308,11 +326,14 @@ def main(event):
|
||||
urls_to_cancel.append(workflow_description.cancel_url)
|
||||
print(f"Found {len(urls_to_cancel)} workflows to cancel")
|
||||
exec_workflow_url(urls_to_cancel, token)
|
||||
elif action == "labeled" and "can be tested" in labels:
|
||||
elif action == "labeled" and event_data["label"]["name"] == "can be tested":
|
||||
print("PR marked with can be tested label, rerun workflow")
|
||||
workflow_descriptions = get_workflows_description_for_pull_request(pull_request)
|
||||
workflow_descriptions = get_workflows_description_for_pull_request(
|
||||
pull_request, token
|
||||
)
|
||||
workflow_descriptions = (
|
||||
workflow_descriptions or get_workflow_description_fallback(pull_request)
|
||||
workflow_descriptions
|
||||
or get_workflow_description_fallback(pull_request, token)
|
||||
)
|
||||
if not workflow_descriptions:
|
||||
print("Not found any workflows")
|
||||
@ -330,7 +351,10 @@ def main(event):
|
||||
print("Cancelled")
|
||||
|
||||
for _ in range(45):
|
||||
latest_workflow_desc = get_workflow_description(most_recent_workflow.run_id)
|
||||
# If the number of retries is changed: tune the lambda limits accordingly
|
||||
latest_workflow_desc = get_workflow_description(
|
||||
most_recent_workflow.url, token
|
||||
)
|
||||
print("Checking latest workflow", latest_workflow_desc)
|
||||
if latest_workflow_desc.status in ("completed", "cancelled"):
|
||||
print("Finally latest workflow done, going to rerun")
|
||||
@ -347,6 +371,12 @@ def main(event):
|
||||
def handler(event, _):
|
||||
try:
|
||||
main(event)
|
||||
|
||||
return {
|
||||
"statusCode": 200,
|
||||
"headers": {"Content-Type": "application/json"},
|
||||
"body": '{"status": "OK"}',
|
||||
}
|
||||
finally:
|
||||
for name, value in DEBUG_INFO.items():
|
||||
print(f"Value of {name}: ", value)
|
||||
|
@ -0,0 +1 @@
|
||||
../team_keys_lambda/build_and_deploy_archive.sh
|
@ -491,6 +491,12 @@ def main(event):
|
||||
def handler(event, _):
|
||||
try:
|
||||
main(event)
|
||||
|
||||
return {
|
||||
"statusCode": 200,
|
||||
"headers": {"Content-Type": "application/json"},
|
||||
"body": '{"status": "OK"}',
|
||||
}
|
||||
except Exception:
|
||||
print("Received event: ", event)
|
||||
raise
|
||||
|
Loading…
Reference in New Issue
Block a user