Fix typing and bug in cancel_and_rerun_workflow_lambda

This commit is contained in:
Mikhail f. Shiryaev 2022-11-29 13:33:45 +01:00
parent 23002bc808
commit 0f2704703f
No known key found for this signature in database
GPG Key ID: 4B02ED204C7D93F4

View File

@ -106,7 +106,7 @@ def _exec_get_with_retry(url: str, token: str) -> dict:
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()
return response.json() # type: ignore
except Exception as ex:
print("Got exception executing request", ex)
time.sleep(i + 1)
@ -130,8 +130,7 @@ WorkflowDescription = namedtuple(
def get_workflows_description_for_pull_request(
pull_request_event,
token,
pull_request_event: dict, token: str
) -> List[WorkflowDescription]:
head_repo = pull_request_event["head"]["repo"]["full_name"]
head_branch = pull_request_event["head"]["ref"]
@ -193,7 +192,7 @@ def get_workflows_description_for_pull_request(
def get_workflow_description_fallback(
pull_request_event, token
pull_request_event: dict, token: str
) -> List[WorkflowDescription]:
head_repo = pull_request_event["head"]["repo"]["full_name"]
head_branch = pull_request_event["head"]["ref"]
@ -241,7 +240,7 @@ def get_workflow_description_fallback(
WorkflowDescription(
url=wf["url"],
run_id=wf["id"],
name=workflow["name"],
name=wf["name"],
head_sha=wf["head_sha"],
status=wf["status"],
rerun_url=wf["rerun_url"],
@ -254,7 +253,7 @@ def get_workflow_description_fallback(
return workflow_descriptions
def get_workflow_description(workflow_url, token) -> WorkflowDescription:
def get_workflow_description(workflow_url: str, token: str) -> WorkflowDescription:
workflow = _exec_get_with_retry(workflow_url, token)
return WorkflowDescription(
url=workflow["url"],
@ -331,7 +330,7 @@ def main(event):
workflow_descriptions
or get_workflow_description_fallback(pull_request, token)
)
workflow_descriptions.sort(key=lambda x: x.run_id)
workflow_descriptions.sort(key=lambda x: x.run_id) # type: ignore
most_recent_workflow = workflow_descriptions[-1]
if (
most_recent_workflow.status == "completed"
@ -376,7 +375,7 @@ def main(event):
print("Not found any workflows")
return
workflow_descriptions.sort(key=lambda x: x.run_id)
workflow_descriptions.sort(key=lambda x: x.run_id) # type: ignore
most_recent_workflow = workflow_descriptions[-1]
print("Latest workflow", most_recent_workflow)
if (