CI: Remove aws lambda packages from oss

This commit is contained in:
Max K 2024-07-17 17:19:56 +02:00
parent 10fb524e73
commit 465e4ad73f
55 changed files with 194 additions and 3415 deletions

View File

@ -1,235 +0,0 @@
#!/usr/bin/env python3
"""The lambda to decrease/increase ASG desired capacity based on current queue"""
import logging
from dataclasses import dataclass
from pprint import pformat
from typing import Any, List, Literal, Optional, Tuple
import boto3 # type: ignore
from lambda_shared import (
RUNNER_TYPE_LABELS,
CHException,
ClickHouseHelper,
get_parameter_from_ssm,
)
### Update comment on the change ###
# 4 HOUR - is a balance to get the most precise values
# - Our longest possible running check is around 5h on the worst scenario
# - The long queue won't be wiped out and replaced, so the measurmenet is fine
# - If the data is spoiled by something, we are from the bills perspective
# Changed it to 3 HOUR: in average we have 1h tasks, but p90 is around 2h.
# With 4h we have too much wasted computing time in case of issues with DB
QUEUE_QUERY = f"""SELECT
last_status AS status,
toUInt32(count()) AS length,
labels
FROM
(
SELECT
arraySort(groupArray(status))[-1] AS last_status,
labels,
id,
html_url
FROM default.workflow_jobs
WHERE has(labels, 'self-hosted')
AND hasAny({RUNNER_TYPE_LABELS}, labels)
AND started_at > now() - INTERVAL 3 HOUR
GROUP BY ALL
HAVING last_status IN ('in_progress', 'queued')
)
GROUP BY ALL
ORDER BY labels, last_status"""
@dataclass
class Queue:
status: Literal["in_progress", "queued"]
lentgh: int
label: str
def get_scales(runner_type: str) -> Tuple[int, int]:
"returns the multipliers for scaling down and up ASG by types"
# Scaling down is quicker on the lack of running jobs than scaling up on
# queue
# The ASG should deflate almost instantly
scale_down = 1
# the style checkers have so many noise, so it scales up too quickly
# The 5 was too quick, there are complainings regarding too slow with
# 10. I am trying 7 now.
# 7 still looks a bit slow, so I try 6
# Let's have it the same as the other ASG
#
# All type of style-checkers should be added very quickly to not block the workflows
# UPDATE THE COMMENT ON CHANGES
scale_up = 3
if "style" in runner_type:
scale_up = 1
return scale_down, scale_up
CH_CLIENT = None # type: Optional[ClickHouseHelper]
def set_capacity(
runner_type: str, queues: List[Queue], client: Any, dry_run: bool = True
) -> None:
assert len(queues) in (1, 2)
assert all(q.label == runner_type for q in queues)
as_groups = client.describe_auto_scaling_groups(
Filters=[
{"Name": "tag-key", "Values": ["github:runner-type"]},
{"Name": "tag-value", "Values": [runner_type]},
]
)["AutoScalingGroups"]
assert len(as_groups) == 1
asg = as_groups[0]
running = 0
queued = 0
for q in queues:
if q.status == "in_progress":
running = q.lentgh
continue
if q.status == "queued":
queued = q.lentgh
continue
raise ValueError("Queue status is not in ['in_progress', 'queued']")
# scale_down, scale_up = get_scales(runner_type)
_, scale_up = get_scales(runner_type)
# With lyfecycle hooks some instances are actually free because some of
# them are in 'Terminating:Wait' state
effective_capacity = max(
asg["DesiredCapacity"],
len([ins for ins in asg["Instances"] if ins["HealthStatus"] == "Healthy"]),
)
# How much nodes are free (positive) or need to be added (negative)
capacity_reserve = effective_capacity - running - queued
stop = False
if capacity_reserve <= 0:
# This part is about scaling up
capacity_deficit = -capacity_reserve
# It looks that we are still OK, since no queued jobs exist
stop = stop or queued == 0
# Are we already at the capacity limits
stop = stop or asg["MaxSize"] <= asg["DesiredCapacity"]
# Let's calculate a new desired capacity
# (capacity_deficit + scale_up - 1) // scale_up : will increase min by 1
# if there is any capacity_deficit
new_capacity = (
asg["DesiredCapacity"] + (capacity_deficit + scale_up - 1) // scale_up
)
new_capacity = max(new_capacity, asg["MinSize"])
new_capacity = min(new_capacity, asg["MaxSize"])
# Finally, should the capacity be even changed
stop = stop or asg["DesiredCapacity"] == new_capacity
if stop:
logging.info(
"Do not increase ASG %s capacity, current capacity=%s, effective "
"capacity=%s, maximum capacity=%s, running jobs=%s, queue size=%s",
asg["AutoScalingGroupName"],
asg["DesiredCapacity"],
effective_capacity,
asg["MaxSize"],
running,
queued,
)
return
logging.info(
"The ASG %s capacity will be increased to %s, current capacity=%s, "
"effective capacity=%s, maximum capacity=%s, running jobs=%s, queue size=%s",
asg["AutoScalingGroupName"],
new_capacity,
asg["DesiredCapacity"],
effective_capacity,
asg["MaxSize"],
running,
queued,
)
if not dry_run:
client.set_desired_capacity(
AutoScalingGroupName=asg["AutoScalingGroupName"],
DesiredCapacity=new_capacity,
)
return
# FIXME: try decreasing capacity from runners that finished their jobs and have no job assigned
# IMPORTANT: Runner init script must be of version that supports ASG decrease
# # Now we will calculate if we need to scale down
# stop = stop or asg["DesiredCapacity"] == asg["MinSize"]
# new_capacity = asg["DesiredCapacity"] - (capacity_reserve // scale_down)
# new_capacity = max(new_capacity, asg["MinSize"])
# new_capacity = min(new_capacity, asg["MaxSize"])
# stop = stop or asg["DesiredCapacity"] == new_capacity
# if stop:
# logging.info(
# "Do not decrease ASG %s capacity, current capacity=%s, effective "
# "capacity=%s, minimum capacity=%s, running jobs=%s, queue size=%s",
# asg["AutoScalingGroupName"],
# asg["DesiredCapacity"],
# effective_capacity,
# asg["MinSize"],
# running,
# queued,
# )
# return
#
# logging.info(
# "The ASG %s capacity will be decreased to %s, current capacity=%s, effective "
# "capacity=%s, minimum capacity=%s, running jobs=%s, queue size=%s",
# asg["AutoScalingGroupName"],
# new_capacity,
# asg["DesiredCapacity"],
# effective_capacity,
# asg["MinSize"],
# running,
# queued,
# )
# if not dry_run:
# client.set_desired_capacity(
# AutoScalingGroupName=asg["AutoScalingGroupName"],
# DesiredCapacity=new_capacity,
# )
def main(dry_run: bool = True) -> None:
logging.getLogger().setLevel(logging.INFO)
asg_client = boto3.client("autoscaling")
try:
global CH_CLIENT
CH_CLIENT = CH_CLIENT or ClickHouseHelper(
get_parameter_from_ssm("clickhouse-test-stat-url"), "play"
)
queues = CH_CLIENT.select_json_each_row("default", QUEUE_QUERY)
except CHException as ex:
logging.exception(
"Got an exception on insert, tryuing to update the client "
"credentials and repeat",
exc_info=ex,
)
CH_CLIENT = ClickHouseHelper(
get_parameter_from_ssm("clickhouse-test-stat-url"), "play"
)
queues = CH_CLIENT.select_json_each_row("default", QUEUE_QUERY)
logging.info("Received queue data:\n%s", pformat(queues, width=120))
for runner_type in RUNNER_TYPE_LABELS:
runner_queues = [
Queue(queue["status"], queue["length"], runner_type)
for queue in queues
if runner_type in queue["labels"]
]
runner_queues = runner_queues or [Queue("in_progress", 0, runner_type)]
set_capacity(runner_type, runner_queues, asg_client, dry_run)
def handler(event: dict, context: Any) -> None:
_ = event
_ = context
return main(False)

View File

@ -1 +0,0 @@
../team_keys_lambda/build_and_deploy_archive.sh

View File

@ -1 +0,0 @@
../lambda_shared_package/lambda_shared

View File

@ -1 +0,0 @@
../lambda_shared_package

View File

@ -1,196 +0,0 @@
#!/usr/bin/env python
import unittest
from dataclasses import dataclass
from typing import Any, List
from app import Queue, set_capacity
@dataclass
class TestCase:
name: str
min_size: int
desired_capacity: int
max_size: int
queues: List[Queue]
expected_capacity: int
class TestSetCapacity(unittest.TestCase):
class FakeClient:
def __init__(self):
self._expected_data = {} # type: dict
self._expected_capacity = -1
@property
def expected_data(self) -> dict:
"""a one-time property"""
data, self._expected_data = self._expected_data, {}
return data
@expected_data.setter
def expected_data(self, value: dict) -> None:
self._expected_data = value
@property
def expected_capacity(self) -> int:
"""a one-time property"""
capacity, self._expected_capacity = self._expected_capacity, -1
return capacity
def describe_auto_scaling_groups(self, **kwargs: Any) -> dict:
_ = kwargs
return self.expected_data
def set_desired_capacity(self, **kwargs: Any) -> None:
self._expected_capacity = kwargs["DesiredCapacity"]
def data_helper(
self, name: str, min_size: int, desired_capacity: int, max_size: int
) -> None:
self.expected_data = {
"AutoScalingGroups": [
{
"AutoScalingGroupName": name,
"DesiredCapacity": desired_capacity,
"MinSize": min_size,
"MaxSize": max_size,
"Instances": [], # necessary for ins["HealthStatus"] check
}
]
}
def setUp(self):
self.client = self.FakeClient()
def test_normal_cases(self):
test_cases = (
# Do not change capacity
TestCase("noqueue", 1, 13, 20, [Queue("in_progress", 155, "noqueue")], -1),
TestCase("reserve", 1, 13, 20, [Queue("queued", 13, "reserve")], -1),
# Increase capacity
TestCase(
"increase-always",
1,
13,
20,
[Queue("queued", 14, "increase-always")],
14,
),
TestCase("increase-1", 1, 13, 20, [Queue("queued", 23, "increase-1")], 17),
TestCase(
"style-checker", 1, 13, 20, [Queue("queued", 19, "style-checker")], 19
),
TestCase("increase-2", 1, 13, 20, [Queue("queued", 18, "increase-2")], 15),
TestCase("increase-3", 1, 13, 20, [Queue("queued", 183, "increase-3")], 20),
TestCase(
"increase-w/o reserve",
1,
13,
20,
[
Queue("in_progress", 11, "increase-w/o reserve"),
Queue("queued", 12, "increase-w/o reserve"),
],
17,
),
TestCase("lower-min", 10, 5, 20, [Queue("queued", 5, "lower-min")], 10),
# Decrease capacity
# FIXME: Tests changed for lambda that can only scale up
# TestCase("w/reserve", 1, 13, 20, [Queue("queued", 5, "w/reserve")], 5),
TestCase("w/reserve", 1, 13, 20, [Queue("queued", 5, "w/reserve")], -1),
# TestCase(
# "style-checker", 1, 13, 20, [Queue("queued", 5, "style-checker")], 5
# ),
TestCase(
"style-checker", 1, 13, 20, [Queue("queued", 5, "style-checker")], -1
),
# TestCase("w/reserve", 1, 23, 20, [Queue("queued", 17, "w/reserve")], 17),
TestCase("w/reserve", 1, 23, 20, [Queue("queued", 17, "w/reserve")], -1),
# TestCase("decrease", 1, 13, 20, [Queue("in_progress", 3, "decrease")], 3),
TestCase("decrease", 1, 13, 20, [Queue("in_progress", 3, "decrease")], -1),
# TestCase(
# "style-checker",
# 1,
# 13,
# 20,
# [Queue("in_progress", 5, "style-checker")],
# 5,
# ),
TestCase(
"style-checker",
1,
13,
20,
[Queue("in_progress", 5, "style-checker")],
-1,
),
)
for t in test_cases:
self.client.data_helper(t.name, t.min_size, t.desired_capacity, t.max_size)
set_capacity(t.name, t.queues, self.client, False)
self.assertEqual(t.expected_capacity, self.client.expected_capacity, t.name)
def test_effective_capacity(self):
"""Normal cases test increasing w/o considering
effective_capacity much lower than DesiredCapacity"""
test_cases = (
TestCase(
"desired-overwritten",
1,
20, # DesiredCapacity, overwritten by effective_capacity
50,
[
Queue("in_progress", 30, "desired-overwritten"),
Queue("queued", 60, "desired-overwritten"),
],
40,
),
)
for t in test_cases:
self.client.data_helper(t.name, t.min_size, t.desired_capacity, t.max_size)
# we test that effective_capacity is 30 (a half of 60)
data_with_instances = self.client.expected_data
data_with_instances["AutoScalingGroups"][0]["Instances"] = [
{"HealthStatus": "Healthy" if i % 2 else "Unhealthy"} for i in range(60)
]
self.client.expected_data = data_with_instances
set_capacity(t.name, t.queues, self.client, False)
self.assertEqual(t.expected_capacity, self.client.expected_capacity, t.name)
def test_exceptions(self):
test_cases = (
(
TestCase(
"different names",
1,
1,
1,
[Queue("queued", 5, "another name")],
-1,
),
AssertionError,
),
(TestCase("wrong queue len", 1, 1, 1, [], -1), AssertionError),
(
TestCase(
"wrong queue", 1, 1, 1, [Queue("wrong", 1, "wrong queue")], -1 # type: ignore
),
ValueError,
),
)
for t, error in test_cases:
with self.assertRaises(error):
self.client.data_helper(
t.name, t.min_size, t.desired_capacity, t.max_size
)
set_capacity(t.name, t.queues, self.client, False)
with self.assertRaises(AssertionError):
self.client.expected_data = {"AutoScalingGroups": [1, 2]}
set_capacity(
"wrong number of ASGs",
[Queue("queued", 1, "wrong number of ASGs")],
self.client,
)

View File

