Add insert_file to ClickHouseHelper, make insert_json_str public

This commit is contained in:
Mikhail f. Shiryaev 2023-08-09 21:11:21 +02:00
parent 1405346830
commit 239d198cf2
No known key found for this signature in database
GPG Key ID: 4B02ED204C7D93F4

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python3
from typing import List
from pathlib import Path
from typing import Dict, List, Optional
import json
import logging
import time
@ -16,18 +17,78 @@ class InsertException(Exception):
class ClickHouseHelper:
def __init__(self, url=None):
def __init__(
self, url: Optional[str] = None, auth: Optional[Dict[str, str]] = None
):
if url is None:
url = get_parameter_from_ssm("clickhouse-test-stat-url")
self.url = url
self.auth = {
self.auth = auth or {
"X-ClickHouse-User": get_parameter_from_ssm("clickhouse-test-stat-login"),
"X-ClickHouse-Key": get_parameter_from_ssm("clickhouse-test-stat-password"),
}
@staticmethod
def _insert_json_str_info_impl(url, auth, db, table, json_str):
def insert_file(
url: str,
auth: Optional[Dict[str, str]],
query: str,
file: Path,
additional_options: Optional[Dict[str, str]] = None,
) -> None:
params = {
"query": query,
"date_time_input_format": "best_effort",
"send_logs_level": "warning",
}
if additional_options:
for k, v in additional_options.items():
params[k] = v
with open(file, "rb") as data_fd:
for i in range(5):
try:
response = requests.post(
url, params=params, data=data_fd, headers=auth
)
except Exception as e:
error = f"Received exception while sending data to {url} on {i} attempt: {e}"
logging.warning(error)
continue
logging.info("Response content '%s'", response.content)
if response.ok:
break
error = (
"Cannot insert data into clickhouse at try "
+ str(i)
+ ": HTTP code "
+ str(response.status_code)
+ ": '"
+ str(response.text)
+ "'"
)
if response.status_code >= 500:
# A retriable error
time.sleep(1)
continue
logging.info(
"Request headers '%s', body '%s'",
response.request.headers,
response.request.body,
)
raise InsertException(error)
else:
raise InsertException(error)
@staticmethod
def insert_json_str(url, auth, db, table, json_str):
params = {
"database": db,
"query": f"INSERT INTO {table} FORMAT JSONEachRow",
@ -76,7 +137,7 @@ class ClickHouseHelper:
raise InsertException(error)
def _insert_json_str_info(self, db, table, json_str):
self._insert_json_str_info_impl(self.url, self.auth, db, table, json_str)
self.insert_json_str(self.url, self.auth, db, table, json_str)
def insert_event_into(self, db, table, event, safe=True):
event_str = json.dumps(event)