mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-29 11:02:08 +00:00
Merge pull request #52277 from ClickHouse/kill-runner-with-subprocesses
Kill the runner process with all subprocesses
This commit is contained in:
commit
3a07b80f34
@ -219,3 +219,12 @@ def list_runners(access_token: str) -> RunnerDescriptions:
|
||||
result.append(desc)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def cached_value_is_valid(updated_at: float, ttl: float) -> bool:
|
||||
"a common function to identify if cachable value is still valid"
|
||||
if updated_at == 0:
|
||||
return False
|
||||
if time.time() - ttl < updated_at:
|
||||
return True
|
||||
return False
|
||||
|
@ -8,6 +8,8 @@ import boto3 # type: ignore
|
||||
import jwt
|
||||
import requests # type: ignore
|
||||
|
||||
from . import cached_value_is_valid
|
||||
|
||||
|
||||
def get_key_and_app_from_aws() -> Tuple[str, int]:
|
||||
secret_name = "clickhouse_github_secret_key"
|
||||
@ -68,7 +70,7 @@ def get_access_token_by_key_app(private_key: str, app_id: int) -> str:
|
||||
|
||||
@dataclass
|
||||
class CachedToken:
|
||||
time: int
|
||||
time: float
|
||||
value: str
|
||||
updating: bool = False
|
||||
|
||||
@ -81,12 +83,9 @@ def get_cached_access_token() -> str:
|
||||
return _cached_token.value
|
||||
# Indicate that the value is updating now, so the cached value can be
|
||||
# used. The first setting and close-to-ttl are not counted as update
|
||||
if _cached_token.time != 0 or time.time() - 590 < _cached_token.time:
|
||||
_cached_token.updating = True
|
||||
else:
|
||||
_cached_token.updating = False
|
||||
_cached_token.updating = cached_value_is_valid(_cached_token.time, 590)
|
||||
private_key, app_id = get_key_and_app_from_aws()
|
||||
_cached_token.time = int(time.time())
|
||||
_cached_token.time = time.time()
|
||||
_cached_token.value = get_access_token_by_key_app(private_key, app_id)
|
||||
_cached_token.updating = False
|
||||
return _cached_token.value
|
||||
|
@ -9,13 +9,13 @@ from typing import Any, Dict, List
|
||||
|
||||
import boto3 # type: ignore
|
||||
|
||||
from lambda_shared import RunnerDescriptions, list_runners
|
||||
from lambda_shared import RunnerDescriptions, list_runners, cached_value_is_valid
|
||||
from lambda_shared.token import get_access_token_by_key_app, get_cached_access_token
|
||||
|
||||
|
||||
@dataclass
|
||||
class CachedInstances:
|
||||
time: int
|
||||
time: float
|
||||
value: dict
|
||||
updating: bool = False
|
||||
|
||||
@ -27,17 +27,12 @@ def get_cached_instances() -> dict:
|
||||
"""return cached instances description with updating it once per five minutes"""
|
||||
if time.time() - 250 < cached_instances.time or cached_instances.updating:
|
||||
return cached_instances.value
|
||||
# Indicate that the value is updating now, so the cached value can be
|
||||
# used. The first setting and close-to-ttl are not counted as update
|
||||
if cached_instances.time != 0 or time.time() - 300 < cached_instances.time:
|
||||
cached_instances.updating = True
|
||||
else:
|
||||
cached_instances.updating = False
|
||||
cached_instances.updating = cached_value_is_valid(cached_instances.time, 300)
|
||||
ec2_client = boto3.client("ec2")
|
||||
instances_response = ec2_client.describe_instances(
|
||||
Filters=[{"Name": "instance-state-name", "Values": ["running"]}]
|
||||
)
|
||||
cached_instances.time = int(time.time())
|
||||
cached_instances.time = time.time()
|
||||
cached_instances.value = {
|
||||
instance["InstanceId"]: instance
|
||||
for reservation in instances_response["Reservations"]
|
||||
@ -47,6 +42,28 @@ def get_cached_instances() -> dict:
|
||||
return cached_instances.value
|
||||
|
||||
|
||||
@dataclass
|
||||
class CachedRunners:
|
||||
time: float
|
||||
value: RunnerDescriptions
|
||||
updating: bool = False
|
||||
|
||||
|
||||
cached_runners = CachedRunners(0, [])
|
||||
|
||||
|
||||
def get_cached_runners(access_token: str) -> RunnerDescriptions:
|
||||
"""From time to time request to GH api costs up to 3 seconds, and
|
||||
it's a disaster from the termination lambda perspective"""
|
||||
if time.time() - 5 < cached_runners.time or cached_instances.updating:
|
||||
return cached_runners.value
|
||||
cached_runners.updating = cached_value_is_valid(cached_runners.time, 15)
|
||||
cached_runners.value = list_runners(access_token)
|
||||
cached_runners.time = time.time()
|
||||
cached_runners.updating = False
|
||||
return cached_runners.value
|
||||
|
||||
|
||||
def how_many_instances_to_kill(event_data: dict) -> Dict[str, int]:
|
||||
data_array = event_data["CapacityToTerminate"]
|
||||
to_kill_by_zone = {} # type: Dict[str, int]
|
||||
@ -104,7 +121,7 @@ def main(access_token: str, event: dict) -> Dict[str, List[str]]:
|
||||
)
|
||||
print("Time spent on the requests to AWS: ", time.time() - start)
|
||||
|
||||
runners = list_runners(access_token)
|
||||
runners = get_cached_runners(access_token)
|
||||
runner_ids = set(runner.name for runner in runners)
|
||||
# We used to delete potential hosts to terminate from GitHub runners pool,
|
||||
# but the documentation states:
|
||||
|
@ -12,7 +12,8 @@ echo "Running init script"
|
||||
export DEBIAN_FRONTEND=noninteractive
|
||||
export RUNNER_HOME=/home/ubuntu/actions-runner
|
||||
|
||||
export RUNNER_URL="https://github.com/ClickHouse"
|
||||
export RUNNER_ORG="ClickHouse"
|
||||
export RUNNER_URL="https://github.com/${RUNNER_ORG}"
|
||||
# Funny fact, but metadata service has fixed IP
|
||||
INSTANCE_ID=$(ec2metadata --instance-id)
|
||||
export INSTANCE_ID
|
||||
@ -102,7 +103,8 @@ check_proceed_spot_termination() {
|
||||
runner_pid=$(pgrep Runner.Listener)
|
||||
if [ -n "$runner_pid" ]; then
|
||||
# Kill the runner to not allow it cancelling the job
|
||||
kill -9 "$runner_pid"
|
||||
# shellcheck disable=SC2046
|
||||
kill -9 $(list_children "$runner_pid")
|
||||
fi
|
||||
sudo -u ubuntu ./config.sh remove --token "$(get_runner_token)"
|
||||
terminate_and_exit
|
||||
@ -171,6 +173,7 @@ set -uo pipefail
|
||||
|
||||
echo "Runner's public DNS: $(ec2metadata --public-hostname)"
|
||||
echo "Runner's labels: ${LABELS}"
|
||||
echo "Runner's instance type: $(ec2metadata --instance-type)"
|
||||
EOF
|
||||
|
||||
# Create a post-run script that will restart docker daemon before the job started
|
||||
@ -234,6 +237,19 @@ is_job_assigned() {
|
||||
|| return 1
|
||||
}
|
||||
|
||||
list_children () {
|
||||
local children
|
||||
children=$(ps --ppid "$1" -o pid=)
|
||||
if [ -z "$children" ]; then
|
||||
return
|
||||
fi
|
||||
|
||||
for pid in $children; do
|
||||
list_children "$pid"
|
||||
done
|
||||
echo "$children"
|
||||
}
|
||||
|
||||
while true; do
|
||||
runner_pid=$(pgrep Runner.Listener)
|
||||
echo "Got runner pid '$runner_pid'"
|
||||
@ -268,17 +284,11 @@ while true; do
|
||||
RUNNER_AGE=$(( $(date +%s) - $(stat -c +%Y /proc/"$runner_pid" 2>/dev/null || date +%s) ))
|
||||
echo "The runner is launched $RUNNER_AGE seconds ago and still has hot received the job"
|
||||
if (( 60 < RUNNER_AGE )); then
|
||||
echo "Check if the instance should tear down"
|
||||
if ! no_terminating_metadata; then
|
||||
# Another check if the worker still didn't start
|
||||
if is_job_assigned; then
|
||||
echo "During the metadata check the job was assigned, continue"
|
||||
continue
|
||||
fi
|
||||
kill -9 "$runner_pid"
|
||||
sudo -u ubuntu ./config.sh remove --token "$(get_runner_token)"
|
||||
terminate_on_event
|
||||
fi
|
||||
echo "Attempt to delete the runner for a graceful shutdown"
|
||||
sudo -u ubuntu ./config.sh remove --token "$(get_runner_token)" \
|
||||
|| continue
|
||||
echo "Runner didn't launch or have assigned jobs after ${RUNNER_AGE} seconds, shutting down"
|
||||
terminate_and_exit
|
||||
fi
|
||||
fi
|
||||
sleep 5
|
||||
|
Loading…
Reference in New Issue
Block a user