mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-04 21:42:39 +00:00
218 lines
7.3 KiB
Python
218 lines
7.3 KiB
Python
#!/usr/bin/env python3
|
|
|
|
"""The lambda to decrease/increase ASG desired capacity based on current queue"""
|
|
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from pprint import pformat
|
|
from typing import Any, List, Literal, Optional, Tuple
|
|
|
|
import boto3 # type: ignore
|
|
|
|
from lambda_shared import (
|
|
CHException,
|
|
ClickHouseHelper,
|
|
RUNNER_TYPE_LABELS,
|
|
get_parameter_from_ssm,
|
|
)
|
|
|
|
### Update comment on the change ###
|
|
# 4 HOUR - is a balance to get the most precise values
|
|
# - Our longest possible running check is around 5h on the worst scenario
|
|
# - The long queue won't be wiped out and replaced, so the measurmenet is fine
|
|
# - If the data is spoiled by something, we are from the bills perspective
|
|
# Changed it to 3 HOUR: in average we have 1h tasks, but p90 is around 2h.
|
|
# With 4h we have too much wasted computing time in case of issues with DB
|
|
QUEUE_QUERY = f"""SELECT
|
|
last_status AS status,
|
|
toUInt32(count()) AS length,
|
|
labels
|
|
FROM
|
|
(
|
|
SELECT
|
|
arraySort(groupArray(status))[-1] AS last_status,
|
|
labels,
|
|
id,
|
|
html_url
|
|
FROM default.workflow_jobs
|
|
WHERE has(labels, 'self-hosted')
|
|
AND hasAny({RUNNER_TYPE_LABELS}, labels)
|
|
AND started_at > now() - INTERVAL 3 HOUR
|
|
GROUP BY ALL
|
|
HAVING last_status IN ('in_progress', 'queued')
|
|
)
|
|
GROUP BY ALL
|
|
ORDER BY labels, last_status"""
|
|
|
|
|
|
@dataclass
|
|
class Queue:
|
|
status: Literal["in_progress", "queued"]
|
|
lentgh: int
|
|
label: str
|
|
|
|
|
|
def get_scales(runner_type: str) -> Tuple[int, int]:
|
|
"returns the multipliers for scaling down and up ASG by types"
|
|
# Scaling down is quicker on the lack of running jobs than scaling up on
|
|
# queue
|
|
scale_down = 2
|
|
scale_up = 5
|
|
if runner_type == "style-checker":
|
|
# the style checkers have so many noise, so it scales up too quickly
|
|
scale_down = 1
|
|
# The 5 was too quick, there are complainings regarding too slow with
|
|
# 10. I am trying 7 now.
|
|
# UPDATE THE COMMENT ON CHANGES
|
|
scale_up = 7
|
|
elif runner_type == "limited-tester":
|
|
# The limited runners should inflate and deflate faster
|
|
scale_down = 1
|
|
scale_up = 2
|
|
return scale_down, scale_up
|
|
|
|
|
|
CH_CLIENT = None # type: Optional[ClickHouseHelper]
|
|
|
|
|
|
def set_capacity(
|
|
runner_type: str, queues: List[Queue], client: Any, dry_run: bool = True
|
|
) -> None:
|
|
assert len(queues) in (1, 2)
|
|
assert all(q.label == runner_type for q in queues)
|
|
as_groups = client.describe_auto_scaling_groups(
|
|
Filters=[
|
|
{"Name": "tag-key", "Values": ["github:runner-type"]},
|
|
{"Name": "tag-value", "Values": [runner_type]},
|
|
]
|
|
)["AutoScalingGroups"]
|
|
assert len(as_groups) == 1
|
|
asg = as_groups[0]
|
|
running = 0
|
|
queued = 0
|
|
for q in queues:
|
|
if q.status == "in_progress":
|
|
running = q.lentgh
|
|
continue
|
|
if q.status == "queued":
|
|
queued = q.lentgh
|
|
continue
|
|
raise ValueError("Queue status is not in ['in_progress', 'queued']")
|
|
|
|
scale_down, scale_up = get_scales(runner_type)
|
|
# How much nodes are free (positive) or need to be added (negative)
|
|
capacity_reserve = asg["DesiredCapacity"] - running - queued
|
|
stop = False
|
|
if capacity_reserve < 0:
|
|
# This part is about scaling up
|
|
capacity_deficit = -capacity_reserve
|
|
# It looks that we are still OK, since no queued jobs exist
|
|
stop = stop or queued == 0
|
|
# Are we already at the capacity limits
|
|
stop = stop or asg["MaxSize"] <= asg["DesiredCapacity"]
|
|
# Let's calculate a new desired capacity
|
|
desired_capacity = asg["DesiredCapacity"] + (capacity_deficit // scale_up)
|
|
desired_capacity = max(desired_capacity, asg["MinSize"])
|
|
desired_capacity = min(desired_capacity, asg["MaxSize"])
|
|
# Finally, should the capacity be even changed
|
|
stop = stop or asg["DesiredCapacity"] == desired_capacity
|
|
if stop:
|
|
logging.info(
|
|
"Do not increase ASG %s capacity, current capacity=%s, "
|
|
"maximum capacity=%s, running jobs=%s, queue size=%s",
|
|
asg["AutoScalingGroupName"],
|
|
desired_capacity,
|
|
asg["MaxSize"],
|
|
running,
|
|
queued,
|
|
)
|
|
return
|
|
|
|
logging.info(
|
|
"The ASG %s capacity will be increased to %s, current capacity=%s, "
|
|
"maximum capacity=%s, running jobs=%s, queue size=%s",
|
|
asg["AutoScalingGroupName"],
|
|
desired_capacity,
|
|
asg["DesiredCapacity"],
|
|
asg["MaxSize"],
|
|
running,
|
|
queued,
|
|
)
|
|
if not dry_run:
|
|
client.set_desired_capacity(
|
|
AutoScalingGroupName=asg["AutoScalingGroupName"],
|
|
DesiredCapacity=desired_capacity,
|
|
)
|
|
return
|
|
|
|
# Now we will calculate if we need to scale down
|
|
stop = stop or asg["DesiredCapacity"] == asg["MinSize"]
|
|
desired_capacity = asg["DesiredCapacity"] - (capacity_reserve // scale_down)
|
|
desired_capacity = max(desired_capacity, asg["MinSize"])
|
|
desired_capacity = min(desired_capacity, asg["MaxSize"])
|
|
stop = stop or asg["DesiredCapacity"] == desired_capacity
|
|
if stop:
|
|
logging.info(
|
|
"Do not decrease ASG %s capacity, current capacity=%s, "
|
|
"minimum capacity=%s, running jobs=%s, queue size=%s",
|
|
asg["AutoScalingGroupName"],
|
|
desired_capacity,
|
|
asg["MinSize"],
|
|
running,
|
|
queued,
|
|
)
|
|
return
|
|
|
|
logging.info(
|
|
"The ASG %s capacity will be decreased to %s, current capacity=%s, "
|
|
"minimum capacity=%s, running jobs=%s, queue size=%s",
|
|
asg["AutoScalingGroupName"],
|
|
desired_capacity,
|
|
asg["DesiredCapacity"],
|
|
asg["MinSize"],
|
|
running,
|
|
queued,
|
|
)
|
|
if not dry_run:
|
|
client.set_desired_capacity(
|
|
AutoScalingGroupName=asg["AutoScalingGroupName"],
|
|
DesiredCapacity=desired_capacity,
|
|
)
|
|
|
|
|
|
def main(dry_run: bool = True) -> None:
|
|
logging.getLogger().setLevel(logging.INFO)
|
|
asg_client = boto3.client("autoscaling")
|
|
try:
|
|
global CH_CLIENT
|
|
CH_CLIENT = CH_CLIENT or ClickHouseHelper(
|
|
get_parameter_from_ssm("clickhouse-test-stat-url"), "play"
|
|
)
|
|
queues = CH_CLIENT.select_json_each_row("default", QUEUE_QUERY)
|
|
except CHException as ex:
|
|
logging.exception(
|
|
"Got an exception on insert, tryuing to update the client "
|
|
"credentials and repeat",
|
|
exc_info=ex,
|
|
)
|
|
CH_CLIENT = ClickHouseHelper(
|
|
get_parameter_from_ssm("clickhouse-test-stat-url"), "play"
|
|
)
|
|
queues = CH_CLIENT.select_json_each_row("default", QUEUE_QUERY)
|
|
|
|
logging.info("Received queue data:\n%s", pformat(queues, width=120))
|
|
for runner_type in RUNNER_TYPE_LABELS:
|
|
runner_queues = [
|
|
Queue(queue["status"], queue["length"], runner_type)
|
|
for queue in queues
|
|
if runner_type in queue["labels"]
|
|
]
|
|
runner_queues = runner_queues or [Queue("in_progress", 0, runner_type)]
|
|
set_capacity(runner_type, runner_queues, asg_client, dry_run)
|
|
|
|
|
|
def handler(event: dict, context: Any) -> None:
|
|
_ = event
|
|
_ = context
|
|
return main(False)
|