mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-01 03:52:15 +00:00
102 lines
2.9 KiB
Python
102 lines
2.9 KiB
Python
import io
|
|
import os
|
|
import time
|
|
|
|
import pandas as pd
|
|
import requests
|
|
from requests.adapters import HTTPAdapter
|
|
from requests.packages.urllib3.util.retry import Retry
|
|
|
|
CLICKHOUSE_HOST = os.environ.get("CLICKHOUSE_HOST", "127.0.0.1")
|
|
CLICKHOUSE_PORT_HTTP = os.environ.get("CLICKHOUSE_PORT_HTTP", "8123")
|
|
CLICKHOUSE_SERVER_URL_STR = (
|
|
"http://" + ":".join(str(s) for s in [CLICKHOUSE_HOST, CLICKHOUSE_PORT_HTTP]) + "/"
|
|
)
|
|
CLICKHOUSE_DATABASE = os.environ.get("CLICKHOUSE_DATABASE", "test")
|
|
|
|
|
|
class ClickHouseClient:
|
|
def __init__(self, host=CLICKHOUSE_SERVER_URL_STR):
|
|
self.host = host
|
|
|
|
def query(
|
|
self,
|
|
query,
|
|
connection_timeout=500,
|
|
settings=dict(),
|
|
binary_result=False,
|
|
with_retries=True,
|
|
):
|
|
NUMBER_OF_TRIES = 30 if with_retries else 1
|
|
DELAY = 10
|
|
|
|
params = {
|
|
"timeout_before_checking_execution_speed": 120,
|
|
"max_execution_time": 6000,
|
|
"database": CLICKHOUSE_DATABASE,
|
|
}
|
|
|
|
# Add extra settings to params
|
|
params = {**params, **settings}
|
|
|
|
for i in range(NUMBER_OF_TRIES):
|
|
r = requests.post(
|
|
self.host, params=params, timeout=connection_timeout, data=query
|
|
)
|
|
if r.status_code == 200:
|
|
return r.content if binary_result else r.text
|
|
else:
|
|
if with_retries:
|
|
print("ATTENTION: try #%d failed" % i)
|
|
if i != (NUMBER_OF_TRIES - 1):
|
|
print(query)
|
|
print(r.text)
|
|
time.sleep(DELAY * (i + 1))
|
|
else:
|
|
raise ValueError(r.text)
|
|
|
|
def query_return_df(self, query, connection_timeout=500):
|
|
data = self.query(query, connection_timeout)
|
|
df = pd.read_csv(io.StringIO(data), sep="\t")
|
|
return df
|
|
|
|
def query_with_data(self, query, data, connection_timeout=500, settings=dict()):
|
|
params = {
|
|
"query": query,
|
|
"timeout_before_checking_execution_speed": 120,
|
|
"max_execution_time": 6000,
|
|
"database": CLICKHOUSE_DATABASE,
|
|
}
|
|
|
|
headers = {"Content-Type": "application/binary"}
|
|
|
|
# Add extra settings to params
|
|
params = {**params, **settings}
|
|
|
|
r = requests.post(
|
|
self.host,
|
|
params=params,
|
|
timeout=connection_timeout,
|
|
data=data,
|
|
headers=headers,
|
|
)
|
|
result = r.text
|
|
if r.status_code == 200:
|
|
return result
|
|
else:
|
|
raise ValueError(r.text)
|
|
|
|
|
|
def requests_session_with_retries(retries=3, timeout=180):
|
|
session = requests.Session()
|
|
retry = Retry(
|
|
total=retries,
|
|
read=retries,
|
|
connect=retries,
|
|
)
|
|
adapter = HTTPAdapter(max_retries=retry)
|
|
session.mount("http://", adapter)
|
|
session.mount("https://", adapter)
|
|
session.timeout = timeout
|
|
return session
|