Add concurrent benchmark to performance test

After the main test, run queries from `website.xml` in parallel using
`clickhouse-benchmark`. This can be useful to test the effects of
concurrency on performance. Comparison test can miss some effects
because it always runs queries sequentially, and many of them are even
single-threaded.
This commit is contained in:
Alexander Kuzmenkov 2020-06-12 00:24:56 +03:00
parent d0987d56be
commit 7ba5063b7a
3 changed files with 90 additions and 58 deletions

View File

@ -161,6 +161,20 @@ function run_tests
wait
}
# Run some queries concurrently and report the resulting TPS. This additional
# (relatively) short test helps detect concurrency-related effects, because the
# main performance comparison testing is done query-by-query.
function run_benchmark
{
rm -rf benchmark ||:
mkdir bencmhark ||:
# TODO disable this when there is an explicit list of tests to run
"$script_dir/perf.py" --print right/performance/website.xml > benchmark/website-queries.tsv
clickhouse-benchmark --port 9001 --concurrency 6 --cumulative --iterations 1000 --randomize 1 --delay 0 --json benchmark/website-left.json < benchmark/website-queries.tsv
clickhouse-benchmark --port 9002 --concurrency 6 --cumulative --iterations 1000 --randomize 1 --delay 0 --json benchmark/website-right.json < benchmark/website-queries.tsv
}
function get_profiles_watchdog
{
sleep 6000
@ -716,6 +730,9 @@ case "$stage" in
# Ignore the errors to collect the log and build at least some report, anyway
time run_tests ||:
;&
"run_benchmark")
time run_benchmark 2> >(tee -a run-errors.tsv 1>&2) ||:
;&
"get_profiles")
# Getting profiles inexplicably hangs sometimes, so try to save some logs if
# this happens again. Give the servers some time to collect all info, then

View File

