ClickHouse/tests/ci/get_robot_token.py

91 lines
2.5 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
import logging
2022-11-10 16:11:23 +00:00
from dataclasses import dataclass
2023-08-23 19:24:33 +00:00
from typing import Any, Dict, List, Optional
import boto3 # type: ignore
2022-11-10 16:11:23 +00:00
from github import Github
from github.AuthenticatedUser import AuthenticatedUser
@dataclass
class Token:
user: AuthenticatedUser
value: str
rest: int
2023-08-23 19:24:33 +00:00
def get_parameter_from_ssm(
name: str, decrypt: bool = True, client: Optional[Any] = None
) -> str:
2021-10-20 11:48:27 +00:00
if not client:
client = boto3.client("ssm", region_name="us-east-1")
2023-08-23 19:24:33 +00:00
return client.get_parameter( # type:ignore
Name=name, WithDecryption=decrypt
)[
"Parameter"
]["Value"]
def get_parameters_from_ssm(
names: List[str], decrypt: bool = True, client: Optional[Any] = None
) -> Dict[str, str]:
if not client:
client = boto3.client("ssm", region_name="us-east-1")
names = list(set(names))
results = {} # type: Dict[str,str]
i = 0
while (i) * 10 < len(names):
# the get_parameters returns up to 10 values, so the call is split by 10
results.update(
**{
p["Name"]: p["Value"]
for p in client.get_parameters(
Names=names[i * 10 : (i + 1) * 10], WithDecryption=decrypt
)["Parameters"]
}
)
i += 1
return results
2021-10-20 11:48:27 +00:00
2023-04-28 14:01:53 +00:00
ROBOT_TOKEN = None # type: Optional[Token]
def get_best_robot_token(tokens_path: str = "/github-tokens") -> str:
2023-04-28 14:01:53 +00:00
global ROBOT_TOKEN
if ROBOT_TOKEN is not None:
return ROBOT_TOKEN.value
client = boto3.client("ssm", region_name="us-east-1")
tokens = {
p["Name"]: p["Value"]
for p in client.get_parameters_by_path(Path=tokens_path, WithDecryption=True)[
"Parameters"
]
}
assert tokens
2023-08-23 19:24:33 +00:00
for value in tokens.values():
gh = Github(value, per_page=100)
2022-11-22 12:37:08 +00:00
# Do not spend additional request to API by accessin user.login unless
# the token is chosen by the remaining requests number
user = gh.get_user()
rest, _ = gh.rate_limiting
2022-11-22 12:37:08 +00:00
logging.info("Get token with %s remaining requests", rest)
2023-04-28 14:01:53 +00:00
if ROBOT_TOKEN is None:
ROBOT_TOKEN = Token(user, value, rest)
2022-11-10 16:11:23 +00:00
continue
2023-04-28 14:01:53 +00:00
if ROBOT_TOKEN.rest < rest:
ROBOT_TOKEN.user, ROBOT_TOKEN.value, ROBOT_TOKEN.rest = user, value, rest
2023-04-28 14:01:53 +00:00
assert ROBOT_TOKEN
logging.info(
2023-04-28 14:01:53 +00:00
"User %s with %s remaining requests is used",
ROBOT_TOKEN.user.login,
ROBOT_TOKEN.rest,
)
2023-04-28 14:01:53 +00:00
return ROBOT_TOKEN.value