mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-15 10:52:30 +00:00
547 lines
19 KiB
Python
Executable File
547 lines
19 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import argparse
|
|
import functools
|
|
import itertools
|
|
import logging
|
|
import math
|
|
import os
|
|
import pprint
|
|
import random
|
|
import re
|
|
import statistics
|
|
import string
|
|
import sys
|
|
import time
|
|
import traceback
|
|
import xml.etree.ElementTree as et
|
|
from threading import Thread
|
|
|
|
import clickhouse_driver
|
|
from scipy import stats
|
|
|
|
logging.basicConfig(
|
|
format="%(asctime)s: %(levelname)s: %(module)s: %(message)s", level="WARNING"
|
|
)
|
|
|
|
total_start_seconds = time.perf_counter()
|
|
stage_start_seconds = total_start_seconds
|
|
|
|
|
|
# Thread executor that does not hides exception that happens during function
|
|
# execution, and rethrows it after join()
|
|
class SafeThread(Thread):
|
|
run_exception = None
|
|
|
|
def run(self):
|
|
try:
|
|
super().run()
|
|
except:
|
|
self.run_exception = sys.exc_info()
|
|
|
|
def join(self):
|
|
super().join()
|
|
if self.run_exception:
|
|
raise self.run_exception[1]
|
|
|
|
|
|
def reportStageEnd(stage):
|
|
global stage_start_seconds, total_start_seconds
|
|
|
|
current = time.perf_counter()
|
|
print(
|
|
f"stage\t{stage}\t{current - stage_start_seconds:.3f}\t{current - total_start_seconds:.3f}"
|
|
)
|
|
stage_start_seconds = current
|
|
|
|
|
|
def tsv_escape(s):
|
|
return (
|
|
s.replace("\\", "\\\\")
|
|
.replace("\t", "\\t")
|
|
.replace("\n", "\\n")
|
|
.replace("\r", "")
|
|
)
|
|
|
|
|
|
parser = argparse.ArgumentParser(description="Run performance test.")
|
|
# Explicitly decode files as UTF-8 because sometimes we have Russian characters in queries, and LANG=C is set.
|
|
parser.add_argument(
|
|
"file",
|
|
metavar="FILE",
|
|
type=argparse.FileType("r", encoding="utf-8"),
|
|
nargs=1,
|
|
help="test description file",
|
|
)
|
|
parser.add_argument(
|
|
"--host",
|
|
nargs="*",
|
|
default=["localhost"],
|
|
help="Space-separated list of server hostname(s). Corresponds to '--port' options.",
|
|
)
|
|
parser.add_argument(
|
|
"--port",
|
|
nargs="*",
|
|
default=[9000],
|
|
help="Space-separated list of server port(s). Corresponds to '--host' options.",
|
|
)
|
|
parser.add_argument(
|
|
"--runs", type=int, default=1, help="Number of query runs per server."
|
|
)
|
|
parser.add_argument(
|
|
"--max-queries",
|
|
type=int,
|
|
default=None,
|
|
help="Test no more than this number of queries, chosen at random.",
|
|
)
|
|
parser.add_argument(
|
|
"--queries-to-run",
|
|
nargs="*",
|
|
type=int,
|
|
default=None,
|
|
help="Space-separated list of indexes of queries to test.",
|
|
)
|
|
parser.add_argument(
|
|
"--max-query-seconds",
|
|
type=int,
|
|
default=15,
|
|
help="For how many seconds at most a query is allowed to run. The script finishes with error if this time is exceeded.",
|
|
)
|
|
parser.add_argument(
|
|
"--prewarm-max-query-seconds",
|
|
type=int,
|
|
default=180,
|
|
help="For how many seconds at most a prewarm (cold storage) query is allowed to run. The script finishes with error if this time is exceeded.",
|
|
)
|
|
parser.add_argument(
|
|
"--profile-seconds",
|
|
type=int,
|
|
default=0,
|
|
help="For how many seconds to profile a query for which the performance has changed.",
|
|
)
|
|
parser.add_argument(
|
|
"--long", action="store_true", help="Do not skip the tests tagged as long."
|
|
)
|
|
parser.add_argument(
|
|
"--print-queries", action="store_true", help="Print test queries and exit."
|
|
)
|
|
parser.add_argument(
|
|
"--print-settings", action="store_true", help="Print test settings and exit."
|
|
)
|
|
parser.add_argument(
|
|
"--keep-created-tables",
|
|
action="store_true",
|
|
help="Don't drop the created tables after the test.",
|
|
)
|
|
parser.add_argument(
|
|
"--use-existing-tables",
|
|
action="store_true",
|
|
help="Don't create or drop the tables, use the existing ones instead.",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
reportStageEnd("start")
|
|
|
|
test_name = os.path.splitext(os.path.basename(args.file[0].name))[0]
|
|
|
|
tree = et.parse(args.file[0])
|
|
root = tree.getroot()
|
|
|
|
reportStageEnd("parse")
|
|
|
|
# Process query parameters
|
|
subst_elems = root.findall("substitutions/substitution")
|
|
available_parameters = {} # { 'table': ['hits_10m', 'hits_100m'], ... }
|
|
for e in subst_elems:
|
|
name = e.find("name").text
|
|
values = [v.text for v in e.findall("values/value")]
|
|
if not values:
|
|
raise Exception(f"No values given for substitution {{{name}}}")
|
|
|
|
available_parameters[name] = values
|
|
|
|
|
|
# Takes parallel lists of templates, substitutes them with all combos of
|
|
# parameters. The set of parameters is determined based on the first list.
|
|
# Note: keep the order of queries -- sometimes we have DROP IF EXISTS
|
|
# followed by CREATE in create queries section, so the order matters.
|
|
def substitute_parameters(query_templates, other_templates=[]):
|
|
query_results = []
|
|
other_results = [[]] * (len(other_templates))
|
|
for i, q in enumerate(query_templates):
|
|
# We need stable order of keys here, so that the order of substitutions
|
|
# is always the same, and the query indexes are consistent across test
|
|
# runs.
|
|
keys = sorted(set(n for _, n, _, _ in string.Formatter().parse(q) if n))
|
|
values = [available_parameters[k] for k in keys]
|
|
combos = itertools.product(*values)
|
|
for c in combos:
|
|
with_keys = dict(zip(keys, c))
|
|
query_results.append(q.format(**with_keys))
|
|
for j, t in enumerate(other_templates):
|
|
other_results[j].append(t[i].format(**with_keys))
|
|
if len(other_templates):
|
|
return query_results, other_results
|
|
else:
|
|
return query_results
|
|
|
|
|
|
# Build a list of test queries, substituting parameters to query templates,
|
|
test_queries = []
|
|
for e in root.findall("query"):
|
|
new_queries = substitute_parameters([e.text])
|
|
test_queries += new_queries
|
|
|
|
# If we're given a list of queries to run, check that it makes sense.
|
|
for i in args.queries_to_run or []:
|
|
if i < 0 or i >= len(test_queries):
|
|
print(
|
|
f"There is no query no. {i} in this test, only [{0}-{len(test_queries) - 1}] are present"
|
|
)
|
|
exit(1)
|
|
|
|
# If we're only asked to print the queries, do that and exit.
|
|
if args.print_queries:
|
|
for i in args.queries_to_run or range(0, len(test_queries)):
|
|
print(test_queries[i])
|
|
exit(0)
|
|
|
|
# If we're only asked to print the settings, do that and exit. These are settings
|
|
# for clickhouse-benchmark, so we print them as command line arguments, e.g.
|
|
# '--max_memory_usage=10000000'.
|
|
if args.print_settings:
|
|
for s in root.findall("settings/*"):
|
|
print(f"--{s.tag}={s.text}")
|
|
|
|
exit(0)
|
|
|
|
# Skip long tests
|
|
if not args.long:
|
|
for tag in root.findall(".//tag"):
|
|
if tag.text == "long":
|
|
print("skipped\tTest is tagged as long.")
|
|
sys.exit(0)
|
|
|
|
# Print report threshold for the test if it is set.
|
|
ignored_relative_change = 0.05
|
|
if "max_ignored_relative_change" in root.attrib:
|
|
ignored_relative_change = float(root.attrib["max_ignored_relative_change"])
|
|
print(f"report-threshold\t{ignored_relative_change}")
|
|
|
|
reportStageEnd("before-connect")
|
|
|
|
# Open connections
|
|
servers = [
|
|
{"host": host or args.host[0], "port": port or args.port[0]}
|
|
for (host, port) in itertools.zip_longest(args.host, args.port)
|
|
]
|
|
# Force settings_is_important to fail queries on unknown settings.
|
|
all_connections = [
|
|
clickhouse_driver.Client(**server, settings_is_important=True) for server in servers
|
|
]
|
|
|
|
for i, s in enumerate(servers):
|
|
print(f'server\t{i}\t{s["host"]}\t{s["port"]}')
|
|
|
|
reportStageEnd("connect")
|
|
|
|
if not args.use_existing_tables:
|
|
# Run drop queries, ignoring errors. Do this before all other activity,
|
|
# because clickhouse_driver disconnects on error (this is not configurable),
|
|
# and the new connection loses the changes in settings.
|
|
drop_query_templates = [q.text for q in root.findall("drop_query")]
|
|
drop_queries = substitute_parameters(drop_query_templates)
|
|
for conn_index, c in enumerate(all_connections):
|
|
for q in drop_queries:
|
|
try:
|
|
c.execute(q)
|
|
print(f"drop\t{conn_index}\t{c.last_query.elapsed}\t{tsv_escape(q)}")
|
|
except:
|
|
pass
|
|
|
|
reportStageEnd("drop-1")
|
|
|
|
# Apply settings.
|
|
settings = root.findall("settings/*")
|
|
for conn_index, c in enumerate(all_connections):
|
|
for s in settings:
|
|
# requires clickhouse-driver >= 1.1.5 to accept arbitrary new settings
|
|
# (https://github.com/mymarilyn/clickhouse-driver/pull/142)
|
|
c.settings[s.tag] = s.text
|
|
# We have to perform a query to make sure the settings work. Otherwise an
|
|
# unknown setting will lead to failing precondition check, and we will skip
|
|
# the test, which is wrong.
|
|
c.execute("select 1")
|
|
|
|
reportStageEnd("settings")
|
|
|
|
if not args.use_existing_tables:
|
|
# Run create and fill queries. We will run them simultaneously for both
|
|
# servers, to save time. The weird XML search + filter is because we want to
|
|
# keep the relative order of elements, and etree doesn't support the
|
|
# appropriate xpath query.
|
|
create_query_templates = [
|
|
q.text for q in root.findall("./*") if q.tag in ("create_query", "fill_query")
|
|
]
|
|
create_queries = substitute_parameters(create_query_templates)
|
|
|
|
# Disallow temporary tables, because the clickhouse_driver reconnects on
|
|
# errors, and temporary tables are destroyed. We want to be able to continue
|
|
# after some errors.
|
|
for q in create_queries:
|
|
if re.search("create temporary table", q, flags=re.IGNORECASE):
|
|
print(
|
|
f"Temporary tables are not allowed in performance tests: '{q}'",
|
|
file=sys.stderr,
|
|
)
|
|
sys.exit(1)
|
|
|
|
def do_create(connection, index, queries):
|
|
for q in queries:
|
|
connection.execute(q)
|
|
print(f"create\t{index}\t{connection.last_query.elapsed}\t{tsv_escape(q)}")
|
|
|
|
threads = [
|
|
SafeThread(target=do_create, args=(connection, index, create_queries))
|
|
for index, connection in enumerate(all_connections)
|
|
]
|
|
|
|
for t in threads:
|
|
t.start()
|
|
|
|
for t in threads:
|
|
t.join()
|
|
|
|
reportStageEnd("create")
|
|
|
|
# Let's sync the data to avoid writeback affects performance
|
|
os.system("sync")
|
|
reportStageEnd("sync")
|
|
|
|
# By default, test all queries.
|
|
queries_to_run = range(0, len(test_queries))
|
|
|
|
if args.max_queries:
|
|
# If specified, test a limited number of queries chosen at random.
|
|
queries_to_run = random.sample(
|
|
range(0, len(test_queries)), min(len(test_queries), args.max_queries)
|
|
)
|
|
|
|
if args.queries_to_run:
|
|
# Run the specified queries.
|
|
queries_to_run = args.queries_to_run
|
|
|
|
# Run test queries.
|
|
profile_total_seconds = 0
|
|
for query_index in queries_to_run:
|
|
q = test_queries[query_index]
|
|
query_prefix = f"{test_name}.query{query_index}"
|
|
|
|
# We have some crazy long queries (about 100kB), so trim them to a sane
|
|
# length. This means we can't use query text as an identifier and have to
|
|
# use the test name + the test-wide query index.
|
|
query_display_name = q
|
|
if len(query_display_name) > 1000:
|
|
query_display_name = f"{query_display_name[:1000]}...({query_index})"
|
|
|
|
print(f"display-name\t{query_index}\t{tsv_escape(query_display_name)}")
|
|
|
|
for conn_index, c in enumerate(all_connections):
|
|
try:
|
|
c.execute("SYSTEM JEMALLOC PURGE")
|
|
|
|
print(f"purging jemalloc arenas\t{conn_index}\t{c.last_query.elapsed}")
|
|
except KeyboardInterrupt:
|
|
raise
|
|
except:
|
|
continue
|
|
|
|
# Prewarm: run once on both servers. Helps to bring the data into memory,
|
|
# precompile the queries, etc.
|
|
# A query might not run on the old server if it uses a function added in the
|
|
# new one. We want to run them on the new server only, so that the PR author
|
|
# can ensure that the test works properly. Remember the errors we had on
|
|
# each server.
|
|
query_error_on_connection = [None] * len(all_connections)
|
|
for conn_index, c in enumerate(all_connections):
|
|
try:
|
|
prewarm_id = f"{query_prefix}.prewarm0"
|
|
|
|
try:
|
|
# During the warm-up runs, we will also:
|
|
# * detect queries that are exceedingly long, to fail fast,
|
|
# * collect profiler traces, which might be helpful for analyzing
|
|
# test coverage. We disable profiler for normal runs because
|
|
# it makes the results unstable.
|
|
res = c.execute(
|
|
q,
|
|
query_id=prewarm_id,
|
|
settings={
|
|
"max_execution_time": args.prewarm_max_query_seconds,
|
|
"query_profiler_real_time_period_ns": 10000000,
|
|
"query_profiler_cpu_time_period_ns": 10000000,
|
|
"metrics_perf_events_enabled": 1,
|
|
"memory_profiler_step": "4Mi",
|
|
},
|
|
)
|
|
except clickhouse_driver.errors.Error as e:
|
|
# Add query id to the exception to make debugging easier.
|
|
e.args = (prewarm_id, *e.args)
|
|
e.message = prewarm_id + ": " + e.message
|
|
raise
|
|
|
|
print(
|
|
f"prewarm\t{query_index}\t{prewarm_id}\t{conn_index}\t{c.last_query.elapsed}"
|
|
)
|
|
except KeyboardInterrupt:
|
|
raise
|
|
except:
|
|
# FIXME the driver reconnects on error and we lose settings, so this
|
|
# might lead to further errors or unexpected behavior.
|
|
query_error_on_connection[conn_index] = traceback.format_exc()
|
|
continue
|
|
|
|
# Report all errors that occurred during prewarm and decide what to do next.
|
|
# If prewarm fails for the query on all servers -- skip the query and
|
|
# continue testing the next query.
|
|
# If prewarm fails on one of the servers, run the query on the rest of them.
|
|
no_errors = []
|
|
for i, e in enumerate(query_error_on_connection):
|
|
if e:
|
|
print(e, file=sys.stderr)
|
|
else:
|
|
no_errors.append(i)
|
|
|
|
if len(no_errors) == 0:
|
|
continue
|
|
elif len(no_errors) < len(all_connections):
|
|
print(f"partial\t{query_index}\t{no_errors}")
|
|
|
|
this_query_connections = [all_connections[index] for index in no_errors]
|
|
|
|
# Now, perform measured runs.
|
|
# Track the time spent by the client to process this query, so that we can
|
|
# notice the queries that take long to process on the client side, e.g. by
|
|
# sending excessive data.
|
|
start_seconds = time.perf_counter()
|
|
server_seconds = 0
|
|
profile_seconds = 0
|
|
run = 0
|
|
|
|
# Arrays of run times for each connection.
|
|
all_server_times = []
|
|
for conn_index, c in enumerate(this_query_connections):
|
|
all_server_times.append([])
|
|
|
|
while True:
|
|
run_id = f"{query_prefix}.run{run}"
|
|
|
|
for conn_index, c in enumerate(this_query_connections):
|
|
try:
|
|
res = c.execute(
|
|
q,
|
|
query_id=run_id,
|
|
settings={"max_execution_time": args.max_query_seconds},
|
|
)
|
|
except clickhouse_driver.errors.Error as e:
|
|
# Add query id to the exception to make debugging easier.
|
|
e.args = (run_id, *e.args)
|
|
e.message = run_id + ": " + e.message
|
|
raise
|
|
|
|
elapsed = c.last_query.elapsed
|
|
all_server_times[conn_index].append(elapsed)
|
|
|
|
server_seconds += elapsed
|
|
print(f"query\t{query_index}\t{run_id}\t{conn_index}\t{elapsed}")
|
|
|
|
if elapsed > args.max_query_seconds:
|
|
# Do not stop processing pathologically slow queries,
|
|
# since this may hide errors in other queries.
|
|
print(
|
|
f"The query no. {query_index} is taking too long to run ({elapsed} s)",
|
|
file=sys.stderr,
|
|
)
|
|
|
|
# Be careful with the counter, after this line it's the next iteration
|
|
# already.
|
|
run += 1
|
|
|
|
avg_time_per_server = server_seconds / len(this_query_connections)
|
|
|
|
# We break if all the min stop conditions are met (1 second arg.runs iterations)
|
|
# or at lest one of the max stop conditions is met (8 seconds or 500 iterations)
|
|
if (avg_time_per_server >= 1 and run >= args.runs) or (
|
|
avg_time_per_server >= 8 or run >= 500
|
|
):
|
|
break
|
|
|
|
client_seconds = time.perf_counter() - start_seconds
|
|
print(f"client-time\t{query_index}\t{client_seconds}\t{server_seconds}")
|
|
|
|
# Run additional profiling queries to collect profile data, but only if test times appeared to be different.
|
|
# We have to do it after normal runs because otherwise it will affect test statistics too much
|
|
if len(all_server_times) != 2:
|
|
continue
|
|
|
|
if len(all_server_times[0]) < 3:
|
|
# Don't fail if for some reason there are not enough measurements.
|
|
continue
|
|
|
|
pvalue = stats.ttest_ind(
|
|
all_server_times[0], all_server_times[1], equal_var=False
|
|
).pvalue
|
|
median = [statistics.median(t) for t in all_server_times]
|
|
# Keep this consistent with the value used in report. Should eventually move
|
|
# to (median[1] - median[0]) / min(median), which is compatible with "times"
|
|
# difference we use in report (max(median) / min(median)).
|
|
relative_diff = (median[1] - median[0]) / median[0]
|
|
print(f"diff\t{query_index}\t{median[0]}\t{median[1]}\t{relative_diff}\t{pvalue}")
|
|
if abs(relative_diff) < ignored_relative_change or pvalue > 0.05:
|
|
continue
|
|
|
|
# Perform profile runs for fixed amount of time. Don't limit the number
|
|
# of runs, because we also have short queries.
|
|
profile_start_seconds = time.perf_counter()
|
|
run = 0
|
|
while time.perf_counter() - profile_start_seconds < args.profile_seconds:
|
|
run_id = f"{query_prefix}.profile{run}"
|
|
|
|
for conn_index, c in enumerate(this_query_connections):
|
|
try:
|
|
res = c.execute(
|
|
q,
|
|
query_id=run_id,
|
|
settings={
|
|
"query_profiler_real_time_period_ns": 10000000,
|
|
"query_profiler_cpu_time_period_ns": 10000000,
|
|
"metrics_perf_events_enabled": 1,
|
|
},
|
|
)
|
|
print(
|
|
f"profile\t{query_index}\t{run_id}\t{conn_index}\t{c.last_query.elapsed}"
|
|
)
|
|
except clickhouse_driver.errors.Error as e:
|
|
# Add query id to the exception to make debugging easier.
|
|
e.args = (run_id, *e.args)
|
|
e.message = run_id + ": " + e.message
|
|
raise
|
|
|
|
run += 1
|
|
|
|
profile_total_seconds += time.perf_counter() - profile_start_seconds
|
|
|
|
print(f"profile-total\t{profile_total_seconds}")
|
|
|
|
reportStageEnd("run")
|
|
|
|
# Run drop queries
|
|
if not args.keep_created_tables and not args.use_existing_tables:
|
|
drop_queries = substitute_parameters(drop_query_templates)
|
|
for conn_index, c in enumerate(all_connections):
|
|
for q in drop_queries:
|
|
c.execute(q)
|
|
print(f"drop\t{conn_index}\t{c.last_query.elapsed}\t{tsv_escape(q)}")
|
|
|
|
reportStageEnd("drop-2")
|