ClickHouse/tests/performance/scripts/perf.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

536 lines
18 KiB
Python
Raw Normal View History

2021-01-13 14:28:36 +00:00
#!/usr/bin/env python3
import argparse
2020-09-02 03:29:36 +00:00
import clickhouse_driver
import itertools
import functools
import math
import os
import pprint
import random
import re
2020-09-02 03:29:36 +00:00
import statistics
2020-03-03 13:38:45 +00:00
import string
2020-09-02 03:29:36 +00:00
import sys
2020-01-16 19:39:07 +00:00
import time
import traceback
2020-11-21 10:58:15 +00:00
import logging
2020-09-02 03:29:36 +00:00
import xml.etree.ElementTree as et
2020-09-23 08:21:55 +00:00
from threading import Thread
from scipy import stats
logging.basicConfig(
format="%(asctime)s: %(levelname)s: %(module)s: %(message)s", level="WARNING"
)
2020-09-24 11:46:03 +00:00
total_start_seconds = time.perf_counter()
stage_start_seconds = total_start_seconds
# Thread executor that does not hides exception that happens during function
# execution, and rethrows it after join()
class SafeThread(Thread):
run_exception = None
def run(self):
try:
super().run()
except:
self.run_exception = sys.exc_info()
def join(self):
super().join()
if self.run_exception:
raise self.run_exception[1]
2020-09-24 11:46:03 +00:00
def reportStageEnd(stage):
global stage_start_seconds, total_start_seconds
current = time.perf_counter()
print(
f"stage\t{stage}\t{current - stage_start_seconds:.3f}\t{current - total_start_seconds:.3f}"
)
2020-09-24 11:46:03 +00:00
stage_start_seconds = current
2020-05-25 01:03:21 +00:00
def tsv_escape(s):
return (
s.replace("\\", "\\\\")
.replace("\t", "\\t")
.replace("\n", "\\n")
.replace("\r", "")
)
2020-05-25 01:03:21 +00:00
2020-09-24 11:46:03 +00:00
parser = argparse.ArgumentParser(description="Run performance test.")
2020-01-10 14:06:07 +00:00
# Explicitly decode files as UTF-8 because sometimes we have Russian characters in queries, and LANG=C is set.
parser.add_argument(
"file",
metavar="FILE",
type=argparse.FileType("r", encoding="utf-8"),
nargs=1,
help="test description file",
)
parser.add_argument(
"--host",
nargs="*",
default=["localhost"],
help="Space-separated list of server hostname(s). Corresponds to '--port' options.",
)
parser.add_argument(
"--port",
nargs="*",
default=[9000],
help="Space-separated list of server port(s). Corresponds to '--host' options.",
)
parser.add_argument(
"--runs", type=int, default=1, help="Number of query runs per server."
)
parser.add_argument(
"--max-queries",
type=int,
default=None,
help="Test no more than this number of queries, chosen at random.",
)
parser.add_argument(
"--queries-to-run",
nargs="*",
type=int,
default=None,
help="Space-separated list of indexes of queries to test.",
)
parser.add_argument(
"--max-query-seconds",
type=int,
default=15,
help="For how many seconds at most a query is allowed to run. The script finishes with error if this time is exceeded.",
)
parser.add_argument(
"--prewarm-max-query-seconds",
type=int,
default=180,
help="For how many seconds at most a prewarm (cold storage) query is allowed to run. The script finishes with error if this time is exceeded.",
)
parser.add_argument(
"--profile-seconds",
type=int,
default=0,
help="For how many seconds to profile a query for which the performance has changed.",
)
parser.add_argument(
"--long", action="store_true", help="Do not skip the tests tagged as long."
)
parser.add_argument(
"--print-queries", action="store_true", help="Print test queries and exit."
)
parser.add_argument(
"--print-settings", action="store_true", help="Print test settings and exit."
)
parser.add_argument(
"--keep-created-tables",
action="store_true",
help="Don't drop the created tables after the test.",
)
parser.add_argument(
"--use-existing-tables",
action="store_true",
help="Don't create or drop the tables, use the existing ones instead.",
)
args = parser.parse_args()
reportStageEnd("start")
2020-09-24 11:46:03 +00:00
test_name = os.path.splitext(os.path.basename(args.file[0].name))[0]
tree = et.parse(args.file[0])
root = tree.getroot()
reportStageEnd("parse")
2020-09-24 11:46:03 +00:00
2020-03-03 13:38:45 +00:00
# Process query parameters
subst_elems = root.findall("substitutions/substitution")
available_parameters = {} # { 'table': ['hits_10m', 'hits_100m'], ... }
2020-03-03 13:38:45 +00:00
for e in subst_elems:
name = e.find("name").text
values = [v.text for v in e.findall("values/value")]
2021-04-15 16:40:49 +00:00
if not values:
raise Exception(f"No values given for substitution {{{name}}}")
2021-04-15 16:40:49 +00:00
available_parameters[name] = values
2020-06-23 12:09:54 +00:00
# Takes parallel lists of templates, substitutes them with all combos of
# parameters. The set of parameters is determined based on the first list.
# Note: keep the order of queries -- sometimes we have DROP IF EXISTS
2020-02-21 19:58:52 +00:00
# followed by CREATE in create queries section, so the order matters.
def substitute_parameters(query_templates, other_templates=[]):
2020-06-23 12:09:54 +00:00
query_results = []
other_results = [[]] * (len(other_templates))
for i, q in enumerate(query_templates):
2021-04-27 21:58:37 +00:00
# We need stable order of keys here, so that the order of substitutions
# is always the same, and the query indexes are consistent across test
# runs.
keys = sorted(set(n for _, n, _, _ in string.Formatter().parse(q) if n))
2020-03-03 13:38:45 +00:00
values = [available_parameters[k] for k in keys]
2020-06-23 12:09:54 +00:00
combos = itertools.product(*values)
for c in combos:
with_keys = dict(zip(keys, c))
query_results.append(q.format(**with_keys))
for j, t in enumerate(other_templates):
other_results[j].append(t[i].format(**with_keys))
if len(other_templates):
return query_results, other_results
2020-06-23 12:09:54 +00:00
else:
return query_results
# Build a list of test queries, substituting parameters to query templates,
test_queries = []
for e in root.findall("query"):
2022-04-29 02:39:21 +00:00
new_queries = substitute_parameters([e.text])
test_queries += new_queries
2020-09-28 12:09:08 +00:00
# If we're given a list of queries to run, check that it makes sense.
for i in args.queries_to_run or []:
if i < 0 or i >= len(test_queries):
print(
f"There is no query no. {i} in this test, only [{0}-{len(test_queries) - 1}] are present"
)
2020-09-28 12:09:08 +00:00
exit(1)
2020-09-28 12:09:08 +00:00
# If we're only asked to print the queries, do that and exit.
2020-06-23 12:30:45 +00:00
if args.print_queries:
2020-09-28 12:09:08 +00:00
for i in args.queries_to_run or range(0, len(test_queries)):
print(test_queries[i])
exit(0)
2020-06-23 12:30:45 +00:00
# If we're only asked to print the settings, do that and exit. These are settings
# for clickhouse-benchmark, so we print them as command line arguments, e.g.
# '--max_memory_usage=10000000'.
if args.print_settings:
for s in root.findall("settings/*"):
print(f"--{s.tag}={s.text}")
2020-06-23 12:30:45 +00:00
exit(0)
2020-03-03 10:47:32 +00:00
# Skip long tests
if not args.long:
for tag in root.findall(".//tag"):
if tag.text == "long":
print("skipped\tTest is tagged as long.")
sys.exit(0)
2020-03-03 10:47:32 +00:00
2020-04-28 07:45:35 +00:00
# Print report threshold for the test if it is set.
2020-09-29 19:06:45 +00:00
ignored_relative_change = 0.05
if "max_ignored_relative_change" in root.attrib:
2020-09-29 19:06:45 +00:00
ignored_relative_change = float(root.attrib["max_ignored_relative_change"])
print(f"report-threshold\t{ignored_relative_change}")
2020-04-28 07:45:35 +00:00
reportStageEnd("before-connect")
2020-09-24 11:46:03 +00:00
# Open connections
servers = [
{"host": host or args.host[0], "port": port or args.port[0]}
for (host, port) in itertools.zip_longest(args.host, args.port)
]
2020-11-26 21:16:41 +00:00
# Force settings_is_important to fail queries on unknown settings.
all_connections = [
clickhouse_driver.Client(**server, settings_is_important=True) for server in servers
]
2020-09-28 18:43:47 +00:00
for i, s in enumerate(servers):
print(f'server\t{i}\t{s["host"]}\t{s["port"]}')
2020-02-14 12:55:47 +00:00
reportStageEnd("connect")
2020-09-24 11:46:03 +00:00
if not args.use_existing_tables:
# Run drop queries, ignoring errors. Do this before all other activity,
# because clickhouse_driver disconnects on error (this is not configurable),
# and the new connection loses the changes in settings.
drop_query_templates = [q.text for q in root.findall("drop_query")]
drop_queries = substitute_parameters(drop_query_templates)
for conn_index, c in enumerate(all_connections):
for q in drop_queries:
try:
c.execute(q)
print(f"drop\t{conn_index}\t{c.last_query.elapsed}\t{tsv_escape(q)}")
except:
pass
reportStageEnd("drop-1")
2020-09-24 11:46:03 +00:00
2020-06-05 12:53:47 +00:00
# Apply settings.
settings = root.findall("settings/*")
2020-09-02 03:29:36 +00:00
for conn_index, c in enumerate(all_connections):
2020-03-02 15:05:58 +00:00
for s in settings:
# requires clickhouse-driver >= 1.1.5 to accept arbitrary new settings
# (https://github.com/mymarilyn/clickhouse-driver/pull/142)
c.settings[s.tag] = s.text
# We have to perform a query to make sure the settings work. Otherwise an
# unknown setting will lead to failing precondition check, and we will skip
# the test, which is wrong.
c.execute("select 1")
2020-03-02 15:05:58 +00:00
reportStageEnd("settings")
2020-09-24 11:46:03 +00:00
if not args.use_existing_tables:
# Run create and fill queries. We will run them simultaneously for both
# servers, to save time. The weird XML search + filter is because we want to
# keep the relative order of elements, and etree doesn't support the
# appropriate xpath query.
create_query_templates = [
q.text for q in root.findall("./*") if q.tag in ("create_query", "fill_query")
]
create_queries = substitute_parameters(create_query_templates)
# Disallow temporary tables, because the clickhouse_driver reconnects on
# errors, and temporary tables are destroyed. We want to be able to continue
# after some errors.
for q in create_queries:
if re.search("create temporary table", q, flags=re.IGNORECASE):
print(
f"Temporary tables are not allowed in performance tests: '{q}'",
file=sys.stderr,
)
sys.exit(1)
def do_create(connection, index, queries):
for q in queries:
connection.execute(q)
print(f"create\t{index}\t{connection.last_query.elapsed}\t{tsv_escape(q)}")
threads = [
SafeThread(target=do_create, args=(connection, index, create_queries))
for index, connection in enumerate(all_connections)
]
for t in threads:
t.start()
for t in threads:
t.join()
reportStageEnd("create")
2020-09-24 11:46:03 +00:00
# Let's sync the data to avoid writeback affects performance
os.system("sync")
reportStageEnd("sync")
# By default, test all queries.
2020-09-24 13:24:44 +00:00
queries_to_run = range(0, len(test_queries))
if args.max_queries:
# If specified, test a limited number of queries chosen at random.
queries_to_run = random.sample(
range(0, len(test_queries)), min(len(test_queries), args.max_queries)
)
2020-09-24 13:24:44 +00:00
if args.queries_to_run:
2020-09-28 12:09:08 +00:00
# Run the specified queries.
2020-09-24 13:24:44 +00:00
queries_to_run = args.queries_to_run
2020-06-23 12:09:54 +00:00
# Run test queries.
2020-09-25 09:52:09 +00:00
profile_total_seconds = 0
for query_index in queries_to_run:
q = test_queries[query_index]
query_prefix = f"{test_name}.query{query_index}"
2020-04-29 09:28:12 +00:00
# We have some crazy long queries (about 100kB), so trim them to a sane
# length. This means we can't use query text as an identifier and have to
# use the test name + the test-wide query index.
2020-04-29 09:28:12 +00:00
query_display_name = q
if len(query_display_name) > 1000:
query_display_name = f"{query_display_name[:1000]}...({query_index})"
2020-04-29 09:28:12 +00:00
print(f"display-name\t{query_index}\t{tsv_escape(query_display_name)}")
2020-04-29 09:28:12 +00:00
2020-02-11 20:00:53 +00:00
# Prewarm: run once on both servers. Helps to bring the data into memory,
2020-02-11 15:01:16 +00:00
# precompile the queries, etc.
# A query might not run on the old server if it uses a function added in the
# new one. We want to run them on the new server only, so that the PR author
# can ensure that the test works properly. Remember the errors we had on
# each server.
query_error_on_connection = [None] * len(all_connections)
2020-09-02 03:29:36 +00:00
for conn_index, c in enumerate(all_connections):
try:
prewarm_id = f"{query_prefix}.prewarm0"
try:
# During the warm-up runs, we will also:
2021-05-15 06:43:56 +00:00
# * detect queries that are exceedingly long, to fail fast,
# * collect profiler traces, which might be helpful for analyzing
# test coverage. We disable profiler for normal runs because
# it makes the results unstable.
res = c.execute(
q,
query_id=prewarm_id,
settings={
"max_execution_time": args.prewarm_max_query_seconds,
"query_profiler_real_time_period_ns": 10000000,
"query_profiler_cpu_time_period_ns": 10000000,
"metrics_perf_events_enabled": 1,
"memory_profiler_step": "4Mi",
},
)
except clickhouse_driver.errors.Error as e:
# Add query id to the exception to make debugging easier.
e.args = (prewarm_id, *e.args)
e.message = prewarm_id + ": " + e.message
raise
print(
f"prewarm\t{query_index}\t{prewarm_id}\t{conn_index}\t{c.last_query.elapsed}"
)
except KeyboardInterrupt:
raise
except:
# FIXME the driver reconnects on error and we lose settings, so this
# might lead to further errors or unexpected behavior.
query_error_on_connection[conn_index] = traceback.format_exc()
continue
# Report all errors that occurred during prewarm and decide what to do next.
2020-08-12 05:39:29 +00:00
# If prewarm fails for the query on all servers -- skip the query and
# continue testing the next query.
# If prewarm fails on one of the servers, run the query on the rest of them.
2020-08-12 05:39:29 +00:00
no_errors = []
for i, e in enumerate(query_error_on_connection):
if e:
print(e, file=sys.stderr)
2020-08-12 05:39:29 +00:00
else:
no_errors.append(i)
if len(no_errors) == 0:
continue
2020-09-02 03:29:36 +00:00
elif len(no_errors) < len(all_connections):
print(f"partial\t{query_index}\t{no_errors}")
2020-09-02 03:29:36 +00:00
this_query_connections = [all_connections[index] for index in no_errors]
2020-02-11 15:01:16 +00:00
# Now, perform measured runs.
# Track the time spent by the client to process this query, so that we can
# notice the queries that take long to process on the client side, e.g. by
# sending excessive data.
2020-01-16 19:39:07 +00:00
start_seconds = time.perf_counter()
server_seconds = 0
profile_seconds = 0
2020-09-02 03:29:36 +00:00
run = 0
# Arrays of run times for each connection.
2020-09-19 18:21:29 +00:00
all_server_times = []
for conn_index, c in enumerate(this_query_connections):
all_server_times.append([])
2020-09-02 03:29:36 +00:00
while True:
run_id = f"{query_prefix}.run{run}"
2020-08-12 05:39:29 +00:00
2020-09-02 03:29:36 +00:00
for conn_index, c in enumerate(this_query_connections):
2020-08-12 05:39:29 +00:00
try:
res = c.execute(
q,
query_id=run_id,
settings={"max_execution_time": args.max_query_seconds},
)
except clickhouse_driver.errors.Error as e:
2020-08-12 05:39:29 +00:00
# Add query id to the exception to make debugging easier.
e.args = (run_id, *e.args)
e.message = run_id + ": " + e.message
2020-08-12 05:39:29 +00:00
raise
elapsed = c.last_query.elapsed
all_server_times[conn_index].append(elapsed)
2020-01-16 19:39:07 +00:00
server_seconds += elapsed
print(f"query\t{query_index}\t{run_id}\t{conn_index}\t{elapsed}")
2021-02-09 14:44:04 +00:00
if elapsed > args.max_query_seconds:
# Do not stop processing pathologically slow queries,
# since this may hide errors in other queries.
print(
f"The query no. {query_index} is taking too long to run ({elapsed} s)",
file=sys.stderr,
)
2020-09-02 03:29:36 +00:00
# Be careful with the counter, after this line it's the next iteration
# already.
run += 1
2022-04-29 02:37:57 +00:00
avg_time_per_server = server_seconds / len(this_query_connections)
# We break if all the min stop conditions are met (1 second arg.runs iterations)
# or at lest one of the max stop conditions is met (8 seconds or 500 iterations)
2022-04-29 02:39:21 +00:00
if (avg_time_per_server >= 1 and run >= args.runs) or (
avg_time_per_server >= 8 or run >= 500
):
break
2020-01-16 19:39:07 +00:00
client_seconds = time.perf_counter() - start_seconds
print(f"client-time\t{query_index}\t{client_seconds}\t{server_seconds}")
# Run additional profiling queries to collect profile data, but only if test times appeared to be different.
# We have to do it after normal runs because otherwise it will affect test statistics too much
2020-09-25 09:52:09 +00:00
if len(all_server_times) != 2:
continue
2020-09-28 18:43:47 +00:00
if len(all_server_times[0]) < 3:
# Don't fail if for some reason there are not enough measurements.
continue
pvalue = stats.ttest_ind(
all_server_times[0], all_server_times[1], equal_var=False
).pvalue
2020-09-29 19:06:45 +00:00
median = [statistics.median(t) for t in all_server_times]
2020-09-30 11:55:24 +00:00
# Keep this consistent with the value used in report. Should eventually move
# to (median[1] - median[0]) / min(median), which is compatible with "times"
# difference we use in report (max(median) / min(median)).
relative_diff = (median[1] - median[0]) / median[0]
print(f"diff\t{query_index}\t{median[0]}\t{median[1]}\t{relative_diff}\t{pvalue}")
2020-09-29 19:06:45 +00:00
if abs(relative_diff) < ignored_relative_change or pvalue > 0.05:
2020-09-25 09:52:09 +00:00
continue
# Perform profile runs for fixed amount of time. Don't limit the number
# of runs, because we also have short queries.
profile_start_seconds = time.perf_counter()
run = 0
while time.perf_counter() - profile_start_seconds < args.profile_seconds:
run_id = f"{query_prefix}.profile{run}"
2020-09-25 09:52:09 +00:00
for conn_index, c in enumerate(this_query_connections):
try:
res = c.execute(
q,
query_id=run_id,
settings={
"query_profiler_real_time_period_ns": 10000000,
"query_profiler_cpu_time_period_ns": 10000000,
"metrics_perf_events_enabled": 1,
},
)
print(
f"profile\t{query_index}\t{run_id}\t{conn_index}\t{c.last_query.elapsed}"
)
except clickhouse_driver.errors.Error as e:
2020-09-25 09:52:09 +00:00
# Add query id to the exception to make debugging easier.
e.args = (run_id, *e.args)
e.message = run_id + ": " + e.message
2020-09-25 09:52:09 +00:00
raise
run += 1
profile_total_seconds += time.perf_counter() - profile_start_seconds
print(f"profile-total\t{profile_total_seconds}")
2020-09-24 11:46:03 +00:00
reportStageEnd("run")
2020-09-24 11:46:03 +00:00
# Run drop queries
if not args.keep_created_tables and not args.use_existing_tables:
2020-11-16 16:09:58 +00:00
drop_queries = substitute_parameters(drop_query_templates)
for conn_index, c in enumerate(all_connections):
for q in drop_queries:
c.execute(q)
print(f"drop\t{conn_index}\t{c.last_query.elapsed}\t{tsv_escape(q)}")
2020-09-24 11:46:03 +00:00
reportStageEnd("drop-2")