CI: CiBuddy with channel dispatcher

This commit is contained in:
Max Kainov 2024-08-06 16:55:04 +02:00
parent 77fd21555f
commit f2591bd1a6
2 changed files with 81 additions and 25 deletions

View File

@ -1,7 +1,7 @@
import argparse
import json
import os
from typing import Union, Dict
from typing import Union, Dict, List
import boto3
import requests
@ -9,20 +9,44 @@ from botocore.exceptions import ClientError
from pr_info import PRInfo
from ci_config import CI
from ci_utils import WithIter
class Channels(metaclass=WithIter):
# Channel names must match json keys in ParameterStore
ALERTS = "alerts-channel"
INFO = "info-channel"
DRY_RUN = "dry-ryn-channel"
DEFAULT = "default"
class CIBuddy:
Channels = Channels
_HEADERS = {"Content-Type": "application/json"}
def __init__(self, dry_run=False):
self.repo = os.getenv("GITHUB_REPOSITORY", "")
self.dry_run = dry_run
res = self._get_webhooks()
self.test_channel = ""
self.dev_ci_channel = ""
self.channels = {}
if res:
self.test_channel = json.loads(res)["test_channel"]
self.dev_ci_channel = json.loads(res)["ci_channel"]
channels = json.loads(res)
for channel in Channels:
if channel in channels:
self.channels[channel] = channels[channel]
for channel in Channels:
if channel not in self.channels:
if Channels.DEFAULT in self.channels:
print(
f"ERROR: missing config for channel [{channel}] - will use default channel instead"
)
self.channels[channel] = self.channels[Channels.DEFAULT]
else:
print(
f"ERROR: missing config for channel [{channel}] - will disable notification"
)
self.channels[channel] = ""
self.job_name = os.getenv("CHECK_NAME", "unknown")
pr_info = PRInfo()
self.pr_number = pr_info.number
@ -63,22 +87,33 @@ class CIBuddy:
return json_string
def post(self, message, dry_run=None):
if dry_run is None:
dry_run = self.dry_run
print(f"Posting slack message, dry_run [{dry_run}]")
if dry_run:
url = self.test_channel
def post(self, message: str, channels: List[str]) -> None:
print(f"Posting slack message, dry_run [{self.dry_run}]")
if self.dry_run:
urls = [self.channels[Channels.DRY_RUN]]
else:
url = self.dev_ci_channel
urls = []
for channel in channels:
url = self.channels[channel]
if url:
urls.append(url)
else:
print(f"WARNING: no channel config for [{channel}] - skip")
data = {"text": message}
try:
requests.post(url, headers=self._HEADERS, data=json.dumps(data), timeout=10)
for url in urls:
requests.post(
url, headers=self._HEADERS, data=json.dumps(data), timeout=10
)
except Exception as e:
print(f"ERROR: Failed to post message, ex {e}")
def _post_formatted(
self, title: str, body: Union[Dict, str], with_wf_link: bool
self,
title: str,
body: Union[Dict, str],
with_wf_link: bool,
channels: Union[List[str], str],
) -> None:
message = title
if isinstance(body, dict):
@ -96,31 +131,49 @@ class CIBuddy:
run_id = os.getenv("GITHUB_RUN_ID", "")
if with_wf_link and run_id:
message += f" *workflow*: <https://github.com/{self.repo}/actions/runs/{run_id}|{run_id}>\n"
self.post(message)
self.post(
message, channels=[channels] if isinstance(channels, str) else channels
)
def post_info(
self, title: str, body: Union[Dict, str], with_wf_link: bool = True
self,
title: str,
body: Union[Dict, str],
with_wf_link: bool = True,
channels: Union[List[str], str] = Channels.INFO,
) -> None:
title_extended = f":white_circle: *{title}*\n\n"
self._post_formatted(title_extended, body, with_wf_link)
self._post_formatted(title_extended, body, with_wf_link, channels=channels)
def post_done(
self, title: str, body: Union[Dict, str], with_wf_link: bool = True
self,
title: str,
body: Union[Dict, str],
with_wf_link: bool = True,
channels: Union[List[str], str] = Channels.INFO,
) -> None:
title_extended = f":white_check_mark: *{title}*\n\n"
self._post_formatted(title_extended, body, with_wf_link)
self._post_formatted(title_extended, body, with_wf_link, channels=channels)
def post_warning(
self, title: str, body: Union[Dict, str], with_wf_link: bool = True
self,
title: str,
body: Union[Dict, str],
with_wf_link: bool = True,
channels: Union[List[str], str] = Channels.ALERTS,
) -> None:
title_extended = f":warning: *{title}*\n\n"
self._post_formatted(title_extended, body, with_wf_link)
self._post_formatted(title_extended, body, with_wf_link, channels=channels)
def post_critical(
self, title: str, body: Union[Dict, str], with_wf_link: bool = True
self,
title: str,
body: Union[Dict, str],
with_wf_link: bool = True,
channels: Union[List[str], str] = Channels.ALERTS,
) -> None:
title_extended = f":black_circle: *{title}*\n\n"
self._post_formatted(title_extended, body, with_wf_link)
self._post_formatted(title_extended, body, with_wf_link, channels=channels)
def post_job_error(
self,
@ -129,6 +182,7 @@ class CIBuddy:
with_instance_info: bool = True,
with_wf_link: bool = True,
critical: bool = False,
channel: Union[List[str], str] = Channels.ALERTS,
) -> None:
instance_id, instance_type = "unknown", "unknown"
if with_instance_info:
@ -159,7 +213,7 @@ class CIBuddy:
run_id = os.getenv("GITHUB_RUN_ID", "")
if with_wf_link and run_id:
message += f" *workflow*: <https://github.com/{self.repo}/actions/runs/{run_id}|{run_id}>\n"
self.post(message)
self.post(message, channels=[channel] if isinstance(channel, str) else channel)
def parse_args():

View File

@ -821,7 +821,9 @@ if __name__ == "__main__":
else:
title = "Failed: " + title
CIBuddy(dry_run=args.dry_run).post_critical(
title, dataclasses.asdict(release_info)
title,
dataclasses.asdict(release_info),
channels=[CIBuddy.Channels.ALERTS, CIBuddy.Channels.INFO],
)
if args.set_progress_started: