mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Merge pull request #36269 from ClickHouse/fix-cancel-lambda
Fix cancel-lambda for closed PRs
This commit is contained in:
commit
722af5a2f5
@ -2,6 +2,8 @@
|
||||
|
||||
from collections import namedtuple
|
||||
from typing import Any, Dict, List
|
||||
from threading import Thread
|
||||
from queue import Queue
|
||||
import json
|
||||
import time
|
||||
|
||||
@ -25,6 +27,24 @@ MAX_RETRY = 5
|
||||
DEBUG_INFO = {} # type: Dict[str, Any]
|
||||
|
||||
|
||||
class Worker(Thread):
|
||||
def __init__(self, request_queue: Queue, ignore_exception: bool = False):
|
||||
Thread.__init__(self)
|
||||
self.queue = request_queue
|
||||
self.ignore_exception = ignore_exception
|
||||
self.response = {} # type: Dict
|
||||
|
||||
def run(self):
|
||||
m = self.queue.get()
|
||||
try:
|
||||
self.response = _exec_get_with_retry(m)
|
||||
except Exception as e:
|
||||
if not self.ignore_exception:
|
||||
raise
|
||||
print(f"Exception occured, still continue: {e}")
|
||||
self.queue.task_done()
|
||||
|
||||
|
||||
def get_installation_id(jwt_token):
|
||||
headers = {
|
||||
"Authorization": f"Bearer {jwt_token}",
|
||||
@ -88,32 +108,37 @@ def _exec_get_with_retry(url):
|
||||
|
||||
|
||||
WorkflowDescription = namedtuple(
|
||||
"WorkflowDescription", ["run_id", "status", "rerun_url", "cancel_url"]
|
||||
"WorkflowDescription", ["run_id", "status", "rerun_url", "cancel_url", "conclusion"]
|
||||
)
|
||||
|
||||
|
||||
def get_workflows_description_for_pull_request(pull_request_event):
|
||||
def get_workflows_description_for_pull_request(
|
||||
pull_request_event,
|
||||
) -> List[WorkflowDescription]:
|
||||
head_repo = pull_request_event["head"]["repo"]["full_name"]
|
||||
head_branch = pull_request_event["head"]["ref"]
|
||||
head_sha = pull_request_event["head"]["sha"]
|
||||
print("PR", pull_request_event["number"], "has head ref", head_branch)
|
||||
workflows_data = []
|
||||
workflows = _exec_get_with_retry(
|
||||
API_URL + f"/actions/runs?branch={head_branch}&event=pull_request&page=1"
|
||||
)
|
||||
workflows_data += workflows["workflow_runs"]
|
||||
i = 2
|
||||
while len(workflows["workflow_runs"]) > 0:
|
||||
workflows = _exec_get_with_retry(
|
||||
API_URL + f"/actions/runs?branch={head_branch}&event=pull_request&page={i}"
|
||||
)
|
||||
workflows_data += workflows["workflow_runs"]
|
||||
i += 1
|
||||
if i > 30:
|
||||
print("Too many workflows found")
|
||||
break
|
||||
|
||||
DEBUG_INFO["workflows"] = [] # type: List[Dict[str, str]]
|
||||
workflows_data = []
|
||||
request_url = f"{API_URL}/actions/runs?per_page=100"
|
||||
# Get all workflows for the current branch
|
||||
for i in range(1, 11):
|
||||
workflows = _exec_get_with_retry(
|
||||
f"{request_url}&event=pull_request&branch={head_branch}&page={i}"
|
||||
)
|
||||
if not workflows["workflow_runs"]:
|
||||
break
|
||||
workflows_data += workflows["workflow_runs"]
|
||||
if i == 10:
|
||||
print("Too many workflows found")
|
||||
|
||||
if not workflows_data:
|
||||
print("No workflows found by filter")
|
||||
return []
|
||||
|
||||
print(f"Total workflows for the branch {head_branch} found: {len(workflows_data)}")
|
||||
|
||||
DEBUG_INFO["workflows"] = []
|
||||
workflow_descriptions = []
|
||||
for workflow in workflows_data:
|
||||
# Some time workflow["head_repository"]["full_name"] is None
|
||||
@ -123,13 +148,13 @@ def get_workflows_description_for_pull_request(pull_request_event):
|
||||
{
|
||||
"full_name": workflow["head_repository"]["full_name"],
|
||||
"name": workflow["name"],
|
||||
"branch": workflow["head_branch"],
|
||||
}
|
||||
)
|
||||
# unfortunately we cannot filter workflows from forks in request to API
|
||||
# so doing it manually
|
||||
if (
|
||||
workflow["head_sha"] == head_sha
|
||||
and workflow["head_repository"]["full_name"] == head_repo
|
||||
workflow["head_repository"]["full_name"] == head_repo
|
||||
and workflow["name"] in NEED_RERUN_OR_CANCELL_WORKFLOWS
|
||||
):
|
||||
workflow_descriptions.append(
|
||||
@ -138,19 +163,85 @@ def get_workflows_description_for_pull_request(pull_request_event):
|
||||
status=workflow["status"],
|
||||
rerun_url=workflow["rerun_url"],
|
||||
cancel_url=workflow["cancel_url"],
|
||||
conclusion=workflow["conclusion"],
|
||||
)
|
||||
)
|
||||
|
||||
return workflow_descriptions
|
||||
|
||||
|
||||
def get_workflow_description(workflow_id):
|
||||
def get_workflow_description_fallback(event_data) -> List[WorkflowDescription]:
|
||||
pull_request_event = event_data["pull_request"]
|
||||
head_repo = pull_request_event["head"]["repo"]["full_name"]
|
||||
head_branch = pull_request_event["head"]["ref"]
|
||||
head_sha = pull_request_event["head"]["sha"]
|
||||
print("Get last 500 workflows from API to search related there")
|
||||
# Fallback for a case of an already deleted branch and no workflows received
|
||||
request_url = f"{API_URL}/actions/runs?per_page=100"
|
||||
q = Queue() # type: Queue
|
||||
workers = []
|
||||
workflows_data = []
|
||||
i = 1
|
||||
for i in range(1, 6):
|
||||
q.put(f"{request_url}&page={i}")
|
||||
worker = Worker(q, True)
|
||||
worker.start()
|
||||
workers.append(worker)
|
||||
|
||||
for worker in workers:
|
||||
worker.join()
|
||||
if not worker.response:
|
||||
# We ignore get errors, so response can be empty
|
||||
continue
|
||||
# Prefilter workflows
|
||||
workflows_data += [
|
||||
wf
|
||||
for wf in worker.response["workflow_runs"]
|
||||
if wf["head_repository"] is not None
|
||||
and wf["head_repository"]["full_name"] == head_repo
|
||||
and wf["head_branch"] == head_branch
|
||||
and wf["name"] in NEED_RERUN_OR_CANCELL_WORKFLOWS
|
||||
]
|
||||
|
||||
print(f"Total workflows in last 500 actions matches: {len(workflows_data)}")
|
||||
|
||||
DEBUG_INFO["workflows"] = [
|
||||
{
|
||||
"full_name": wf["head_repository"]["full_name"],
|
||||
"name": wf["name"],
|
||||
"branch": wf["head_branch"],
|
||||
}
|
||||
for wf in workflows_data
|
||||
]
|
||||
if event_data["action"] == "synchronize":
|
||||
print(f"Leave only workflows with SHA but {head_sha} for updated PR")
|
||||
# Cancel all events with SHA different than current
|
||||
workflows_data = list(
|
||||
filter(lambda x: x["head_sha"] != head_sha, workflows_data)
|
||||
)
|
||||
|
||||
workflow_descriptions = [
|
||||
WorkflowDescription(
|
||||
run_id=wf["id"],
|
||||
status=wf["status"],
|
||||
rerun_url=wf["rerun_url"],
|
||||
cancel_url=wf["cancel_url"],
|
||||
conclusion=wf["conclusion"],
|
||||
)
|
||||
for wf in workflows_data
|
||||
]
|
||||
|
||||
return workflow_descriptions
|
||||
|
||||
|
||||
def get_workflow_description(workflow_id) -> WorkflowDescription:
|
||||
workflow = _exec_get_with_retry(API_URL + f"/actions/runs/{workflow_id}")
|
||||
return WorkflowDescription(
|
||||
run_id=workflow["id"],
|
||||
status=workflow["status"],
|
||||
rerun_url=workflow["rerun_url"],
|
||||
cancel_url=workflow["cancel_url"],
|
||||
conclusion=workflow["conclusion"],
|
||||
)
|
||||
|
||||
|
||||
@ -189,15 +280,39 @@ def main(event):
|
||||
if action == "closed" or "do not test" in labels:
|
||||
print("PR merged/closed or manually labeled 'do not test' will kill workflows")
|
||||
workflow_descriptions = get_workflows_description_for_pull_request(pull_request)
|
||||
workflow_descriptions = (
|
||||
workflow_descriptions or get_workflow_description_fallback(event_data)
|
||||
)
|
||||
urls_to_cancel = []
|
||||
for workflow_description in workflow_descriptions:
|
||||
if workflow_description.status != "completed":
|
||||
if (
|
||||
workflow_description.status != "completed"
|
||||
and workflow_description.conclusion != "cancelled"
|
||||
):
|
||||
urls_to_cancel.append(workflow_description.cancel_url)
|
||||
print(f"Found {len(urls_to_cancel)} workflows to cancel")
|
||||
exec_workflow_url(urls_to_cancel, token)
|
||||
elif action == "synchronize":
|
||||
print("PR is synchronized, going to stop old actions")
|
||||
workflow_descriptions = get_workflows_description_for_pull_request(pull_request)
|
||||
workflow_descriptions = (
|
||||
workflow_descriptions or get_workflow_description_fallback(event_data)
|
||||
)
|
||||
urls_to_cancel = []
|
||||
for workflow_description in workflow_descriptions:
|
||||
if (
|
||||
workflow_description.status != "completed"
|
||||
and workflow_description.conclusion != "cancelled"
|
||||
):
|
||||
urls_to_cancel.append(workflow_description.cancel_url)
|
||||
print(f"Found {len(urls_to_cancel)} workflows to cancel")
|
||||
exec_workflow_url(urls_to_cancel, token)
|
||||
elif action == "labeled" and "can be tested" in labels:
|
||||
print("PR marked with can be tested label, rerun workflow")
|
||||
workflow_descriptions = get_workflows_description_for_pull_request(pull_request)
|
||||
workflow_descriptions = (
|
||||
workflow_descriptions or get_workflow_description_fallback(event_data)
|
||||
)
|
||||
if not workflow_descriptions:
|
||||
print("Not found any workflows")
|
||||
return
|
||||
@ -205,7 +320,10 @@ def main(event):
|
||||
sorted_workflows = list(sorted(workflow_descriptions, key=lambda x: x.run_id))
|
||||
most_recent_workflow = sorted_workflows[-1]
|
||||
print("Latest workflow", most_recent_workflow)
|
||||
if most_recent_workflow.status != "completed":
|
||||
if (
|
||||
most_recent_workflow.status != "completed"
|
||||
and most_recent_workflow.conclusion != "cancelled"
|
||||
):
|
||||
print("Latest workflow is not completed, cancelling")
|
||||
exec_workflow_url([most_recent_workflow.cancel_url], token)
|
||||
print("Cancelled")
|
||||
|
@ -1,12 +1,13 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import requests
|
||||
import argparse
|
||||
import json
|
||||
|
||||
from threading import Thread
|
||||
from queue import Queue
|
||||
|
||||
import requests # type: ignore
|
||||
|
||||
|
||||
def get_org_team_members(token: str, org: str, team_slug: str) -> tuple:
|
||||
headers = {
|
||||
@ -37,7 +38,7 @@ def get_members_keys(members: tuple) -> str:
|
||||
self.results.append(f"# {m}\n{response.text}")
|
||||
self.queue.task_done()
|
||||
|
||||
q = Queue()
|
||||
q = Queue() # type: Queue
|
||||
workers = []
|
||||
for m in members:
|
||||
q.put(m)
|
||||
@ -61,7 +62,7 @@ def get_members_keys(members: tuple) -> str:
|
||||
|
||||
|
||||
def get_token_from_aws() -> str:
|
||||
import boto3
|
||||
import boto3 # type: ignore
|
||||
|
||||
secret_name = "clickhouse_robot_token"
|
||||
session = boto3.session.Session()
|
||||
@ -81,6 +82,8 @@ def main(token: str, org: str, team_slug: str) -> str:
|
||||
|
||||
|
||||
def handler(event, context):
|
||||
_ = context
|
||||
_ = event
|
||||
token = get_token_from_aws()
|
||||
result = {
|
||||
"statusCode": 200,
|
||||
|
@ -1,13 +1,13 @@
|
||||
FROM public.ecr.aws/lambda/python:3.9
|
||||
|
||||
# Copy function code
|
||||
COPY app.py ${LAMBDA_TASK_ROOT}
|
||||
|
||||
# Install the function's dependencies using file requirements.txt
|
||||
# from your project folder.
|
||||
|
||||
COPY requirements.txt .
|
||||
RUN pip3 install -r requirements.txt --target "${LAMBDA_TASK_ROOT}"
|
||||
|
||||
# Copy function code
|
||||
COPY app.py ${LAMBDA_TASK_ROOT}
|
||||
|
||||
# Set the CMD to your handler (could also be done as a parameter override outside of the Dockerfile)
|
||||
CMD [ "app.handler" ]
|
||||
|
@ -394,7 +394,7 @@ def rerun_workflow(workflow_description, token):
|
||||
def main(event):
|
||||
token = get_token_from_aws()
|
||||
event_data = json.loads(event["body"])
|
||||
print("The body received:", event_data)
|
||||
print("The body received:", event["body"])
|
||||
workflow_description = get_workflow_description_from_event(event_data)
|
||||
|
||||
print("Got workflow description", workflow_description)
|
||||
|
Loading…
Reference in New Issue
Block a user