2019-12-26 17:35:41 +00:00
#!/usr/bin/python3
2020-01-21 18:15:25 +00:00
import os
2020-01-16 14:29:30 +00:00
import sys
2019-12-26 17:35:41 +00:00
import itertools
import clickhouse_driver
import xml . etree . ElementTree as et
import argparse
import pprint
2020-06-27 00:45:00 +00:00
import re
2020-03-03 13:38:45 +00:00
import string
2020-01-16 19:39:07 +00:00
import time
import traceback
2019-12-26 17:35:41 +00:00
2020-05-25 01:03:21 +00:00
def tsv_escape ( s ) :
return s . replace ( ' \\ ' , ' \\ \\ ' ) . replace ( ' \t ' , ' \\ t ' ) . replace ( ' \n ' , ' \\ n ' ) . replace ( ' \r ' , ' ' )
2019-12-26 17:35:41 +00:00
parser = argparse . ArgumentParser ( description = ' Run performance test. ' )
2020-01-10 14:06:07 +00:00
# Explicitly decode files as UTF-8 because sometimes we have Russian characters in queries, and LANG=C is set.
parser . add_argument ( ' file ' , metavar = ' FILE ' , type = argparse . FileType ( ' r ' , encoding = ' utf-8 ' ) , nargs = 1 , help = ' test description file ' )
2020-04-06 20:47:55 +00:00
parser . add_argument ( ' --host ' , nargs = ' * ' , default = [ ' localhost ' ] , help = " Server hostname(s). Corresponds to ' --port ' options. " )
parser . add_argument ( ' --port ' , nargs = ' * ' , default = [ 9000 ] , help = " Server port(s). Corresponds to ' --host ' options. " )
2020-09-02 16:42:24 +00:00
parser . add_argument ( ' --runs ' , type = int , default = int ( os . environ . get ( ' CHPC_RUNS ' , 7 ) ) , help = ' Number of query runs per server. Defaults to CHPC_RUNS environment variable. ' )
2020-06-11 21:24:56 +00:00
parser . add_argument ( ' --long ' , action = ' store_true ' , help = ' Do not skip the tests tagged as long. ' )
2020-06-23 12:30:45 +00:00
parser . add_argument ( ' --print-queries ' , action = ' store_true ' , help = ' Print test queries and exit. ' )
parser . add_argument ( ' --print-settings ' , action = ' store_true ' , help = ' Print test settings and exit. ' )
2019-12-26 17:35:41 +00:00
args = parser . parse_args ( )
2020-05-20 02:19:19 +00:00
test_name = os . path . splitext ( os . path . basename ( args . file [ 0 ] . name ) ) [ 0 ]
2019-12-26 17:35:41 +00:00
tree = et . parse ( args . file [ 0 ] )
root = tree . getroot ( )
2020-03-03 13:38:45 +00:00
# Process query parameters
2019-12-26 17:35:41 +00:00
subst_elems = root . findall ( ' substitutions/substitution ' )
2020-03-03 13:38:45 +00:00
available_parameters = { } # { 'table': ['hits_10m', 'hits_100m'], ... }
for e in subst_elems :
available_parameters [ e . find ( ' name ' ) . text ] = [ v . text for v in e . findall ( ' values/value ' ) ]
2019-12-26 17:35:41 +00:00
2020-06-23 12:09:54 +00:00
# Takes parallel lists of templates, substitutes them with all combos of
# parameters. The set of parameters is determined based on the first list.
# Note: keep the order of queries -- sometimes we have DROP IF EXISTS
2020-02-21 19:58:52 +00:00
# followed by CREATE in create queries section, so the order matters.
2020-06-24 22:21:02 +00:00
def substitute_parameters ( query_templates , other_templates = [ ] ) :
2020-06-23 12:09:54 +00:00
query_results = [ ]
other_results = [ [ ] ] * ( len ( other_templates ) )
for i , q in enumerate ( query_templates ) :
2020-03-03 13:38:45 +00:00
keys = set ( n for _ , n , _ , _ in string . Formatter ( ) . parse ( q ) if n )
values = [ available_parameters [ k ] for k in keys ]
2020-06-23 12:09:54 +00:00
combos = itertools . product ( * values )
for c in combos :
with_keys = dict ( zip ( keys , c ) )
query_results . append ( q . format ( * * with_keys ) )
for j , t in enumerate ( other_templates ) :
other_results [ j ] . append ( t [ i ] . format ( * * with_keys ) )
if len ( other_templates ) :
2020-06-24 22:21:02 +00:00
return query_results , other_results
2020-06-23 12:09:54 +00:00
else :
return query_results
2019-12-26 17:35:41 +00:00
2020-06-26 18:01:40 +00:00
# Build a list of test queries, substituting parameters to query templates,
# and reporting the queries marked as short.
test_queries = [ ]
for e in root . findall ( ' query ' ) :
new_queries = [ ]
if ' short ' in e . attrib :
new_queries , [ is_short ] = substitute_parameters ( [ e . text ] , [ [ e . attrib [ ' short ' ] ] ] )
for i , s in enumerate ( is_short ) :
2020-06-30 09:45:19 +00:00
# Don't print this if we only need to print the queries.
if eval ( s ) and not args . print_queries :
2020-06-26 18:01:40 +00:00
print ( f ' short \t { i + len ( test_queries ) } ' )
else :
new_queries = substitute_parameters ( [ e . text ] )
test_queries + = new_queries
2020-06-11 21:24:56 +00:00
# If we're only asked to print the queries, do that and exit
2020-06-23 12:30:45 +00:00
if args . print_queries :
2020-06-11 21:24:56 +00:00
for q in test_queries :
print ( q )
exit ( 0 )
2020-06-23 12:30:45 +00:00
# If we're only asked to print the settings, do that and exit. These are settings
# for clickhouse-benchmark, so we print them as command line arguments, e.g.
# '--max_memory_usage=10000000'.
if args . print_settings :
for s in root . findall ( ' settings/* ' ) :
print ( f ' -- { s . tag } = { s . text } ' )
exit ( 0 )
2020-03-03 10:47:32 +00:00
# Skip long tests
2020-06-11 21:24:56 +00:00
if not args . long :
for tag in root . findall ( ' .//tag ' ) :
if tag . text == ' long ' :
print ( ' skipped \t Test is tagged as long. ' )
sys . exit ( 0 )
2020-03-03 10:47:32 +00:00
2020-06-11 21:24:56 +00:00
# Check main metric to detect infinite tests. We shouldn't have such tests anymore,
# but we did in the past, and it is convenient to be able to process old tests.
2019-12-26 19:16:36 +00:00
main_metric_element = root . find ( ' main_metric/* ' )
2019-12-26 21:33:10 +00:00
if main_metric_element is not None and main_metric_element . tag != ' min_time ' :
raise Exception ( ' Only the min_time main metric is supported. This test uses \' {} \' ' . format ( main_metric_element . tag ) )
2020-06-11 21:24:56 +00:00
# Another way to detect infinite tests. They should have an appropriate main_metric
# but sometimes they don't.
2019-12-26 21:33:10 +00:00
infinite_sign = root . find ( ' .//average_speed_not_changing_for_ms ' )
if infinite_sign is not None :
raise Exception ( ' Looks like the test is infinite (sign 1) ' )
2019-12-26 17:35:41 +00:00
2020-04-28 07:45:35 +00:00
# Print report threshold for the test if it is set.
if ' max_ignored_relative_change ' in root . attrib :
print ( f ' report-threshold \t { root . attrib [ " max_ignored_relative_change " ] } ' )
2019-12-26 17:35:41 +00:00
# Open connections
2020-01-21 18:15:25 +00:00
servers = [ { ' host ' : host , ' port ' : port } for ( host , port ) in zip ( args . host , args . port ) ]
2019-12-26 17:35:41 +00:00
connections = [ clickhouse_driver . Client ( * * server ) for server in servers ]
2020-03-02 15:05:58 +00:00
for s in servers :
print ( ' server \t {} \t {} ' . format ( s [ ' host ' ] , s [ ' port ' ] ) )
2020-02-14 12:55:47 +00:00
2020-03-02 15:05:58 +00:00
# Run drop queries, ignoring errors. Do this before all other activity, because
# clickhouse_driver disconnects on error (this is not configurable), and the new
# connection loses the changes in settings.
2019-12-26 17:35:41 +00:00
drop_query_templates = [ q . text for q in root . findall ( ' drop_query ' ) ]
2020-03-03 13:38:45 +00:00
drop_queries = substitute_parameters ( drop_query_templates )
2020-06-27 00:45:00 +00:00
for conn_index , c in enumerate ( connections ) :
2019-12-26 17:35:41 +00:00
for q in drop_queries :
try :
c . execute ( q )
2020-06-27 00:45:00 +00:00
print ( f ' drop \t { conn_index } \t { c . last_query . elapsed } \t { tsv_escape ( q ) } ' )
2019-12-26 17:35:41 +00:00
except :
2020-01-16 19:39:07 +00:00
pass
2019-12-26 17:35:41 +00:00
2020-06-05 12:53:47 +00:00
# Apply settings.
# If there are errors, report them and continue -- maybe a new test uses a setting
# that is not in master, but the queries can still run. If we have multiple
# settings and one of them throws an exception, all previous settings for this
# connection will be reset, because the driver reconnects on error (not
# configurable). So the end result is uncertain, but hopefully we'll be able to
# run at least some queries.
2020-03-02 15:05:58 +00:00
settings = root . findall ( ' settings/* ' )
2020-06-27 00:45:00 +00:00
for conn_index , c in enumerate ( connections ) :
2020-03-02 15:05:58 +00:00
for s in settings :
2020-06-05 12:53:47 +00:00
try :
2020-06-27 00:45:00 +00:00
q = f " set { s . tag } = ' { s . text } ' "
c . execute ( q )
print ( f ' set \t { conn_index } \t { c . last_query . elapsed } \t { tsv_escape ( q ) } ' )
2020-06-05 12:53:47 +00:00
except :
print ( traceback . format_exc ( ) , file = sys . stderr )
2020-03-02 15:05:58 +00:00
# Check tables that should exist. If they don't exist, just skip this test.
tables = [ e . text for e in root . findall ( ' preconditions/table_exists ' ) ]
for t in tables :
for c in connections :
try :
2020-03-11 21:07:34 +00:00
res = c . execute ( " select 1 from {} limit 1 " . format ( t ) )
2020-03-02 15:05:58 +00:00
except :
2020-05-25 01:03:21 +00:00
exception_message = traceback . format_exception_only ( * sys . exc_info ( ) [ : 2 ] ) [ - 1 ]
skipped_message = ' ' . join ( exception_message . split ( ' \n ' ) [ : 2 ] )
print ( f ' skipped \t { tsv_escape ( skipped_message ) } ' )
2020-03-02 15:05:58 +00:00
sys . exit ( 0 )
2019-12-26 17:35:41 +00:00
# Run create queries
create_query_templates = [ q . text for q in root . findall ( ' create_query ' ) ]
2020-03-03 13:38:45 +00:00
create_queries = substitute_parameters ( create_query_templates )
2020-06-27 00:45:00 +00:00
# Disallow temporary tables, because the clickhouse_driver reconnects on errors,
# and temporary tables are destroyed. We want to be able to continue after some
# errors.
for q in create_queries :
if re . search ( ' create temporary table ' , q , flags = re . IGNORECASE ) :
print ( f " Temporary tables are not allowed in performance tests: ' { q } ' " ,
file = sys . stderr )
sys . exit ( 1 )
for conn_index , c in enumerate ( connections ) :
2019-12-26 17:35:41 +00:00
for q in create_queries :
c . execute ( q )
2020-06-27 00:45:00 +00:00
print ( f ' create \t { conn_index } \t { c . last_query . elapsed } \t { tsv_escape ( q ) } ' )
2019-12-26 17:35:41 +00:00
# Run fill queries
fill_query_templates = [ q . text for q in root . findall ( ' fill_query ' ) ]
2020-03-03 13:38:45 +00:00
fill_queries = substitute_parameters ( fill_query_templates )
2020-06-27 00:45:00 +00:00
for conn_index , c in enumerate ( connections ) :
2019-12-26 17:35:41 +00:00
for q in fill_queries :
c . execute ( q )
2020-06-27 00:45:00 +00:00
print ( f ' fill \t { conn_index } \t { c . last_query . elapsed } \t { tsv_escape ( q ) } ' )
2019-12-26 17:35:41 +00:00
2020-06-23 12:09:54 +00:00
# Run test queries.
2020-05-20 02:19:19 +00:00
for query_index , q in enumerate ( test_queries ) :
query_prefix = f ' { test_name } .query { query_index } '
2020-04-29 09:28:12 +00:00
# We have some crazy long queries (about 100kB), so trim them to a sane
2020-05-20 02:19:19 +00:00
# length. This means we can't use query text as an identifier and have to
# use the test name + the test-wide query index.
2020-04-29 09:28:12 +00:00
query_display_name = q
if len ( query_display_name ) > 1000 :
2020-05-21 18:06:08 +00:00
query_display_name = f ' { query_display_name [ : 1000 ] } ...( { query_index } ) '
2020-04-29 09:28:12 +00:00
2020-05-20 02:19:19 +00:00
print ( f ' display-name \t { query_index } \t { tsv_escape ( query_display_name ) } ' )
2020-04-29 09:28:12 +00:00
2020-02-11 20:00:53 +00:00
# Prewarm: run once on both servers. Helps to bring the data into memory,
2020-02-11 15:01:16 +00:00
# precompile the queries, etc.
2020-06-27 00:45:00 +00:00
# A query might not run on the old server if it uses a function added in the
# new one. We want to run them on the new server only, so that the PR author
# can ensure that the test works properly. Remember the errors we had on
# each server.
query_error_on_connection = [ None ] * len ( connections ) ;
for conn_index , c in enumerate ( connections ) :
try :
2020-05-20 02:19:19 +00:00
prewarm_id = f ' { query_prefix } .prewarm0 '
res = c . execute ( q , query_id = prewarm_id )
print ( f ' prewarm \t { query_index } \t { prewarm_id } \t { conn_index } \t { c . last_query . elapsed } ' )
2020-06-27 00:45:00 +00:00
except KeyboardInterrupt :
raise
except :
# FIXME the driver reconnects on error and we lose settings, so this
# might lead to further errors or unexpected behavior.
query_error_on_connection [ conn_index ] = traceback . format_exc ( ) ;
continue
2020-02-11 15:01:16 +00:00
2020-08-12 05:39:29 +00:00
# Report all errors that ocurred during prewarm and decide what to do next.
# If prewarm fails for the query on all servers -- skip the query and
# continue testing the next query.
2020-06-27 00:45:00 +00:00
# If prewarm fails on one of the servers, run the query on the rest of them.
2020-08-12 05:39:29 +00:00
no_errors = [ ]
for i , e in enumerate ( query_error_on_connection ) :
if e :
print ( e , file = sys . stderr )
else :
no_errors . append ( i )
if len ( no_errors ) == 0 :
continue
elif len ( no_errors ) < len ( connections ) :
print ( f ' partial \t { query_index } \t { no_errors } ' )
2020-06-27 00:45:00 +00:00
2020-02-11 15:01:16 +00:00
# Now, perform measured runs.
2020-06-27 00:45:00 +00:00
# Track the time spent by the client to process this query, so that we can
# notice the queries that take long to process on the client side, e.g. by
# sending excessive data.
2020-01-16 19:39:07 +00:00
start_seconds = time . perf_counter ( )
server_seconds = 0
2020-01-21 18:15:25 +00:00
for run in range ( 0 , args . runs ) :
2020-05-20 02:19:19 +00:00
run_id = f ' { query_prefix } .run { run } '
2019-12-26 17:35:41 +00:00
for conn_index , c in enumerate ( connections ) :
2020-06-27 00:45:00 +00:00
if query_error_on_connection [ conn_index ] :
continue
2020-08-12 05:39:29 +00:00
try :
res = c . execute ( q , query_id = run_id )
except Exception as e :
# Add query id to the exception to make debugging easier.
e . args = ( run_id , * e . args )
e . message = run_id + ' : ' + e . message
raise
2020-05-20 02:19:19 +00:00
print ( f ' query \t { query_index } \t { run_id } \t { conn_index } \t { c . last_query . elapsed } ' )
2020-01-16 19:39:07 +00:00
server_seconds + = c . last_query . elapsed
2020-09-07 06:33:57 +00:00
if c . last_query . elapsed > 10 :
# Stop processing pathologically slow queries, to avoid timing out
# the entire test task. This shouldn't really happen, so we don't
# need much handling for this case and can just exit.
print ( f ' The query no. { query_index } is taking too long to run ( { c . last_query . elapsed } s) ' , file = sys . stderr )
exit ( 2 )
2020-01-16 19:39:07 +00:00
client_seconds = time . perf_counter ( ) - start_seconds
2020-05-20 02:19:19 +00:00
print ( f ' client-time \t { query_index } \t { client_seconds } \t { server_seconds } ' )
2019-12-26 17:35:41 +00:00
# Run drop queries
2020-03-03 13:38:45 +00:00
drop_queries = substitute_parameters ( drop_query_templates )
2020-06-27 00:45:00 +00:00
for conn_index , c in enumerate ( connections ) :
2019-12-26 17:35:41 +00:00
for q in drop_queries :
c . execute ( q )
2020-06-27 00:45:00 +00:00
print ( f ' drop \t { conn_index } \t { c . last_query . elapsed } \t { tsv_escape ( q ) } ' )