#!/usr/bin/env python3 """The lambda to decrease/increase ASG desired capacity based on current queue""" import logging from dataclasses import dataclass from pprint import pformat from typing import Any, List, Literal, Optional, Tuple import boto3 # type: ignore from lambda_shared import ( CHException, ClickHouseHelper, RUNNER_TYPE_LABELS, get_parameter_from_ssm, ) ### Update comment on the change ### # 4 HOUR - is a balance to get the most precise values # - Our longest possible running check is around 5h on the worst scenario # - The long queue won't be wiped out and replaced, so the measurmenet is fine # - If the data is spoiled by something, we are from the bills perspective # Changed it to 3 HOUR: in average we have 1h tasks, but p90 is around 2h. # With 4h we have too much wasted computing time in case of issues with DB QUEUE_QUERY = f"""SELECT last_status AS status, toUInt32(count()) AS length, labels FROM ( SELECT arraySort(groupArray(status))[-1] AS last_status, labels, id, html_url FROM default.workflow_jobs WHERE has(labels, 'self-hosted') AND hasAny({RUNNER_TYPE_LABELS}, labels) AND started_at > now() - INTERVAL 3 HOUR GROUP BY ALL HAVING last_status IN ('in_progress', 'queued') ) GROUP BY ALL ORDER BY labels, last_status""" @dataclass class Queue: status: Literal["in_progress", "queued"] lentgh: int label: str def get_scales(runner_type: str) -> Tuple[int, int]: "returns the multipliers for scaling down and up ASG by types" # Scaling down is quicker on the lack of running jobs than scaling up on # queue scale_down = 2 scale_up = 5 if runner_type == "style-checker": # the style checkers have so many noise, so it scales up too quickly scale_down = 1 # The 5 was too quick, there are complainings regarding too slow with # 10. I am trying 7 now. # UPDATE THE COMMENT ON CHANGES scale_up = 7 elif runner_type == "limited-tester": # The limited runners should inflate and deflate faster scale_down = 1 scale_up = 2 return scale_down, scale_up CH_CLIENT = None # type: Optional[ClickHouseHelper] def set_capacity( runner_type: str, queues: List[Queue], client: Any, dry_run: bool = True ) -> None: assert len(queues) in (1, 2) assert all(q.label == runner_type for q in queues) as_groups = client.describe_auto_scaling_groups( Filters=[ {"Name": "tag-key", "Values": ["github:runner-type"]}, {"Name": "tag-value", "Values": [runner_type]}, ] )["AutoScalingGroups"] assert len(as_groups) == 1 asg = as_groups[0] running = 0 queued = 0 for q in queues: if q.status == "in_progress": running = q.lentgh continue if q.status == "queued": queued = q.lentgh continue raise ValueError("Queue status is not in ['in_progress', 'queued']") scale_down, scale_up = get_scales(runner_type) # How much nodes are free (positive) or need to be added (negative) capacity_reserve = asg["DesiredCapacity"] - running - queued stop = False if capacity_reserve < 0: # This part is about scaling up capacity_deficit = -capacity_reserve # It looks that we are still OK, since no queued jobs exist stop = stop or queued == 0 # Are we already at the capacity limits stop = stop or asg["MaxSize"] <= asg["DesiredCapacity"] # Let's calculate a new desired capacity desired_capacity = asg["DesiredCapacity"] + (capacity_deficit // scale_up) desired_capacity = max(desired_capacity, asg["MinSize"]) desired_capacity = min(desired_capacity, asg["MaxSize"]) # Finally, should the capacity be even changed stop = stop or asg["DesiredCapacity"] == desired_capacity if stop: logging.info( "Do not increase ASG %s capacity, current capacity=%s, " "maximum capacity=%s, running jobs=%s, queue size=%s", asg["AutoScalingGroupName"], desired_capacity, asg["MaxSize"], running, queued, ) return logging.info( "The ASG %s capacity will be increased to %s, current capacity=%s, " "maximum capacity=%s, running jobs=%s, queue size=%s", asg["AutoScalingGroupName"], desired_capacity, asg["DesiredCapacity"], asg["MaxSize"], running, queued, ) if not dry_run: client.set_desired_capacity( AutoScalingGroupName=asg["AutoScalingGroupName"], DesiredCapacity=desired_capacity, ) return # Now we will calculate if we need to scale down stop = stop or asg["DesiredCapacity"] == asg["MinSize"] desired_capacity = asg["DesiredCapacity"] - (capacity_reserve // scale_down) desired_capacity = max(desired_capacity, asg["MinSize"]) desired_capacity = min(desired_capacity, asg["MaxSize"]) stop = stop or asg["DesiredCapacity"] == desired_capacity if stop: logging.info( "Do not decrease ASG %s capacity, current capacity=%s, " "minimum capacity=%s, running jobs=%s, queue size=%s", asg["AutoScalingGroupName"], desired_capacity, asg["MinSize"], running, queued, ) return logging.info( "The ASG %s capacity will be decreased to %s, current capacity=%s, " "minimum capacity=%s, running jobs=%s, queue size=%s", asg["AutoScalingGroupName"], desired_capacity, asg["DesiredCapacity"], asg["MinSize"], running, queued, ) if not dry_run: client.set_desired_capacity( AutoScalingGroupName=asg["AutoScalingGroupName"], DesiredCapacity=desired_capacity, ) def main(dry_run: bool = True) -> None: logging.getLogger().setLevel(logging.INFO) asg_client = boto3.client("autoscaling") try: global CH_CLIENT CH_CLIENT = CH_CLIENT or ClickHouseHelper( get_parameter_from_ssm("clickhouse-test-stat-url"), "play" ) queues = CH_CLIENT.select_json_each_row("default", QUEUE_QUERY) except CHException as ex: logging.exception( "Got an exception on insert, tryuing to update the client " "credentials and repeat", exc_info=ex, ) CH_CLIENT = ClickHouseHelper( get_parameter_from_ssm("clickhouse-test-stat-url"), "play" ) queues = CH_CLIENT.select_json_each_row("default", QUEUE_QUERY) logging.info("Received queue data:\n%s", pformat(queues, width=120)) for runner_type in RUNNER_TYPE_LABELS: runner_queues = [ Queue(queue["status"], queue["length"], runner_type) for queue in queues if runner_type in queue["labels"] ] runner_queues = runner_queues or [Queue("in_progress", 0, runner_type)] set_capacity(runner_type, runner_queues, asg_client, dry_run) def handler(event: dict, context: Any) -> None: _ = event _ = context return main(False)