@ -14,22 +14,14 @@ import traceback
def tsv_escape(s):
return s.replace('\\', '\\\\').replace('\t', '\\t').replace('\n', '\\n').replace('\r','')
stage_start_seconds = time.perf_counter()
def report_stage_end(stage_name):
global stage_start_seconds
print('{}\t{}'.format(stage_name, time.perf_counter() - stage_start_seconds))
stage_start_seconds = time.perf_counter()
report_stage_end('start')
parser = argparse.ArgumentParser(description='Run performance test.')
# Explicitly decode files as UTF-8 because sometimes we have Russian characters in queries, and LANG=C is set.
parser.add_argument('file', metavar='FILE', type=argparse.FileType('r', encoding='utf-8'), nargs=1, help='test description file')
parser.add_argument('--host', nargs='*', default=['localhost'], help="Server hostname(s). Corresponds to '--port' options.")
parser.add_argument('--port', nargs='*', default=[9000], help="Server port(s). Corresponds to '--host' options.")
parser.add_argument('--runs', type=int, default=int(os.environ.get('CHPC_RUNS', 13)), help='Number of query runs per server. Defaults to CHPC_RUNS environment variable.')
parser.add_argument('--no-long', type=bool, default=True, help='Skip the tests tagged as long.')
parser.add_argument('--runs', type=int, default=int(os.environ.get('CHPC_RUNS', 17)), help='Number of query runs per server. Defaults to CHPC_RUNS environment variable.')
parser.add_argument('--long', action='store_true', help='Do not skip the tests tagged as long.')
parser.add_argument('--print', action='store_true', help='Print test queries and exit.')
args = parser.parse_args()
test_name = os.path.splitext(os.path.basename(args.file[0].name))[0]
@ -37,35 +29,6 @@ test_name = os.path.splitext(os.path.basename(args.file[0].name))[0]
tree = et.parse(args.file[0])
root = tree.getroot()
# Skip long tests
for tag in root.findall('.//tag'):
if tag.text == 'long':
print('skipped\tTest is tagged as long.')
sys.exit(0)
# Check main metric
main_metric_element = root.find('main_metric/*')
if main_metric_element is not None and main_metric_element.tag != 'min_time':
raise Exception('Only the min_time main metric is supported. This test uses \'{}\''.format(main_metric_element.tag))
# FIXME another way to detect infinite tests. They should have an appropriate main_metric but sometimes they don't.
infinite_sign = root.find('.//average_speed_not_changing_for_ms')
if infinite_sign is not None:
raise Exception('Looks like the test is infinite (sign 1)')
# Print report threshold for the test if it is set.
if 'max_ignored_relative_change' in root.attrib:
print(f'report-threshold\t{root.attrib["max_ignored_relative_change"]}')
# Open connections
servers = [{'host': host, 'port': port} for (host, port) in zip(args.host, args.port)]
connections = [clickhouse_driver.Client(**server) for server in servers]
for s in servers:
print('server\t{}\t{}'.format(s['host'], s['port']))
report_stage_end('connect')
# Process query parameters
subst_elems = root.findall('substitutions/substitution')
available_parameters = {} # { 'table': ['hits_10m', 'hits_100m'], ... }
@ -84,7 +47,45 @@ def substitute_parameters(query_templates):
for values_combo in itertools.product(*values)])
return result
report_stage_end('substitute')
# Build a list of test queries, processing all substitutions
test_query_templates = [q.text for q in root.findall('query')]
test_queries = substitute_parameters(test_query_templates)
# If we're only asked to print the queries, do that and exit
if args.print:
for q in test_queries:
print(q)
exit(0)
# Skip long tests
if not args.long:
for tag in root.findall('.//tag'):
if tag.text == 'long':
print('skipped\tTest is tagged as long.')
sys.exit(0)
# Check main metric to detect infinite tests. We shouldn't have such tests anymore,
# but we did in the past, and it is convenient to be able to process old tests.
main_metric_element = root.find('main_metric/*')
if main_metric_element is not None and main_metric_element.tag != 'min_time':
raise Exception('Only the min_time main metric is supported. This test uses \'{}\''.format(main_metric_element.tag))
# Another way to detect infinite tests. They should have an appropriate main_metric
# but sometimes they don't.
infinite_sign = root.find('.//average_speed_not_changing_for_ms')
if infinite_sign is not None:
raise Exception('Looks like the test is infinite (sign 1)')
# Print report threshold for the test if it is set.
if 'max_ignored_relative_change' in root.attrib:
print(f'report-threshold\t{root.attrib["max_ignored_relative_change"]}')
# Open connections
servers = [{'host': host, 'port': port} for (host, port) in zip(args.host, args.port)]
connections = [clickhouse_driver.Client(**server) for server in servers]
for s in servers:
print('server\t{}\t{}'.format(s['host'], s['port']))
# Run drop queries, ignoring errors. Do this before all other activity, because
# clickhouse_driver disconnects on error (this is not configurable), and the new
@ -98,8 +99,6 @@ for c in connections:
except:
pass
report_stage_end('drop1')
# Apply settings.
# If there are errors, report them and continue -- maybe a new test uses a setting
# that is not in master, but the queries can still run. If we have multiple
@ -115,8 +114,6 @@ for c in connections:
except:
print(traceback.format_exc(), file=sys.stderr)
report_stage_end('settings')
# Check tables that should exist. If they don't exist, just skip this test.
tables = [e.text for e in root.findall('preconditions/table_exists')]
for t in tables:
@ -129,8 +126,6 @@ for t in tables:
print(f'skipped\t{tsv_escape(skipped_message)}')
sys.exit(0)
report_stage_end('preconditions')
# Run create queries
create_query_templates = [q.text for q in root.findall('create_query')]
create_queries = substitute_parameters(create_query_templates)
@ -145,14 +140,7 @@ for c in connections:
for q in fill_queries:
c.execute(q)
report_stage_end('fill')
# Run test queries
test_query_templates = [q.text for q in root.findall('query')]
test_queries = substitute_parameters(test_query_templates)
report_stage_end('substitute2')
for query_index, q in enumerate(test_queries):
query_prefix = f'{test_name}.query{query_index}'
@ -199,13 +187,9 @@ for query_index, q in enumerate(test_queries):
client_seconds = time.perf_counter() - start_seconds
print(f'client-time\t{query_index}\t{client_seconds}\t{server_seconds}')
report_stage_end('benchmark')
# Run drop queries
drop_query_templates = [q.text for q in root.findall('drop_query')]
drop_queries = substitute_parameters(drop_query_templates)
for c in connections:
for q in drop_queries:
c.execute(q)
report_stage_end('drop2')

View File

@ -5,6 +5,7 @@ import ast
import collections
import csv
import itertools
import json
import os
import sys
import traceback
@ -321,6 +322,36 @@ if args.report == 'main':
print_test_times()
def print_benchmark_results():
left_json = json.load(open('benchmark/website-left.json'));
right_json = json.load(open('benchmark/website-right.json'));
left_qps = left_json["statistics"]["QPS"]
right_qps = right_json["statistics"]["QPS"]
relative_diff = (right_qps - left_qps) / left_qps;
times_diff = max(right_qps, left_qps) / max(0.01, min(right_qps, left_qps))
print(tableStart('Concurrent benchmarks'))
print(tableHeader(['Benchmark', 'Old, queries/s', 'New, queries/s', 'Relative difference', 'Times difference']))
row = ['website', f'{left_qps:.3f}', f'{right_qps:.3f}', f'{relative_diff:.3f}', f'x{times_diff:.3f}']
attrs = ['' for r in row]
if abs(relative_diff) > 0.1:
# More queries per second is better.
if relative_diff > 0.:
attrs[3] = f'style="background: {color_good}"'
else:
attrs[3] = f'style="background: {color_bad}"'
else:
attrs[3] = ''
print(tableRow(row, attrs))
print(tableEnd())
try:
print_benchmark_results()
except:
report_errors.append(
traceback.format_exception_only(
*sys.exc_info()[:2])[-1])
pass
print_report_errors()
print("""