mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-12 01:12:12 +00:00
279 lines
8.6 KiB
Python
279 lines
8.6 KiB
Python
from operator import eq
|
|
import os
|
|
import random
|
|
import time
|
|
import sys
|
|
from clickhouse_driver import Client
|
|
import numpy as np
|
|
import subprocess
|
|
import multiprocessing
|
|
from multiprocessing import Manager
|
|
|
|
warmup_runs = 10
|
|
calculated_runs = 10
|
|
seconds = 30
|
|
max_instances_number = 8
|
|
retest_number = 3
|
|
retest_tolerance = 10
|
|
|
|
|
|
def checkInt(str):
|
|
try:
|
|
int(str)
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
|
|
|
|
def setup_client(index):
|
|
if index < 4:
|
|
port_idx = index
|
|
else:
|
|
port_idx = index + 4
|
|
client = Client(
|
|
host="localhost",
|
|
database="default",
|
|
user="default",
|
|
password="",
|
|
port="900%d" % port_idx,
|
|
)
|
|
union_mode_query = "SET union_default_mode='DISTINCT'"
|
|
client.execute(union_mode_query)
|
|
return client
|
|
|
|
|
|
def warm_client(clientN, clientL, query, loop):
|
|
for c_idx in range(clientN):
|
|
for _ in range(loop):
|
|
clientL[c_idx].execute(query)
|
|
|
|
|
|
def read_queries(queries_list):
|
|
queries = list()
|
|
queries_id = list()
|
|
with open(queries_list, "r") as f:
|
|
for line in f:
|
|
line = line.rstrip()
|
|
line = line.split("$")
|
|
queries_id.append(line[0])
|
|
queries.append(line[1])
|
|
return queries_id, queries
|
|
|
|
|
|
def run_task(client, cname, query, loop, query_latency):
|
|
start_time = time.time()
|
|
for i in range(loop):
|
|
client.execute(query)
|
|
query_latency.append(client.last_query.elapsed)
|
|
|
|
end_time = time.time()
|
|
p95 = np.percentile(query_latency, 95)
|
|
print(
|
|
"CLIENT: {0} end. -> P95: %f, qps: %f".format(cname)
|
|
% (p95, loop / (end_time - start_time))
|
|
)
|
|
|
|
|
|
def run_multi_clients(clientN, clientList, query, loop):
|
|
client_pids = {}
|
|
start_time = time.time()
|
|
manager = multiprocessing.Manager()
|
|
query_latency_list0 = manager.list()
|
|
query_latency_list1 = manager.list()
|
|
query_latency_list2 = manager.list()
|
|
query_latency_list3 = manager.list()
|
|
query_latency_list4 = manager.list()
|
|
query_latency_list5 = manager.list()
|
|
query_latency_list6 = manager.list()
|
|
query_latency_list7 = manager.list()
|
|
|
|
for c_idx in range(clientN):
|
|
client_name = "Role_%d" % c_idx
|
|
if c_idx == 0:
|
|
client_pids[c_idx] = multiprocessing.Process(
|
|
target=run_task,
|
|
args=(clientList[c_idx], client_name, query, loop, query_latency_list0),
|
|
)
|
|
elif c_idx == 1:
|
|
client_pids[c_idx] = multiprocessing.Process(
|
|
target=run_task,
|
|
args=(clientList[c_idx], client_name, query, loop, query_latency_list1),
|
|
)
|
|
elif c_idx == 2:
|
|
client_pids[c_idx] = multiprocessing.Process(
|
|
target=run_task,
|
|
args=(clientList[c_idx], client_name, query, loop, query_latency_list2),
|
|
)
|
|
elif c_idx == 3:
|
|
client_pids[c_idx] = multiprocessing.Process(
|
|
target=run_task,
|
|
args=(clientList[c_idx], client_name, query, loop, query_latency_list3),
|
|
)
|
|
elif c_idx == 4:
|
|
client_pids[c_idx] = multiprocessing.Process(
|
|
target=run_task,
|
|
args=(clientList[c_idx], client_name, query, loop, query_latency_list4),
|
|
)
|
|
elif c_idx == 5:
|
|
client_pids[c_idx] = multiprocessing.Process(
|
|
target=run_task,
|
|
args=(clientList[c_idx], client_name, query, loop, query_latency_list5),
|
|
)
|
|
elif c_idx == 6:
|
|
client_pids[c_idx] = multiprocessing.Process(
|
|
target=run_task,
|
|
args=(clientList[c_idx], client_name, query, loop, query_latency_list6),
|
|
)
|
|
elif c_idx == 7:
|
|
client_pids[c_idx] = multiprocessing.Process(
|
|
target=run_task,
|
|
args=(clientList[c_idx], client_name, query, loop, query_latency_list7),
|
|
)
|
|
else:
|
|
print("ERROR: CLIENT number dismatch!!")
|
|
exit()
|
|
print("CLIENT: %s start" % client_name)
|
|
client_pids[c_idx].start()
|
|
|
|
for c_idx in range(clientN):
|
|
client_pids[c_idx].join()
|
|
end_time = time.time()
|
|
totalT = end_time - start_time
|
|
|
|
query_latencyTotal = list()
|
|
for item in query_latency_list0:
|
|
query_latencyTotal.append(item)
|
|
for item in query_latency_list1:
|
|
query_latencyTotal.append(item)
|
|
for item in query_latency_list2:
|
|
query_latencyTotal.append(item)
|
|
for item in query_latency_list3:
|
|
query_latencyTotal.append(item)
|
|
for item in query_latency_list4:
|
|
query_latencyTotal.append(item)
|
|
for item in query_latency_list5:
|
|
query_latencyTotal.append(item)
|
|
for item in query_latency_list6:
|
|
query_latencyTotal.append(item)
|
|
for item in query_latency_list7:
|
|
query_latencyTotal.append(item)
|
|
|
|
totalP95 = np.percentile(query_latencyTotal, 95) * 1000
|
|
return totalT, totalP95
|
|
|
|
|
|
def run_task_caculated(client, cname, query, loop):
|
|
query_latency = list()
|
|
start_time = time.time()
|
|
for i in range(loop):
|
|
client.execute(query)
|
|
query_latency.append(client.last_query.elapsed)
|
|
end_time = time.time()
|
|
p95 = np.percentile(query_latency, 95)
|
|
|
|
|
|
def run_multi_clients_caculated(clientN, clientList, query, loop):
|
|
client_pids = {}
|
|
start_time = time.time()
|
|
for c_idx in range(clientN):
|
|
client_name = "Role_%d" % c_idx
|
|
client_pids[c_idx] = multiprocessing.Process(
|
|
target=run_task_caculated,
|
|
args=(clientList[c_idx], client_name, query, loop),
|
|
)
|
|
client_pids[c_idx].start()
|
|
for c_idx in range(clientN):
|
|
client_pids[c_idx].join()
|
|
end_time = time.time()
|
|
totalT = end_time - start_time
|
|
return totalT
|
|
|
|
|
|
if __name__ == "__main__":
|
|
client_number = 1
|
|
queries = list()
|
|
queries_id = list()
|
|
|
|
if len(sys.argv) != 3:
|
|
print(
|
|
"usage: python3 client_stressing_test.py [queries_file_path] [client_number]"
|
|
)
|
|
sys.exit()
|
|
else:
|
|
queries_list = sys.argv[1]
|
|
client_number = int(sys.argv[2])
|
|
print(
|
|
"queries_file_path: %s, client_number: %d" % (queries_list, client_number)
|
|
)
|
|
if not os.path.isfile(queries_list) or not os.access(queries_list, os.R_OK):
|
|
print("please check the right path for queries file")
|
|
sys.exit()
|
|
if (
|
|
not checkInt(sys.argv[2])
|
|
or int(sys.argv[2]) > max_instances_number
|
|
or int(sys.argv[2]) < 1
|
|
):
|
|
print("client_number should be in [1~%d]" % max_instances_number)
|
|
sys.exit()
|
|
|
|
client_list = {}
|
|
queries_id, queries = read_queries(queries_list)
|
|
|
|
for c_idx in range(client_number):
|
|
client_list[c_idx] = setup_client(c_idx)
|
|
# clear cache
|
|
os.system("sync; echo 3 > /proc/sys/vm/drop_caches")
|
|
|
|
print("###Polit Run Begin")
|
|
for i in queries:
|
|
warm_client(client_number, client_list, i, 1)
|
|
print("###Polit Run End -> Start stressing....")
|
|
|
|
query_index = 0
|
|
for q in queries:
|
|
print(
|
|
"\n###START -> Index: %d, ID: %s, Query: %s"
|
|
% (query_index, queries_id[query_index], q)
|
|
)
|
|
warm_client(client_number, client_list, q, warmup_runs)
|
|
print("###Warm Done!")
|
|
for j in range(0, retest_number):
|
|
totalT = run_multi_clients_caculated(
|
|
client_number, client_list, q, calculated_runs
|
|
)
|
|
curr_loop = int(seconds * calculated_runs / totalT) + 1
|
|
print(
|
|
"###Calculation Done! -> loopN: %d, expected seconds:%d"
|
|
% (curr_loop, seconds)
|
|
)
|
|
|
|
print("###Stress Running! -> %d iterations......" % curr_loop)
|
|
|
|
totalT, totalP95 = run_multi_clients(
|
|
client_number, client_list, q, curr_loop
|
|
)
|
|
|
|
if totalT > (seconds - retest_tolerance) and totalT < (
|
|
seconds + retest_tolerance
|
|
):
|
|
break
|
|
else:
|
|
print(
|
|
"###totalT:%d is far way from expected seconds:%d. Run again ->j:%d!"
|
|
% (totalT, seconds, j)
|
|
)
|
|
|
|
print(
|
|
"###Completed! -> ID: %s, clientN: %d, totalT: %.2f s, latencyAVG: %.2f ms, P95: %.2f ms, QPS_Final: %.2f"
|
|
% (
|
|
queries_id[query_index],
|
|
client_number,
|
|
totalT,
|
|
totalT * 1000 / (curr_loop * client_number),
|
|
totalP95,
|
|
((curr_loop * client_number) / totalT),
|
|
)
|
|
)
|
|
query_index += 1
|
|
print("###Finished!")
|