mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-12 01:12:12 +00:00
339 lines
13 KiB
Python
339 lines
13 KiB
Python
import os
|
|
import platform
|
|
import signal
|
|
import time
|
|
import traceback
|
|
|
|
import requests
|
|
from praktika.execution.execution_settings import ExecutionSettings, ScalingType
|
|
from praktika.utils import ContextManager, Shell
|
|
|
|
|
|
class StateMachine:
|
|
class StateNames:
|
|
INIT = "init"
|
|
WAIT = "wait"
|
|
RUN = "run"
|
|
|
|
def __init__(self):
|
|
self.state = self.StateNames.INIT
|
|
self.scale_type = ExecutionSettings.RUNNER_SCALING_TYPE
|
|
self.machine = Machine(scaling_type=self.scale_type).update_instance_info()
|
|
self.state_updated_at = int(time.time())
|
|
self.forked = False
|
|
|
|
def kick(self):
|
|
if self.state == self.StateNames.INIT:
|
|
self.machine.config_actions().run_actions_async()
|
|
print("State Machine: INIT -> WAIT")
|
|
self.state = self.StateNames.WAIT
|
|
self.state_updated_at = int(time.time())
|
|
# TODO: add monitoring
|
|
if not self.machine.is_actions_process_healthy():
|
|
print(f"ERROR: GH runner process unexpectedly died")
|
|
self.machine.self_terminate(decrease_capacity=False)
|
|
elif self.state == self.StateNames.WAIT:
|
|
res = self.machine.check_job_assigned()
|
|
if res:
|
|
print("State Machine: WAIT -> RUN")
|
|
self.state = self.StateNames.RUN
|
|
self.state_updated_at = int(time.time())
|
|
self.check_scale_up()
|
|
else:
|
|
self.check_scale_down()
|
|
elif self.state == self.StateNames.RUN:
|
|
res = self.machine.check_job_running()
|
|
if res:
|
|
pass
|
|
else:
|
|
print("State Machine: RUN -> INIT")
|
|
self.state = self.StateNames.INIT
|
|
self.state_updated_at = int(time.time())
|
|
|
|
def check_scale_down(self):
|
|
if self.scale_type not in (
|
|
ScalingType.AUTOMATIC_SCALE_DOWN,
|
|
ScalingType.AUTOMATIC_SCALE_UP_DOWN,
|
|
):
|
|
return
|
|
if ScalingType.AUTOMATIC_SCALE_UP_DOWN and not self.forked:
|
|
print(
|
|
f"Scaling type is AUTOMATIC_SCALE_UP_DOWN and machine has not run a job - do not scale down"
|
|
)
|
|
return
|
|
if (
|
|
int(time.time()) - self.state_updated_at
|
|
> ExecutionSettings.MAX_WAIT_TIME_BEFORE_SCALE_DOWN_SEC
|
|
):
|
|
print(
|
|
f"No job assigned for more than MAX_WAIT_TIME_BEFORE_SCALE_DOWN_SEC [{ExecutionSettings.MAX_WAIT_TIME_BEFORE_SCALE_DOWN_SEC}] - scale down the instance"
|
|
)
|
|
if not ExecutionSettings.LOCAL_EXECUTION:
|
|
self.machine.self_terminate(decrease_capacity=True)
|
|
else:
|
|
print("Local execution - skip scaling operation")
|
|
|
|
def check_scale_up(self):
|
|
if self.scale_type not in (ScalingType.AUTOMATIC_SCALE_UP_DOWN,):
|
|
return
|
|
if self.forked:
|
|
print("This instance already forked once - do not scale up")
|
|
return
|
|
self.machine.self_fork()
|
|
self.forked = True
|
|
|
|
def run(self):
|
|
self.machine.unconfig_actions()
|
|
while True:
|
|
self.kick()
|
|
time.sleep(5)
|
|
|
|
def terminate(self):
|
|
try:
|
|
self.machine.unconfig_actions()
|
|
except:
|
|
print("WARNING: failed to unconfig runner")
|
|
if not ExecutionSettings.LOCAL_EXECUTION:
|
|
if self.machine is not None:
|
|
self.machine.self_terminate(decrease_capacity=False)
|
|
time.sleep(10)
|
|
# wait termination
|
|
print("ERROR: failed to terminate instance via aws cli - try os call")
|
|
os.system("sudo shutdown now")
|
|
else:
|
|
print("NOTE: Local execution - machine won't be terminated")
|
|
|
|
|
|
class Machine:
|
|
@staticmethod
|
|
def get_latest_gh_actions_release():
|
|
url = f"https://api.github.com/repos/actions/runner/releases/latest"
|
|
response = requests.get(url, timeout=5)
|
|
if response.status_code == 200:
|
|
latest_release = response.json()
|
|
return latest_release["tag_name"].removeprefix("v")
|
|
else:
|
|
print(f"Failed to get the latest release: {response.status_code}")
|
|
return None
|
|
|
|
def __init__(self, scaling_type):
|
|
self.os_name = platform.system().lower()
|
|
assert self.os_name == "linux", f"Unsupported OS [{self.os_name}]"
|
|
if platform.machine() == "x86_64":
|
|
self.arch = "x64"
|
|
elif "aarch64" in platform.machine().lower():
|
|
self.arch = "arm64"
|
|
else:
|
|
assert False, f"Unsupported arch [{platform.machine()}]"
|
|
self.instance_id = None
|
|
self.asg_name = None
|
|
self.runner_api_endpoint = None
|
|
self.runner_type = None
|
|
self.labels = []
|
|
self.proc = None
|
|
assert scaling_type in ScalingType
|
|
self.scaling_type = scaling_type
|
|
|
|
def install_gh_actions_runner(self):
|
|
gh_actions_version = self.get_latest_gh_actions_release()
|
|
assert self.os_name and gh_actions_version and self.arch
|
|
Shell.check(
|
|
f"rm -rf {ExecutionSettings.GH_ACTIONS_DIRECTORY}",
|
|
strict=True,
|
|
verbose=True,
|
|
)
|
|
Shell.check(
|
|
f"mkdir {ExecutionSettings.GH_ACTIONS_DIRECTORY}", strict=True, verbose=True
|
|
)
|
|
with ContextManager.cd(ExecutionSettings.GH_ACTIONS_DIRECTORY):
|
|
Shell.check(
|
|
f"curl -O -L https://github.com/actions/runner/releases/download/v{gh_actions_version}/actions-runner-{self.os_name}-{self.arch}-{gh_actions_version}.tar.gz",
|
|
strict=True,
|
|
verbose=True,
|
|
)
|
|
Shell.check(f"tar xzf *tar.gz", strict=True, verbose=True)
|
|
Shell.check(f"rm -f *tar.gz", strict=True, verbose=True)
|
|
Shell.check(f"sudo ./bin/installdependencies.sh", strict=True, verbose=True)
|
|
Shell.check(
|
|
f"chown -R ubuntu:ubuntu {ExecutionSettings.GH_ACTIONS_DIRECTORY}",
|
|
strict=True,
|
|
verbose=True,
|
|
)
|
|
|
|
def _get_gh_token_from_ssm(self):
|
|
gh_token = Shell.get_output_or_raise(
|
|
"/usr/local/bin/aws ssm get-parameter --name github_runner_registration_token --with-decryption --output text --query Parameter.Value"
|
|
)
|
|
return gh_token
|
|
|
|
def update_instance_info(self):
|
|
self.instance_id = Shell.get_output_or_raise("ec2metadata --instance-id")
|
|
assert self.instance_id
|
|
self.asg_name = Shell.get_output(
|
|
f"aws ec2 describe-instances --instance-id {self.instance_id} --query \"Reservations[].Instances[].Tags[?Key=='aws:autoscaling:groupName'].Value\" --output text"
|
|
)
|
|
# self.runner_type = Shell.get_output_or_raise(
|
|
# f'/usr/local/bin/aws ec2 describe-tags --filters "Name=resource-id,Values={self.instance_id}" --query "Tags[?Key==\'github:runner-type\'].Value" --output text'
|
|
# )
|
|
self.runner_type = self.asg_name
|
|
if (
|
|
self.scaling_type != ScalingType.DISABLED
|
|
and not ExecutionSettings.LOCAL_EXECUTION
|
|
):
|
|
assert (
|
|
self.asg_name and self.runner_type
|
|
), f"Failed to retrieve ASG name, which is required for scaling_type [{self.scaling_type}]"
|
|
org = os.getenv("MY_ORG", "")
|
|
assert (
|
|
org
|
|
), "MY_ORG env variable myst be set to use init script for runner machine"
|
|
self.runner_api_endpoint = f"https://github.com/{org}"
|
|
|
|
self.labels = ["self-hosted", self.runner_type]
|
|
return self
|
|
|
|
@classmethod
|
|
def check_job_assigned(cls):
|
|
runner_pid = Shell.get_output_or_raise("pgrep Runner.Listener")
|
|
if not runner_pid:
|
|
print("check_job_assigned: No runner pid")
|
|
return False
|
|
log_file = Shell.get_output_or_raise(
|
|
f"lsof -p {runner_pid} | grep -o {ExecutionSettings.GH_ACTIONS_DIRECTORY}/_diag/Runner.*log"
|
|
)
|
|
if not log_file:
|
|
print("check_job_assigned: No log file")
|
|
return False
|
|
return Shell.check(f"grep -q 'Terminal] .* Running job:' {log_file}")
|
|
|
|
def check_job_running(self):
|
|
if self.proc is None:
|
|
print(f"WARNING: No job started")
|
|
return False
|
|
exit_code = self.proc.poll()
|
|
if exit_code is None:
|
|
return True
|
|
else:
|
|
print(f"Job runner finished with exit code [{exit_code}]")
|
|
self.proc = None
|
|
return False
|
|
|
|
def config_actions(self):
|
|
if not self.instance_id:
|
|
self.update_instance_info()
|
|
token = self._get_gh_token_from_ssm()
|
|
assert token and self.instance_id and self.runner_api_endpoint and self.labels
|
|
command = f"sudo -u ubuntu {ExecutionSettings.GH_ACTIONS_DIRECTORY}/config.sh --token {token} \
|
|
--url {self.runner_api_endpoint} --ephemeral --unattended --replace \
|
|
--runnergroup Default --labels {','.join(self.labels)} --work wd --name {self.instance_id}"
|
|
res = 1
|
|
i = 0
|
|
while i < 10 and res != 0:
|
|
res = Shell.run(command)
|
|
i += 1
|
|
if res != 0:
|
|
print(
|
|
f"ERROR: failed to configure GH actions runner after [{i}] attempts, exit code [{res}], retry after 10s"
|
|
)
|
|
time.sleep(10)
|
|
self._get_gh_token_from_ssm()
|
|
if res == 0:
|
|
print("GH action runner has been configured")
|
|
else:
|
|
assert False, "GH actions runner configuration failed"
|
|
return self
|
|
|
|
def unconfig_actions(self):
|
|
token = self._get_gh_token_from_ssm()
|
|
command = f"sudo -u ubuntu {ExecutionSettings.GH_ACTIONS_DIRECTORY}/config.sh remove --token {token}"
|
|
Shell.check(command, strict=True)
|
|
return self
|
|
|
|
def run_actions_async(self):
|
|
command = f"sudo -u ubuntu {ExecutionSettings.GH_ACTIONS_DIRECTORY}/run.sh"
|
|
self.proc = Shell.run_async(command)
|
|
assert self.proc is not None
|
|
return self
|
|
|
|
def is_actions_process_healthy(self):
|
|
try:
|
|
if self.proc.poll() is None:
|
|
return True
|
|
|
|
stdout, stderr = self.proc.communicate()
|
|
|
|
if self.proc.returncode != 0:
|
|
# Handle failure
|
|
print(
|
|
f"GH Action process failed with return code {self.proc.returncode}"
|
|
)
|
|
print(f"Error output: {stderr}")
|
|
return False
|
|
else:
|
|
print(f"GH Action process is not running")
|
|
return False
|
|
except Exception as e:
|
|
print(f"GH Action process exception: {e}")
|
|
return False
|
|
|
|
def self_terminate(self, decrease_capacity):
|
|
print(
|
|
f"WARNING: Self terminate is called, decrease_capacity [{decrease_capacity}]"
|
|
)
|
|
traceback.print_stack()
|
|
if not self.instance_id:
|
|
self.update_instance_info()
|
|
assert self.instance_id
|
|
command = f"aws autoscaling terminate-instance-in-auto-scaling-group --instance-id {self.instance_id}"
|
|
if decrease_capacity:
|
|
command += " --should-decrement-desired-capacity"
|
|
else:
|
|
command += " --no-should-decrement-desired-capacity"
|
|
Shell.check(
|
|
command=command,
|
|
verbose=True,
|
|
)
|
|
|
|
def self_fork(self):
|
|
current_capacity = Shell.get_output(
|
|
f'aws autoscaling describe-auto-scaling-groups --auto-scaling-group-name {self.asg_name} \
|
|
--query "AutoScalingGroups[0].DesiredCapacity" --output text'
|
|
)
|
|
current_capacity = int(current_capacity)
|
|
if not current_capacity:
|
|
print("ERROR: failed to get current capacity - cannot scale up")
|
|
return
|
|
desired_capacity = current_capacity + 1
|
|
command = f"aws autoscaling set-desired-capacity --auto-scaling-group-name {self.asg_name} --desired-capacity {desired_capacity}"
|
|
print(f"Increase capacity [{current_capacity} -> {desired_capacity}]")
|
|
res = Shell.check(
|
|
command=command,
|
|
verbose=True,
|
|
)
|
|
if not res:
|
|
print("ERROR: failed to increase capacity - cannot scale up")
|
|
|
|
|
|
def handle_signal(signum, _frame):
|
|
print(f"FATAL: Received signal {signum}")
|
|
raise RuntimeError(f"killed by signal {signum}")
|
|
|
|
|
|
def run():
|
|
signal.signal(signal.SIGINT, handle_signal)
|
|
signal.signal(signal.SIGTERM, handle_signal)
|
|
m = None
|
|
try:
|
|
m = StateMachine()
|
|
m.run()
|
|
except Exception as e:
|
|
print(f"FATAL: Exception [{e}] - terminate instance")
|
|
time.sleep(10)
|
|
if m:
|
|
m.terminate()
|
|
raise e
|
|
|
|
|
|
if __name__ == "__main__":
|
|
run()
|