@ -12,7 +12,6 @@ import docker_images_helper
from ci_config import CI
from env_helper import REPO_COPY, S3_BUILDS_BUCKET, TEMP_PATH
from git_helper import Git
from lambda_shared_package.lambda_shared.pr import Labels
from pr_info import PRInfo
from report import FAILURE, SUCCESS, JobReport, StatusType
from stopwatch import Stopwatch
@ -108,7 +107,9 @@ def build_clickhouse(
def is_release_pr(pr_info: PRInfo) -> bool:
return Labels.RELEASE in pr_info.labels or Labels.RELEASE_LTS in pr_info.labels
return (
CI.Labels.RELEASE in pr_info.labels or CI.Labels.RELEASE_LTS in pr_info.labels
)
def get_release_or_pr(pr_info: PRInfo, version: ClickHouseVersion) -> Tuple[str, str]:

View File

@ -1,376 +0,0 @@
#!/usr/bin/env python3
import json
import time
from base64 import b64decode
from collections import namedtuple
from queue import Queue
from threading import Thread
from typing import Any, Dict, List, Optional
import requests
from lambda_shared.pr import Labels
from lambda_shared.token import get_cached_access_token
NEED_RERUN_OR_CANCELL_WORKFLOWS = {
"BackportPR",
"DocsCheck",
"MasterCI",
"PullRequestCI",
}
MAX_RETRY = 5
DEBUG_INFO = {} # type: Dict[str, Any]
class Worker(Thread):
def __init__(
self, request_queue: Queue, token: str, ignore_exception: bool = False
):
Thread.__init__(self)
self.queue = request_queue
self.token = token
self.ignore_exception = ignore_exception
self.response = {} # type: Dict
def run(self):
m = self.queue.get()
try:
self.response = _exec_get_with_retry(m, self.token)
except Exception as e:
if not self.ignore_exception:
raise
print(f"Exception occured, still continue: {e}")
self.queue.task_done()
def _exec_get_with_retry(url: str, token: str) -> dict:
headers = {"Authorization": f"token {token}"}
e = Exception()
for i in range(MAX_RETRY):
try:
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
return response.json() # type: ignore
except Exception as ex:
print("Got exception executing request", ex)
e = ex
time.sleep(i + 1)
raise requests.HTTPError("Cannot execute GET request with retries") from e
WorkflowDescription = namedtuple(
"WorkflowDescription",
[
"url",
"run_id",
"name",
"head_sha",
"status",
"rerun_url",
"cancel_url",
"conclusion",
],
)
def get_workflows_description_for_pull_request(
pull_request_event: dict, token: str
) -> List[WorkflowDescription]:
head_repo = pull_request_event["head"]["repo"]["full_name"]
head_branch = pull_request_event["head"]["ref"]
print("PR", pull_request_event["number"], "has head ref", head_branch)
workflows_data = []
repo_url = pull_request_event["base"]["repo"]["url"]
request_url = f"{repo_url}/actions/runs?per_page=100"
# Get all workflows for the current branch
for i in range(1, 11):
workflows = _exec_get_with_retry(
f"{request_url}&event=pull_request&branch={head_branch}&page={i}", token
)
if not workflows["workflow_runs"]:
break
workflows_data += workflows["workflow_runs"]
if i == 10:
print("Too many workflows found")
if not workflows_data:
print("No workflows found by filter")
return []
print(f"Total workflows for the branch {head_branch} found: {len(workflows_data)}")
DEBUG_INFO["workflows"] = []
workflow_descriptions = []
for workflow in workflows_data:
# Some time workflow["head_repository"]["full_name"] is None
if workflow["head_repository"] is None:
continue
DEBUG_INFO["workflows"].append(
{
"full_name": workflow["head_repository"]["full_name"],
"name": workflow["name"],
"branch": workflow["head_branch"],
}
)
# unfortunately we cannot filter workflows from forks in request to API
# so doing it manually
if (
workflow["head_repository"]["full_name"] == head_repo
and workflow["name"] in NEED_RERUN_OR_CANCELL_WORKFLOWS
):
workflow_descriptions.append(
WorkflowDescription(
url=workflow["url"],
run_id=workflow["id"],
name=workflow["name"],
head_sha=workflow["head_sha"],
status=workflow["status"],
rerun_url=workflow["rerun_url"],
cancel_url=workflow["cancel_url"],
conclusion=workflow["conclusion"],
)
)
return workflow_descriptions
def get_workflow_description_fallback(
pull_request_event: dict, token: str
) -> List[WorkflowDescription]:
head_repo = pull_request_event["head"]["repo"]["full_name"]
head_branch = pull_request_event["head"]["ref"]
print("Get last 500 workflows from API to search related there")
# Fallback for a case of an already deleted branch and no workflows received
repo_url = pull_request_event["base"]["repo"]["url"]
request_url = f"{repo_url}/actions/runs?per_page=100"
q = Queue() # type: Queue
workers = []
workflows_data = []
i = 1
for i in range(1, 6):
q.put(f"{request_url}&page={i}")
worker = Worker(q, token, True)
worker.start()
workers.append(worker)
for worker in workers:
worker.join()
if not worker.response:
# We ignore get errors, so response can be empty
continue
# Prefilter workflows
workflows_data += [
wf
for wf in worker.response["workflow_runs"]
if wf["head_repository"] is not None
and wf["head_repository"]["full_name"] == head_repo
and wf["head_branch"] == head_branch
and wf["name"] in NEED_RERUN_OR_CANCELL_WORKFLOWS
]
print(f"Total workflows in last 500 actions matches: {len(workflows_data)}")
DEBUG_INFO["workflows"] = [
{
"full_name": wf["head_repository"]["full_name"],
"name": wf["name"],
"branch": wf["head_branch"],
}
for wf in workflows_data
]
workflow_descriptions = [
WorkflowDescription(
url=wf["url"],
run_id=wf["id"],
name=wf["name"],
head_sha=wf["head_sha"],
status=wf["status"],
rerun_url=wf["rerun_url"],
cancel_url=wf["cancel_url"],
conclusion=wf["conclusion"],
)
for wf in workflows_data
]
return workflow_descriptions
def get_workflow_description(workflow_url: str, token: str) -> WorkflowDescription:
workflow = _exec_get_with_retry(workflow_url, token)
return WorkflowDescription(
url=workflow["url"],
run_id=workflow["id"],
name=workflow["name"],
head_sha=workflow["head_sha"],
status=workflow["status"],
rerun_url=workflow["rerun_url"],
cancel_url=workflow["cancel_url"],
conclusion=workflow["conclusion"],
)
def _exec_post_with_retry(url: str, token: str, json: Optional[Any] = None) -> Any:
headers = {"Authorization": f"token {token}"}
e = Exception()
for i in range(MAX_RETRY):
try:
response = requests.post(url, headers=headers, json=json, timeout=30)
response.raise_for_status()
return response.json()
except Exception as ex:
print("Got exception executing request", ex)
e = ex
time.sleep(i + 1)
raise requests.HTTPError("Cannot execute POST request with retry") from e
def exec_workflow_url(urls_to_post, token):
for url in urls_to_post:
print("Post for workflow workflow using url", url)
_exec_post_with_retry(url, token)
print("Workflow post finished")
def main(event):
token = get_cached_access_token()
DEBUG_INFO["event"] = event
if event["isBase64Encoded"]:
event_data = json.loads(b64decode(event["body"]))
else:
event_data = json.loads(event["body"])
print("Got event for PR", event_data["number"])
action = event_data["action"]
print("Got action", event_data["action"])
pull_request = event_data["pull_request"]
label = ""
if action == "labeled":
label = event_data["label"]["name"]
print("Added label:", label)
print("PR has labels", {label["name"] for label in pull_request["labels"]})
if action == "opened" or (
action == "labeled" and pull_request["created_at"] == pull_request["updated_at"]
):
print("Freshly opened PR, nothing to do")
return
if action == "closed" or label == Labels.DO_NOT_TEST:
print("PR merged/closed or manually labeled 'do not test', will kill workflows")
workflow_descriptions = get_workflows_description_for_pull_request(
pull_request, token
)
workflow_descriptions = (
workflow_descriptions
or get_workflow_description_fallback(pull_request, token)
)
urls_to_cancel = []
for workflow_description in workflow_descriptions:
if (
workflow_description.status != "completed"
and workflow_description.conclusion != "cancelled"
):
urls_to_cancel.append(workflow_description.cancel_url)
print(f"Found {len(urls_to_cancel)} workflows to cancel")
exec_workflow_url(urls_to_cancel, token)
return
if label == Labels.CAN_BE_TESTED:
print("PR marked with can be tested label, rerun workflow")
workflow_descriptions = get_workflows_description_for_pull_request(
pull_request, token
)
workflow_descriptions = (
workflow_descriptions
or get_workflow_description_fallback(pull_request, token)
)
if not workflow_descriptions:
print("Not found any workflows")
return
workflow_descriptions.sort(key=lambda x: x.run_id) # type: ignore
most_recent_workflow = workflow_descriptions[-1]
print("Latest workflow", most_recent_workflow)
if (
most_recent_workflow.status != "completed"
and most_recent_workflow.conclusion != "cancelled"
):
print("Latest workflow is not completed, cancelling")
exec_workflow_url([most_recent_workflow.cancel_url], token)
print("Cancelled")
for _ in range(45):
# If the number of retries is changed: tune the lambda limits accordingly
latest_workflow_desc = get_workflow_description(
most_recent_workflow.url, token
)
print("Checking latest workflow", latest_workflow_desc)
if latest_workflow_desc.status in ("completed", "cancelled"):
print("Finally latest workflow done, going to rerun")
exec_workflow_url([most_recent_workflow.rerun_url], token)
print("Rerun finished, exiting")
break
print("Still have strange status")
time.sleep(3)
return
if action == "edited":
print("PR is edited - do nothing")
# error, _ = check_pr_description(
# pull_request["body"], pull_request["base"]["repo"]["full_name"]
# )
# if error:
# print(
# f"The PR's body is wrong, is going to comment it. The error is: {error}"
# )
# post_json = {
# "body": "This is an automatic comment. The PR descriptions does not "
# f"match the [template]({pull_request['base']['repo']['html_url']}/"
# "blob/master/.github/PULL_REQUEST_TEMPLATE.md?plain=1).\n\n"
# f"Please, edit it accordingly.\n\nThe error is: {error}"
# }
# _exec_post_with_retry(pull_request["comments_url"], token, json=post_json)
return
if action == "synchronize":
print("PR is synchronized, going to stop old actions")
workflow_descriptions = get_workflows_description_for_pull_request(
pull_request, token
)
workflow_descriptions = (
workflow_descriptions
or get_workflow_description_fallback(pull_request, token)
)
urls_to_cancel = []
for workflow_description in workflow_descriptions:
if (
workflow_description.status != "completed"
and workflow_description.conclusion != "cancelled"
and workflow_description.head_sha != pull_request["head"]["sha"]
):
urls_to_cancel.append(workflow_description.cancel_url)
print(f"Found {len(urls_to_cancel)} workflows to cancel")
exec_workflow_url(urls_to_cancel, token)
return
print("Nothing to do")
def handler(event, _):
try:
main(event)
return {
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": '{"status": "OK"}',
}
finally:
for name, value in DEBUG_INFO.items():
print(f"Value of {name}: ", value)

View File

@ -1 +0,0 @@
../team_keys_lambda/build_and_deploy_archive.sh

View File

@ -1 +0,0 @@
../lambda_shared_package/lambda_shared

View File

@ -1 +0,0 @@
../lambda_shared_package[token]

View File

@ -38,7 +38,7 @@ from env_helper import TEMP_PATH
from get_robot_token import get_best_robot_token
from git_helper import GIT_PREFIX, git_runner, is_shallow
from github_helper import GitHub, PullRequest, PullRequests, Repository
from lambda_shared_package.lambda_shared.pr import Labels
from ci_config import Labels
from ssh import SSHKey

View File

@ -32,6 +32,9 @@ class CI:
from ci_definitions import MQ_JOBS as MQ_JOBS
from ci_definitions import WorkflowStages as WorkflowStages
from ci_definitions import Runners as Runners
from ci_definitions import Labels as Labels
from ci_definitions import TRUSTED_CONTRIBUTORS as TRUSTED_CONTRIBUTORS
from ci_utils import CATEGORY_TO_LABEL as CATEGORY_TO_LABEL
# Jobs that run for doc related updates
_DOCS_CHECK_JOBS = [JobNames.DOCS_CHECK, JobNames.STYLE_CHECK]

View File

@ -7,6 +7,53 @@ from ci_utils import WithIter
from integration_test_images import IMAGES
class Labels:
PR_BUGFIX = "pr-bugfix"
PR_CRITICAL_BUGFIX = "pr-critical-bugfix"
CAN_BE_TESTED = "can be tested"
DO_NOT_TEST = "do not test"
MUST_BACKPORT = "pr-must-backport"
MUST_BACKPORT_CLOUD = "pr-must-backport-cloud"
JEPSEN_TEST = "jepsen-test"
SKIP_MERGEABLE_CHECK = "skip mergeable check"
PR_BACKPORT = "pr-backport"
PR_BACKPORTS_CREATED = "pr-backports-created"
PR_BACKPORTS_CREATED_CLOUD = "pr-backports-created-cloud"
PR_CHERRYPICK = "pr-cherrypick"
PR_CI = "pr-ci"
PR_FEATURE = "pr-feature"
PR_SYNCED_TO_CLOUD = "pr-synced-to-cloud"
PR_SYNC_UPSTREAM = "pr-sync-upstream"
RELEASE = "release"
RELEASE_LTS = "release-lts"
SUBMODULE_CHANGED = "submodule changed"
# automatic backport for critical bug fixes
AUTO_BACKPORT = {"pr-critical-bugfix"}
TRUSTED_CONTRIBUTORS = {
e.lower()
for e in [
"amosbird",
"azat", # SEMRush
"bharatnc", # Many contributions.
"cwurm", # ClickHouse, Inc
"den-crane", # Documentation contributor
"ildus", # adjust, ex-pgpro
"nvartolomei", # Seasoned contributor, CloudFlare
"taiyang-li",
"ucasFL", # Amos Bird's friend
"thomoco", # ClickHouse, Inc
"tonickkozlov", # Cloudflare
"tylerhannan", # ClickHouse, Inc
"tsolodov", # ClickHouse, Inc
"justindeguzman", # ClickHouse, Inc
"XuJia0210", # ClickHouse, Inc
]
}
class WorkflowStages(metaclass=WithIter):
"""
Stages of GitHUb actions workflow

View File

@ -1,164 +0,0 @@
#!/usr/bin/env python3
"""
Lambda function to:
- calculate number of running runners
- cleaning dead runners from GitHub
- terminating stale lost runners in EC2
"""
import argparse
import sys
from typing import Dict
import boto3 # type: ignore
from lambda_shared import RUNNER_TYPE_LABELS, RunnerDescriptions, list_runners
from lambda_shared.token import (
get_access_token_by_key_app,
get_cached_access_token,
get_key_and_app_from_aws,
)
UNIVERSAL_LABEL = "universal"
def handler(event, context):
_ = event
_ = context
main(get_cached_access_token(), True)
def group_runners_by_tag(
listed_runners: RunnerDescriptions,
) -> Dict[str, RunnerDescriptions]:
result = {} # type: Dict[str, RunnerDescriptions]
def add_to_result(tag, runner):
if tag not in result:
result[tag] = []
result[tag].append(runner)
for runner in listed_runners:
if UNIVERSAL_LABEL in runner.tags:
# Do not proceed other labels if UNIVERSAL_LABEL is included
add_to_result(UNIVERSAL_LABEL, runner)
continue
for tag in runner.tags:
if tag in RUNNER_TYPE_LABELS:
add_to_result(tag, runner)
break
else:
add_to_result("unlabeled", runner)
return result
def push_metrics_to_cloudwatch(
listed_runners: RunnerDescriptions, group_name: str
) -> None:
client = boto3.client("cloudwatch")
namespace = "RunnersMetrics"
metrics_data = []
busy_runners = sum(
1 for runner in listed_runners if runner.busy and not runner.offline
)
dimensions = [{"Name": "group", "Value": group_name}]
metrics_data.append(
{
"MetricName": "BusyRunners",
"Value": busy_runners,
"Unit": "Count",
"Dimensions": dimensions,
}
)
total_active_runners = sum(1 for runner in listed_runners if not runner.offline)
metrics_data.append(
{
"MetricName": "ActiveRunners",
"Value": total_active_runners,
"Unit": "Count",
"Dimensions": dimensions,
}
)
total_runners = len(listed_runners)
metrics_data.append(
{
"MetricName": "TotalRunners",
"Value": total_runners,
"Unit": "Count",
"Dimensions": dimensions,
}
)
if total_active_runners == 0:
busy_ratio = 100.0
else:
busy_ratio = busy_runners / total_active_runners * 100
metrics_data.append(
{
"MetricName": "BusyRunnersRatio",
"Value": busy_ratio,
"Unit": "Percent",
"Dimensions": dimensions,
}
)
client.put_metric_data(Namespace=namespace, MetricData=metrics_data)
def main(
access_token: str,
push_to_cloudwatch: bool,
) -> None:
gh_runners = list_runners(access_token)
grouped_runners = group_runners_by_tag(gh_runners)
for group, group_runners in grouped_runners.items():
if push_to_cloudwatch:
print(f"Pushing metrics for group '{group}'")
push_metrics_to_cloudwatch(group_runners, group)
else:
print(group, f"({len(group_runners)})")
for runner in group_runners:
print("\t", runner)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Get list of runners and their states")
parser.add_argument(
"-p", "--private-key-path", help="Path to file with private key"
)
parser.add_argument("-k", "--private-key", help="Private key")
parser.add_argument(
"-a", "--app-id", type=int, help="GitHub application ID", required=True
)
parser.add_argument(
"--push-to-cloudwatch",
action="store_true",
help="Push metrics for active and busy runners to cloudwatch",
)
args = parser.parse_args()
if not args.private_key_path and not args.private_key:
print(
"Either --private-key-path or --private-key must be specified",
file=sys.stderr,
)
if args.private_key_path and args.private_key:
print(
"Either --private-key-path or --private-key must be specified",
file=sys.stderr,
)
if args.private_key:
private_key = args.private_key
elif args.private_key_path:
with open(args.private_key_path, "r", encoding="utf-8") as key_file:
private_key = key_file.read()
else:
print("Attempt to get key and id from AWS secret manager")
private_key, args.app_id = get_key_and_app_from_aws()
token = get_access_token_by_key_app(private_key, args.app_id)
main(token, args.push_to_cloudwatch)

View File

@ -1 +0,0 @@
../team_keys_lambda/build_and_deploy_archive.sh

View File

@ -1 +0,0 @@
../lambda_shared_package/lambda_shared

View File

@ -1,2 +0,0 @@
../lambda_shared_package
../lambda_shared_package[token]

View File

@ -3,7 +3,42 @@ import re
import subprocess
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Iterator, List, Union, Optional
from typing import Any, Iterator, List, Union, Optional, Tuple
LABEL_CATEGORIES = {
"pr-backward-incompatible": ["Backward Incompatible Change"],
"pr-bugfix": [
"Bug Fix",
"Bug Fix (user-visible misbehavior in an official stable release)",
"Bug Fix (user-visible misbehaviour in official stable or prestable release)",
"Bug Fix (user-visible misbehavior in official stable or prestable release)",
],
"pr-critical-bugfix": ["Critical Bug Fix (crash, LOGICAL_ERROR, data loss, RBAC)"],
"pr-build": [
"Build/Testing/Packaging Improvement",
"Build Improvement",
"Build/Testing Improvement",
"Build",
"Packaging Improvement",
],
"pr-documentation": [
"Documentation (changelog entry is not required)",
"Documentation",
],
"pr-feature": ["New Feature"],
"pr-improvement": ["Improvement"],
"pr-not-for-changelog": [
"Not for changelog (changelog entry is not required)",
"Not for changelog",
],
"pr-performance": ["Performance Improvement"],
"pr-ci": ["CI Fix or Improvement (changelog entry is not required)"],
}
CATEGORY_TO_LABEL = {
c: lb for lb, categories in LABEL_CATEGORIES.items() for c in categories
}
class WithIter(type):
@ -109,3 +144,81 @@ class Utils:
@staticmethod
def clear_dmesg():
Shell.run("sudo dmesg --clear ||:")
@staticmethod
def check_pr_description(pr_body: str, repo_name: str) -> Tuple[str, str]:
"""The function checks the body to being properly formatted according to
.github/PULL_REQUEST_TEMPLATE.md, if the first returned string is not empty,
then there is an error."""
lines = list(map(lambda x: x.strip(), pr_body.split("\n") if pr_body else []))
lines = [re.sub(r"\s+", " ", line) for line in lines]
# Check if body contains "Reverts ClickHouse/ClickHouse#36337"
if [
True for line in lines if re.match(rf"\AReverts {repo_name}#[\d]+\Z", line)
]:
return "", LABEL_CATEGORIES["pr-not-for-changelog"][0]
category = ""
entry = ""
description_error = ""
i = 0
while i < len(lines):
if re.match(r"(?i)^[#>*_ ]*change\s*log\s*category", lines[i]):
i += 1
if i >= len(lines):
break
# Can have one empty line between header and the category
# itself. Filter it out.
if not lines[i]:
i += 1
if i >= len(lines):
break
category = re.sub(r"^[-*\s]*", "", lines[i])
i += 1
# Should not have more than one category. Require empty line
# after the first found category.
if i >= len(lines):
break
if lines[i]:
second_category = re.sub(r"^[-*\s]*", "", lines[i])
description_error = (
"More than one changelog category specified: "
f"'{category}', '{second_category}'"
)
return description_error, category
elif re.match(
r"(?i)^[#>*_ ]*(short\s*description|change\s*log\s*entry)", lines[i]
):
i += 1
# Can have one empty line between header and the entry itself.
# Filter it out.
if i < len(lines) and not lines[i]:
i += 1
# All following lines until empty one are the changelog entry.
entry_lines = []
while i < len(lines) and lines[i]:
entry_lines.append(lines[i])
i += 1
entry = " ".join(entry_lines)
# Don't accept changelog entries like '...'.
entry = re.sub(r"[#>*_.\- ]", "", entry)
# Don't accept changelog entries like 'Close #12345'.
entry = re.sub(r"^[\w\-\s]{0,10}#?\d{5,6}\.?$", "", entry)
else:
i += 1
if not category:
description_error = "Changelog category is empty"
# Filter out the PR categories that are not for changelog.
elif "(changelog entry is not required)" in category:
pass # to not check the rest of the conditions
elif category not in CATEGORY_TO_LABEL:
description_error, category = f"Category '{category}' is not valid", ""
elif not entry:
description_error = f"Changelog entry required for category '{category}'"
return description_error, category

View File

@ -1,336 +0,0 @@
#!/usr/bin/env python3
"""
Lambda function to:
- calculate number of running runners
- cleaning dead runners from GitHub
- terminating stale lost runners in EC2
"""
import argparse
import sys
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, List
import boto3 # type: ignore
import requests
from botocore.exceptions import ClientError # type: ignore
from lambda_shared import (
RUNNER_TYPE_LABELS,
RunnerDescription,
RunnerDescriptions,
list_runners,
)
from lambda_shared.token import (
get_access_token_by_key_app,
get_cached_access_token,
get_key_and_app_from_aws,
)
UNIVERSAL_LABEL = "universal"
@dataclass
class LostInstance:
counter: int
seen: datetime
def set_offline(self) -> None:
now = datetime.now()
if now.timestamp() <= self.seen.timestamp() + 120:
# the instance is offline for more than 2 minutes, so we increase
# the counter
self.counter += 1
else:
self.counter = 1
self.seen = now
@property
def recently_offline(self) -> bool:
"""Returns True if the instance has been seen less than 5 minutes ago"""
return datetime.now().timestamp() <= self.seen.timestamp() + 300
@property
def stable_offline(self) -> bool:
return self.counter >= 3
LOST_INSTANCES = {} # type: Dict["str", LostInstance]
def get_dead_runners_in_ec2(runners: RunnerDescriptions) -> RunnerDescriptions:
"""Returns instances that are offline/dead in EC2, or not found in EC2"""
ids = {
runner.name: runner
for runner in runners
# Only `i-deadbead123` are valid names for an instance ID
if runner.name.startswith("i-") and runner.offline and not runner.busy
}
if not ids:
return []
# Delete all offline runners with wrong name
result_to_delete = [
runner
for runner in runners
if not ids.get(runner.name) and runner.offline and not runner.busy
]
client = boto3.client("ec2")
i = 0
inc = 100
print("Checking ids: ", " ".join(ids.keys()))
instances_statuses = []
while i < len(ids.keys()):
try:
instances_statuses.append(
client.describe_instance_status(
InstanceIds=list(ids.keys())[i : i + inc]
)
)
# It applied only if all ids exist in EC2
i += inc
except ClientError as e:
# The list of non-existent instances is in the message:
# The instance IDs 'i-069b1c256c06cf4e3, i-0f26430432b044035,
# i-0faa2ff44edbc147e, i-0eccf2514585045ec, i-0ee4ee53e0daa7d4a,
# i-07928f15acd473bad, i-0eaddda81298f9a85' do not exist
message = e.response["Error"]["Message"]
if message.startswith("The instance IDs '") and message.endswith(
"' do not exist"
):
non_existent = message[18:-14].split(", ")
for n in non_existent:
result_to_delete.append(ids.pop(n))
else:
raise
found_instances = set([])
print("Response", instances_statuses)
for instances_status in instances_statuses:
for instance_status in instances_status["InstanceStatuses"]:
if instance_status["InstanceState"]["Name"] in ("pending", "running"):
found_instances.add(instance_status["InstanceId"])
print("Found instances", found_instances)
for runner in result_to_delete:
print("Instance", runner.name, "is not alive, going to remove it")
for instance_id, runner in ids.items():
if instance_id not in found_instances:
print("Instance", instance_id, "is not found in EC2, going to remove it")
result_to_delete.append(runner)
return result_to_delete
def handler(event, context):
_ = event
_ = context
main(get_cached_access_token(), True)
def delete_runner(access_token: str, runner: RunnerDescription) -> bool:
headers = {
"Authorization": f"token {access_token}",
"Accept": "application/vnd.github.v3+json",
}
response = requests.delete(
f"https://api.github.com/orgs/ClickHouse/actions/runners/{runner.id}",
headers=headers,
timeout=30,
)
response.raise_for_status()
print(f"Response code deleting {runner.name} is {response.status_code}")
return bool(response.status_code == 204)
def get_lost_ec2_instances(runners: RunnerDescriptions) -> List[str]:
global LOST_INSTANCES
now = datetime.now()
client = boto3.client("ec2")
reservations = client.describe_instances(
Filters=[
{"Name": "tag-key", "Values": ["github:runner-type"]},
{"Name": "instance-state-name", "Values": ["pending", "running"]},
],
)["Reservations"]
# flatten the reservation into instances
instances = [
instance
for reservation in reservations
for instance in reservation["Instances"]
]
offline_runner_names = {
runner.name for runner in runners if runner.offline and not runner.busy
}
runner_names = {runner.name for runner in runners}
def offline_instance(iid: str) -> None:
if iid in LOST_INSTANCES:
LOST_INSTANCES[iid].set_offline()
return
LOST_INSTANCES[iid] = LostInstance(1, now)
for instance in instances:
# Do not consider instances started 20 minutes ago as problematic
if now.timestamp() - instance["LaunchTime"].timestamp() < 1200:
continue
runner_type = [
tag["Value"]
for tag in instance["Tags"]
if tag["Key"] == "github:runner-type"
][0]
# If there's no necessary labels in runner type it's fine
if not (UNIVERSAL_LABEL in runner_type or runner_type in RUNNER_TYPE_LABELS):
continue
if instance["InstanceId"] in offline_runner_names:
offline_instance(instance["InstanceId"])
continue
if (
instance["State"]["Name"] == "running"
and not instance["InstanceId"] in runner_names
):
offline_instance(instance["InstanceId"])
instance_ids = [instance["InstanceId"] for instance in instances]
# clean out long unseen instances
LOST_INSTANCES = {
instance_id: stats
for instance_id, stats in LOST_INSTANCES.items()
if stats.recently_offline and instance_id in instance_ids
}
print("The remained LOST_INSTANCES: ", LOST_INSTANCES)
return [
instance_id
for instance_id, stats in LOST_INSTANCES.items()
if stats.stable_offline
]
def continue_lifecycle_hooks(delete_offline_runners: bool) -> None:
"""The function to trigger CONTINUE for instances' lifectycle hooks"""
client = boto3.client("ec2")
reservations = client.describe_instances(
Filters=[
{"Name": "tag-key", "Values": ["github:runner-type"]},
{"Name": "instance-state-name", "Values": ["shutting-down", "terminated"]},
],
)["Reservations"]
# flatten the reservation into instances
terminated_instances = [
instance["InstanceId"]
for reservation in reservations
for instance in reservation["Instances"]
]
asg_client = boto3.client("autoscaling")
as_groups = asg_client.describe_auto_scaling_groups(
Filters=[{"Name": "tag-key", "Values": ["github:runner-type"]}]
)["AutoScalingGroups"]
for asg in as_groups:
lifecycle_hooks = [
lch
for lch in asg_client.describe_lifecycle_hooks(
AutoScalingGroupName=asg["AutoScalingGroupName"]
)["LifecycleHooks"]
if lch["LifecycleTransition"] == "autoscaling:EC2_INSTANCE_TERMINATING"
]
if not lifecycle_hooks:
continue
for instance in asg["Instances"]:
continue_instance = False
if instance["LifecycleState"] == "Terminating:Wait":
if instance["HealthStatus"] == "Unhealthy":
print(f"The instance {instance['InstanceId']} is Unhealthy")
continue_instance = True
elif (
instance["HealthStatus"] == "Healthy"
and instance["InstanceId"] in terminated_instances
):
print(
f"The instance {instance['InstanceId']} is already terminated"
)
continue_instance = True
if continue_instance:
if delete_offline_runners:
for lch in lifecycle_hooks:
print(f"Continue lifecycle hook {lch['LifecycleHookName']}")
asg_client.complete_lifecycle_action(
LifecycleHookName=lch["LifecycleHookName"],
AutoScalingGroupName=asg["AutoScalingGroupName"],
LifecycleActionResult="CONTINUE",
InstanceId=instance["InstanceId"],
)
def main(
access_token: str,
delete_offline_runners: bool,
) -> None:
gh_runners = list_runners(access_token)
dead_runners = get_dead_runners_in_ec2(gh_runners)
print("Runners in GH API to terminate: ", [runner.name for runner in dead_runners])
if delete_offline_runners and dead_runners:
print("Going to delete offline runners")
for runner in dead_runners:
print("Deleting runner", runner)
delete_runner(access_token, runner)
elif dead_runners:
print("Would delete dead runners: ", dead_runners)
lost_instances = get_lost_ec2_instances(gh_runners)
print("Instances to terminate: ", lost_instances)
if delete_offline_runners:
if lost_instances:
print("Going to terminate lost instances")
boto3.client("ec2").terminate_instances(InstanceIds=lost_instances)
continue_lifecycle_hooks(delete_offline_runners)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Get list of runners and their states")
parser.add_argument(
"-p", "--private-key-path", help="Path to file with private key"
)
parser.add_argument("-k", "--private-key", help="Private key")
parser.add_argument(
"-a", "--app-id", type=int, help="GitHub application ID", required=True
)
parser.add_argument(
"--delete-offline", action="store_true", help="Remove offline runners"
)
args = parser.parse_args()
if not args.private_key_path and not args.private_key:
print(
"Either --private-key-path or --private-key must be specified",
file=sys.stderr,
)
if args.private_key_path and args.private_key:
print(
"Either --private-key-path or --private-key must be specified",
file=sys.stderr,
)
if args.private_key:
private_key = args.private_key
elif args.private_key_path:
with open(args.private_key_path, "r", encoding="utf-8") as key_file:
private_key = key_file.read()
else:
print("Attempt to get key and id from AWS secret manager")
private_key, args.app_id = get_key_and_app_from_aws()
token = get_access_token_by_key_app(private_key, args.app_id)
main(token, args.delete_offline)

View File

@ -1 +0,0 @@
../team_keys_lambda/build_and_deploy_archive.sh

View File

@ -1 +0,0 @@
../lambda_shared_package/lambda_shared

View File

@ -1,2 +0,0 @@
../lambda_shared_package
../lambda_shared_package[token]

View File

@ -1,2 +0,0 @@
build
*.egg-info

View File

@ -1,237 +0,0 @@
"""The shared code and types for all our CI lambdas
It exists as __init__.py and lambda_shared/__init__.py to work both in local and venv"""
import json
import logging
import time
from collections import namedtuple
from typing import Any, Dict, Iterable, List, Optional
import boto3 # type: ignore
import requests
RUNNER_TYPE_LABELS = [
"builder",
"func-tester",
"func-tester-aarch64",
"fuzzer-unit-tester",
"limited-tester",
"stress-tester",
"style-checker",
"style-checker-aarch64",
# private runners
"private-builder",
"private-clickpipes",
"private-func-tester",
"private-fuzzer-unit-tester",
"private-stress-tester",
"private-style-checker",
]
### VENDORING
def get_parameter_from_ssm(
name: str, decrypt: bool = True, client: Optional[Any] = None
) -> str:
if not client:
client = boto3.client("ssm", region_name="us-east-1")
return client.get_parameter(Name=name, WithDecryption=decrypt)[ # type: ignore
"Parameter"
]["Value"]
class CHException(Exception):
pass
class InsertException(CHException):
pass
class ClickHouseHelper:
def __init__(
self,
url: str,
user: Optional[str] = None,
password: Optional[str] = None,
):
self.url = url
self.auth = {}
if user:
self.auth["X-ClickHouse-User"] = user
if password:
self.auth["X-ClickHouse-Key"] = password
@staticmethod
def _insert_json_str_info_impl(
url: str, auth: Dict[str, str], db: str, table: str, json_str: str
) -> None:
params = {
"database": db,
"query": f"INSERT INTO {table} FORMAT JSONEachRow",
"date_time_input_format": "best_effort",
"send_logs_level": "warning",
}
for i in range(5):
try:
response = requests.post(
url, params=params, data=json_str, headers=auth
)
except Exception as e:
error = f"Received exception while sending data to {url} on {i} attempt: {e}"
logging.warning(error)
continue
logging.info("Response content '%s'", response.content)
if response.ok:
break
error = (
"Cannot insert data into clickhouse at try "
+ str(i)
+ ": HTTP code "
+ str(response.status_code)
+ ": '"
+ str(response.text)
+ "'"
)
if response.status_code >= 500:
# A retriable error
time.sleep(1)
continue
logging.info(
"Request headers '%s', body '%s'",
response.request.headers,
response.request.body,
)
raise InsertException(error)
else:
raise InsertException(error)
def _insert_json_str_info(self, db: str, table: str, json_str: str) -> None:
self._insert_json_str_info_impl(self.url, self.auth, db, table, json_str)
def insert_event_into(
self, db: str, table: str, event: object, safe: bool = True
) -> None:
event_str = json.dumps(event)
try:
self._insert_json_str_info(db, table, event_str)
except InsertException as e:
logging.error(
"Exception happened during inserting data into clickhouse: %s", e
)
if not safe:
raise
def insert_events_into(
self, db: str, table: str, events: Iterable[object], safe: bool = True
) -> None:
jsons = []
for event in events:
jsons.append(json.dumps(event))
try:
self._insert_json_str_info(db, table, ",".join(jsons))
except InsertException as e:
logging.error(
"Exception happened during inserting data into clickhouse: %s", e
)
if not safe:
raise
def _select_and_get_json_each_row(self, db: str, query: str) -> str:
params = {
"database": db,
"query": query,
"default_format": "JSONEachRow",
}
for i in range(5):
response = None
try:
response = requests.get(self.url, params=params, headers=self.auth)
response.raise_for_status()
return response.text # type: ignore
except Exception as ex:
logging.warning("Cannot fetch data with exception %s", str(ex))
if response:
logging.warning("Reponse text %s", response.text)
time.sleep(0.1 * i)
raise CHException("Cannot fetch data from clickhouse")
def select_json_each_row(self, db: str, query: str) -> List[dict]:
text = self._select_and_get_json_each_row(db, query)
result = []
for line in text.split("\n"):
if line:
result.append(json.loads(line))
return result
### Runners
RunnerDescription = namedtuple(
"RunnerDescription", ["id", "name", "tags", "offline", "busy"]
)
RunnerDescriptions = List[RunnerDescription]
def list_runners(access_token: str) -> RunnerDescriptions:
headers = {
"Authorization": f"token {access_token}",
"Accept": "application/vnd.github.v3+json",
}
per_page = 100
response = requests.get(
f"https://api.github.com/orgs/ClickHouse/actions/runners?per_page={per_page}",
headers=headers,
)
response.raise_for_status()
data = response.json()
total_runners = data["total_count"]
print("Expected total runners", total_runners)
runners = data["runners"]
# round to 0 for 0, 1 for 1..100, but to 2 for 101..200
total_pages = (total_runners - 1) // per_page + 1
print("Total pages", total_pages)
for i in range(2, total_pages + 1):
response = requests.get(
"https://api.github.com/orgs/ClickHouse/actions/runners"
f"?page={i}&per_page={per_page}",
headers=headers,
)
response.raise_for_status()
data = response.json()
runners += data["runners"]
print("Total runners", len(runners))
result = []
for runner in runners:
tags = [tag["name"] for tag in runner["labels"]]
desc = RunnerDescription(
id=runner["id"],
name=runner["name"],
tags=tags,
offline=runner["status"] == "offline",
busy=runner["busy"],
)
result.append(desc)
return result
def cached_value_is_valid(updated_at: float, ttl: float) -> bool:
"a common function to identify if cachable value is still valid"
if updated_at == 0:
return False
if time.time() - ttl < updated_at:
return True
return False

View File

@ -1,168 +0,0 @@
#!/usr/bin/env python
import re
from typing import Tuple
# Individual trusted contributors who are not in any trusted organization.
# Can be changed in runtime: we will append users that we learned to be in
# a trusted org, to save GitHub API calls.
TRUSTED_CONTRIBUTORS = {
e.lower()
for e in [
"amosbird",
"azat", # SEMRush
"bharatnc", # Many contributions.
"cwurm", # ClickHouse, Inc
"den-crane", # Documentation contributor
"ildus", # adjust, ex-pgpro
"nvartolomei", # Seasoned contributor, CloudFlare
"taiyang-li",
"ucasFL", # Amos Bird's friend
"thomoco", # ClickHouse, Inc
"tonickkozlov", # Cloudflare
"tylerhannan", # ClickHouse, Inc
"tsolodov", # ClickHouse, Inc
"justindeguzman", # ClickHouse, Inc
"XuJia0210", # ClickHouse, Inc
]
}
class Labels:
PR_BUGFIX = "pr-bugfix"
PR_CRITICAL_BUGFIX = "pr-critical-bugfix"
CAN_BE_TESTED = "can be tested"
DO_NOT_TEST = "do not test"
MUST_BACKPORT = "pr-must-backport"
MUST_BACKPORT_CLOUD = "pr-must-backport-cloud"
JEPSEN_TEST = "jepsen-test"
SKIP_MERGEABLE_CHECK = "skip mergeable check"
PR_BACKPORT = "pr-backport"
PR_BACKPORTS_CREATED = "pr-backports-created"
PR_BACKPORTS_CREATED_CLOUD = "pr-backports-created-cloud"
PR_CHERRYPICK = "pr-cherrypick"
PR_CI = "pr-ci"
PR_FEATURE = "pr-feature"
PR_SYNCED_TO_CLOUD = "pr-synced-to-cloud"
PR_SYNC_UPSTREAM = "pr-sync-upstream"
RELEASE = "release"
RELEASE_LTS = "release-lts"
SUBMODULE_CHANGED = "submodule changed"
# automatic backport for critical bug fixes
AUTO_BACKPORT = {"pr-critical-bugfix"}
# Descriptions are used in .github/PULL_REQUEST_TEMPLATE.md, keep comments there
# updated accordingly
# The following lists are append only, try to avoid editing them
# They still could be cleaned out after the decent time though.
LABEL_CATEGORIES = {
"pr-backward-incompatible": ["Backward Incompatible Change"],
"pr-bugfix": [
"Bug Fix",
"Bug Fix (user-visible misbehavior in an official stable release)",
"Bug Fix (user-visible misbehaviour in official stable or prestable release)",
"Bug Fix (user-visible misbehavior in official stable or prestable release)",
],
"pr-critical-bugfix": ["Critical Bug Fix (crash, LOGICAL_ERROR, data loss, RBAC)"],
"pr-build": [
"Build/Testing/Packaging Improvement",
"Build Improvement",
"Build/Testing Improvement",
"Build",
"Packaging Improvement",
],
"pr-documentation": [
"Documentation (changelog entry is not required)",
"Documentation",
],
"pr-feature": ["New Feature"],
"pr-improvement": ["Improvement"],
"pr-not-for-changelog": [
"Not for changelog (changelog entry is not required)",
"Not for changelog",
],
"pr-performance": ["Performance Improvement"],
"pr-ci": ["CI Fix or Improvement (changelog entry is not required)"],
}
CATEGORY_TO_LABEL = {
c: lb for lb, categories in LABEL_CATEGORIES.items() for c in categories
}
def check_pr_description(pr_body: str, repo_name: str) -> Tuple[str, str]:
"""The function checks the body to being properly formatted according to
.github/PULL_REQUEST_TEMPLATE.md, if the first returned string is not empty,
then there is an error."""
lines = list(map(lambda x: x.strip(), pr_body.split("\n") if pr_body else []))
lines = [re.sub(r"\s+", " ", line) for line in lines]
# Check if body contains "Reverts ClickHouse/ClickHouse#36337"
if [True for line in lines if re.match(rf"\AReverts {repo_name}#[\d]+\Z", line)]:
return "", LABEL_CATEGORIES["pr-not-for-changelog"][0]
category = ""
entry = ""
description_error = ""
i = 0
while i < len(lines):
if re.match(r"(?i)^[#>*_ ]*change\s*log\s*category", lines[i]):
i += 1
if i >= len(lines):
break
# Can have one empty line between header and the category
# itself. Filter it out.
if not lines[i]:
i += 1
if i >= len(lines):
break
category = re.sub(r"^[-*\s]*", "", lines[i])
i += 1
# Should not have more than one category. Require empty line
# after the first found category.
if i >= len(lines):
break
if lines[i]:
second_category = re.sub(r"^[-*\s]*", "", lines[i])
description_error = (
"More than one changelog category specified: "
f"'{category}', '{second_category}'"
)
return description_error, category
elif re.match(
r"(?i)^[#>*_ ]*(short\s*description|change\s*log\s*entry)", lines[i]
):
i += 1
# Can have one empty line between header and the entry itself.
# Filter it out.
if i < len(lines) and not lines[i]:
i += 1
# All following lines until empty one are the changelog entry.
entry_lines = []
while i < len(lines) and lines[i]:
entry_lines.append(lines[i])
i += 1
entry = " ".join(entry_lines)
# Don't accept changelog entries like '...'.
entry = re.sub(r"[#>*_.\- ]", "", entry)
# Don't accept changelog entries like 'Close #12345'.
entry = re.sub(r"^[\w\-\s]{0,10}#?\d{5,6}\.?$", "", entry)
else:
i += 1
if not category:
description_error = "Changelog category is empty"
# Filter out the PR categories that are not for changelog.
elif "(changelog entry is not required)" in category:
pass # to not check the rest of the conditions
elif category not in CATEGORY_TO_LABEL:
description_error, category = f"Category '{category}' is not valid", ""
elif not entry:
description_error = f"Changelog entry required for category '{category}'"
return description_error, category

View File

@ -1,95 +0,0 @@
"""Module to get the token for GitHub"""
from dataclasses import dataclass
import json
import time
from typing import Tuple
import boto3 # type: ignore
import jwt
import requests
from . import cached_value_is_valid
def get_key_and_app_from_aws() -> Tuple[str, int]:
secret_name = "clickhouse_github_secret_key"
session = boto3.session.Session()
client = session.client(
service_name="secretsmanager",
)
get_secret_value_response = client.get_secret_value(SecretId=secret_name)
data = json.loads(get_secret_value_response["SecretString"])
return data["clickhouse-app-key"], int(data["clickhouse-app-id"])
def get_installation_id(jwt_token: str) -> int:
headers = {
"Authorization": f"Bearer {jwt_token}",
"Accept": "application/vnd.github.v3+json",
}
response = requests.get("https://api.github.com/app/installations", headers=headers)
response.raise_for_status()
data = response.json()
for installation in data:
if installation["account"]["login"] == "ClickHouse":
installation_id = installation["id"]
return installation_id # type: ignore
def get_access_token_by_jwt(jwt_token: str, installation_id: int) -> str:
headers = {
"Authorization": f"Bearer {jwt_token}",
"Accept": "application/vnd.github.v3+json",
}
response = requests.post(
f"https://api.github.com/app/installations/{installation_id}/access_tokens",
headers=headers,
)
response.raise_for_status()
data = response.json()
return data["token"] # type: ignore
def get_token_from_aws() -> str:
private_key, app_id = get_key_and_app_from_aws()
return get_access_token_by_key_app(private_key, app_id)
def get_access_token_by_key_app(private_key: str, app_id: int) -> str:
payload = {
"iat": int(time.time()) - 60,
"exp": int(time.time()) + (10 * 60),
"iss": app_id,
}
# FIXME: apparently should be switched to this so that mypy is happy
# jwt_instance = JWT()
# encoded_jwt = jwt_instance.encode(payload, private_key, algorithm="RS256")
encoded_jwt = jwt.encode(payload, private_key, algorithm="RS256") # type: ignore
installation_id = get_installation_id(encoded_jwt)
return get_access_token_by_jwt(encoded_jwt, installation_id)
@dataclass
class CachedToken:
time: float
value: str
updating: bool = False
_cached_token = CachedToken(0, "")
def get_cached_access_token() -> str:
if time.time() - 550 < _cached_token.time or _cached_token.updating:
return _cached_token.value
# Indicate that the value is updating now, so the cached value can be
# used. The first setting and close-to-ttl are not counted as update
_cached_token.updating = cached_value_is_valid(_cached_token.time, 590)
private_key, app_id = get_key_and_app_from_aws()
_cached_token.time = time.time()
_cached_token.value = get_access_token_by_key_app(private_key, app_id)
_cached_token.updating = False
return _cached_token.value

View File

@ -1,24 +0,0 @@
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"
[project]
name = "lambda_shared"
version = "0.0.1"
dependencies = [
"requests",
"urllib3 < 2"
]
[project.optional-dependencies]
token = [
"PyJWT",
"cryptography",
]
dev = [
"boto3",
"lambda_shared[token]",
]
[tool.distutils.bdist_wheel]
universal = true

View File

@ -1,8 +0,0 @@
### This file exists for clear builds in docker ###
# without it the `build` directory wouldn't be #
# updated on the fly and will require manual clean #
[build]
build_base = /tmp/lambda_shared
[egg_info]
egg_base = /tmp/

View File

@ -15,7 +15,7 @@ from env_helper import (
GITHUB_SERVER_URL,
GITHUB_UPSTREAM_REPOSITORY,
)
from lambda_shared_package.lambda_shared.pr import Labels
from ci_config import Labels
from get_robot_token import get_best_robot_token
from github_helper import GitHub

View File

@ -25,7 +25,7 @@ from contextlib import contextmanager
from typing import Any, Final, Iterator, List, Optional, Tuple
from git_helper import Git, commit, release_branch
from lambda_shared_package.lambda_shared.pr import Labels
from ci_config import Labels
from report import SUCCESS
from version_helper import (
FILE_WITH_VERSION_PATH,

View File

@ -15,26 +15,22 @@ from commit_status_helper import (
)
from env_helper import GITHUB_REPOSITORY, GITHUB_SERVER_URL
from get_robot_token import get_best_robot_token
from lambda_shared_package.lambda_shared.pr import (
CATEGORY_TO_LABEL,
TRUSTED_CONTRIBUTORS,
Labels,
check_pr_description,
)
from ci_config import CI
from ci_utils import Utils
from pr_info import PRInfo
from report import FAILURE, PENDING, SUCCESS, StatusType
from ci_config import CI
TRUSTED_ORG_IDS = {
54801242, # clickhouse
}
OK_SKIP_LABELS = {Labels.RELEASE, Labels.PR_BACKPORT, Labels.PR_CHERRYPICK}
OK_SKIP_LABELS = {CI.Labels.RELEASE, CI.Labels.PR_BACKPORT, CI.Labels.PR_CHERRYPICK}
PR_CHECK = "PR Check"
def pr_is_by_trusted_user(pr_user_login, pr_user_orgs):
if pr_user_login.lower() in TRUSTED_CONTRIBUTORS:
if pr_user_login.lower() in CI.TRUSTED_CONTRIBUTORS:
logging.info("User '%s' is trusted", pr_user_login)
return True
@ -63,13 +59,13 @@ def should_run_ci_for_pr(pr_info: PRInfo) -> Tuple[bool, str]:
if OK_SKIP_LABELS.intersection(pr_info.labels):
return True, "Don't try new checks for release/backports/cherry-picks"
if Labels.CAN_BE_TESTED not in pr_info.labels and not pr_is_by_trusted_user(
if CI.Labels.CAN_BE_TESTED not in pr_info.labels and not pr_is_by_trusted_user(
pr_info.user_login, pr_info.user_orgs
):
logging.info(
"PRs by untrusted users need the '%s' label - "
"please contact a member of the core team",
Labels.CAN_BE_TESTED,
CI.Labels.CAN_BE_TESTED,
)
return False, "Needs 'can be tested' label"
@ -96,30 +92,32 @@ def main():
commit = get_commit(gh, pr_info.sha)
status = SUCCESS # type: StatusType
description_error, category = check_pr_description(pr_info.body, GITHUB_REPOSITORY)
description_error, category = Utils.check_pr_description(
pr_info.body, GITHUB_REPOSITORY
)
pr_labels_to_add = []
pr_labels_to_remove = []
if (
category in CATEGORY_TO_LABEL
and CATEGORY_TO_LABEL[category] not in pr_info.labels
category in CI.CATEGORY_TO_LABEL
and CI.CATEGORY_TO_LABEL[category] not in pr_info.labels
):
pr_labels_to_add.append(CATEGORY_TO_LABEL[category])
pr_labels_to_add.append(CI.CATEGORY_TO_LABEL[category])
for label in pr_info.labels:
if (
label in CATEGORY_TO_LABEL.values()
and category in CATEGORY_TO_LABEL
and label != CATEGORY_TO_LABEL[category]
label in CI.CATEGORY_TO_LABEL.values()
and category in CI.CATEGORY_TO_LABEL
and label != CI.CATEGORY_TO_LABEL[category]
):
pr_labels_to_remove.append(label)
if pr_info.has_changes_in_submodules():
pr_labels_to_add.append(Labels.SUBMODULE_CHANGED)
elif Labels.SUBMODULE_CHANGED in pr_info.labels:
pr_labels_to_remove.append(Labels.SUBMODULE_CHANGED)
pr_labels_to_add.append(CI.Labels.SUBMODULE_CHANGED)
elif CI.Labels.SUBMODULE_CHANGED in pr_info.labels:
pr_labels_to_remove.append(CI.Labels.SUBMODULE_CHANGED)
if any(label in Labels.AUTO_BACKPORT for label in pr_labels_to_add):
backport_labels = [Labels.MUST_BACKPORT, Labels.MUST_BACKPORT_CLOUD]
if any(label in CI.Labels.AUTO_BACKPORT for label in pr_labels_to_add):
backport_labels = [CI.Labels.MUST_BACKPORT, CI.Labels.MUST_BACKPORT_CLOUD]
pr_labels_to_add += [
label for label in backport_labels if label not in pr_info.labels
]
@ -164,15 +162,15 @@ def main():
# 2. Then we check if the documentation is not created to fail the Mergeable check
if (
Labels.PR_FEATURE in pr_info.labels
CI.Labels.PR_FEATURE in pr_info.labels
and not pr_info.has_changes_in_documentation()
):
print(
f"::error ::The '{Labels.PR_FEATURE}' in the labels, "
f"::error ::The '{CI.Labels.PR_FEATURE}' in the labels, "
"but there's no changed documentation"
)
status = FAILURE
description = f"expect adding docs for {Labels.PR_FEATURE}"
description = f"expect adding docs for {CI.Labels.PR_FEATURE}"
# 3. But we allow the workflow to continue
# 4. And post only a single commit status on a failure

View File

@ -1,93 +0,0 @@
#!/usr/bin/env python3
import argparse
import sys
import boto3 # type: ignore
import requests
from lambda_shared.token import get_access_token_by_key_app, get_cached_access_token
def get_runner_registration_token(access_token):
headers = {
"Authorization": f"token {access_token}",
"Accept": "application/vnd.github.v3+json",
}
response = requests.post(
"https://api.github.com/orgs/ClickHouse/actions/runners/registration-token",
headers=headers,
timeout=30,
)
response.raise_for_status()
data = response.json()
return data["token"]
def main(access_token, push_to_ssm, ssm_parameter_name):
runner_registration_token = get_runner_registration_token(access_token)
if push_to_ssm:
print("Trying to put params into ssm manager")
client = boto3.client("ssm")
client.put_parameter(
Name=ssm_parameter_name,
Value=runner_registration_token,
Type="SecureString",
Overwrite=True,
)
else:
print(
"Not push token to AWS Parameter Store, just print:",
runner_registration_token,
)
def handler(event, context):
_, _ = event, context
main(get_cached_access_token(), True, "github_runner_registration_token")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Get new token from github to add runners"
)
parser.add_argument(
"-p", "--private-key-path", help="Path to file with private key"
)
parser.add_argument("-k", "--private-key", help="Private key")
parser.add_argument(
"-a", "--app-id", type=int, help="GitHub application ID", required=True
)
parser.add_argument(
"--push-to-ssm",
action="store_true",
help="Store received token in parameter store",
)
parser.add_argument(
"--ssm-parameter-name",
default="github_runner_registration_token",
help="AWS paramater store parameter name",
)
args = parser.parse_args()
if not args.private_key_path and not args.private_key:
print(
"Either --private-key-path or --private-key must be specified",
file=sys.stderr,
)
if args.private_key_path and args.private_key:
print(
"Either --private-key-path or --private-key must be specified",
file=sys.stderr,
)
if args.private_key:
private_key = args.private_key
else:
with open(args.private_key_path, "r", encoding="utf-8") as key_file:
private_key = key_file.read()
token = get_access_token_by_key_app(private_key, args.app_id)
main(token, args.push_to_ssm, args.ssm_parameter_name)

View File

@ -1 +0,0 @@
../team_keys_lambda/build_and_deploy_archive.sh

View File

@ -1 +0,0 @@
../lambda_shared_package/lambda_shared

View File

@ -1 +0,0 @@
../lambda_shared_package[token]

View File

@ -1,323 +0,0 @@
#!/usr/bin/env python3
"""
A trivial stateless slack bot that notifies about new broken tests in ClickHouse CI.
It checks what happened to our CI during the last check_period hours (1 hour) and
notifies us in slack if necessary.
This script should be executed once each check_period hours (1 hour).
It will post duplicate messages if you run it more often; it will lose some messages
if you run it less often.
You can run it locally with no arguments, it will work in a dry-run mode.
Or you can set your own SLACK_URL_DEFAULT.
Feel free to add more checks, more details to messages, or better heuristics.
It's deployed to slack-bot-ci-lambda in CI/CD account
See also: https://aretestsgreenyet.com/
"""
import base64
import json
import os
import random
import requests
DRY_RUN_MARK = "<no url, dry run>"
MAX_FAILURES_DEFAULT = 30
SLACK_URL_DEFAULT = DRY_RUN_MARK
FLAKY_ALERT_PROBABILITY = 0.50
REPORT_NO_FAILURES_PROBABILITY = 0.99
MAX_TESTS_TO_REPORT = 4
# Slack has a stupid limitation on message size, it splits long messages into multiple,
# ones breaking formatting
MESSAGE_LENGTH_LIMIT = 4000
# Find tests that failed in master during the last check_period * 24 hours,
# but did not fail during the last 2 weeks. Assuming these tests were broken recently.
# Counts number of failures in check_period and check_period * 24 time windows
# to distinguish rare flaky tests from completely broken tests
NEW_BROKEN_TESTS_QUERY = """
WITH
1 AS check_period,
check_period * 24 AS extended_check_period,
now() as now
SELECT
test_name,
any(report_url),
countIf((check_start_time + check_duration_ms / 1000) < now - INTERVAL check_period HOUR) AS count_prev_periods,
countIf((check_start_time + check_duration_ms / 1000) >= now - INTERVAL check_period HOUR) AS count
FROM checks
WHERE 1
AND check_start_time BETWEEN now - INTERVAL 1 WEEK AND now
AND (check_start_time + check_duration_ms / 1000) >= now - INTERVAL extended_check_period HOUR
AND pull_request_number = 0
AND test_status LIKE 'F%'
AND check_status != 'success'
AND test_name NOT IN (
SELECT test_name FROM checks WHERE 1
AND check_start_time >= now - INTERVAL 1 MONTH
AND (check_start_time + check_duration_ms / 1000) BETWEEN now - INTERVAL 2 WEEK AND now - INTERVAL extended_check_period HOUR
AND pull_request_number = 0
AND check_status != 'success'
AND test_status LIKE 'F%')
AND test_context_raw NOT LIKE '%CannotSendRequest%' and test_context_raw NOT LIKE '%Server does not respond to health check%'
GROUP BY test_name
ORDER BY (count_prev_periods + count) DESC
"""
# Returns total number of failed checks during the last 24 hours
# and previous value of that metric (check_period hours ago)
COUNT_FAILURES_QUERY = """
WITH
1 AS check_period,
'%' AS check_name_pattern,
now() as now
SELECT
countIf((check_start_time + check_duration_ms / 1000) >= now - INTERVAL 24 HOUR) AS new_val,
countIf((check_start_time + check_duration_ms / 1000) <= now - INTERVAL check_period HOUR) AS prev_val
FROM checks
WHERE 1
AND check_start_time >= now - INTERVAL 1 WEEK
AND (check_start_time + check_duration_ms / 1000) >= now - INTERVAL 24 + check_period HOUR
AND pull_request_number = 0
AND test_status LIKE 'F%'
AND check_status != 'success'
AND check_name ILIKE check_name_pattern
"""
# Returns percentage of failed checks (once per day, at noon)
FAILED_CHECKS_PERCENTAGE_QUERY = """
SELECT if(toHour(now('Europe/Amsterdam')) = 12, v, 0)
FROM
(
SELECT
countDistinctIf((commit_sha, check_name), (test_status LIKE 'F%') AND (check_status != 'success'))
/ countDistinct((commit_sha, check_name)) AS v
FROM checks
WHERE 1
AND (pull_request_number = 0)
AND (test_status != 'SKIPPED')
AND (check_start_time > (now() - toIntervalDay(1)))
)
"""
# It shows all recent failures of the specified test (helps to find when it started)
ALL_RECENT_FAILURES_QUERY = """
WITH
'{}' AS name_substr,
90 AS interval_days,
('Stateless tests (asan)', 'Stateless tests (address)', 'Stateless tests (address, actions)', 'Integration tests (asan) [1/3]', 'Stateless tests (tsan) [1/3]') AS backport_and_release_specific_checks
SELECT
toStartOfDay(check_start_time) AS d,
count(),
groupUniqArray(pull_request_number) AS prs,
any(report_url)
FROM checks
WHERE ((now() - toIntervalDay(interval_days)) <= check_start_time) AND (pull_request_number NOT IN (
SELECT pull_request_number AS prn
FROM checks
WHERE (prn != 0) AND ((now() - toIntervalDay(interval_days)) <= check_start_time) AND (check_name IN (backport_and_release_specific_checks))
)) AND (position(test_name, name_substr) > 0) AND (test_status IN ('FAIL', 'ERROR', 'FLAKY'))
GROUP BY d
ORDER BY d DESC
"""
SLACK_MESSAGE_JSON = {"type": "mrkdwn", "text": None}
def get_play_url(query):
return (
"https://play.clickhouse.com/play?user=play#"
+ base64.b64encode(query.encode()).decode()
)
def run_clickhouse_query(query):
url = "https://play.clickhouse.com/?user=play&query=" + requests.compat.quote(query)
res = requests.get(url, timeout=30)
if res.status_code != 200:
print("Failed to execute query: ", res.status_code, res.content)
res.raise_for_status()
lines = res.text.strip().splitlines()
return [x.split("\t") for x in lines]
def split_broken_and_flaky_tests(failed_tests):
if not failed_tests:
return None
broken_tests = []
flaky_tests = []
for name, report, count_prev_str, count_str in failed_tests:
count_prev, count = int(count_prev_str), int(count_str)
if (count_prev < 2 <= count) or (count_prev == count == 1):
# It failed 2 times or more within extended time window, it's definitely broken.
# 2 <= count means that it was not reported as broken on previous runs
broken_tests.append([name, report])
elif 0 < count and count_prev == 0:
# It failed only once, can be a rare flaky test
flaky_tests.append([name, report])
return broken_tests, flaky_tests
def format_failed_tests_list(failed_tests, failure_type):
if len(failed_tests) == 1:
res = f"There is a new {failure_type} test:\n"
else:
res = f"There are {len(failed_tests)} new {failure_type} tests:\n"
for name, report in failed_tests[:MAX_TESTS_TO_REPORT]:
cidb_url = get_play_url(ALL_RECENT_FAILURES_QUERY.format(name))
res += f"- *{name}* - <{report}|Report> - <{cidb_url}|CI DB> \n"
if MAX_TESTS_TO_REPORT < len(failed_tests):
res += (
f"- and {len(failed_tests) - MAX_TESTS_TO_REPORT} other "
"tests... :this-is-fine-fire:"
)
return res
def get_new_broken_tests_message(failed_tests):
if not failed_tests:
return None
broken_tests, flaky_tests = split_broken_and_flaky_tests(failed_tests)
if len(broken_tests) == 0 and len(flaky_tests) == 0:
return None
msg = ""
if len(broken_tests) > 0:
msg += format_failed_tests_list(broken_tests, "*BROKEN*")
elif random.random() > FLAKY_ALERT_PROBABILITY:
looks_like_fuzzer = [x[0].count(" ") > 2 for x in flaky_tests]
if not any(looks_like_fuzzer):
print("Will not report flaky tests to avoid noise: ", flaky_tests)
return None
if len(flaky_tests) > 0:
if len(msg) > 0:
msg += "\n"
msg += format_failed_tests_list(flaky_tests, "flaky")
return msg
def get_too_many_failures_message_impl(failures_count):
MAX_FAILURES = int(os.environ.get("MAX_FAILURES", MAX_FAILURES_DEFAULT))
curr_failures = int(failures_count[0][0])
prev_failures = int(failures_count[0][1])
if curr_failures == 0 and prev_failures != 0:
if random.random() < REPORT_NO_FAILURES_PROBABILITY:
return None
return "Wow, there are *no failures* at all... 0_o"
return_none = (
curr_failures < MAX_FAILURES
or curr_failures < prev_failures
or (curr_failures - prev_failures) / prev_failures < 0.2
)
if return_none:
return None
if prev_failures < MAX_FAILURES:
return f":alert: *CI is broken: there are {curr_failures} failures during the last 24 hours*"
return "CI is broken and it's getting worse: there are {curr_failures} failures during the last 24 hours"
def get_too_many_failures_message(failures_count):
msg = get_too_many_failures_message_impl(failures_count)
if msg:
msg += "\nSee https://aretestsgreenyet.com/"
return msg
def get_failed_checks_percentage_message(percentage):
p = float(percentage[0][0]) * 100
# Always report more than 1% of failed checks
# For <= 1%: higher percentage of failures == higher probability
if p <= random.random():
return None
msg = ":alert: " if p > 1 else "Only " if p < 0.5 else ""
msg += f"*{p:.2f}%* of all checks in master have failed yesterday"
return msg
def split_slack_message(long_message):
lines = long_message.split("\n")
messages = []
curr_msg = ""
for line in lines:
if len(curr_msg) + len(line) < MESSAGE_LENGTH_LIMIT:
curr_msg += "\n"
curr_msg += line
else:
messages.append(curr_msg)
curr_msg = line
messages.append(curr_msg)
return messages
def send_to_slack_impl(message):
SLACK_URL = os.environ.get("SLACK_URL", SLACK_URL_DEFAULT)
if SLACK_URL == DRY_RUN_MARK:
return
payload = SLACK_MESSAGE_JSON.copy()
payload["text"] = message
res = requests.post(SLACK_URL, json.dumps(payload), timeout=30)
if res.status_code != 200:
print("Failed to send a message to Slack: ", res.status_code, res.content)
res.raise_for_status()
def send_to_slack(message):
messages = split_slack_message(message)
for msg in messages:
send_to_slack_impl(msg)
def query_and_alert_if_needed(query, get_message_func):
query_res = run_clickhouse_query(query)
print("Got result {} for query {}", query_res, query)
msg = get_message_func(query_res)
if msg is None:
return
msg += f"\nCI DB query: <{get_play_url(query)}|link>"
print("Sending message to slack:", msg)
send_to_slack(msg)
def check_and_alert():
query_and_alert_if_needed(NEW_BROKEN_TESTS_QUERY, get_new_broken_tests_message)
query_and_alert_if_needed(COUNT_FAILURES_QUERY, get_too_many_failures_message)
query_and_alert_if_needed(
FAILED_CHECKS_PERCENTAGE_QUERY, get_failed_checks_percentage_message
)
def handler(event, context):
_, _ = event, context
try:
check_and_alert()
return {"statusCode": 200, "body": "OK"}
except Exception as e:
send_to_slack(
"I failed, please help me "
f"(see ClickHouse/ClickHouse/tests/ci/slack_bot_ci_lambda/app.py): {e}"
)
return {"statusCode": 200, "body": "FAIL"}
if __name__ == "__main__":
check_and_alert()

View File

@ -1 +0,0 @@
../team_keys_lambda/build_and_deploy_archive.sh

View File

@ -1 +0,0 @@
../lambda_shared_package

View File

@ -1,136 +0,0 @@
#!/usr/bin/env python3
import argparse
import json
from datetime import datetime
from queue import Queue
from threading import Thread
import boto3 # type: ignore
import requests
class Keys(set):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.updated_at = 0.0
def update_now(self):
self.updated_at = datetime.now().timestamp()
keys = Keys()
class Worker(Thread):
def __init__(self, request_queue):
Thread.__init__(self)
self.queue = request_queue
self.results = set()
def run(self):
while True:
m = self.queue.get()
if m == "":
break
response = requests.get(f"https://github.com/{m}.keys", timeout=30)
self.results.add(f"# {m}\n{response.text}\n")
self.queue.task_done()
def get_org_team_members(token: str, org: str, team_slug: str) -> set:
headers = {
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json",
}
response = requests.get(
f"https://api.github.com/orgs/{org}/teams/{team_slug}/members",
headers=headers,
timeout=30,
)
response.raise_for_status()
data = response.json()
return set(m["login"] for m in data)
def get_cached_members_keys(members: set) -> Keys:
if (datetime.now().timestamp() - 3600) <= keys.updated_at:
return keys
q = Queue() # type: Queue
workers = []
for m in members:
q.put(m)
# Create workers and add to the queue
worker = Worker(q)
worker.start()
workers.append(worker)
# Workers keep working till they receive an empty string
for _ in workers:
q.put("")
# Join workers to wait till they finished
for worker in workers:
worker.join()
keys.clear()
for worker in workers:
keys.update(worker.results)
keys.update_now()
return keys
def get_token_from_aws() -> str:
# We need a separate token, since the clickhouse-ci app does not have
# access to the organization members' endpoint
secret_name = "clickhouse_robot_token"
session = boto3.session.Session()
client = session.client(
service_name="secretsmanager",
)
get_secret_value_response = client.get_secret_value(SecretId=secret_name)
data = json.loads(get_secret_value_response["SecretString"])
return data["clickhouse_robot_token"] # type: ignore
def main(token: str, org: str, team_slug: str) -> str:
members = get_org_team_members(token, org, team_slug)
keys = get_cached_members_keys(members)
return "".join(sorted(keys))
def handler(event, context):
_ = context
_ = event
if keys.updated_at < (datetime.now().timestamp() - 3600):
token = get_token_from_aws()
body = main(token, "ClickHouse", "core")
else:
body = "".join(sorted(keys))
result = {
"statusCode": 200,
"headers": {
"Content-Type": "text/html",
},
"body": body,
}
return result
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Get the public SSH keys for members of given org and team"
)
parser.add_argument("--token", required=True, help="Github PAT")
parser.add_argument(
"--organization", help="GitHub organization name", default="ClickHouse"
)
parser.add_argument("--team", help="GitHub team name", default="core")
args = parser.parse_args()
output = main(args.token, args.organization, args.team)
print(f"# Just showing off the keys:\n{output}")

View File

@ -1,76 +0,0 @@
#!/usr/bin/env bash
set -xeo pipefail
WORKDIR=$(dirname "$0")
WORKDIR=$(readlink -f "${WORKDIR}")
DIR_NAME=$(basename "$WORKDIR")
cd "$WORKDIR"
# Do not deploy the lambda to AWS
DRY_RUN=${DRY_RUN:-}
# Python runtime to install dependencies
PY_VERSION=${PY_VERSION:-3.10}
PY_EXEC="python${PY_VERSION}"
# Image to build the lambda zip package
DOCKER_IMAGE="public.ecr.aws/lambda/python:${PY_VERSION}"
# Rename the_lambda_name directory to the-lambda-name lambda in AWS
LAMBDA_NAME=${DIR_NAME//_/-}
# The name of directory with lambda code
PACKAGE=lambda-package
# Do not rebuild and deploy the archive if it's newer than sources
if [ -e "$PACKAGE.zip" ] && [ -z "$FORCE" ]; then
REBUILD=""
for src in app.py build_and_deploy_archive.sh requirements.txt lambda_shared/*; do
if [ "$src" -nt "$PACKAGE.zip" ]; then
REBUILD=1
fi
done
[ -n "$REBUILD" ] || exit 0
fi
docker_cmd=(
docker run -i --net=host --rm --user="${UID}" -e HOME=/tmp --entrypoint=/bin/bash
--volume="${WORKDIR}/..:/ci" --workdir="/ci/${DIR_NAME}" "${DOCKER_IMAGE}"
)
rm -rf "$PACKAGE" "$PACKAGE".zip
mkdir "$PACKAGE"
cp app.py "$PACKAGE"
if [ -f requirements.txt ]; then
VENV=lambda-venv
rm -rf "$VENV"
"${docker_cmd[@]}" -ex <<EOF
'$PY_EXEC' -m venv '$VENV' &&
source '$VENV/bin/activate' &&
pip install -r requirements.txt &&
# To have consistent pyc files
find '$VENV/lib' -name '*.pyc' -delete
cp -rT '$VENV/lib/$PY_EXEC/site-packages/' '$PACKAGE'
rm -r '$PACKAGE'/{pip,pip-*,setuptools,setuptools-*}
chmod 0777 -R '$PACKAGE'
EOF
fi
# Create zip archive via python zipfile to have it cross-platform
"${docker_cmd[@]}" -ex <<EOF
cd '$PACKAGE'
find ! -type d -exec touch -t 201212121212 {} +
python <<'EOP'
import zipfile
import os
files_path = []
for root, _, files in os.walk('.'):
files_path.extend(os.path.join(root, file) for file in files)
# persistent file order
files_path.sort()
with zipfile.ZipFile('../$PACKAGE.zip', 'w') as zf:
for file in files_path:
zf.write(file)
EOP
EOF
ECHO=()
if [ -n "$DRY_RUN" ]; then
ECHO=(echo Run the following command to push the changes:)
fi
"${ECHO[@]}" aws lambda update-function-code --function-name "$LAMBDA_NAME" --zip-file fileb://"$WORKDIR/$PACKAGE".zip

View File

@ -1 +0,0 @@
../lambda_shared_package/lambda_shared

View File

@ -1 +0,0 @@
../lambda_shared_package

View File

@ -1,278 +0,0 @@
#!/usr/bin/env python3
import argparse
import json
import sys
import time
from dataclasses import dataclass
from typing import Any, Dict, List
import boto3 # type: ignore
from lambda_shared import RunnerDescriptions, cached_value_is_valid, list_runners
from lambda_shared.token import get_access_token_by_key_app, get_cached_access_token
@dataclass
class CachedInstances:
time: float
value: dict
updating: bool = False
cached_instances = CachedInstances(0, {})
def get_cached_instances() -> dict:
"""return cached instances description with updating it once per five minutes"""
if time.time() - 250 < cached_instances.time or cached_instances.updating:
return cached_instances.value
cached_instances.updating = cached_value_is_valid(cached_instances.time, 300)
ec2_client = boto3.client("ec2")
instances_response = ec2_client.describe_instances(
Filters=[{"Name": "instance-state-name", "Values": ["running"]}]
)
cached_instances.time = time.time()
cached_instances.value = {
instance["InstanceId"]: instance
for reservation in instances_response["Reservations"]
for instance in reservation["Instances"]
}
cached_instances.updating = False
return cached_instances.value
@dataclass
class CachedRunners:
time: float
value: RunnerDescriptions
updating: bool = False
cached_runners = CachedRunners(0, [])
def get_cached_runners(access_token: str) -> RunnerDescriptions:
"""From time to time request to GH api costs up to 3 seconds, and
it's a disaster from the termination lambda perspective"""
if time.time() - 5 < cached_runners.time or cached_instances.updating:
return cached_runners.value
cached_runners.updating = cached_value_is_valid(cached_runners.time, 15)
cached_runners.value = list_runners(access_token)
cached_runners.time = time.time()
cached_runners.updating = False
return cached_runners.value
def how_many_instances_to_kill(event_data: dict) -> Dict[str, int]:
data_array = event_data["CapacityToTerminate"]
to_kill_by_zone = {} # type: Dict[str, int]
for av_zone in data_array:
zone_name = av_zone["AvailabilityZone"]
to_kill = av_zone["Capacity"]
if zone_name not in to_kill_by_zone:
to_kill_by_zone[zone_name] = 0
to_kill_by_zone[zone_name] += to_kill
return to_kill_by_zone
def get_candidates_to_be_killed(event_data: dict) -> Dict[str, List[str]]:
data_array = event_data["Instances"]
instances_by_zone = {} # type: Dict[str, List[str]]
for instance in data_array:
zone_name = instance["AvailabilityZone"]
instance_id = instance["InstanceId"] # type: str
if zone_name not in instances_by_zone:
instances_by_zone[zone_name] = []
instances_by_zone[zone_name].append(instance_id)
return instances_by_zone
def main(access_token: str, event: dict) -> Dict[str, List[str]]:
start = time.time()
print("Got event", json.dumps(event, sort_keys=True).replace("\n", ""))
to_kill_by_zone = how_many_instances_to_kill(event)
instances_by_zone = get_candidates_to_be_killed(event)
# Getting ASG and instances' descriptions from the API
# We don't kill instances that alive for less than 10 minutes, since they
# could be not in the GH active runners yet
print(f"Check other hosts from the same ASG {event['AutoScalingGroupName']}")
asg_client = boto3.client("autoscaling")
as_groups_response = asg_client.describe_auto_scaling_groups(
AutoScalingGroupNames=[event["AutoScalingGroupName"]]
)
assert len(as_groups_response["AutoScalingGroups"]) == 1
asg = as_groups_response["AutoScalingGroups"][0]
asg_instance_ids = [instance["InstanceId"] for instance in asg["Instances"]]
instance_descriptions = get_cached_instances()
# The instances launched less than 10 minutes ago
immune_ids = [
instance["InstanceId"]
for instance in instance_descriptions.values()
if start - instance["LaunchTime"].timestamp() < 600
]
# if the ASG's instance ID not in instance_descriptions, it's most probably
# is not cached yet, so we must mark it as immuned
immune_ids.extend(
iid for iid in asg_instance_ids if iid not in instance_descriptions
)
print("Time spent on the requests to AWS: ", time.time() - start)
runners = get_cached_runners(access_token)
runner_ids = set(runner.name for runner in runners)
# We used to delete potential hosts to terminate from GitHub runners pool,
# but the documentation states:
# --- Returning an instance first in the response data does not guarantee its termination
# so they will be cleaned out by ci_runners_metrics_lambda eventually
instances_to_kill = []
total_to_kill = 0
for zone, num_to_kill in to_kill_by_zone.items():
candidates = instances_by_zone[zone]
total_to_kill += num_to_kill
if num_to_kill > len(candidates):
raise RuntimeError(
f"Required to kill {num_to_kill}, but have only {len(candidates)}"
f" candidates in AV {zone}"
)
delete_for_av = [] # type: RunnerDescriptions
for candidate in candidates:
if candidate in immune_ids:
print(
f"Candidate {candidate} started less than 10 minutes ago, won't touch a child"
)
break
if candidate not in runner_ids:
print(
f"Candidate {candidate} was not in runners list, simply delete it"
)
instances_to_kill.append(candidate)
break
if len(delete_for_av) + len(instances_to_kill) == num_to_kill:
break
if candidate in instances_to_kill:
continue
for runner in runners:
if runner.name == candidate:
if not runner.busy:
print(
f"Runner {runner.name} is not busy and can be deleted from AV {zone}"
)
delete_for_av.append(runner)
else:
print(f"Runner {runner.name} is busy, not going to delete it")
break
if len(delete_for_av) < num_to_kill:
print(
f"Checked all candidates for av {zone}, get to delete "
f"{len(delete_for_av)}, but still cannot get required {num_to_kill}"
)
instances_to_kill += [runner.name for runner in delete_for_av]
if len(instances_to_kill) < total_to_kill:
for instance in asg_instance_ids:
if instance in immune_ids:
continue
for runner in runners:
if runner.name == instance and not runner.busy:
print(f"Runner {runner.name} is not busy and can be deleted")
instances_to_kill.append(runner.name)
if total_to_kill <= len(instances_to_kill):
print("Got enough instances to kill")
break
response = {"InstanceIDs": instances_to_kill}
print("Got instances to kill: ", response)
print("Time spent on the request: ", time.time() - start)
return response
def handler(event: dict, context: Any) -> Dict[str, List[str]]:
_ = context
return main(get_cached_access_token(), event)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Get list of runners and their states")
parser.add_argument(
"-p", "--private-key-path", help="Path to file with private key"
)
parser.add_argument("-k", "--private-key", help="Private key")
parser.add_argument(
"-a", "--app-id", type=int, help="GitHub application ID", required=True
)
args = parser.parse_args()
if not args.private_key_path and not args.private_key:
print(
"Either --private-key-path or --private-key must be specified",
file=sys.stderr,
)
if args.private_key_path and args.private_key:
print(
"Either --private-key-path or --private-key must be specified",
file=sys.stderr,
)
if args.private_key:
private_key = args.private_key
else:
with open(args.private_key_path, "r", encoding="utf-8") as key_file:
private_key = key_file.read()
token = get_access_token_by_key_app(private_key, args.app_id)
sample_event = {
"AutoScalingGroupARN": "arn:aws:autoscaling:us-east-1:<account-id>:autoScalingGroup:d4738357-2d40-4038-ae7e-b00ae0227003:autoScalingGroupName/my-asg",
"AutoScalingGroupName": "my-asg",
"CapacityToTerminate": [
{
"AvailabilityZone": "us-east-1b",
"Capacity": 1,
"InstanceMarketOption": "OnDemand",
},
{
"AvailabilityZone": "us-east-1c",
"Capacity": 2,
"InstanceMarketOption": "OnDemand",
},
],
"Instances": [
{
"AvailabilityZone": "us-east-1b",
"InstanceId": "i-08d0b3c1a137e02a5",
"InstanceType": "t2.nano",
"InstanceMarketOption": "OnDemand",
},
{
"AvailabilityZone": "us-east-1c",
"InstanceId": "ip-172-31-45-253.eu-west-1.compute.internal",
"InstanceType": "t2.nano",
"InstanceMarketOption": "OnDemand",
},
{
"AvailabilityZone": "us-east-1c",
"InstanceId": "ip-172-31-27-227.eu-west-1.compute.internal",
"InstanceType": "t2.nano",
"InstanceMarketOption": "OnDemand",
},
{
"AvailabilityZone": "us-east-1c",
"InstanceId": "ip-172-31-45-253.eu-west-1.compute.internal",
"InstanceType": "t2.nano",
"InstanceMarketOption": "OnDemand",
},
],
"Cause": "SCALE_IN",
}
main(token, sample_event)

View File

@ -1 +0,0 @@
../team_keys_lambda/build_and_deploy_archive.sh

View File

@ -1 +0,0 @@
../lambda_shared_package/lambda_shared

View File

@ -1 +0,0 @@
../lambda_shared_package[token]

View File

@ -1,404 +0,0 @@
#!/usr/bin/env python3
import fnmatch
import json
import time
from collections import namedtuple
from urllib.parse import quote
import requests
from lambda_shared.pr import TRUSTED_CONTRIBUTORS
from lambda_shared.token import get_cached_access_token
SUSPICIOUS_CHANGED_FILES_NUMBER = 200
SUSPICIOUS_PATTERNS = [
".github/*",
"docker/*",
"docs/tools/*",
"packages/*",
"tests/ci/*",
]
# Number of retries for API calls.
MAX_RETRY = 5
# Number of times a check can re-run as a whole.
# It is needed, because we are using AWS "spot" instances, that are terminated often
MAX_WORKFLOW_RERUN = 30
WorkflowDescription = namedtuple(
"WorkflowDescription",
[
"name",
"action",
"run_id",
"event",
"workflow_id",
"conclusion",
"status",
"api_url",
"fork_owner_login",
"fork_branch",
"rerun_url",
"jobs_url",
"attempt",
"repo_url",
"url",
],
)
# See https://api.github.com/orgs/{name}
TRUSTED_ORG_IDS = {
54801242, # clickhouse
}
# See https://api.github.com/repos/ClickHouse/ClickHouse/actions/workflows
# Use ID to not inject a malicious workflow
TRUSTED_WORKFLOW_IDS = {
14586616, # Cancel workflows, always trusted
}
NEED_RERUN_WORKFLOWS = {
"BackportPR",
"DocsCheck",
"MasterCI",
"NightlyBuilds",
"PublishedReleaseCI",
"PullRequestCI",
"ReleaseBranchCI",
}
def is_trusted_contributor(pr_user_login, pr_user_orgs):
if pr_user_login.lower() in TRUSTED_CONTRIBUTORS:
print(f"User '{pr_user_login}' is trusted")
return True
print(f"User '{pr_user_login}' is not trusted")
for org_id in pr_user_orgs:
if org_id in TRUSTED_ORG_IDS:
print(
f"Org '{org_id}' is trusted; will mark user {pr_user_login} as trusted"
)
return True
print(f"Org '{org_id}' is not trusted")
return False
def _exec_get_with_retry(url, token):
headers = {"Authorization": f"token {token}"}
e = Exception()
for i in range(MAX_RETRY):
try:
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
return response.json()
except Exception as ex:
print("Got exception executing request", ex)
e = ex
time.sleep(i + 1)
raise requests.HTTPError("Cannot execute GET request with retries") from e
def _exec_post_with_retry(url, token, data=None):
headers = {"Authorization": f"token {token}"}
e = Exception()
for i in range(MAX_RETRY):
try:
if data:
response = requests.post(url, headers=headers, json=data, timeout=30)
else:
response = requests.post(url, headers=headers, timeout=30)
if response.status_code == 403:
data = response.json()
if (
"message" in data
and data["message"]
== "This workflow run is not waiting for approval"
):
print("Workflow doesn't need approval")
return data
response.raise_for_status()
return response.json()
except Exception as ex:
print("Got exception executing request", ex)
e = ex
time.sleep(i + 1)
raise requests.HTTPError("Cannot execute POST request with retry") from e
def _get_pull_requests_from(repo_url, owner, branch, token):
url = f"{repo_url}/pulls?head={quote(owner)}:{quote(branch)}"
return _exec_get_with_retry(url, token)
def get_workflow_description_from_event(event):
action = event["action"]
run_id = event["workflow_run"]["id"]
event_type = event["workflow_run"]["event"]
fork_owner = event["workflow_run"]["head_repository"]["owner"]["login"]
fork_branch = event["workflow_run"]["head_branch"]
name = event["workflow_run"]["name"]
workflow_id = event["workflow_run"]["workflow_id"]
conclusion = event["workflow_run"]["conclusion"]
attempt = event["workflow_run"]["run_attempt"]
status = event["workflow_run"]["status"]
jobs_url = event["workflow_run"]["jobs_url"]
rerun_url = event["workflow_run"]["rerun_url"]
url = event["workflow_run"]["html_url"]
api_url = event["workflow_run"]["url"]
repo_url = event["repository"]["url"]
return WorkflowDescription(
name=name,
action=action,
run_id=run_id,
event=event_type,
fork_owner_login=fork_owner,
fork_branch=fork_branch,
workflow_id=workflow_id,
conclusion=conclusion,
attempt=attempt,
status=status,
jobs_url=jobs_url,
rerun_url=rerun_url,
url=url,
repo_url=repo_url,
api_url=api_url,
)
def get_pr_author_and_orgs(pull_request, token):
author = pull_request["user"]["login"]
orgs = _exec_get_with_retry(pull_request["user"]["organizations_url"], token)
return author, [org["id"] for org in orgs]
def get_changed_files_for_pull_request(pull_request, token):
url = pull_request["url"]
changed_files = set([])
for i in range(1, 31):
print("Requesting changed files page", i)
data = _exec_get_with_retry(f"{url}/files?page={i}&per_page=100", token)
print(f"Got {len(data)} changed files")
if len(data) == 0:
print("No more changed files")
break
for change in data:
# print("Adding changed file", change['filename'])
changed_files.add(change["filename"])
if len(changed_files) >= SUSPICIOUS_CHANGED_FILES_NUMBER:
print(
f"More than {len(changed_files)} changed files. "
"Will stop fetching new files."
)
break
return changed_files
def check_suspicious_changed_files(changed_files):
if len(changed_files) >= SUSPICIOUS_CHANGED_FILES_NUMBER:
print(f"Too many files changed {len(changed_files)}, need manual approve")
return True
for path in changed_files:
for pattern in SUSPICIOUS_PATTERNS:
if fnmatch.fnmatch(path, pattern):
print(
f"File {path} match suspicious pattern {pattern}, "
"will not approve automatically"
)
return True
print("No changed files match suspicious patterns, run could be approved")
return False
def approve_run(workflow_description: WorkflowDescription, token: str) -> None:
print("Approving run")
url = f"{workflow_description.api_url}/approve"
_exec_post_with_retry(url, token)
def label_manual_approve(pull_request, token):
url = f"{pull_request['issue_url']}/labels"
data = {"labels": ["manual approve"]}
_exec_post_with_retry(url, token, data)
def get_workflow_jobs(workflow_description, token):
jobs_url = (
workflow_description.api_url + f"/attempts/{workflow_description.attempt}/jobs"
)
jobs = []
i = 1
while True:
got_jobs = _exec_get_with_retry(jobs_url + f"?page={i}", token)
if len(got_jobs["jobs"]) == 0:
break
jobs += got_jobs["jobs"]
i += 1
return jobs
def check_need_to_rerun(workflow_description, token):
if workflow_description.attempt >= MAX_WORKFLOW_RERUN:
print(
"Not going to rerun workflow because it's already tried more than two times"
)
return False
print("Going to check jobs")
jobs = get_workflow_jobs(workflow_description, token)
print("Got jobs", len(jobs))
for job in jobs:
print(f"Job {job['name']} has a conclusion '{job['conclusion']}'")
if job["conclusion"] not in ("success", "skipped"):
print("Job", job["name"], "failed, checking steps")
for step in job["steps"]:
# always the last job
if step["name"] == "Complete job":
print("Found Complete job step for job", job["name"])
break
else:
print(
"Checked all steps and doesn't found Complete job, going to rerun"
)
return True
return False
def rerun_workflow(workflow_description, token):
print("Going to rerun workflow")
try:
_exec_post_with_retry(f"{workflow_description.rerun_url}-failed-jobs", token)
except Exception:
_exec_post_with_retry(workflow_description.rerun_url, token)
def check_workflow_completed(
event_data: dict, workflow_description: WorkflowDescription, token: str
) -> bool:
if workflow_description.action == "completed":
attempt = 0
# Nice and reliable GH API sends from time to time such events, e.g:
# action='completed', conclusion=None, status='in_progress',
# So let's try receiving a real workflow data
while workflow_description.conclusion is None and attempt < MAX_RETRY:
progressive_sleep = 3 * sum(i + 1 for i in range(attempt))
time.sleep(progressive_sleep)
event_data["workflow_run"] = _exec_get_with_retry(
workflow_description.api_url, token
)
workflow_description = get_workflow_description_from_event(event_data)
attempt += 1
if workflow_description.conclusion != "failure":
print(
"Workflow finished with status "
f"{workflow_description.conclusion}, exiting"
)
return True
print(
"Workflow",
workflow_description.url,
"completed and failed, let's check for rerun",
)
if workflow_description.name not in NEED_RERUN_WORKFLOWS:
print(
"Workflow",
workflow_description.name,
"not in list of rerunable workflows",
)
return True
if check_need_to_rerun(workflow_description, token):
rerun_workflow(workflow_description, token)
return True
return False
def main(event):
token = get_cached_access_token()
event_data = json.loads(event["body"])
print("The body received:", event["body"])
workflow_description = get_workflow_description_from_event(event_data)
print("Got workflow description", workflow_description)
if check_workflow_completed(event_data, workflow_description, token):
return
if workflow_description.action != "requested":
print("Exiting, event action is", workflow_description.action)
return
if workflow_description.workflow_id in TRUSTED_WORKFLOW_IDS:
print("Workflow in trusted list, approving run")
approve_run(workflow_description, token)
return
pull_requests = _get_pull_requests_from(
workflow_description.repo_url,
workflow_description.fork_owner_login,
workflow_description.fork_branch,
token,
)
print("Got pull requests for workflow", len(pull_requests))
if len(pull_requests) != 1:
print(f"Can't continue with non-uniq PRs: {pull_requests}")
return
pull_request = pull_requests[0]
print("Pull request for workflow number", pull_request["number"])
author, author_orgs = get_pr_author_and_orgs(pull_request, token)
if is_trusted_contributor(author, author_orgs):
print("Contributor is trusted, approving run")
approve_run(workflow_description, token)
return
labels = {label["name"] for label in pull_request["labels"]}
if "can be tested" not in labels:
print("Label 'can be tested' is required for untrusted users")
return
changed_files = get_changed_files_for_pull_request(pull_request, token)
print(f"Totally have {len(changed_files)} changed files in PR:", changed_files)
if check_suspicious_changed_files(changed_files):
print(f"Pull Request {pull_request['number']} has suspicious changes")
if "manual approve" not in labels:
print("Label the PR as needed for manuall approve")
label_manual_approve(pull_request, token)
else:
print(f"Pull Request {pull_request['number']} has no suspicious changes")
approve_run(workflow_description, token)
def handler(event, _):
try:
main(event)
return {
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": '{"status": "OK"}',
}
except Exception:
print("Received event: ", event)
raise

View File

@ -1 +0,0 @@
../team_keys_lambda/build_and_deploy_archive.sh

View File

@ -1 +0,0 @@
../lambda_shared_package/lambda_shared

View File

@ -1 +0,0 @@
../lambda_shared_package[token]

View File

@ -1,202 +0,0 @@
#!/usr/bin/env python
"""
Lambda gets the workflow_job events, see
https://docs.github.com/en/developers/webhooks-and-events/webhooks/webhook-events-and-payloads#workflow_job
Then it either posts it as is to the play.clickhouse.com, or anonymizes the sensitive
fields for private repositories
"""
import json
import logging
from base64 import b64decode
from dataclasses import dataclass
from typing import Any, List, Optional
from lambda_shared import ClickHouseHelper, InsertException, get_parameter_from_ssm
logging.getLogger().setLevel(logging.INFO)
@dataclass
class WorkflowJob:
id: int
run_id: int
workflow_name: str
head_branch: str
run_url: str
run_attempt: int
node_id: str
head_sha: str
url: str
html_url: str
status: str
conclusion: str
started_at: str
completed_at: str
name: str
steps: int # just number of steps, we don't keep steps
check_run_url: str
labels: List[str]
runner_id: int
runner_name: str
runner_group_id: int
runner_group_name: str
repository: str
def anonimyze_url(self, url: str) -> str:
return url.replace(self.repository, "ANONYMIZED_REPO")
def anonimyze(self):
anm = "ANONYMIZED"
self.workflow_name = anm
self.head_branch = anm
self.run_url = self.anonimyze_url(self.run_url)
self.node_id = anm
self.url = self.anonimyze_url(self.url)
self.html_url = self.anonimyze_url(self.html_url)
self.name = anm
self.check_run_url = self.anonimyze_url(self.check_run_url)
self.repository = anm
def as_dict(self) -> dict:
return self.__dict__
CH_CLIENT = None # type: Optional[ClickHouseHelper]
def send_event_workflow_job(workflow_job: WorkflowJob) -> None:
# # SHOW CREATE TABLE default.workflow_jobs
# CREATE TABLE default.workflow_jobs UUID 'c0351924-8ccd-47a6-9db0-e28a9eee2fdf'
# (
# `id` UInt64,
# `run_id` UInt64,
# `workflow_name` LowCardinality(String),
# `head_branch` LowCardinality(String),
# `run_url` String,
# `run_attempt` UInt16,
# `node_id` String,
# `head_sha` String,
# `url` String,
# `html_url` String,
# `status` Enum8('waiting' = 1, 'queued' = 2, 'in_progress' = 3, 'completed' = 4),
# `conclusion` LowCardinality(String),
# `started_at` DateTime,
# `completed_at` DateTime,
# `name` LowCardinality(String),
# `steps` UInt16,
# `check_run_url` String,
# `labels` Array(LowCardinality(String)),
# `runner_id` UInt64,
# `runner_name` String,
# `runner_group_id` UInt64,
# `runner_group_name` LowCardinality(String),
# `repository` LowCardinality(String),
# `updated_at` DateTime DEFAULT now()
# )
# ENGINE = ReplicatedMergeTree('/clickhouse/tables/c0351924-8ccd-47a6-9db0-e28a9eee2fdf/{shard}', '{replica}')
# PARTITION BY toStartOfMonth(started_at)
# ORDER BY (id, updated_at)
# SETTINGS index_granularity = 8192
global CH_CLIENT
CH_CLIENT = CH_CLIENT or ClickHouseHelper(
get_parameter_from_ssm("clickhouse-test-stat-url"),
get_parameter_from_ssm("clickhouse-test-stat-login"),
get_parameter_from_ssm("clickhouse-test-stat-password"),
)
try:
CH_CLIENT.insert_event_into(
"default", "workflow_jobs", workflow_job.as_dict(), False
)
except InsertException as ex:
logging.exception(
"Got an exception on insert, tryuing to update the client "
"credentials and repeat",
exc_info=ex,
)
CH_CLIENT = ClickHouseHelper(
get_parameter_from_ssm("clickhouse-test-stat-url"),
get_parameter_from_ssm("clickhouse-test-stat-login"),
get_parameter_from_ssm("clickhouse-test-stat-password"),
)
CH_CLIENT.insert_event_into(
"default", "workflow_jobs", workflow_job.as_dict(), False
)
def killed_job(wf_job: dict) -> bool:
"""a hack to identify the killed runner if "Complete job" is omit"""
if (
wf_job.get("status", "") != "completed"
or wf_job.get("conclusion", "") != "failure"
):
# The task either success or in progress
return False
return not any(
step["name"] == "Complete job" and step["conclusion"] is not None
for step in wf_job["steps"]
)
def handler(event: dict, context: Any) -> dict:
if event["isBase64Encoded"]:
event_data = json.loads(b64decode(event["body"]))
else:
event_data = json.loads(event["body"])
logging.info("Got the next raw event from the github hook: %s", event_data)
repo = event_data["repository"]
try:
wf_job = event_data["workflow_job"]
except KeyError:
logging.error("The event does not contain valid workflow_jobs data")
logging.error("The event data: %s", event)
logging.error("The context data: %s", context)
if killed_job(wf_job):
# for killed job we record 0
steps = 0
else:
# We record only finished steps
steps = sum(1 for st in wf_job["steps"] if st["conclusion"] is not None)
workflow_job = WorkflowJob(
wf_job["id"],
wf_job["run_id"],
wf_job["workflow_name"] or "", # nullable
wf_job["head_branch"],
wf_job["run_url"],
wf_job["run_attempt"],
wf_job["node_id"],
wf_job["head_sha"],
wf_job["url"],
wf_job["html_url"],
wf_job["status"],
wf_job["conclusion"] or "", # nullable
wf_job["started_at"],
wf_job["completed_at"] or "1970-01-01T00:00:00", # nullable date
wf_job["name"],
steps,
wf_job["check_run_url"],
wf_job["labels"],
wf_job["runner_id"] or 0, # nullable
wf_job["runner_name"] or "", # nullable
wf_job["runner_group_id"] or 0, # nullable
wf_job["runner_group_name"] or "", # nullable
repo["full_name"],
)
logging.info(
"Got the next event (private_repo=%s): %s", repo["private"], workflow_job
)
if repo["private"]:
workflow_job.anonimyze()
send_event_workflow_job(workflow_job)
return {
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": '{"status": "OK"}',
}

View File

@ -1 +0,0 @@
../team_keys_lambda/build_and_deploy_archive.sh

View File

@ -1 +0,0 @@
../lambda_shared_package/lambda_shared

View File

@ -1 +0,0 @@
../lambda_shared_package