mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Merge pull request #11616 from ClickHouse/aku/perf-benchmark
Add concurrent benchmark to performance test
This commit is contained in:
commit
593a0181bd
@ -131,6 +131,11 @@ function run_tests
|
|||||||
test_files=$(ls "$test_prefix"/*.xml)
|
test_files=$(ls "$test_prefix"/*.xml)
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
# Determine which concurrent benchmarks to run. For now, the only test
|
||||||
|
# we run as a concurrent benchmark is 'website'. Run it as benchmark if we
|
||||||
|
# are also going to run it as a normal test.
|
||||||
|
for test in $test_files; do echo $test; done | sed -n '/website/p' > benchmarks-to-run.txt
|
||||||
|
|
||||||
# Delete old report files.
|
# Delete old report files.
|
||||||
for x in {test-times,wall-clock-times}.tsv
|
for x in {test-times,wall-clock-times}.tsv
|
||||||
do
|
do
|
||||||
@ -161,6 +166,30 @@ function run_tests
|
|||||||
wait
|
wait
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Run some queries concurrently and report the resulting TPS. This additional
|
||||||
|
# (relatively) short test helps detect concurrency-related effects, because the
|
||||||
|
# main performance comparison testing is done query-by-query.
|
||||||
|
function run_benchmark
|
||||||
|
{
|
||||||
|
rm -rf benchmark ||:
|
||||||
|
mkdir benchmark ||:
|
||||||
|
|
||||||
|
# The list is built by run_tests.
|
||||||
|
for file in $(cat benchmarks-to-run.txt)
|
||||||
|
do
|
||||||
|
name=$(basename "$file" ".xml")
|
||||||
|
|
||||||
|
"$script_dir/perf.py" --print-queries "$file" > "benchmark/$name-queries.txt"
|
||||||
|
"$script_dir/perf.py" --print-settings "$file" > "benchmark/$name-settings.txt"
|
||||||
|
|
||||||
|
readarray -t settings < "benchmark/$name-settings.txt"
|
||||||
|
command=(clickhouse-benchmark --concurrency 6 --cumulative --iterations 1000 --randomize 1 --delay 0 --continue_on_errors "${settings[@]}")
|
||||||
|
|
||||||
|
"${command[@]}" --port 9001 --json "benchmark/$name-left.json" < "benchmark/$name-queries.txt"
|
||||||
|
"${command[@]}" --port 9002 --json "benchmark/$name-right.json" < "benchmark/$name-queries.txt"
|
||||||
|
done
|
||||||
|
}
|
||||||
|
|
||||||
function get_profiles_watchdog
|
function get_profiles_watchdog
|
||||||
{
|
{
|
||||||
sleep 6000
|
sleep 6000
|
||||||
@ -219,7 +248,7 @@ function build_log_column_definitions
|
|||||||
{
|
{
|
||||||
# FIXME This loop builds column definitons from TSVWithNamesAndTypes in an
|
# FIXME This loop builds column definitons from TSVWithNamesAndTypes in an
|
||||||
# absolutely atrocious way. This should be done by the file() function itself.
|
# absolutely atrocious way. This should be done by the file() function itself.
|
||||||
for x in {right,left}-{addresses,{query,query-thread,trace,metric}-log}.tsv
|
for x in {right,left}-{addresses,{query,query-thread,trace,{async-,}metric}-log}.tsv
|
||||||
do
|
do
|
||||||
paste -d' ' \
|
paste -d' ' \
|
||||||
<(sed -n '1{s/\t/\n/g;p;q}' "$x" | sed 's/\(^.*$\)/"\1"/') \
|
<(sed -n '1{s/\t/\n/g;p;q}' "$x" | sed 's/\(^.*$\)/"\1"/') \
|
||||||
@ -716,6 +745,9 @@ case "$stage" in
|
|||||||
# Ignore the errors to collect the log and build at least some report, anyway
|
# Ignore the errors to collect the log and build at least some report, anyway
|
||||||
time run_tests ||:
|
time run_tests ||:
|
||||||
;&
|
;&
|
||||||
|
"run_benchmark")
|
||||||
|
time run_benchmark 2> >(tee -a run-errors.tsv 1>&2) ||:
|
||||||
|
;&
|
||||||
"get_profiles")
|
"get_profiles")
|
||||||
# Getting profiles inexplicably hangs sometimes, so try to save some logs if
|
# Getting profiles inexplicably hangs sometimes, so try to save some logs if
|
||||||
# this happens again. Give the servers some time to collect all info, then
|
# this happens again. Give the servers some time to collect all info, then
|
||||||
|
@ -14,22 +14,15 @@ import traceback
|
|||||||
def tsv_escape(s):
|
def tsv_escape(s):
|
||||||
return s.replace('\\', '\\\\').replace('\t', '\\t').replace('\n', '\\n').replace('\r','')
|
return s.replace('\\', '\\\\').replace('\t', '\\t').replace('\n', '\\n').replace('\r','')
|
||||||
|
|
||||||
stage_start_seconds = time.perf_counter()
|
|
||||||
|
|
||||||
def report_stage_end(stage_name):
|
|
||||||
global stage_start_seconds
|
|
||||||
print('{}\t{}'.format(stage_name, time.perf_counter() - stage_start_seconds))
|
|
||||||
stage_start_seconds = time.perf_counter()
|
|
||||||
|
|
||||||
report_stage_end('start')
|
|
||||||
|
|
||||||
parser = argparse.ArgumentParser(description='Run performance test.')
|
parser = argparse.ArgumentParser(description='Run performance test.')
|
||||||
# Explicitly decode files as UTF-8 because sometimes we have Russian characters in queries, and LANG=C is set.
|
# Explicitly decode files as UTF-8 because sometimes we have Russian characters in queries, and LANG=C is set.
|
||||||
parser.add_argument('file', metavar='FILE', type=argparse.FileType('r', encoding='utf-8'), nargs=1, help='test description file')
|
parser.add_argument('file', metavar='FILE', type=argparse.FileType('r', encoding='utf-8'), nargs=1, help='test description file')
|
||||||
parser.add_argument('--host', nargs='*', default=['localhost'], help="Server hostname(s). Corresponds to '--port' options.")
|
parser.add_argument('--host', nargs='*', default=['localhost'], help="Server hostname(s). Corresponds to '--port' options.")
|
||||||
parser.add_argument('--port', nargs='*', default=[9000], help="Server port(s). Corresponds to '--host' options.")
|
parser.add_argument('--port', nargs='*', default=[9000], help="Server port(s). Corresponds to '--host' options.")
|
||||||
parser.add_argument('--runs', type=int, default=int(os.environ.get('CHPC_RUNS', 13)), help='Number of query runs per server. Defaults to CHPC_RUNS environment variable.')
|
parser.add_argument('--runs', type=int, default=int(os.environ.get('CHPC_RUNS', 13)), help='Number of query runs per server. Defaults to CHPC_RUNS environment variable.')
|
||||||
parser.add_argument('--no-long', type=bool, default=True, help='Skip the tests tagged as long.')
|
parser.add_argument('--long', action='store_true', help='Do not skip the tests tagged as long.')
|
||||||
|
parser.add_argument('--print-queries', action='store_true', help='Print test queries and exit.')
|
||||||
|
parser.add_argument('--print-settings', action='store_true', help='Print test settings and exit.')
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
test_name = os.path.splitext(os.path.basename(args.file[0].name))[0]
|
test_name = os.path.splitext(os.path.basename(args.file[0].name))[0]
|
||||||
@ -37,35 +30,6 @@ test_name = os.path.splitext(os.path.basename(args.file[0].name))[0]
|
|||||||
tree = et.parse(args.file[0])
|
tree = et.parse(args.file[0])
|
||||||
root = tree.getroot()
|
root = tree.getroot()
|
||||||
|
|
||||||
# Skip long tests
|
|
||||||
for tag in root.findall('.//tag'):
|
|
||||||
if tag.text == 'long':
|
|
||||||
print('skipped\tTest is tagged as long.')
|
|
||||||
sys.exit(0)
|
|
||||||
|
|
||||||
# Check main metric
|
|
||||||
main_metric_element = root.find('main_metric/*')
|
|
||||||
if main_metric_element is not None and main_metric_element.tag != 'min_time':
|
|
||||||
raise Exception('Only the min_time main metric is supported. This test uses \'{}\''.format(main_metric_element.tag))
|
|
||||||
|
|
||||||
# FIXME another way to detect infinite tests. They should have an appropriate main_metric but sometimes they don't.
|
|
||||||
infinite_sign = root.find('.//average_speed_not_changing_for_ms')
|
|
||||||
if infinite_sign is not None:
|
|
||||||
raise Exception('Looks like the test is infinite (sign 1)')
|
|
||||||
|
|
||||||
# Print report threshold for the test if it is set.
|
|
||||||
if 'max_ignored_relative_change' in root.attrib:
|
|
||||||
print(f'report-threshold\t{root.attrib["max_ignored_relative_change"]}')
|
|
||||||
|
|
||||||
# Open connections
|
|
||||||
servers = [{'host': host, 'port': port} for (host, port) in zip(args.host, args.port)]
|
|
||||||
connections = [clickhouse_driver.Client(**server) for server in servers]
|
|
||||||
|
|
||||||
for s in servers:
|
|
||||||
print('server\t{}\t{}'.format(s['host'], s['port']))
|
|
||||||
|
|
||||||
report_stage_end('connect')
|
|
||||||
|
|
||||||
# Process query parameters
|
# Process query parameters
|
||||||
subst_elems = root.findall('substitutions/substitution')
|
subst_elems = root.findall('substitutions/substitution')
|
||||||
available_parameters = {} # { 'table': ['hits_10m', 'hits_100m'], ... }
|
available_parameters = {} # { 'table': ['hits_10m', 'hits_100m'], ... }
|
||||||
@ -84,7 +48,54 @@ def substitute_parameters(query_templates):
|
|||||||
for values_combo in itertools.product(*values)])
|
for values_combo in itertools.product(*values)])
|
||||||
return result
|
return result
|
||||||
|
|
||||||
report_stage_end('substitute')
|
# Build a list of test queries, processing all substitutions
|
||||||
|
test_query_templates = [q.text for q in root.findall('query')]
|
||||||
|
test_queries = substitute_parameters(test_query_templates)
|
||||||
|
|
||||||
|
# If we're only asked to print the queries, do that and exit
|
||||||
|
if args.print_queries:
|
||||||
|
for q in test_queries:
|
||||||
|
print(q)
|
||||||
|
exit(0)
|
||||||
|
|
||||||
|
# If we're only asked to print the settings, do that and exit. These are settings
|
||||||
|
# for clickhouse-benchmark, so we print them as command line arguments, e.g.
|
||||||
|
# '--max_memory_usage=10000000'.
|
||||||
|
if args.print_settings:
|
||||||
|
for s in root.findall('settings/*'):
|
||||||
|
print(f'--{s.tag}={s.text}')
|
||||||
|
|
||||||
|
exit(0)
|
||||||
|
|
||||||
|
# Skip long tests
|
||||||
|
if not args.long:
|
||||||
|
for tag in root.findall('.//tag'):
|
||||||
|
if tag.text == 'long':
|
||||||
|
print('skipped\tTest is tagged as long.')
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
# Check main metric to detect infinite tests. We shouldn't have such tests anymore,
|
||||||
|
# but we did in the past, and it is convenient to be able to process old tests.
|
||||||
|
main_metric_element = root.find('main_metric/*')
|
||||||
|
if main_metric_element is not None and main_metric_element.tag != 'min_time':
|
||||||
|
raise Exception('Only the min_time main metric is supported. This test uses \'{}\''.format(main_metric_element.tag))
|
||||||
|
|
||||||
|
# Another way to detect infinite tests. They should have an appropriate main_metric
|
||||||
|
# but sometimes they don't.
|
||||||
|
infinite_sign = root.find('.//average_speed_not_changing_for_ms')
|
||||||
|
if infinite_sign is not None:
|
||||||
|
raise Exception('Looks like the test is infinite (sign 1)')
|
||||||
|
|
||||||
|
# Print report threshold for the test if it is set.
|
||||||
|
if 'max_ignored_relative_change' in root.attrib:
|
||||||
|
print(f'report-threshold\t{root.attrib["max_ignored_relative_change"]}')
|
||||||
|
|
||||||
|
# Open connections
|
||||||
|
servers = [{'host': host, 'port': port} for (host, port) in zip(args.host, args.port)]
|
||||||
|
connections = [clickhouse_driver.Client(**server) for server in servers]
|
||||||
|
|
||||||
|
for s in servers:
|
||||||
|
print('server\t{}\t{}'.format(s['host'], s['port']))
|
||||||
|
|
||||||
# Run drop queries, ignoring errors. Do this before all other activity, because
|
# Run drop queries, ignoring errors. Do this before all other activity, because
|
||||||
# clickhouse_driver disconnects on error (this is not configurable), and the new
|
# clickhouse_driver disconnects on error (this is not configurable), and the new
|
||||||
@ -98,8 +109,6 @@ for c in connections:
|
|||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
report_stage_end('drop1')
|
|
||||||
|
|
||||||
# Apply settings.
|
# Apply settings.
|
||||||
# If there are errors, report them and continue -- maybe a new test uses a setting
|
# If there are errors, report them and continue -- maybe a new test uses a setting
|
||||||
# that is not in master, but the queries can still run. If we have multiple
|
# that is not in master, but the queries can still run. If we have multiple
|
||||||
@ -115,8 +124,6 @@ for c in connections:
|
|||||||
except:
|
except:
|
||||||
print(traceback.format_exc(), file=sys.stderr)
|
print(traceback.format_exc(), file=sys.stderr)
|
||||||
|
|
||||||
report_stage_end('settings')
|
|
||||||
|
|
||||||
# Check tables that should exist. If they don't exist, just skip this test.
|
# Check tables that should exist. If they don't exist, just skip this test.
|
||||||
tables = [e.text for e in root.findall('preconditions/table_exists')]
|
tables = [e.text for e in root.findall('preconditions/table_exists')]
|
||||||
for t in tables:
|
for t in tables:
|
||||||
@ -129,8 +136,6 @@ for t in tables:
|
|||||||
print(f'skipped\t{tsv_escape(skipped_message)}')
|
print(f'skipped\t{tsv_escape(skipped_message)}')
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
||||||
report_stage_end('preconditions')
|
|
||||||
|
|
||||||
# Run create queries
|
# Run create queries
|
||||||
create_query_templates = [q.text for q in root.findall('create_query')]
|
create_query_templates = [q.text for q in root.findall('create_query')]
|
||||||
create_queries = substitute_parameters(create_query_templates)
|
create_queries = substitute_parameters(create_query_templates)
|
||||||
@ -145,14 +150,7 @@ for c in connections:
|
|||||||
for q in fill_queries:
|
for q in fill_queries:
|
||||||
c.execute(q)
|
c.execute(q)
|
||||||
|
|
||||||
report_stage_end('fill')
|
|
||||||
|
|
||||||
# Run test queries
|
# Run test queries
|
||||||
test_query_templates = [q.text for q in root.findall('query')]
|
|
||||||
test_queries = substitute_parameters(test_query_templates)
|
|
||||||
|
|
||||||
report_stage_end('substitute2')
|
|
||||||
|
|
||||||
for query_index, q in enumerate(test_queries):
|
for query_index, q in enumerate(test_queries):
|
||||||
query_prefix = f'{test_name}.query{query_index}'
|
query_prefix = f'{test_name}.query{query_index}'
|
||||||
|
|
||||||
@ -199,13 +197,9 @@ for query_index, q in enumerate(test_queries):
|
|||||||
client_seconds = time.perf_counter() - start_seconds
|
client_seconds = time.perf_counter() - start_seconds
|
||||||
print(f'client-time\t{query_index}\t{client_seconds}\t{server_seconds}')
|
print(f'client-time\t{query_index}\t{client_seconds}\t{server_seconds}')
|
||||||
|
|
||||||
report_stage_end('benchmark')
|
|
||||||
|
|
||||||
# Run drop queries
|
# Run drop queries
|
||||||
drop_query_templates = [q.text for q in root.findall('drop_query')]
|
drop_query_templates = [q.text for q in root.findall('drop_query')]
|
||||||
drop_queries = substitute_parameters(drop_query_templates)
|
drop_queries = substitute_parameters(drop_query_templates)
|
||||||
for c in connections:
|
for c in connections:
|
||||||
for q in drop_queries:
|
for q in drop_queries:
|
||||||
c.execute(q)
|
c.execute(q)
|
||||||
|
|
||||||
report_stage_end('drop2')
|
|
||||||
|
@ -5,7 +5,9 @@ import ast
|
|||||||
import collections
|
import collections
|
||||||
import csv
|
import csv
|
||||||
import itertools
|
import itertools
|
||||||
|
import json
|
||||||
import os
|
import os
|
||||||
|
import pprint
|
||||||
import sys
|
import sys
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
@ -101,7 +103,7 @@ def tableRow(cell_values, cell_attributes = []):
|
|||||||
for v, a in itertools.zip_longest(
|
for v, a in itertools.zip_longest(
|
||||||
cell_values, cell_attributes,
|
cell_values, cell_attributes,
|
||||||
fillvalue = '')
|
fillvalue = '')
|
||||||
if a is not None]))
|
if a is not None and v is not None]))
|
||||||
|
|
||||||
def tableHeader(r):
|
def tableHeader(r):
|
||||||
return tr(''.join([th(f) for f in r]))
|
return tr(''.join([th(f) for f in r]))
|
||||||
@ -321,6 +323,67 @@ if args.report == 'main':
|
|||||||
|
|
||||||
print_test_times()
|
print_test_times()
|
||||||
|
|
||||||
|
def print_benchmark_results():
|
||||||
|
json_reports = [json.load(open(f'benchmark/website-{x}.json')) for x in ['left', 'right']]
|
||||||
|
stats = [next(iter(x.values()))["statistics"] for x in json_reports]
|
||||||
|
qps = [x["QPS"] for x in stats]
|
||||||
|
queries = [x["num_queries"] for x in stats]
|
||||||
|
errors = [x["num_errors"] for x in stats]
|
||||||
|
relative_diff = (qps[1] - qps[0]) / max(0.01, qps[0]);
|
||||||
|
times_diff = max(qps) / max(0.01, min(qps))
|
||||||
|
|
||||||
|
all_rows = []
|
||||||
|
header = ['Benchmark', 'Metric', 'Old', 'New', 'Relative difference', 'Times difference'];
|
||||||
|
|
||||||
|
attrs = ['' for x in header]
|
||||||
|
row = ['website', 'queries', f'{queries[0]:d}', f'{queries[1]:d}', '--', '--']
|
||||||
|
attrs[0] = 'rowspan=2'
|
||||||
|
all_rows.append([row, attrs])
|
||||||
|
|
||||||
|
attrs = ['' for x in header]
|
||||||
|
row = [None, 'queries/s', f'{qps[0]:.3f}', f'{qps[1]:.3f}', f'{relative_diff:.3f}', f'x{times_diff:.3f}']
|
||||||
|
if abs(relative_diff) > 0.1:
|
||||||
|
# More queries per second is better.
|
||||||
|
if relative_diff > 0.:
|
||||||
|
attrs[4] = f'style="background: {color_good}"'
|
||||||
|
else:
|
||||||
|
attrs[4] = f'style="background: {color_bad}"'
|
||||||
|
else:
|
||||||
|
attrs[4] = ''
|
||||||
|
all_rows.append([row, attrs]);
|
||||||
|
|
||||||
|
if max(errors):
|
||||||
|
all_rows[0][1][0] = "rowspan=3"
|
||||||
|
row = [''] * (len(header))
|
||||||
|
attrs = ['' for x in header]
|
||||||
|
|
||||||
|
attrs[0] = None
|
||||||
|
row[1] = 'errors'
|
||||||
|
row[2] = f'{errors[0]:d}'
|
||||||
|
row[3] = f'{errors[1]:d}'
|
||||||
|
row[4] = '--'
|
||||||
|
row[5] = '--'
|
||||||
|
if errors[0]:
|
||||||
|
attrs[2] += f' style="background: {color_bad}" '
|
||||||
|
if errors[1]:
|
||||||
|
attrs[3] += f' style="background: {color_bad}" '
|
||||||
|
|
||||||
|
all_rows.append([row, attrs])
|
||||||
|
|
||||||
|
print(tableStart('Concurrent benchmarks'))
|
||||||
|
print(tableHeader(header))
|
||||||
|
for row, attrs in all_rows:
|
||||||
|
print(tableRow(row, attrs))
|
||||||
|
print(tableEnd())
|
||||||
|
|
||||||
|
try:
|
||||||
|
print_benchmark_results()
|
||||||
|
except:
|
||||||
|
report_errors.append(
|
||||||
|
traceback.format_exception_only(
|
||||||
|
*sys.exc_info()[:2])[-1])
|
||||||
|
pass
|
||||||
|
|
||||||
print_report_errors()
|
print_report_errors()
|
||||||
|
|
||||||
print("""
|
print("""
|
||||||
|
@ -60,11 +60,15 @@ public:
|
|||||||
bool cumulative_, bool secure_, const String & default_database_,
|
bool cumulative_, bool secure_, const String & default_database_,
|
||||||
const String & user_, const String & password_, const String & stage,
|
const String & user_, const String & password_, const String & stage,
|
||||||
bool randomize_, size_t max_iterations_, double max_time_,
|
bool randomize_, size_t max_iterations_, double max_time_,
|
||||||
const String & json_path_, size_t confidence_, const String & query_id_, const Settings & settings_)
|
const String & json_path_, size_t confidence_,
|
||||||
|
const String & query_id_, bool continue_on_errors_,
|
||||||
|
bool print_stacktrace_, const Settings & settings_)
|
||||||
:
|
:
|
||||||
concurrency(concurrency_), delay(delay_), queue(concurrency), randomize(randomize_),
|
concurrency(concurrency_), delay(delay_), queue(concurrency), randomize(randomize_),
|
||||||
cumulative(cumulative_), max_iterations(max_iterations_), max_time(max_time_),
|
cumulative(cumulative_), max_iterations(max_iterations_), max_time(max_time_),
|
||||||
json_path(json_path_), confidence(confidence_), query_id(query_id_), settings(settings_),
|
json_path(json_path_), confidence(confidence_), query_id(query_id_),
|
||||||
|
continue_on_errors(continue_on_errors_),
|
||||||
|
print_stacktrace(print_stacktrace_), settings(settings_),
|
||||||
shared_context(Context::createShared()), global_context(Context::createGlobal(shared_context.get())),
|
shared_context(Context::createShared()), global_context(Context::createGlobal(shared_context.get())),
|
||||||
pool(concurrency)
|
pool(concurrency)
|
||||||
{
|
{
|
||||||
@ -150,6 +154,8 @@ private:
|
|||||||
String json_path;
|
String json_path;
|
||||||
size_t confidence;
|
size_t confidence;
|
||||||
std::string query_id;
|
std::string query_id;
|
||||||
|
bool continue_on_errors;
|
||||||
|
bool print_stacktrace;
|
||||||
Settings settings;
|
Settings settings;
|
||||||
SharedContextHolder shared_context;
|
SharedContextHolder shared_context;
|
||||||
Context global_context;
|
Context global_context;
|
||||||
@ -163,6 +169,7 @@ private:
|
|||||||
struct Stats
|
struct Stats
|
||||||
{
|
{
|
||||||
std::atomic<size_t> queries{0};
|
std::atomic<size_t> queries{0};
|
||||||
|
size_t errors = 0;
|
||||||
size_t read_rows = 0;
|
size_t read_rows = 0;
|
||||||
size_t read_bytes = 0;
|
size_t read_bytes = 0;
|
||||||
size_t result_rows = 0;
|
size_t result_rows = 0;
|
||||||
@ -259,7 +266,7 @@ private:
|
|||||||
|
|
||||||
if (interrupt_listener.check())
|
if (interrupt_listener.check())
|
||||||
{
|
{
|
||||||
std::cout << "Stopping launch of queries. SIGINT received.\n";
|
std::cout << "Stopping launch of queries. SIGINT received." << std::endl;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -333,14 +340,14 @@ private:
|
|||||||
pcg64 generator(randomSeed());
|
pcg64 generator(randomSeed());
|
||||||
std::uniform_int_distribution<size_t> distribution(0, connection_entries.size() - 1);
|
std::uniform_int_distribution<size_t> distribution(0, connection_entries.size() - 1);
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
/// In these threads we do not accept INT signal.
|
/// In these threads we do not accept INT signal.
|
||||||
sigset_t sig_set;
|
sigset_t sig_set;
|
||||||
if (sigemptyset(&sig_set)
|
if (sigemptyset(&sig_set)
|
||||||
|| sigaddset(&sig_set, SIGINT)
|
|| sigaddset(&sig_set, SIGINT)
|
||||||
|| pthread_sigmask(SIG_BLOCK, &sig_set, nullptr))
|
|| pthread_sigmask(SIG_BLOCK, &sig_set, nullptr))
|
||||||
|
{
|
||||||
throwFromErrno("Cannot block signal.", ErrorCodes::CANNOT_BLOCK_SIGNAL);
|
throwFromErrno("Cannot block signal.", ErrorCodes::CANNOT_BLOCK_SIGNAL);
|
||||||
|
}
|
||||||
|
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
@ -350,19 +357,40 @@ private:
|
|||||||
{
|
{
|
||||||
extracted = queue.tryPop(query, 100);
|
extracted = queue.tryPop(query, 100);
|
||||||
|
|
||||||
if (shutdown || (max_iterations && queries_executed == max_iterations))
|
if (shutdown
|
||||||
|
|| (max_iterations && queries_executed == max_iterations))
|
||||||
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
execute(connection_entries, query, distribution(generator));
|
|
||||||
++queries_executed;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const auto connection_index = distribution(generator);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
execute(connection_entries, query, connection_index);
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
|
{
|
||||||
|
std::cerr << "An error occurred while processing the query '"
|
||||||
|
<< query << "'.\n";
|
||||||
|
if (!continue_on_errors)
|
||||||
{
|
{
|
||||||
shutdown = true;
|
shutdown = true;
|
||||||
std::cerr << "An error occurred while processing query:\n" << query << "\n";
|
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
std::cerr << getCurrentExceptionMessage(print_stacktrace,
|
||||||
|
true /*check embedded stack trace*/) << std::endl;
|
||||||
|
|
||||||
|
comparison_info_per_interval[connection_index]->errors++;
|
||||||
|
comparison_info_total[connection_index]->errors++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Count failed queries toward executed, so that we'd reach
|
||||||
|
// max_iterations even if every run fails.
|
||||||
|
++queries_executed;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void execute(EntryPtrs & connection_entries, Query & query, size_t connection_index)
|
void execute(EntryPtrs & connection_entries, Query & query, size_t connection_index)
|
||||||
@ -410,7 +438,12 @@ private:
|
|||||||
|
|
||||||
std::cerr
|
std::cerr
|
||||||
<< connections[i]->getDescription() << ", "
|
<< connections[i]->getDescription() << ", "
|
||||||
<< "queries " << info->queries << ", "
|
<< "queries " << info->queries << ", ";
|
||||||
|
if (info->errors)
|
||||||
|
{
|
||||||
|
std::cerr << "errors " << info->errors << ", ";
|
||||||
|
}
|
||||||
|
std::cerr
|
||||||
<< "QPS: " << (info->queries / seconds) << ", "
|
<< "QPS: " << (info->queries / seconds) << ", "
|
||||||
<< "RPS: " << (info->read_rows / seconds) << ", "
|
<< "RPS: " << (info->read_rows / seconds) << ", "
|
||||||
<< "MiB/s: " << (info->read_bytes / seconds / 1048576) << ", "
|
<< "MiB/s: " << (info->read_bytes / seconds / 1048576) << ", "
|
||||||
@ -477,11 +510,14 @@ private:
|
|||||||
print_key_value("MiBPS", info->read_bytes / info->work_time);
|
print_key_value("MiBPS", info->read_bytes / info->work_time);
|
||||||
print_key_value("RPS_result", info->result_rows / info->work_time);
|
print_key_value("RPS_result", info->result_rows / info->work_time);
|
||||||
print_key_value("MiBPS_result", info->result_bytes / info->work_time);
|
print_key_value("MiBPS_result", info->result_bytes / info->work_time);
|
||||||
print_key_value("num_queries", info->queries.load(), false);
|
print_key_value("num_queries", info->queries.load());
|
||||||
|
print_key_value("num_errors", info->errors, false);
|
||||||
|
|
||||||
json_out << "},\n";
|
json_out << "},\n";
|
||||||
json_out << double_quote << "query_time_percentiles" << ": {\n";
|
json_out << double_quote << "query_time_percentiles" << ": {\n";
|
||||||
|
|
||||||
|
if (info->queries != 0)
|
||||||
|
{
|
||||||
for (int percent = 0; percent <= 90; percent += 10)
|
for (int percent = 0; percent <= 90; percent += 10)
|
||||||
print_percentile(*info, percent);
|
print_percentile(*info, percent);
|
||||||
|
|
||||||
@ -489,6 +525,7 @@ private:
|
|||||||
print_percentile(*info, 99);
|
print_percentile(*info, 99);
|
||||||
print_percentile(*info, 99.9);
|
print_percentile(*info, 99.9);
|
||||||
print_percentile(*info, 99.99, false);
|
print_percentile(*info, 99.99, false);
|
||||||
|
}
|
||||||
|
|
||||||
json_out << "}\n";
|
json_out << "}\n";
|
||||||
json_out << (i == infos.size() - 1 ? "}\n" : "},\n");
|
json_out << (i == infos.size() - 1 ? "}\n" : "},\n");
|
||||||
@ -542,6 +579,7 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
|
|||||||
("stacktrace", "print stack traces of exceptions")
|
("stacktrace", "print stack traces of exceptions")
|
||||||
("confidence", value<size_t>()->default_value(5), "set the level of confidence for T-test [0=80%, 1=90%, 2=95%, 3=98%, 4=99%, 5=99.5%(default)")
|
("confidence", value<size_t>()->default_value(5), "set the level of confidence for T-test [0=80%, 1=90%, 2=95%, 3=98%, 4=99%, 5=99.5%(default)")
|
||||||
("query_id", value<std::string>()->default_value(""), "")
|
("query_id", value<std::string>()->default_value(""), "")
|
||||||
|
("continue_on_errors", "continue testing even if a query fails")
|
||||||
;
|
;
|
||||||
|
|
||||||
Settings settings;
|
Settings settings;
|
||||||
@ -583,6 +621,8 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
|
|||||||
options["json"].as<std::string>(),
|
options["json"].as<std::string>(),
|
||||||
options["confidence"].as<size_t>(),
|
options["confidence"].as<size_t>(),
|
||||||
options["query_id"].as<std::string>(),
|
options["query_id"].as<std::string>(),
|
||||||
|
options.count("continue_on_errors") > 0,
|
||||||
|
print_stacktrace,
|
||||||
settings);
|
settings);
|
||||||
return benchmark.run();
|
return benchmark.run();
|
||||||
}
|
}
|
||||||
|
@ -649,12 +649,22 @@ int Server::main(const std::vector<std::string> & /*args*/)
|
|||||||
if (max_server_memory_usage == 0)
|
if (max_server_memory_usage == 0)
|
||||||
{
|
{
|
||||||
max_server_memory_usage = default_max_server_memory_usage;
|
max_server_memory_usage = default_max_server_memory_usage;
|
||||||
LOG_INFO(log, "Setting max_server_memory_usage was set to {}", formatReadableSizeWithBinarySuffix(max_server_memory_usage));
|
LOG_INFO(log, "Setting max_server_memory_usage was set to {}"
|
||||||
|
" ({} available * {:.2f} max_server_memory_usage_to_ram_ratio)",
|
||||||
|
formatReadableSizeWithBinarySuffix(max_server_memory_usage),
|
||||||
|
formatReadableSizeWithBinarySuffix(memory_amount),
|
||||||
|
max_server_memory_usage_to_ram_ratio);
|
||||||
}
|
}
|
||||||
else if (max_server_memory_usage > default_max_server_memory_usage)
|
else if (max_server_memory_usage > default_max_server_memory_usage)
|
||||||
{
|
{
|
||||||
max_server_memory_usage = default_max_server_memory_usage;
|
max_server_memory_usage = default_max_server_memory_usage;
|
||||||
LOG_INFO(log, "Setting max_server_memory_usage was lowered to {} because the system has low amount of memory", formatReadableSizeWithBinarySuffix(max_server_memory_usage));
|
LOG_INFO(log, "Setting max_server_memory_usage was lowered to {}"
|
||||||
|
" because the system has low amount of memory. The amount was"
|
||||||
|
" calculated as {} available"
|
||||||
|
" * {:.2f} max_server_memory_usage_to_ram_ratio",
|
||||||
|
formatReadableSizeWithBinarySuffix(max_server_memory_usage),
|
||||||
|
formatReadableSizeWithBinarySuffix(memory_amount),
|
||||||
|
max_server_memory_usage_to_ram_ratio);
|
||||||
}
|
}
|
||||||
|
|
||||||
total_memory_tracker.setOrRaiseHardLimit(max_server_memory_usage);
|
total_memory_tracker.setOrRaiseHardLimit(max_server_memory_usage);
|
||||||
|
@ -13,6 +13,13 @@
|
|||||||
#include <Poco/Exception.h>
|
#include <Poco/Exception.h>
|
||||||
#include <pcg_random.hpp>
|
#include <pcg_random.hpp>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
namespace ErrorCodes
|
||||||
|
{
|
||||||
|
extern const int LOGICAL_ERROR;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Implementing the Reservoir Sampling algorithm. Incrementally selects from the added objects a random subset of the sample_count size.
|
/// Implementing the Reservoir Sampling algorithm. Incrementally selects from the added objects a random subset of the sample_count size.
|
||||||
/// Can approximately get quantiles.
|
/// Can approximately get quantiles.
|
||||||
@ -236,7 +243,7 @@ private:
|
|||||||
ResultType onEmpty() const
|
ResultType onEmpty() const
|
||||||
{
|
{
|
||||||
if (OnEmpty == ReservoirSamplerOnEmpty::THROW)
|
if (OnEmpty == ReservoirSamplerOnEmpty::THROW)
|
||||||
throw Poco::Exception("Quantile of empty ReservoirSampler");
|
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Quantile of empty ReservoirSampler");
|
||||||
else
|
else
|
||||||
return NanLikeValueConstructor<ResultType, std::is_floating_point_v<ResultType>>::getValue();
|
return NanLikeValueConstructor<ResultType, std::is_floating_point_v<ResultType>>::getValue();
|
||||||
}
|
}
|
||||||
|
@ -14,6 +14,13 @@
|
|||||||
#include <Common/NaNUtils.h>
|
#include <Common/NaNUtils.h>
|
||||||
#include <Poco/Exception.h>
|
#include <Poco/Exception.h>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
namespace ErrorCodes
|
||||||
|
{
|
||||||
|
extern const int LOGICAL_ERROR;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Implementation of Reservoir Sampling algorithm. Incrementally selects from the added objects a random subset of the `sample_count` size.
|
/// Implementation of Reservoir Sampling algorithm. Incrementally selects from the added objects a random subset of the `sample_count` size.
|
||||||
/// Can approximately get quantiles.
|
/// Can approximately get quantiles.
|
||||||
@ -223,7 +230,7 @@ private:
|
|||||||
ResultType onEmpty() const
|
ResultType onEmpty() const
|
||||||
{
|
{
|
||||||
if (OnEmpty == ReservoirSamplerDeterministicOnEmpty::THROW)
|
if (OnEmpty == ReservoirSamplerDeterministicOnEmpty::THROW)
|
||||||
throw Poco::Exception("Quantile of empty ReservoirSamplerDeterministic");
|
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Quantile of empty ReservoirSamplerDeterministic");
|
||||||
else
|
else
|
||||||
return NanLikeValueConstructor<ResultType, std::is_floating_point_v<ResultType>>::getValue();
|
return NanLikeValueConstructor<ResultType, std::is_floating_point_v<ResultType>>::getValue();
|
||||||
}
|
}
|
||||||
|
@ -550,6 +550,9 @@ public:
|
|||||||
/// Gathers all changed values (e.g. for applying them later to another collection of settings).
|
/// Gathers all changed values (e.g. for applying them later to another collection of settings).
|
||||||
SettingsChanges changes() const;
|
SettingsChanges changes() const;
|
||||||
|
|
||||||
|
// A debugging aid.
|
||||||
|
std::string dumpChangesToString() const;
|
||||||
|
|
||||||
/// Applies change to concrete setting.
|
/// Applies change to concrete setting.
|
||||||
void applyChange(const SettingChange & change);
|
void applyChange(const SettingChange & change);
|
||||||
|
|
||||||
|
@ -219,6 +219,19 @@ SettingsChanges SettingsCollection<Derived>::changes() const
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
template <class Derived>
|
||||||
|
std::string SettingsCollection<Derived>::dumpChangesToString() const
|
||||||
|
{
|
||||||
|
std::stringstream ss;
|
||||||
|
for (const auto & c : changes())
|
||||||
|
{
|
||||||
|
ss << c.name << " = "
|
||||||
|
<< applyVisitor(FieldVisitorToString(), c.value) << "\n";
|
||||||
|
}
|
||||||
|
return ss.str();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
template <class Derived>
|
template <class Derived>
|
||||||
void SettingsCollection<Derived>::applyChange(const SettingChange & change)
|
void SettingsCollection<Derived>::applyChange(const SettingChange & change)
|
||||||
{
|
{
|
||||||
|
Loading…
Reference in New Issue
Block a user