ClickHouse/tests/ci/ci_runners_metrics_lambda/app.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

403 lines
13 KiB
Python
Raw Normal View History

2021-09-30 09:00:45 +00:00
#!/usr/bin/env python3
"""
Lambda function to:
- calculate number of running runners
- cleaning dead runners from GitHub
- terminating stale lost runners in EC2
"""
2021-09-30 09:00:45 +00:00
import argparse
import sys
import json
import time
from collections import namedtuple
from datetime import datetime
from typing import Dict, List, Tuple
import jwt
import requests # type: ignore
import boto3 # type: ignore
from botocore.exceptions import ClientError # type: ignore
2021-12-02 16:38:18 +00:00
UNIVERSAL_LABEL = "universal"
RUNNER_TYPE_LABELS = [
"builder",
"func-tester",
"func-tester-aarch64",
"fuzzer-unit-tester",
"stress-tester",
"style-checker",
"style-checker-aarch64",
]
RunnerDescription = namedtuple(
"RunnerDescription", ["id", "name", "tags", "offline", "busy"]
)
RunnerDescriptions = List[RunnerDescription]
2022-01-07 09:11:46 +00:00
def get_dead_runners_in_ec2(runners: RunnerDescriptions) -> RunnerDescriptions:
2022-01-07 09:11:46 +00:00
ids = {
runner.name: runner
for runner in runners
# Only `i-deadbead123` are valid names for an instance ID
if runner.offline and not runner.busy and runner.name.startswith("i-")
2022-01-07 09:11:46 +00:00
}
if not ids:
return []
result_to_delete = [
runner
for runner in runners
if not ids.get(runner.name) and runner.offline and not runner.busy
]
2021-12-02 16:38:18 +00:00
2022-01-07 09:11:46 +00:00
client = boto3.client("ec2")
2021-12-02 16:38:18 +00:00
i = 0
inc = 100
print("Checking ids", ids.keys())
instances_statuses = []
while i < len(ids.keys()):
try:
instances_statuses.append(
client.describe_instance_status(
InstanceIds=list(ids.keys())[i : i + inc]
)
)
# It applied only if all ids exist in EC2
i += inc
except ClientError as e:
2022-01-14 09:05:19 +00:00
# The list of non-existent instances is in the message:
# The instance IDs 'i-069b1c256c06cf4e3, i-0f26430432b044035,
# i-0faa2ff44edbc147e, i-0eccf2514585045ec, i-0ee4ee53e0daa7d4a,
# i-07928f15acd473bad, i-0eaddda81298f9a85' do not exist
message = e.response["Error"]["Message"]
if message.startswith("The instance IDs '") and message.endswith(
"' do not exist"
):
non_existent = message[18:-14].split(", ")
for n in non_existent:
result_to_delete.append(ids.pop(n))
else:
raise
2021-12-02 16:38:18 +00:00
found_instances = set([])
print("Response", instances_statuses)
for instances_status in instances_statuses:
for instance_status in instances_status["InstanceStatuses"]:
if instance_status["InstanceState"]["Name"] in ("pending", "running"):
found_instances.add(instance_status["InstanceId"])
2021-12-02 16:38:18 +00:00
print("Found instances", found_instances)
for runner in result_to_delete:
print("Instance", runner.name, "is not alive, going to remove it")
2021-12-02 16:38:18 +00:00
for instance_id, runner in ids.items():
if instance_id not in found_instances:
print("Instance", instance_id, "is not found in EC2, going to remove it")
2021-12-02 16:38:18 +00:00
result_to_delete.append(runner)
return result_to_delete
2021-09-30 09:00:45 +00:00
2022-01-07 09:11:46 +00:00
def get_lost_ec2_instances(runners: RunnerDescriptions) -> List[dict]:
client = boto3.client("ec2")
reservations = client.describe_instances(
Filters=[{"Name": "tag-key", "Values": ["github:runner-type"]}]
)["Reservations"]
lost_instances = []
# Here we refresh the runners to get the most recent state
now = datetime.now().timestamp()
for reservation in reservations:
for instance in reservation["Instances"]:
# Do not consider instances started 20 minutes ago as problematic
if now - instance["LaunchTime"].timestamp() < 1200:
continue
runner_type = [
tag["Value"]
for tag in instance["Tags"]
if tag["Key"] == "github:runner-type"
][0]
# If there's no necessary labels in runner type it's fine
if not (
UNIVERSAL_LABEL in runner_type or runner_type in RUNNER_TYPE_LABELS
):
continue
if instance["State"]["Name"] == "running" and (
not [
runner
for runner in runners
if runner.name == instance["InstanceId"]
]
):
lost_instances.append(instance)
return lost_instances
def get_key_and_app_from_aws() -> Tuple[str, int]:
2021-10-19 19:39:55 +00:00
secret_name = "clickhouse_github_secret_key"
2021-09-30 09:00:45 +00:00
session = boto3.session.Session()
client = session.client(
2022-01-07 09:11:46 +00:00
service_name="secretsmanager",
2021-09-30 09:00:45 +00:00
)
2022-01-07 09:11:46 +00:00
get_secret_value_response = client.get_secret_value(SecretId=secret_name)
data = json.loads(get_secret_value_response["SecretString"])
return data["clickhouse-app-key"], int(data["clickhouse-app-id"])
2021-09-30 09:00:45 +00:00
def handler(event, context):
private_key, app_id = get_key_and_app_from_aws()
2021-12-02 16:38:18 +00:00
main(private_key, app_id, True, True)
2021-09-30 09:00:45 +00:00
2022-01-07 09:11:46 +00:00
def get_installation_id(jwt_token: str) -> int:
2021-09-30 09:00:45 +00:00
headers = {
"Authorization": f"Bearer {jwt_token}",
"Accept": "application/vnd.github.v3+json",
}
response = requests.get("https://api.github.com/app/installations", headers=headers)
response.raise_for_status()
data = response.json()
for installation in data:
if installation["account"]["login"] == "ClickHouse":
installation_id = installation["id"]
break
return installation_id # type: ignore
2022-01-07 09:11:46 +00:00
2021-09-30 09:00:45 +00:00
def get_access_token(jwt_token: str, installation_id: int) -> str:
2021-09-30 09:00:45 +00:00
headers = {
"Authorization": f"Bearer {jwt_token}",
"Accept": "application/vnd.github.v3+json",
}
2022-01-07 09:11:46 +00:00
response = requests.post(
f"https://api.github.com/app/installations/{installation_id}/access_tokens",
headers=headers,
)
2021-09-30 09:00:45 +00:00
response.raise_for_status()
data = response.json()
return data["token"] # type: ignore
2021-09-30 09:00:45 +00:00
def list_runners(access_token: str) -> RunnerDescriptions:
2021-09-30 09:00:45 +00:00
headers = {
"Authorization": f"token {access_token}",
"Accept": "application/vnd.github.v3+json",
}
per_page = 100
2022-01-07 09:11:46 +00:00
response = requests.get(
f"https://api.github.com/orgs/ClickHouse/actions/runners?per_page={per_page}",
2022-01-07 09:11:46 +00:00
headers=headers,
)
2021-09-30 09:00:45 +00:00
response.raise_for_status()
data = response.json()
2022-01-07 09:11:46 +00:00
total_runners = data["total_count"]
print("Expected total runners", total_runners)
2022-01-07 09:11:46 +00:00
runners = data["runners"]
2021-10-29 22:09:07 +00:00
# round to 0 for 0, 1 for 1..100, but to 2 for 101..200
total_pages = (total_runners - 1) // per_page + 1
2021-10-29 22:09:07 +00:00
print("Total pages", total_pages)
for i in range(2, total_pages + 1):
2022-01-07 09:11:46 +00:00
response = requests.get(
"https://api.github.com/orgs/ClickHouse/actions/runners"
f"?page={i}&per_page={per_page}",
2022-01-07 09:11:46 +00:00
headers=headers,
)
2021-10-29 22:09:07 +00:00
response.raise_for_status()
data = response.json()
2022-01-07 09:11:46 +00:00
runners += data["runners"]
2021-10-29 22:09:07 +00:00
print("Total runners", len(runners))
2021-09-30 09:00:45 +00:00
result = []
for runner in runners:
2022-01-07 09:11:46 +00:00
tags = [tag["name"] for tag in runner["labels"]]
desc = RunnerDescription(
id=runner["id"],
name=runner["name"],
tags=tags,
offline=runner["status"] == "offline",
busy=runner["busy"],
)
2021-09-30 09:00:45 +00:00
result.append(desc)
2021-12-02 16:38:18 +00:00
2021-09-30 09:00:45 +00:00
return result
2022-01-07 09:11:46 +00:00
def group_runners_by_tag(
listed_runners: RunnerDescriptions,
) -> Dict[str, RunnerDescriptions]:
result = {} # type: Dict[str, RunnerDescriptions]
2021-10-21 11:09:15 +00:00
def add_to_result(tag, runner):
if tag not in result:
result[tag] = []
result[tag].append(runner)
2021-10-21 11:09:15 +00:00
for runner in listed_runners:
if UNIVERSAL_LABEL in runner.tags:
# Do not proceed other labels if UNIVERSAL_LABEL is included
add_to_result(UNIVERSAL_LABEL, runner)
continue
2021-10-21 11:09:15 +00:00
for tag in runner.tags:
if tag in RUNNER_TYPE_LABELS:
add_to_result(tag, runner)
2021-10-21 11:09:15 +00:00
break
else:
add_to_result("unlabeled", runner)
2021-10-21 11:09:15 +00:00
return result
def push_metrics_to_cloudwatch(
listed_runners: RunnerDescriptions, namespace: str
) -> None:
2022-01-07 09:11:46 +00:00
client = boto3.client("cloudwatch")
2021-09-30 09:00:45 +00:00
metrics_data = []
2022-01-07 09:11:46 +00:00
busy_runners = sum(
1 for runner in listed_runners if runner.busy and not runner.offline
)
metrics_data.append(
{
"MetricName": "BusyRunners",
"Value": busy_runners,
"Unit": "Count",
}
)
2021-09-30 09:00:45 +00:00
total_active_runners = sum(1 for runner in listed_runners if not runner.offline)
2022-01-07 09:11:46 +00:00
metrics_data.append(
{
"MetricName": "ActiveRunners",
"Value": total_active_runners,
"Unit": "Count",
}
)
2021-09-30 09:00:45 +00:00
total_runners = len(listed_runners)
2022-01-07 09:11:46 +00:00
metrics_data.append(
{
"MetricName": "TotalRunners",
"Value": total_runners,
"Unit": "Count",
}
)
2021-09-30 10:12:58 +00:00
if total_active_runners == 0:
busy_ratio = 100.0
2021-09-30 10:12:58 +00:00
else:
busy_ratio = busy_runners / total_active_runners * 100
2022-01-07 09:11:46 +00:00
metrics_data.append(
{
"MetricName": "BusyRunnersRatio",
"Value": busy_ratio,
"Unit": "Percent",
}
)
2021-09-30 09:00:45 +00:00
2021-10-21 11:09:15 +00:00
client.put_metric_data(Namespace=namespace, MetricData=metrics_data)
2021-09-30 09:00:45 +00:00
2022-01-07 09:11:46 +00:00
def delete_runner(access_token: str, runner: RunnerDescription) -> bool:
2021-11-02 19:29:58 +00:00
headers = {
"Authorization": f"token {access_token}",
"Accept": "application/vnd.github.v3+json",
}
2022-01-07 09:11:46 +00:00
response = requests.delete(
f"https://api.github.com/orgs/ClickHouse/actions/runners/{runner.id}",
headers=headers,
)
2021-11-02 19:29:58 +00:00
response.raise_for_status()
print(f"Response code deleting {runner.name} is {response.status_code}")
return bool(response.status_code == 204)
2021-11-02 19:29:58 +00:00
2022-01-07 09:11:46 +00:00
def main(
github_secret_key: str,
github_app_id: int,
push_to_cloudwatch: bool,
delete_offline_runners: bool,
) -> None:
2021-09-30 09:00:45 +00:00
payload = {
"iat": int(time.time()) - 60,
"exp": int(time.time()) + (10 * 60),
"iss": github_app_id,
}
encoded_jwt = jwt.encode(payload, github_secret_key, algorithm="RS256")
installation_id = get_installation_id(encoded_jwt)
access_token = get_access_token(encoded_jwt, installation_id)
gh_runners = list_runners(access_token)
grouped_runners = group_runners_by_tag(gh_runners)
2021-10-21 11:09:15 +00:00
for group, group_runners in grouped_runners.items():
if push_to_cloudwatch:
print(f"Pushing metrics for group '{group}'")
2022-01-07 09:11:46 +00:00
push_metrics_to_cloudwatch(group_runners, "RunnersMetrics/" + group)
2021-10-21 11:09:15 +00:00
else:
2021-11-02 19:29:58 +00:00
print(group, f"({len(group_runners)})")
2021-10-27 08:02:30 +00:00
for runner in group_runners:
2022-01-07 09:11:46 +00:00
print("\t", runner)
2021-11-02 19:29:58 +00:00
if delete_offline_runners:
print("Going to delete offline runners")
dead_runners = get_dead_runners_in_ec2(gh_runners)
2021-12-02 16:38:18 +00:00
for runner in dead_runners:
print("Deleting runner", runner)
delete_runner(access_token, runner)
2021-09-30 09:00:45 +00:00
lost_instances = get_lost_ec2_instances(gh_runners)
if lost_instances:
print("Going to terminate lost runners")
ids = [i["InstanceId"] for i in lost_instances]
print("Terminating runners:", ids)
boto3.client("ec2").terminate_instances(InstanceIds=ids)
2022-01-07 09:11:46 +00:00
2021-09-30 09:00:45 +00:00
if __name__ == "__main__":
2022-01-07 09:11:46 +00:00
parser = argparse.ArgumentParser(description="Get list of runners and their states")
parser.add_argument(
"-p", "--private-key-path", help="Path to file with private key"
)
parser.add_argument("-k", "--private-key", help="Private key")
parser.add_argument(
"-a", "--app-id", type=int, help="GitHub application ID", required=True
)
parser.add_argument(
"--push-to-cloudwatch",
action="store_true",
help="Push metrics for active and busy runners to cloudwatch",
2022-01-07 09:11:46 +00:00
)
parser.add_argument(
"--delete-offline", action="store_true", help="Remove offline runners"
)
2021-09-30 09:00:45 +00:00
args = parser.parse_args()
if not args.private_key_path and not args.private_key:
2022-01-07 09:11:46 +00:00
print(
"Either --private-key-path or --private-key must be specified",
file=sys.stderr,
)
2021-09-30 09:00:45 +00:00
if args.private_key_path and args.private_key:
2022-01-07 09:11:46 +00:00
print(
"Either --private-key-path or --private-key must be specified",
file=sys.stderr,
)
2021-09-30 09:00:45 +00:00
if args.private_key:
private_key = args.private_key
elif args.private_key_path:
2022-01-07 09:11:46 +00:00
with open(args.private_key_path, "r") as key_file:
2021-09-30 09:00:45 +00:00
private_key = key_file.read()
else:
print("Attempt to get key and id from AWS secret manager")
private_key, args.app_id = get_key_and_app_from_aws()
2021-09-30 09:00:45 +00:00
2021-11-02 19:29:58 +00:00
main(private_key, args.app_id, args.push_to_cloudwatch, args.delete_offline)