mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Merge pull request #44520 from ClickHouse/lambda-workflow-jobs
Add the lambda to collect data for workflow_jobs
This commit is contained in:
commit
fb1e7fcd7c
297
tests/ci/workflow_jobs_lambda/app.py
Normal file
297
tests/ci/workflow_jobs_lambda/app.py
Normal file
@ -0,0 +1,297 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
"""
|
||||
Lambda gets the workflow_job events, see
|
||||
https://docs.github.com/en/developers/webhooks-and-events/webhooks/webhook-events-and-payloads#workflow_job
|
||||
|
||||
Then it either posts it as is to the play.clickhouse.com, or anonymizes the sensitive
|
||||
fields for private repositories
|
||||
"""
|
||||
|
||||
from base64 import b64decode
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, List
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
|
||||
import boto3 # type: ignore
|
||||
import requests # type: ignore
|
||||
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
|
||||
|
||||
@dataclass
|
||||
class WorkflowJob:
|
||||
id: int
|
||||
run_id: int
|
||||
workflow_name: str
|
||||
head_branch: str
|
||||
run_url: str
|
||||
run_attempt: int
|
||||
node_id: str
|
||||
head_sha: str
|
||||
url: str
|
||||
html_url: str
|
||||
status: str
|
||||
conclusion: str
|
||||
started_at: str
|
||||
completed_at: str
|
||||
name: str
|
||||
steps: int # just number of steps, we don't keep steps
|
||||
check_run_url: str
|
||||
labels: List[str]
|
||||
runner_id: int
|
||||
runner_name: str
|
||||
runner_group_id: int
|
||||
runner_group_name: str
|
||||
repository: str
|
||||
|
||||
def anonimyze_url(self, url: str) -> str:
|
||||
return url.replace(self.repository, "ANONYMIZED_REPO")
|
||||
|
||||
def anonimyze(self):
|
||||
anm = "ANONYMIZED"
|
||||
self.workflow_name = anm
|
||||
self.head_branch = anm
|
||||
self.run_url = self.anonimyze_url(self.run_url)
|
||||
self.node_id = anm
|
||||
self.url = self.anonimyze_url(self.url)
|
||||
self.html_url = self.anonimyze_url(self.html_url)
|
||||
self.name = anm
|
||||
self.check_run_url = self.anonimyze_url(self.check_run_url)
|
||||
self.repository = anm
|
||||
|
||||
def as_dict(self) -> dict:
|
||||
return self.__dict__
|
||||
|
||||
|
||||
### VENDORING
|
||||
def get_parameter_from_ssm(name, decrypt=True, client=None):
|
||||
if not client:
|
||||
client = boto3.client("ssm", region_name="us-east-1")
|
||||
return client.get_parameter(Name=name, WithDecryption=decrypt)["Parameter"]["Value"]
|
||||
|
||||
|
||||
class InsertException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class ClickHouseHelper:
|
||||
def __init__(self, url=None):
|
||||
if url is None:
|
||||
url = get_parameter_from_ssm("clickhouse-test-stat-url")
|
||||
|
||||
self.url = url
|
||||
self.auth = {
|
||||
"X-ClickHouse-User": get_parameter_from_ssm("clickhouse-test-stat-login"),
|
||||
"X-ClickHouse-Key": get_parameter_from_ssm("clickhouse-test-stat-password"),
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _insert_json_str_info_impl(url, auth, db, table, json_str):
|
||||
params = {
|
||||
"database": db,
|
||||
"query": f"INSERT INTO {table} FORMAT JSONEachRow",
|
||||
"date_time_input_format": "best_effort",
|
||||
"send_logs_level": "warning",
|
||||
}
|
||||
|
||||
for i in range(5):
|
||||
try:
|
||||
response = requests.post(
|
||||
url, params=params, data=json_str, headers=auth
|
||||
)
|
||||
except Exception as e:
|
||||
error = f"Received exception while sending data to {url} on {i} attempt: {e}"
|
||||
logging.warning(error)
|
||||
continue
|
||||
|
||||
logging.info("Response content '%s'", response.content)
|
||||
|
||||
if response.ok:
|
||||
break
|
||||
|
||||
error = (
|
||||
"Cannot insert data into clickhouse at try "
|
||||
+ str(i)
|
||||
+ ": HTTP code "
|
||||
+ str(response.status_code)
|
||||
+ ": '"
|
||||
+ str(response.text)
|
||||
+ "'"
|
||||
)
|
||||
|
||||
if response.status_code >= 500:
|
||||
# A retriable error
|
||||
time.sleep(1)
|
||||
continue
|
||||
|
||||
logging.info(
|
||||
"Request headers '%s', body '%s'",
|
||||
response.request.headers,
|
||||
response.request.body,
|
||||
)
|
||||
|
||||
raise InsertException(error)
|
||||
else:
|
||||
raise InsertException(error)
|
||||
|
||||
def _insert_json_str_info(self, db, table, json_str):
|
||||
self._insert_json_str_info_impl(self.url, self.auth, db, table, json_str)
|
||||
|
||||
def insert_event_into(self, db, table, event, safe=True):
|
||||
event_str = json.dumps(event)
|
||||
try:
|
||||
self._insert_json_str_info(db, table, event_str)
|
||||
except InsertException as e:
|
||||
logging.error(
|
||||
"Exception happened during inserting data into clickhouse: %s", e
|
||||
)
|
||||
if not safe:
|
||||
raise
|
||||
|
||||
def insert_events_into(self, db, table, events, safe=True):
|
||||
jsons = []
|
||||
for event in events:
|
||||
jsons.append(json.dumps(event))
|
||||
|
||||
try:
|
||||
self._insert_json_str_info(db, table, ",".join(jsons))
|
||||
except InsertException as e:
|
||||
logging.error(
|
||||
"Exception happened during inserting data into clickhouse: %s", e
|
||||
)
|
||||
if not safe:
|
||||
raise
|
||||
|
||||
def _select_and_get_json_each_row(self, db, query):
|
||||
params = {
|
||||
"database": db,
|
||||
"query": query,
|
||||
"default_format": "JSONEachRow",
|
||||
}
|
||||
for i in range(5):
|
||||
response = None
|
||||
try:
|
||||
response = requests.get(self.url, params=params, headers=self.auth)
|
||||
response.raise_for_status()
|
||||
return response.text
|
||||
except Exception as ex:
|
||||
logging.warning("Cannot insert with exception %s", str(ex))
|
||||
if response:
|
||||
logging.warning("Reponse text %s", response.text)
|
||||
time.sleep(0.1 * i)
|
||||
|
||||
raise Exception("Cannot fetch data from clickhouse")
|
||||
|
||||
def select_json_each_row(self, db, query):
|
||||
text = self._select_and_get_json_each_row(db, query)
|
||||
result = []
|
||||
for line in text.split("\n"):
|
||||
if line:
|
||||
result.append(json.loads(line))
|
||||
return result
|
||||
|
||||
|
||||
### VENDORING END
|
||||
|
||||
clickhouse_client = ClickHouseHelper()
|
||||
|
||||
|
||||
def send_event_workflow_job(workflow_job: WorkflowJob) -> None:
|
||||
# # SHOW CREATE TABLE default.workflow_jobs
|
||||
# CREATE TABLE default.workflow_jobs UUID 'c0351924-8ccd-47a6-9db0-e28a9eee2fdf'
|
||||
# (
|
||||
# `id` UInt64,
|
||||
# `run_id` UInt64,
|
||||
# `workflow_name` LowCardinality(String),
|
||||
# `head_branch` LowCardinality(String),
|
||||
# `run_url` String,
|
||||
# `run_attempt` UInt16,
|
||||
# `node_id` String,
|
||||
# `head_sha` String,
|
||||
# `url` String,
|
||||
# `html_url` String,
|
||||
# `status` Enum8('queued' = 1, 'in_progress' = 2, 'completed' = 3, 'waiting' = 4),
|
||||
# `conclusion` LowCardinality(String),
|
||||
# `started_at` DateTime,
|
||||
# `completed_at` DateTime,
|
||||
# `name` LowCardinality(String),
|
||||
# `steps` UInt16,
|
||||
# `check_run_url` String,
|
||||
# `labels` Array(LowCardinality(String)),
|
||||
# `runner_id` UInt64,
|
||||
# `runner_name` String,
|
||||
# `runner_group_id` UInt64,
|
||||
# `runner_group_name` LowCardinality(String),
|
||||
# `repository` LowCardinality(String),
|
||||
# `updated_at` DateTime DEFAULT now()
|
||||
# )
|
||||
# ENGINE = ReplicatedMergeTree('/clickhouse/tables/c0351924-8ccd-47a6-9db0-e28a9eee2fdf/{shard}', '{replica}')
|
||||
# PARTITION BY toStartOfMonth(started_at)
|
||||
# ORDER BY (id, updated_at)
|
||||
# SETTINGS index_granularity = 8192
|
||||
global clickhouse_client
|
||||
kwargs = {
|
||||
"db": "default",
|
||||
"table": "workflow_jobs",
|
||||
"event": workflow_job.as_dict(),
|
||||
"safe": False,
|
||||
}
|
||||
try:
|
||||
clickhouse_client.insert_event_into(**kwargs)
|
||||
except InsertException as ex:
|
||||
logging.exception(
|
||||
"Got an exception on insert, tryuing to update the client "
|
||||
"credentials and repeat",
|
||||
exc_info=ex,
|
||||
)
|
||||
clickhouse_client = ClickHouseHelper()
|
||||
clickhouse_client.insert_event_into(**kwargs)
|
||||
|
||||
|
||||
def handler(event: dict, _: Any) -> dict:
|
||||
if event["isBase64Encoded"]:
|
||||
event_data = json.loads(b64decode(event["body"]))
|
||||
else:
|
||||
event_data = json.loads(event["body"])
|
||||
|
||||
repo = event_data["repository"]
|
||||
wf_job = event_data["workflow_job"]
|
||||
workflow_job = WorkflowJob(
|
||||
wf_job["id"],
|
||||
wf_job["run_id"],
|
||||
wf_job["workflow_name"] or "", # nullable
|
||||
wf_job["head_branch"],
|
||||
wf_job["run_url"],
|
||||
wf_job["run_attempt"],
|
||||
wf_job["node_id"],
|
||||
wf_job["head_sha"],
|
||||
wf_job["url"],
|
||||
wf_job["html_url"],
|
||||
wf_job["status"],
|
||||
wf_job["conclusion"] or "", # nullable
|
||||
wf_job["started_at"],
|
||||
wf_job["completed_at"] or "1970-01-01T00:00:00", # nullable date
|
||||
wf_job["name"],
|
||||
len(wf_job["steps"]),
|
||||
wf_job["check_run_url"],
|
||||
wf_job["labels"],
|
||||
wf_job["runner_id"] or 0, # nullable
|
||||
wf_job["runner_name"] or "", # nullable
|
||||
wf_job["runner_group_id"] or 0, # nullable
|
||||
wf_job["runner_group_name"] or "", # nullable
|
||||
repo["full_name"],
|
||||
)
|
||||
if repo["private"]:
|
||||
workflow_job.anonimyze()
|
||||
|
||||
logging.info("Got the next event: %s", workflow_job)
|
||||
send_event_workflow_job(workflow_job)
|
||||
|
||||
return {
|
||||
"statusCode": 200,
|
||||
"headers": {"Content-Type": "application/json"},
|
||||
"body": '{"status": "OK"}',
|
||||
}
|
1
tests/ci/workflow_jobs_lambda/build_and_deploy_archive.sh
Symbolic link
1
tests/ci/workflow_jobs_lambda/build_and_deploy_archive.sh
Symbolic link
@ -0,0 +1 @@
|
||||
../team_keys_lambda/build_and_deploy_archive.sh
|
1
tests/ci/workflow_jobs_lambda/requirements.txt
Normal file
1
tests/ci/workflow_jobs_lambda/requirements.txt
Normal file
@ -0,0 +1 @@
|
||||
requests
|
Loading…
Reference in New Issue
Block a user