mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-17 13:13:36 +00:00
183 lines
8.5 KiB
Python
Executable File
183 lines
8.5 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
from multiprocessing import cpu_count
|
|
from subprocess import Popen, call, check_output, STDOUT
|
|
import os
|
|
import sys
|
|
import shutil
|
|
import argparse
|
|
import logging
|
|
import time
|
|
|
|
|
|
def get_options(i):
|
|
options = []
|
|
client_options = []
|
|
if 0 < i:
|
|
options.append("--order=random")
|
|
|
|
if i % 3 == 1:
|
|
options.append("--db-engine=Ordinary")
|
|
|
|
if i % 3 == 2:
|
|
options.append('''--db-engine="Replicated('/test/db/test_{}', 's1', 'r1')"'''.format(i))
|
|
client_options.append('allow_experimental_database_replicated=1')
|
|
|
|
# If database name is not specified, new database is created for each functional test.
|
|
# Run some threads with one database for all tests.
|
|
if i % 2 == 1:
|
|
options.append(" --database=test_{}".format(i))
|
|
|
|
if i % 5 == 1:
|
|
client_options.append("join_use_nulls=1")
|
|
|
|
if i % 15 == 6:
|
|
client_options.append("join_algorithm='partial_merge'")
|
|
|
|
if i % 15 == 11:
|
|
client_options.append("join_algorithm='auto'")
|
|
client_options.append('max_rows_in_join=1000')
|
|
|
|
if i == 13:
|
|
client_options.append('memory_tracker_fault_probability=0.001')
|
|
|
|
if client_options:
|
|
options.append(" --client-option " + ' '.join(client_options))
|
|
|
|
return ' '.join(options)
|
|
|
|
|
|
def run_func_test(cmd, output_prefix, num_processes, skip_tests_option, global_time_limit):
|
|
global_time_limit_option = ''
|
|
if global_time_limit:
|
|
global_time_limit_option = "--global_time_limit={}".format(global_time_limit)
|
|
|
|
output_paths = [os.path.join(output_prefix, "stress_test_run_{}.txt".format(i)) for i in range(num_processes)]
|
|
pipes = []
|
|
for i in range(0, len(output_paths)):
|
|
f = open(output_paths[i], 'w')
|
|
full_command = "{} {} {} {}".format(cmd, get_options(i), global_time_limit_option, skip_tests_option)
|
|
logging.info("Run func tests '%s'", full_command)
|
|
p = Popen(full_command, shell=True, stdout=f, stderr=f)
|
|
pipes.append(p)
|
|
time.sleep(0.5)
|
|
return pipes
|
|
|
|
def compress_stress_logs(output_path, files_prefix):
|
|
cmd = f"cd {output_path} && tar -zcf stress_run_logs.tar.gz {files_prefix}* && rm {files_prefix}*"
|
|
check_output(cmd, shell=True)
|
|
|
|
def prepare_for_hung_check(drop_databases):
|
|
# FIXME this function should not exist, but...
|
|
|
|
# ThreadFuzzer significantly slows down server and causes false-positive hung check failures
|
|
call("clickhouse client -q 'SYSTEM STOP THREAD FUZZER'", shell=True, stderr=STDOUT, timeout=30)
|
|
|
|
# We attach gdb to clickhouse-server before running tests
|
|
# to print stacktraces of all crashes even if clickhouse cannot print it for some reason.
|
|
# However, it obstruct checking for hung queries.
|
|
logging.info("Will terminate gdb (if any)")
|
|
call("kill -TERM $(pidof gdb)", shell=True, stderr=STDOUT, timeout=30)
|
|
|
|
# Some tests set too low memory limit for default user and forget to reset in back.
|
|
# It may cause SYSTEM queries to fail, let's disable memory limit.
|
|
call("clickhouse client --max_memory_usage_for_user=0 -q 'SELECT 1 FORMAT Null'", shell=True, stderr=STDOUT, timeout=30)
|
|
|
|
# Some tests execute SYSTEM STOP MERGES or similar queries.
|
|
# It may cause some ALTERs to hang.
|
|
# Possibly we should fix tests and forbid to use such queries without specifying table.
|
|
call("clickhouse client -q 'SYSTEM START MERGES'", shell=True, stderr=STDOUT, timeout=30)
|
|
call("clickhouse client -q 'SYSTEM START DISTRIBUTED SENDS'", shell=True, stderr=STDOUT, timeout=30)
|
|
call("clickhouse client -q 'SYSTEM START TTL MERGES'", shell=True, stderr=STDOUT, timeout=30)
|
|
call("clickhouse client -q 'SYSTEM START MOVES'", shell=True, stderr=STDOUT, timeout=30)
|
|
call("clickhouse client -q 'SYSTEM START FETCHES'", shell=True, stderr=STDOUT, timeout=30)
|
|
call("clickhouse client -q 'SYSTEM START REPLICATED SENDS'", shell=True, stderr=STDOUT, timeout=30)
|
|
call("clickhouse client -q 'SYSTEM START REPLICATION QUEUES'", shell=True, stderr=STDOUT, timeout=30)
|
|
|
|
# Issue #21004, live views are experimental, so let's just suppress it
|
|
call("""clickhouse client -q "KILL QUERY WHERE upper(query) LIKE 'WATCH %'" """, shell=True, stderr=STDOUT, timeout=30)
|
|
|
|
# Kill other queries which known to be slow
|
|
# It's query from 01232_preparing_sets_race_condition_long, it may take up to 1000 seconds in slow builds
|
|
call("""clickhouse client -q "KILL QUERY WHERE query LIKE 'insert into tableB select %'" """, shell=True, stderr=STDOUT, timeout=30)
|
|
# Long query from 00084_external_agregation
|
|
call("""clickhouse client -q "KILL QUERY WHERE query LIKE 'SELECT URL, uniq(SearchPhrase) AS u FROM test.hits GROUP BY URL ORDER BY u %'" """, shell=True, stderr=STDOUT, timeout=30)
|
|
|
|
if drop_databases:
|
|
# Here we try to drop all databases in async mode. If some queries really hung, than drop will hung too.
|
|
# Otherwise we will get rid of queries which wait for background pool. It can take a long time on slow builds (more than 900 seconds).
|
|
databases = check_output('clickhouse client -q "SHOW DATABASES"', shell=True, timeout=30).decode('utf-8').strip().split()
|
|
for db in databases:
|
|
if db == "system":
|
|
continue
|
|
command = f'clickhouse client -q "DROP DATABASE {db}"'
|
|
# we don't wait for drop
|
|
Popen(command, shell=True)
|
|
|
|
# Wait for last queries to finish if any, not longer than 300 seconds
|
|
call("""clickhouse client -q "select sleepEachRow((
|
|
select maxOrDefault(300 - elapsed) + 1 from system.processes where query not like '%from system.processes%' and elapsed < 300
|
|
) / 300) from numbers(300) format Null" """, shell=True, stderr=STDOUT, timeout=330)
|
|
|
|
# Even if all clickhouse-test processes are finished, there are probably some sh scripts,
|
|
# which still run some new queries. Let's ignore them.
|
|
try:
|
|
query = """clickhouse client -q "SELECT count() FROM system.processes where where elapsed > 300" """
|
|
output = check_output(query, shell=True, stderr=STDOUT, timeout=30).decode('utf-8').strip()
|
|
if int(output) == 0:
|
|
return False
|
|
except:
|
|
pass
|
|
return True
|
|
|
|
if __name__ == "__main__":
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
|
|
parser = argparse.ArgumentParser(description="ClickHouse script for running stresstest")
|
|
parser.add_argument("--test-cmd", default='/usr/bin/clickhouse-test')
|
|
parser.add_argument("--skip-func-tests", default='')
|
|
parser.add_argument("--client-cmd", default='clickhouse-client')
|
|
parser.add_argument("--server-log-folder", default='/var/log/clickhouse-server')
|
|
parser.add_argument("--output-folder")
|
|
parser.add_argument("--global-time-limit", type=int, default=1800)
|
|
parser.add_argument("--num-parallel", type=int, default=cpu_count())
|
|
parser.add_argument('--hung-check', action='store_true', default=False)
|
|
# make sense only for hung check
|
|
parser.add_argument('--drop-databases', action='store_true', default=False)
|
|
|
|
args = parser.parse_args()
|
|
if args.drop_databases and not args.hung_check:
|
|
raise Exception("--drop-databases only used in hung check (--hung-check)")
|
|
func_pipes = []
|
|
func_pipes = run_func_test(args.test_cmd, args.output_folder, args.num_parallel, args.skip_func_tests, args.global_time_limit)
|
|
|
|
logging.info("Will wait functests to finish")
|
|
while True:
|
|
retcodes = []
|
|
for p in func_pipes:
|
|
if p.poll() is not None:
|
|
retcodes.append(p.returncode)
|
|
if len(retcodes) == len(func_pipes):
|
|
break
|
|
logging.info("Finished %s from %s processes", len(retcodes), len(func_pipes))
|
|
time.sleep(5)
|
|
|
|
logging.info("All processes finished")
|
|
|
|
logging.info("Compressing stress logs")
|
|
compress_stress_logs(args.output_folder, "stress_test_run_")
|
|
logging.info("Logs compressed")
|
|
|
|
if args.hung_check:
|
|
have_long_running_queries = prepare_for_hung_check(args.drop_databases)
|
|
logging.info("Checking if some queries hung")
|
|
cmd = "{} {} {}".format(args.test_cmd, "--hung-check", "00001_select_1")
|
|
res = call(cmd, shell=True, stderr=STDOUT)
|
|
hung_check_status = "No queries hung\tOK\n"
|
|
if res != 0 and have_long_running_queries:
|
|
logging.info("Hung check failed with exit code {}".format(res))
|
|
hung_check_status = "Hung check failed\tFAIL\n"
|
|
with open(os.path.join(args.output_folder, "test_results.tsv"), 'w+') as results:
|
|
results.write(hung_check_status)
|
|
|
|
logging.info("Stress test finished")
|