Move insert part for ClickHouseHelper to shared

This commit is contained in:
Mikhail f. Shiryaev 2023-05-26 17:17:49 +02:00
parent 2dca0eac1b
commit e8b03d7498
No known key found for this signature in database
GPG Key ID: 4B02ED204C7D93F4
3 changed files with 110 additions and 148 deletions

View File

@ -5,7 +5,7 @@ import json
import logging
import time
from collections import namedtuple
from typing import Any, List, Optional
from typing import Any, Dict, Iterable, List, Optional
import boto3 # type: ignore
import requests # type: ignore
@ -36,10 +36,14 @@ class CHException(Exception):
pass
class InsertException(CHException):
pass
class ClickHouseHelper:
def __init__(
self,
url: Optional[str] = None,
url: str,
user: Optional[str] = None,
password: Optional[str] = None,
):
@ -50,6 +54,89 @@ class ClickHouseHelper:
if password:
self.auth["X-ClickHouse-Key"] = password
@staticmethod
def _insert_json_str_info_impl(
url: str, auth: Dict[str, str], db: str, table: str, json_str: str
) -> None:
params = {
"database": db,
"query": f"INSERT INTO {table} FORMAT JSONEachRow",
"date_time_input_format": "best_effort",
"send_logs_level": "warning",
}
for i in range(5):
try:
response = requests.post(
url, params=params, data=json_str, headers=auth
)
except Exception as e:
error = f"Received exception while sending data to {url} on {i} attempt: {e}"
logging.warning(error)
continue
logging.info("Response content '%s'", response.content)
if response.ok:
break
error = (
"Cannot insert data into clickhouse at try "
+ str(i)
+ ": HTTP code "
+ str(response.status_code)
+ ": '"
+ str(response.text)
+ "'"
)
if response.status_code >= 500:
# A retriable error
time.sleep(1)
continue
logging.info(
"Request headers '%s', body '%s'",
response.request.headers,
response.request.body,
)
raise InsertException(error)
else:
raise InsertException(error)
def _insert_json_str_info(self, db: str, table: str, json_str: str) -> None:
self._insert_json_str_info_impl(self.url, self.auth, db, table, json_str)
def insert_event_into(
self, db: str, table: str, event: object, safe: bool = True
) -> None:
event_str = json.dumps(event)
try:
self._insert_json_str_info(db, table, event_str)
except InsertException as e:
logging.error(
"Exception happened during inserting data into clickhouse: %s", e
)
if not safe:
raise
def insert_events_into(
self, db: str, table: str, events: Iterable[object], safe: bool = True
) -> None:
jsons = []
for event in events:
jsons.append(json.dumps(event))
try:
self._insert_json_str_info(db, table, ",".join(jsons))
except InsertException as e:
logging.error(
"Exception happened during inserting data into clickhouse: %s", e
)
if not safe:
raise
def _select_and_get_json_each_row(self, db: str, query: str) -> str:
params = {
"database": db,

View File

@ -10,13 +10,11 @@ fields for private repositories
from base64 import b64decode
from dataclasses import dataclass
from typing import Any, List
from typing import Any, List, Optional
import json
import logging
import time
import boto3 # type: ignore
import requests # type: ignore
from lambda_shared import ClickHouseHelper, InsertException, get_parameter_from_ssm
logging.getLogger().setLevel(logging.INFO)
@ -66,137 +64,7 @@ class WorkflowJob:
return self.__dict__
### VENDORING
def get_parameter_from_ssm(name, decrypt=True, client=None):
if not client:
client = boto3.client("ssm", region_name="us-east-1")
return client.get_parameter(Name=name, WithDecryption=decrypt)["Parameter"]["Value"]
class InsertException(Exception):
pass
class ClickHouseHelper:
def __init__(self, url=None):
if url is None:
url = get_parameter_from_ssm("clickhouse-test-stat-url")
self.url = url
self.auth = {
"X-ClickHouse-User": get_parameter_from_ssm("clickhouse-test-stat-login"),
"X-ClickHouse-Key": get_parameter_from_ssm("clickhouse-test-stat-password"),
}
@staticmethod
def _insert_json_str_info_impl(url, auth, db, table, json_str):
params = {
"database": db,
"query": f"INSERT INTO {table} FORMAT JSONEachRow",
"date_time_input_format": "best_effort",
"send_logs_level": "warning",
}
for i in range(5):
try:
response = requests.post(
url, params=params, data=json_str, headers=auth
)
except Exception as e:
error = f"Received exception while sending data to {url} on {i} attempt: {e}"
logging.warning(error)
continue
logging.info("Response content '%s'", response.content)
if response.ok:
break
error = (
"Cannot insert data into clickhouse at try "
+ str(i)
+ ": HTTP code "
+ str(response.status_code)
+ ": '"
+ str(response.text)
+ "'"
)
if response.status_code >= 500:
# A retriable error
time.sleep(1)
continue
logging.info(
"Request headers '%s', body '%s'",
response.request.headers,
response.request.body,
)
raise InsertException(error)
else:
raise InsertException(error)
def _insert_json_str_info(self, db, table, json_str):
self._insert_json_str_info_impl(self.url, self.auth, db, table, json_str)
def insert_event_into(self, db, table, event, safe=True):
event_str = json.dumps(event)
try:
self._insert_json_str_info(db, table, event_str)
except InsertException as e:
logging.error(
"Exception happened during inserting data into clickhouse: %s", e
)
if not safe:
raise
def insert_events_into(self, db, table, events, safe=True):
jsons = []
for event in events:
jsons.append(json.dumps(event))
try:
self._insert_json_str_info(db, table, ",".join(jsons))
except InsertException as e:
logging.error(
"Exception happened during inserting data into clickhouse: %s", e
)
if not safe:
raise
def _select_and_get_json_each_row(self, db, query):
params = {
"database": db,
"query": query,
"default_format": "JSONEachRow",
}
for i in range(5):
response = None
try:
response = requests.get(self.url, params=params, headers=self.auth)
response.raise_for_status()
return response.text
except Exception as ex:
logging.warning("Cannot insert with exception %s", str(ex))
if response:
logging.warning("Reponse text %s", response.text)
time.sleep(0.1 * i)
raise Exception("Cannot fetch data from clickhouse")
def select_json_each_row(self, db, query):
text = self._select_and_get_json_each_row(db, query)
result = []
for line in text.split("\n"):
if line:
result.append(json.loads(line))
return result
### VENDORING END
clickhouse_client = ClickHouseHelper()
CH_CLIENT = None # type: Optional[ClickHouseHelper]
def send_event_workflow_job(workflow_job: WorkflowJob) -> None:
@ -232,23 +100,30 @@ def send_event_workflow_job(workflow_job: WorkflowJob) -> None:
# PARTITION BY toStartOfMonth(started_at)
# ORDER BY (id, updated_at)
# SETTINGS index_granularity = 8192
global clickhouse_client
kwargs = {
"db": "default",
"table": "workflow_jobs",
"event": workflow_job.as_dict(),
"safe": False,
}
global CH_CLIENT
CH_CLIENT = CH_CLIENT or ClickHouseHelper(
get_parameter_from_ssm("clickhouse-test-stat-url"),
get_parameter_from_ssm("clickhouse-test-stat-login"),
get_parameter_from_ssm("clickhouse-test-stat-password"),
)
try:
clickhouse_client.insert_event_into(**kwargs)
CH_CLIENT.insert_event_into(
"default", "workflow_jobs", workflow_job.as_dict(), False
)
except InsertException as ex:
logging.exception(
"Got an exception on insert, tryuing to update the client "
"credentials and repeat",
exc_info=ex,
)
clickhouse_client = ClickHouseHelper()
clickhouse_client.insert_event_into(**kwargs)
CH_CLIENT = ClickHouseHelper(
get_parameter_from_ssm("clickhouse-test-stat-url"),
get_parameter_from_ssm("clickhouse-test-stat-login"),
get_parameter_from_ssm("clickhouse-test-stat-password"),
)
CH_CLIENT.insert_event_into(
"default", "workflow_jobs", workflow_job.as_dict(), False
)
def handler(event: dict, context: Any) -> dict:

View File

@ -1 +1 @@
requests<2.30
../lambda_shared_package