ClickHouse/tests/ci/clickhouse_helper.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

238 lines
7.4 KiB
Python
Raw Normal View History

2021-11-19 14:47:04 +00:00
#!/usr/bin/env python3
from pathlib import Path
from typing import Dict, List, Optional
2021-11-19 14:47:04 +00:00
import json
import logging
import time
2021-11-19 14:47:04 +00:00
import requests # type: ignore
2021-11-19 14:47:04 +00:00
from get_robot_token import get_parameter_from_ssm
from pr_info import PRInfo
from report import TestResults
2021-11-19 14:47:04 +00:00
class InsertException(Exception):
pass
2021-11-19 14:47:04 +00:00
class ClickHouseHelper:
def __init__(
self, url: Optional[str] = None, auth: Optional[Dict[str, str]] = None
):
2021-11-19 14:47:04 +00:00
if url is None:
2022-03-29 18:46:48 +00:00
url = get_parameter_from_ssm("clickhouse-test-stat-url")
2022-03-29 18:32:28 +00:00
self.url = url
self.auth = auth or {
2022-03-29 18:58:16 +00:00
"X-ClickHouse-User": get_parameter_from_ssm("clickhouse-test-stat-login"),
2022-03-30 10:55:49 +00:00
"X-ClickHouse-Key": get_parameter_from_ssm("clickhouse-test-stat-password"),
2022-03-29 18:32:28 +00:00
}
2021-11-19 14:47:04 +00:00
2021-11-25 08:52:00 +00:00
@staticmethod
def insert_file(
url: str,
auth: Optional[Dict[str, str]],
query: str,
file: Path,
additional_options: Optional[Dict[str, str]] = None,
) -> None:
params = {
"query": query,
"date_time_input_format": "best_effort",
"send_logs_level": "warning",
}
if additional_options:
for k, v in additional_options.items():
params[k] = v
with open(file, "rb") as data_fd:
ClickHouseHelper._insert_post(
url, params=params, data=data_fd, headers=auth
)
@staticmethod
def insert_json_str(url, auth, db, table, json_str):
2021-11-19 14:47:04 +00:00
params = {
"database": db,
2022-05-19 15:54:56 +00:00
"query": f"INSERT INTO {table} FORMAT JSONEachRow",
"date_time_input_format": "best_effort",
"send_logs_level": "warning",
2021-11-19 14:47:04 +00:00
}
ClickHouseHelper._insert_post(url, params=params, data=json_str, headers=auth)
@staticmethod
def _insert_post(*args, **kwargs):
url = ""
if args:
url = args[0]
url = kwargs.get("url", url)
kwargs["timeout"] = kwargs.get("timeout", 100)
2021-11-19 14:47:04 +00:00
for i in range(5):
2022-08-01 12:59:13 +00:00
try:
response = requests.post(*args, **kwargs)
2022-08-01 12:59:13 +00:00
except Exception as e:
2022-10-26 09:09:01 +00:00
error = f"Received exception while sending data to {url} on {i} attempt: {e}"
logging.warning(error)
2022-08-01 12:59:13 +00:00
continue
2021-11-19 14:47:04 +00:00
logging.info("Response content '%s'", response.content)
if response.ok:
break
error = (
f"Cannot insert data into clickhouse at try {i}: HTTP code "
f"{response.status_code}: '{response.text}'"
)
2021-11-19 14:47:04 +00:00
if response.status_code >= 500:
2023-08-07 21:09:34 +00:00
# A retryable error
2021-11-19 14:47:04 +00:00
time.sleep(1)
continue
logging.info(
"Request headers '%s', body '%s'",
response.request.headers,
response.request.body,
)
2021-11-19 14:47:04 +00:00
raise InsertException(error)
2021-11-19 14:47:04 +00:00
else:
raise InsertException(error)
2021-11-19 14:47:04 +00:00
def _insert_json_str_info(self, db, table, json_str):
self.insert_json_str(self.url, self.auth, db, table, json_str)
def insert_event_into(self, db, table, event, safe=True):
2021-11-19 14:47:04 +00:00
event_str = json.dumps(event)
try:
self._insert_json_str_info(db, table, event_str)
except InsertException as e:
logging.error(
"Exception happened during inserting data into clickhouse: %s", e
)
if not safe:
raise
2021-11-19 14:47:04 +00:00
def insert_events_into(self, db, table, events, safe=True):
2021-11-19 14:47:04 +00:00
jsons = []
for event in events:
jsons.append(json.dumps(event))
try:
self._insert_json_str_info(db, table, ",".join(jsons))
except InsertException as e:
logging.error(
"Exception happened during inserting data into clickhouse: %s", e
)
if not safe:
raise
2021-11-19 14:47:04 +00:00
def _select_and_get_json_each_row(self, db, query):
params = {
"database": db,
"query": query,
"default_format": "JSONEachRow",
2021-11-19 14:47:04 +00:00
}
for i in range(5):
response = None
try:
2022-05-19 16:23:20 +00:00
response = requests.get(self.url, params=params, headers=self.auth)
2021-11-19 14:47:04 +00:00
response.raise_for_status()
return response.text
except Exception as ex:
logging.warning("Cannot insert with exception %s", str(ex))
if response:
logging.warning("Reponse text %s", response.text)
time.sleep(0.1 * i)
raise Exception("Cannot fetch data from clickhouse")
2021-11-19 14:47:04 +00:00
def select_json_each_row(self, db, query):
text = self._select_and_get_json_each_row(db, query)
result = []
for line in text.split("\n"):
2021-11-19 14:47:04 +00:00
if line:
result.append(json.loads(line))
return result
# Obtain the machine type from IMDS:
def get_instance_type():
url = "http://169.254.169.254/latest/meta-data/instance-type"
for i in range(5):
try:
response = requests.get(url, timeout=1)
if response.status_code == 200:
return response.text
except Exception as e:
2023-07-30 07:45:27 +00:00
error = (
f"Received exception while sending data to {url} on {i} attempt: {e}"
)
logging.warning(error)
continue
2023-07-30 07:45:27 +00:00
return ""
2021-11-19 14:47:04 +00:00
def prepare_tests_results_for_clickhouse(
pr_info: PRInfo,
test_results: TestResults,
check_status: str,
check_duration: float,
check_start_time: str,
report_url: str,
check_name: str,
) -> List[dict]:
2021-11-19 14:47:04 +00:00
pull_request_url = "https://github.com/ClickHouse/ClickHouse/commits/master"
base_ref = "master"
head_ref = "master"
base_repo = pr_info.repo_full_name
head_repo = pr_info.repo_full_name
if pr_info.number != 0:
pull_request_url = pr_info.pr_html_url
base_ref = pr_info.base_ref
base_repo = pr_info.base_name
head_ref = pr_info.head_ref
head_repo = pr_info.head_name
common_properties = dict(
pull_request_number=pr_info.number,
commit_sha=pr_info.sha,
commit_url=pr_info.commit_html_url,
check_name=check_name,
check_status=check_status,
check_duration_ms=int(float(check_duration) * 1000),
check_start_time=check_start_time,
report_url=report_url,
pull_request_url=pull_request_url,
base_ref=base_ref,
base_repo=base_repo,
head_ref=head_ref,
head_repo=head_repo,
task_url=pr_info.task_url,
instance_type=get_instance_type(),
2021-11-19 14:47:04 +00:00
)
# Always publish a total record for all checks. For checks with individual
# tests, also publish a record per test.
result = [common_properties]
for test_result in test_results:
current_row = common_properties.copy()
test_name = test_result.name
test_status = test_result.status
2021-11-19 14:47:04 +00:00
test_time = test_result.time or 0
current_row["test_duration_ms"] = int(test_time * 1000)
current_row["test_name"] = test_name
current_row["test_status"] = test_status
if test_result.raw_logs:
# Protect from too big blobs that contain garbage
2023-02-02 00:59:49 +00:00
current_row["test_context_raw"] = test_result.raw_logs[: 32 * 1024]
else:
current_row["test_context_raw"] = ""
2021-11-19 14:47:04 +00:00
result.append(current_row)
return result