Using formatted string literals, extracted sort funcs in tester

This commit is contained in:
Mike Kot 2021-08-05 17:15:51 +03:00
parent 71e5cfe3ca
commit af536b1b5e

View File

@ -11,6 +11,7 @@ import copy
import traceback import traceback
from argparse import ArgumentParser from argparse import ArgumentParser
from typing import Tuple, Union, Optional, TextIO
import shlex import shlex
import subprocess import subprocess
from subprocess import Popen from subprocess import Popen
@ -20,10 +21,12 @@ from subprocess import TimeoutExpired
from datetime import datetime from datetime import datetime
from time import time, sleep from time import time, sleep
from errno import ESRCH from errno import ESRCH
try: try:
import termcolor import termcolor
except ImportError: except ImportError:
termcolor = None termcolor = None
import random import random
import string import string
import multiprocessing import multiprocessing
@ -81,7 +84,7 @@ def stop_tests():
def json_minify(string): def json_minify(string):
""" """
Removes all js-style comments from json string. Allows to have comments in skip_list.json. Removes all js-style comments from json string. Allows to have comments in skip_list.json.
The code taken from https://github.com/getify/JSON.minify/tree/python under the MIT license. The code was taken from https://github.com/getify/JSON.minify/tree/python under the MIT license.
""" """
tokenizer = re.compile(r'"|(/\*)|(\*/)|(//)|\n|\r') tokenizer = re.compile(r'"|(/\*)|(\*/)|(//)|\n|\r')
@ -148,13 +151,17 @@ def remove_control_characters(s):
s = re.sub(r"[\x00-\x08\x0b\x0e-\x1f\x7f]", "", s) s = re.sub(r"[\x00-\x08\x0b\x0e-\x1f\x7f]", "", s)
return s return s
def get_db_engine(args, database_name): def get_db_engine(args, database_name):
if args.replicated_database: if args.replicated_database:
return " ON CLUSTER test_cluster_database_replicated ENGINE=Replicated('/test/clickhouse/db/{}', '{{shard}}', '{{replica}}')".format(database_name) return f" ON CLUSTER test_cluster_database_replicated \
ENGINE=Replicated('/test/clickhouse/db/{database_name}', \
'{{shard}}', '{{replica}}')"
if args.db_engine: if args.db_engine:
return " ENGINE=" + args.db_engine return " ENGINE=" + args.db_engine
return "" # Will use default engine return "" # Will use default engine
def configure_testcase_args(args, case_file, suite_tmp_dir, stderr_file): def configure_testcase_args(args, case_file, suite_tmp_dir, stderr_file):
testcase_args = copy.deepcopy(args) testcase_args = copy.deepcopy(args)
@ -166,7 +173,6 @@ def configure_testcase_args(args, case_file, suite_tmp_dir, stderr_file):
database = testcase_args.database database = testcase_args.database
os.environ.setdefault("CLICKHOUSE_DATABASE", database) os.environ.setdefault("CLICKHOUSE_DATABASE", database)
os.environ.setdefault("CLICKHOUSE_TMP", suite_tmp_dir) os.environ.setdefault("CLICKHOUSE_TMP", suite_tmp_dir)
else: else:
# If --database is not specified, we will create temporary database with unique name # If --database is not specified, we will create temporary database with unique name
# And we will recreate and drop it for each test # And we will recreate and drop it for each test
@ -176,8 +182,14 @@ def configure_testcase_args(args, case_file, suite_tmp_dir, stderr_file):
database = 'test_{suffix}'.format(suffix=random_str()) database = 'test_{suffix}'.format(suffix=random_str())
with open(stderr_file, 'w') as stderr: with open(stderr_file, 'w') as stderr:
client_cmd = testcase_args.testcase_client + " " + get_additional_client_options(args) client_cmd = testcase_args.testcase_client + " " \
clickhouse_proc_create = Popen(shlex.split(client_cmd), stdin=PIPE, stdout=PIPE, stderr=stderr, universal_newlines=True) + get_additional_client_options(args)
clickhouse_proc_create = open_client_process(
universal_newlines=True,
client_args=client_cmd,
stderr_file=stderr)
try: try:
clickhouse_proc_create.communicate(("CREATE DATABASE " + database + get_db_engine(testcase_args, database)), timeout=testcase_args.timeout) clickhouse_proc_create.communicate(("CREATE DATABASE " + database + get_db_engine(testcase_args, database)), timeout=testcase_args.timeout)
except TimeoutExpired: except TimeoutExpired:
@ -237,8 +249,10 @@ def run_single_test(args, ext, server_logs_level, client_options, case_file, std
if need_drop_database: if need_drop_database:
with open(stderr_file, 'a') as stderr: with open(stderr_file, 'a') as stderr:
clickhouse_proc_create = Popen(shlex.split(client), stdin=PIPE, stdout=PIPE, stderr=stderr, universal_newlines=True) clickhouse_proc_create = open_client_process(client, universal_newlines=True, stderr_file=stderr)
seconds_left = max(args.timeout - (datetime.now() - start_time).total_seconds(), 20) seconds_left = max(args.timeout - (datetime.now() - start_time).total_seconds(), 20)
try: try:
drop_database_query = "DROP DATABASE " + database drop_database_query = "DROP DATABASE " + database
if args.replicated_database: if args.replicated_database:
@ -254,7 +268,7 @@ def run_single_test(args, ext, server_logs_level, client_options, case_file, std
raise raise
total_time = (datetime.now() - start_time).total_seconds() total_time = (datetime.now() - start_time).total_seconds()
return clickhouse_proc_create, "", "Timeout dropping database {} after test".format(database), total_time return clickhouse_proc_create, "", f"Timeout dropping database {database} after test", total_time
shutil.rmtree(args.test_tmp_dir) shutil.rmtree(args.test_tmp_dir)
@ -286,12 +300,16 @@ def need_retry(stdout, stderr):
def get_processlist(args): def get_processlist(args):
try: try:
query = b"SHOW PROCESSLIST FORMAT Vertical" query = b"SHOW PROCESSLIST FORMAT Vertical"
if args.replicated_database: if args.replicated_database:
query = b"SELECT materialize((hostName(), tcpPort())) as host, * " \ query = b"SELECT materialize((hostName(), tcpPort())) as host, * " \
b"FROM clusterAllReplicas('test_cluster_database_replicated', system.processes) " \ b"FROM clusterAllReplicas('test_cluster_database_replicated', system.processes) " \
b"WHERE query NOT LIKE '%system.processes%' FORMAT Vertical" b"WHERE query NOT LIKE '%system.processes%' FORMAT Vertical"
clickhouse_proc = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE)
clickhouse_proc = open_client_process(args.client)
(stdout, _) = clickhouse_proc.communicate((query), timeout=20) (stdout, _) = clickhouse_proc.communicate((query), timeout=20)
return False, stdout.decode('utf-8') return False, stdout.decode('utf-8')
except Exception as ex: except Exception as ex:
print("Exception", ex) print("Exception", ex)
@ -301,47 +319,90 @@ def get_processlist(args):
# collect server stacktraces using gdb # collect server stacktraces using gdb
def get_stacktraces_from_gdb(server_pid): def get_stacktraces_from_gdb(server_pid):
try: try:
cmd = "gdb -batch -ex 'thread apply all backtrace' -p {}".format(server_pid) cmd = f"gdb -batch -ex 'thread apply all backtrace' -p {server_pid}"
return subprocess.check_output(cmd, shell=True).decode('utf-8') return subprocess.check_output(cmd, shell=True).decode('utf-8')
except Exception as ex: except Exception as e:
print("Error occured while receiving stack traces from gdb: {}".format(str(ex))) print(f"Error occurred while receiving stack traces from gdb: {e}")
return None return None
# collect server stacktraces from system.stack_trace table # collect server stacktraces from system.stack_trace table
# it does not work in Sandbox # it does not work in Sandbox
def get_stacktraces_from_clickhouse(client, replicated_database=False): def get_stacktraces_from_clickhouse(client, replicated_database=False):
try: replicated_msg = \
if replicated_database: "{} --allow_introspection_functions=1 --skip_unavailable_shards=1 --query \
return subprocess.check_output("{} --allow_introspection_functions=1 --skip_unavailable_shards=1 --query " \"SELECT materialize((hostName(), tcpPort())) as host, thread_id, \
"\"SELECT materialize((hostName(), tcpPort())) as host, thread_id, " arrayStringConcat(arrayMap(x, y -> concat(x, ': ', y), \
"arrayStringConcat(arrayMap(x, y -> concat(x, ': ', y), arrayMap(x -> addressToLine(x), trace), " arrayMap(x -> addressToLine(x), trace), \
"arrayMap(x -> demangle(addressToSymbol(x)), trace)), '\n') as trace " arrayMap(x -> demangle(addressToSymbol(x)), trace)), '\n') as trace \
"FROM clusterAllReplicas('test_cluster_database_replicated', 'system.stack_trace') " FROM clusterAllReplicas('test_cluster_database_replicated', 'system.stack_trace') \
"ORDER BY host, thread_id format Vertical\"".format(client), shell=True, stderr=subprocess.STDOUT).decode('utf-8') ORDER BY host, thread_id FORMAT Vertical\"".format(client)
return subprocess.check_output("{} --allow_introspection_functions=1 --query " msg = \
"\"SELECT arrayStringConcat(arrayMap(x, y -> concat(x, ': ', y), arrayMap(x -> addressToLine(x), trace), " "{} --allow_introspection_functions=1 --query \
"arrayMap(x -> demangle(addressToSymbol(x)), trace)), '\n') as trace " \"SELECT arrayStringConcat(arrayMap(x, y -> concat(x, ': ', y), \
"FROM system.stack_trace format Vertical\"".format(client), shell=True, stderr=subprocess.STDOUT).decode('utf-8') arrayMap(x -> addressToLine(x), trace), \
except Exception as ex: arrayMap(x -> demangle(addressToSymbol(x)), trace)), '\n') as trace \
print("Error occured while receiving stack traces from client: {}".format(str(ex))) FROM system.stack_trace FORMAT Vertical\"".format(client)
try:
return subprocess.check_output(
replicated_msg if replicated_database else msg,
shell=True, stderr=subprocess.STDOUT).decode('utf-8')
except Exception as e:
print(f"Error occurred while receiving stack traces from client: {e}")
return None return None
def get_server_pid(server_tcp_port):
def print_stacktraces() -> None:
server_pid = get_server_pid()
bt = None
if server_pid and not args.replicated_database:
print("")
print(f"Located ClickHouse server process {server_pid} listening at TCP port {args.tcp_port}")
print("Collecting stacktraces from all running threads with gdb:")
bt = get_stacktraces_from_gdb(server_pid)
if len(bt) < 1000:
print("Got suspiciously small stacktraces: ", bt)
bt = None
if bt is None:
print("\nCollecting stacktraces from system.stacktraces table:")
bt = get_stacktraces_from_clickhouse(
args.client, args.replicated_database)
if bt is not None:
print(bt)
return
print(colored(
f"\nUnable to locate ClickHouse server process listening at TCP port {args.tcp_port}. "
"It must have crashed or exited prematurely!",
args, "red", attrs=["bold"]))
def get_server_pid():
# lsof does not work in stress tests for some reason # lsof does not work in stress tests for some reason
cmd_lsof = "lsof -i tcp:{port} -s tcp:LISTEN -Fp | awk '/^p[0-9]+$/{{print substr($0, 2)}}'".format(port=server_tcp_port) cmd_lsof = f"lsof -i tcp:{args.tcp_port} -s tcp:LISTEN -Fp | awk '/^p[0-9]+$/{{print substr($0, 2)}}'"
cmd_pidof = "pidof -s clickhouse-server" cmd_pidof = "pidof -s clickhouse-server"
commands = [cmd_lsof, cmd_pidof] commands = [cmd_lsof, cmd_pidof]
output = None output = None
for cmd in commands: for cmd in commands:
try: try:
output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT, universal_newlines=True) output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT, universal_newlines=True)
if output: if output:
return int(output) return int(output)
except Exception as e: except Exception as e:
print("Cannot get server pid with {}, got {}: {}".format(cmd, output, e)) print(f"Cannot get server pid with {cmd}, got {output}: {e}")
return None # most likely server dead
return None # most likely server is dead
def colored(text, args, color=None, on_color=None, attrs=None): def colored(text, args, color=None, on_color=None, attrs=None):
@ -357,6 +418,14 @@ server_died = multiprocessing.Event()
stop_tests_triggered_lock = multiprocessing.Lock() stop_tests_triggered_lock = multiprocessing.Lock()
stop_tests_triggered = multiprocessing.Event() stop_tests_triggered = multiprocessing.Event()
queue = multiprocessing.Queue(maxsize=1) queue = multiprocessing.Queue(maxsize=1)
def print_test_time(test_time) -> str:
if args.print_time:
return " {0:.2f} sec.".format(test_time)
else:
return ''
restarted_tests = [] # (test, stderr) restarted_tests = [] # (test, stderr)
# def run_tests_array(all_tests, suite, suite_dir, suite_tmp_dir, run_total): # def run_tests_array(all_tests, suite, suite_dir, suite_tmp_dir, run_total):
@ -385,15 +454,10 @@ def run_tests_array(all_tests_with_params):
client_options = get_additional_client_options(args) client_options = get_additional_client_options(args)
def print_test_time(test_time):
if args.print_time:
return " {0:.2f} sec.".format(test_time)
else:
return ''
if num_tests > 0: if num_tests > 0:
about = 'about ' if is_concurrent else '' about = 'about ' if is_concurrent else ''
print(f"\nRunning {about}{num_tests} {suite} tests ({multiprocessing.current_process().name}).\n") proc_name = multiprocessing.current_process().name
print(f"\nRunning {about}{num_tests} {suite} tests ({proc_name}).\n")
while True: while True:
if is_concurrent: if is_concurrent:
@ -459,7 +523,6 @@ def run_tests_array(all_tests_with_params):
message = open(disabled_file, 'r').read() message = open(disabled_file, 'r').read()
status += MSG_SKIPPED + " - " + message + "\n" status += MSG_SKIPPED + " - " + message + "\n"
else: else:
if args.testname: if args.testname:
clickhouse_proc = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True) clickhouse_proc = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True)
failed_to_check = False failed_to_check = False
@ -599,7 +662,12 @@ def run_tests_array(all_tests_with_params):
except: except:
exc_type, exc_value, tb = sys.exc_info() exc_type, exc_value, tb = sys.exc_info()
failures += 1 failures += 1
print("{0} - Test internal error: {1}\n{2}\n{3}".format(MSG_FAIL, exc_type.__name__, exc_value, "\n".join(traceback.format_tb(tb, 10))))
exc_name = exc_type.__name__
traceback_str = "\n".join(traceback.format_tb(tb, 10))
print(f"{MSG_FAIL} - Test internal error: {exc_name}")
print(f"{exc_value}\n{traceback_str}")
if failures_chain >= 20: if failures_chain >= 20:
stop_tests() stop_tests()
@ -627,9 +695,11 @@ server_logs_level = "warning"
def check_server_started(client, retry_count): def check_server_started(client, retry_count):
print("Connecting to ClickHouse server...", end='') print("Connecting to ClickHouse server...", end='')
sys.stdout.flush() sys.stdout.flush()
while retry_count > 0: while retry_count > 0:
clickhouse_proc = Popen(shlex.split(client), stdin=PIPE, stdout=PIPE, stderr=PIPE) clickhouse_proc = open_client_process(client)
(stdout, stderr) = clickhouse_proc.communicate(b"SELECT 1") (stdout, stderr) = clickhouse_proc.communicate(b"SELECT 1")
if clickhouse_proc.returncode == 0 and stdout.startswith(b"1"): if clickhouse_proc.returncode == 0 and stdout.startswith(b"1"):
@ -679,7 +749,7 @@ class BuildFlags():
def collect_build_flags(client): def collect_build_flags(client):
clickhouse_proc = Popen(shlex.split(client), stdin=PIPE, stdout=PIPE, stderr=PIPE) clickhouse_proc = open_client_process(client)
(stdout, stderr) = clickhouse_proc.communicate(b"SELECT value FROM system.build_options WHERE name = 'CXX_FLAGS'") (stdout, stderr) = clickhouse_proc.communicate(b"SELECT value FROM system.build_options WHERE name = 'CXX_FLAGS'")
result = [] result = []
@ -695,7 +765,7 @@ def collect_build_flags(client):
else: else:
raise Exception("Cannot get information about build from server errorcode {}, stderr {}".format(clickhouse_proc.returncode, stderr)) raise Exception("Cannot get information about build from server errorcode {}, stderr {}".format(clickhouse_proc.returncode, stderr))
clickhouse_proc = Popen(shlex.split(client), stdin=PIPE, stdout=PIPE, stderr=PIPE) clickhouse_proc = open_client_process(client)
(stdout, stderr) = clickhouse_proc.communicate(b"SELECT value FROM system.build_options WHERE name = 'BUILD_TYPE'") (stdout, stderr) = clickhouse_proc.communicate(b"SELECT value FROM system.build_options WHERE name = 'BUILD_TYPE'")
if clickhouse_proc.returncode == 0: if clickhouse_proc.returncode == 0:
@ -706,7 +776,7 @@ def collect_build_flags(client):
else: else:
raise Exception("Cannot get information about build from server errorcode {}, stderr {}".format(clickhouse_proc.returncode, stderr)) raise Exception("Cannot get information about build from server errorcode {}, stderr {}".format(clickhouse_proc.returncode, stderr))
clickhouse_proc = Popen(shlex.split(client), stdin=PIPE, stdout=PIPE, stderr=PIPE) clickhouse_proc = open_client_process(client)
(stdout, stderr) = clickhouse_proc.communicate(b"SELECT value FROM system.build_options WHERE name = 'UNBUNDLED'") (stdout, stderr) = clickhouse_proc.communicate(b"SELECT value FROM system.build_options WHERE name = 'UNBUNDLED'")
if clickhouse_proc.returncode == 0: if clickhouse_proc.returncode == 0:
@ -715,7 +785,7 @@ def collect_build_flags(client):
else: else:
raise Exception("Cannot get information about build from server errorcode {}, stderr {}".format(clickhouse_proc.returncode, stderr)) raise Exception("Cannot get information about build from server errorcode {}, stderr {}".format(clickhouse_proc.returncode, stderr))
clickhouse_proc = Popen(shlex.split(client), stdin=PIPE, stdout=PIPE, stderr=PIPE) clickhouse_proc = open_client_process(client)
(stdout, stderr) = clickhouse_proc.communicate(b"SELECT value FROM system.settings WHERE name = 'default_database_engine'") (stdout, stderr) = clickhouse_proc.communicate(b"SELECT value FROM system.settings WHERE name = 'default_database_engine'")
if clickhouse_proc.returncode == 0: if clickhouse_proc.returncode == 0:
@ -724,7 +794,7 @@ def collect_build_flags(client):
else: else:
raise Exception("Cannot get information about build from server errorcode {}, stderr {}".format(clickhouse_proc.returncode, stderr)) raise Exception("Cannot get information about build from server errorcode {}, stderr {}".format(clickhouse_proc.returncode, stderr))
clickhouse_proc = Popen(shlex.split(client), stdin=PIPE, stdout=PIPE, stderr=PIPE) clickhouse_proc = open_client_process(client)
(stdout, stderr) = clickhouse_proc.communicate(b"SELECT value FROM system.merge_tree_settings WHERE name = 'min_bytes_for_wide_part'") (stdout, stderr) = clickhouse_proc.communicate(b"SELECT value FROM system.merge_tree_settings WHERE name = 'min_bytes_for_wide_part'")
if clickhouse_proc.returncode == 0: if clickhouse_proc.returncode == 0:
@ -736,6 +806,56 @@ def collect_build_flags(client):
return result return result
def suite_key_func(item: str) -> Union[int, Tuple[int, str]]:
if args.order == 'random':
return random.random()
if -1 == item.find('_'):
return 99998, ''
prefix, suffix = item.split('_', 1)
try:
return int(prefix), suffix
except ValueError:
return 99997, ''
def tests_in_suite_key_func(item: str) -> int:
if args.order == 'random':
return random.random()
reverse = 1 if args.order == 'asc' else -1
if -1 == item.find('_'):
return 99998
prefix, _ = item.split('_', 1)
try:
return reverse * int(prefix)
except ValueError:
return 99997
def extract_key(key: str) -> str:
return subprocess.getstatusoutput(
args.extract_from_config +
" --try --config " +
args.configserver + key)[1]
def open_client_process(
client_args: str,
universal_newlines: bool = False,
stderr_file: Optional[TextIO] = None):
return Popen(
shlex.split(client_args), stdin=PIPE, stdout=PIPE,
stderr=stderr_file if stderr_file is not None else PIPE,
universal_newlines=True if universal_newlines else None)
def do_run_tests(jobs, suite, suite_dir, suite_tmp_dir, all_tests, parallel_tests, sequential_tests, parallel): def do_run_tests(jobs, suite, suite_dir, suite_tmp_dir, all_tests, parallel_tests, sequential_tests, parallel):
if jobs > 1 and len(parallel_tests) > 0: if jobs > 1 and len(parallel_tests) > 0:
print("Found", len(parallel_tests), "parallel tests and", len(sequential_tests), "sequential tests") print("Found", len(parallel_tests), "parallel tests and", len(sequential_tests), "sequential tests")
@ -790,7 +910,7 @@ def removesuffix(text, *suffixes):
Added in python 3.9 Added in python 3.9
https://www.python.org/dev/peps/pep-0616/ https://www.python.org/dev/peps/pep-0616/
This version can work with severtal possible suffixes This version can work with several possible suffixes
""" """
for suffix in suffixes: for suffix in suffixes:
if suffix and text.endswith(suffix): if suffix and text.endswith(suffix):
@ -875,7 +995,7 @@ def main(args):
global server_logs_level global server_logs_level
def is_data_present(): def is_data_present():
clickhouse_proc = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE) clickhouse_proc = open_client_process(args.client)
(stdout, stderr) = clickhouse_proc.communicate(b"EXISTS TABLE test.hits") (stdout, stderr) = clickhouse_proc.communicate(b"EXISTS TABLE test.hits")
if clickhouse_proc.returncode != 0: if clickhouse_proc.returncode != 0:
raise CalledProcessError(clickhouse_proc.returncode, args.client, stderr) raise CalledProcessError(clickhouse_proc.returncode, args.client, stderr)
@ -885,9 +1005,10 @@ def main(args):
if not check_server_started(args.client, args.server_check_retries): if not check_server_started(args.client, args.server_check_retries):
raise Exception( raise Exception(
"Server is not responding. Cannot execute 'SELECT 1' query. \ "Server is not responding. Cannot execute 'SELECT 1' query. \
Note: if you are using split build, you may have to specify -c option.") If you are using split build, you have to specify -c option.")
build_flags = collect_build_flags(args.client) build_flags = collect_build_flags(args.client)
if args.replicated_database: if args.replicated_database:
build_flags.append(BuildFlags.DATABASE_REPLICATED) build_flags.append(BuildFlags.DATABASE_REPLICATED)
@ -911,6 +1032,7 @@ def main(args):
os.environ.setdefault("CLICKHOUSE_BINARY", args.binary) os.environ.setdefault("CLICKHOUSE_BINARY", args.binary)
#os.environ.setdefault("CLICKHOUSE_CLIENT", args.client) #os.environ.setdefault("CLICKHOUSE_CLIENT", args.client)
os.environ.setdefault("CLICKHOUSE_CONFIG", args.configserver) os.environ.setdefault("CLICKHOUSE_CONFIG", args.configserver)
if args.configclient: if args.configclient:
os.environ.setdefault("CLICKHOUSE_CONFIG_CLIENT", args.configclient) os.environ.setdefault("CLICKHOUSE_CONFIG_CLIENT", args.configclient)
@ -923,52 +1045,35 @@ def main(args):
stop_time = time() + args.global_time_limit stop_time = time() + args.global_time_limit
if args.zookeeper is None: if args.zookeeper is None:
_, out = subprocess.getstatusoutput(args.extract_from_config + " --try --config " + args.configserver + ' --key zookeeper | grep . | wc -l')
try: try:
if int(out) > 0: args.zookeeper = int(extract_key(" --key zookeeper | grep . | wc -l")) > 0
args.zookeeper = True
else:
args.zookeeper = False
except ValueError: except ValueError:
args.zookeeper = False args.zookeeper = False
if args.shard is None: if args.shard is None:
_, out = subprocess.getstatusoutput(args.extract_from_config + " --try --config " + args.configserver + ' --key listen_host | grep -E "127.0.0.2|::"') args.shard = bool(extract_key(' --key listen_host | grep -E "127.0.0.2|::"'))
if out:
args.shard = True
else:
args.shard = False
def create_common_database(args, db_name): def create_common_database(args, db_name):
create_database_retries = 0 create_database_retries = 0
while create_database_retries < MAX_RETRIES: while create_database_retries < MAX_RETRIES:
client_cmd = args.client + " " + get_additional_client_options(args) client_cmd = args.client + " " + get_additional_client_options(args)
clickhouse_proc_create = Popen(shlex.split(client_cmd), stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True)
clickhouse_proc_create = open_client_process(client_cmd, universal_newlines=True)
(stdout, stderr) = clickhouse_proc_create.communicate(("CREATE DATABASE IF NOT EXISTS " + db_name + get_db_engine(args, db_name))) (stdout, stderr) = clickhouse_proc_create.communicate(("CREATE DATABASE IF NOT EXISTS " + db_name + get_db_engine(args, db_name)))
if not need_retry(stdout, stderr): if not need_retry(stdout, stderr):
break break
create_database_retries += 1 create_database_retries += 1
if args.database and args.database != "test": if args.database and args.database != "test":
create_common_database(args, args.database) create_common_database(args, args.database)
create_common_database(args, "test") create_common_database(args, "test")
def sute_key_func(item):
if args.order == 'random':
return random.random()
if -1 == item.find('_'):
return 99998, ''
prefix, suffix = item.split('_', 1)
try:
return int(prefix), suffix
except ValueError:
return 99997, ''
total_tests_run = 0 total_tests_run = 0
for suite in sorted(os.listdir(base_dir), key=sute_key_func):
for suite in sorted(os.listdir(base_dir), key=suite_key_func):
if server_died.is_set(): if server_died.is_set():
break break
@ -982,8 +1087,8 @@ def main(args):
os.makedirs(suite_tmp_dir) os.makedirs(suite_tmp_dir)
suite = suite_re_obj.group(1) suite = suite_re_obj.group(1)
if os.path.isdir(suite_dir):
if os.path.isdir(suite_dir):
if 'stateful' in suite and not args.no_stateful and not is_data_present(): if 'stateful' in suite and not args.no_stateful and not is_data_present():
print("Won't run stateful tests because test data wasn't loaded.") print("Won't run stateful tests because test data wasn't loaded.")
continue continue
@ -994,29 +1099,14 @@ def main(args):
print("Won't run stateful tests because they were manually disabled.") print("Won't run stateful tests because they were manually disabled.")
continue continue
# Reverse sort order: we want run newest test first. all_tests = get_tests_list(
# And not reverse subtests suite_dir, args.test, args.test_runs, tests_in_suite_key_func)
def key_func(item):
if args.order == 'random':
return random.random()
reverse = 1 if args.order == 'asc' else -1
if -1 == item.find('_'):
return 99998
prefix, _ = item.split('_', 1)
try:
return reverse * int(prefix)
except ValueError:
return 99997
all_tests = get_tests_list(suite_dir, args.test, args.test_runs, key_func)
jobs = args.jobs jobs = args.jobs
parallel_tests = [] parallel_tests = []
sequential_tests = [] sequential_tests = []
for test in all_tests: for test in all_tests:
if any(s in test for s in args.sequential): if any(s in test for s in args.sequential):
sequential_tests.append(test) sequential_tests.append(test)
@ -1042,44 +1132,21 @@ def main(args):
else: else:
print(colored("Seems like server hung and cannot respond to queries", args, "red", attrs=["bold"])) print(colored("Seems like server hung and cannot respond to queries", args, "red", attrs=["bold"]))
clickhouse_tcp_port = os.getenv("CLICKHOUSE_PORT_TCP", '9000')
server_pid = get_server_pid(clickhouse_tcp_port)
bt = None
if server_pid and not args.replicated_database:
print("\nLocated ClickHouse server process {} listening at TCP port {}".format(server_pid, clickhouse_tcp_port))
print("\nCollecting stacktraces from all running threads with gdb:")
bt = get_stacktraces_from_gdb(server_pid)
if len(bt) < 1000:
print("Got suspiciously small stacktraces: ", bt)
bt = None
if bt is None:
print("\nCollecting stacktraces from system.stacktraces table:")
bt = get_stacktraces_from_clickhouse(args.client, args.replicated_database)
if bt is None:
print(
colored(
"\nUnable to locate ClickHouse server process listening at TCP port {}. "
"It must have crashed or exited prematurely!".format(clickhouse_tcp_port),
args, "red", attrs=["bold"]))
else:
print(bt)
print_stacktraces()
exit_code.value = 1 exit_code.value = 1
else: else:
print(colored("\nNo queries hung.", args, "green", attrs=["bold"])) print(colored("\nNo queries hung.", args, "green", attrs=["bold"]))
if len(restarted_tests) > 0: if len(restarted_tests) > 0:
print("\nSome tests were restarted:\n") print("\nSome tests were restarted:\n")
for (test_case, stderr) in restarted_tests: for (test_case, stderr) in restarted_tests:
print(test_case) print(test_case + "\n" + stderr + "\n")
print(stderr)
print("\n")
if total_tests_run == 0: if total_tests_run == 0:
print("No tests were run.") print("No tests were run.")
sys.exit(1) sys.exit(1)
else:
print("All tests have finished.")
sys.exit(exit_code.value) sys.exit(exit_code.value)
@ -1196,9 +1263,11 @@ if __name__ == '__main__':
parser.add_argument('--no-long', action='store_false', dest='no_long', help='Do not run long tests') parser.add_argument('--no-long', action='store_false', dest='no_long', help='Do not run long tests')
parser.add_argument('--client-option', nargs='+', help='Specify additional client argument') parser.add_argument('--client-option', nargs='+', help='Specify additional client argument')
parser.add_argument('--print-time', action='store_true', dest='print_time', help='Print test time') parser.add_argument('--print-time', action='store_true', dest='print_time', help='Print test time')
group=parser.add_mutually_exclusive_group(required=False) group=parser.add_mutually_exclusive_group(required=False)
group.add_argument('--zookeeper', action='store_true', default=None, dest='zookeeper', help='Run zookeeper related tests') group.add_argument('--zookeeper', action='store_true', default=None, dest='zookeeper', help='Run zookeeper related tests')
group.add_argument('--no-zookeeper', action='store_false', default=None, dest='zookeeper', help='Do not run zookeeper related tests') group.add_argument('--no-zookeeper', action='store_false', default=None, dest='zookeeper', help='Do not run zookeeper related tests')
group=parser.add_mutually_exclusive_group(required=False) group=parser.add_mutually_exclusive_group(required=False)
group.add_argument('--shard', action='store_true', default=None, dest='shard', help='Run sharding related tests (required to clickhouse-server listen 127.0.0.2 127.0.0.3)') group.add_argument('--shard', action='store_true', default=None, dest='shard', help='Run sharding related tests (required to clickhouse-server listen 127.0.0.2 127.0.0.3)')
group.add_argument('--no-shard', action='store_false', default=None, dest='shard', help='Do not run shard related tests') group.add_argument('--no-shard', action='store_false', default=None, dest='shard', help='Do not run shard related tests')
@ -1206,7 +1275,7 @@ if __name__ == '__main__':
args = parser.parse_args() args = parser.parse_args()
if args.queries and not os.path.isdir(args.queries): if args.queries and not os.path.isdir(args.queries):
print("Cannot access the specified directory with queries (" + args.queries + ")", file=sys.stderr) print(f"Cannot access the specified directory with queries ({args.queries})", file=sys.stderr)
sys.exit(1) sys.exit(1)
# Autodetect the directory with queries if not specified # Autodetect the directory with queries if not specified
@ -1257,10 +1326,13 @@ if __name__ == '__main__':
if args.configclient: if args.configclient:
args.client += ' --config-file=' + args.configclient args.client += ' --config-file=' + args.configclient
if os.getenv("CLICKHOUSE_HOST"): if os.getenv("CLICKHOUSE_HOST"):
args.client += ' --host=' + os.getenv("CLICKHOUSE_HOST") args.client += ' --host=' + os.getenv("CLICKHOUSE_HOST")
if os.getenv("CLICKHOUSE_PORT_TCP"):
args.client += ' --port=' + os.getenv("CLICKHOUSE_PORT_TCP") args.tcp_port = int(os.getenv("CLICKHOUSE_PORT_TCP", 9000))
args.client += f" --port={args.tcp_port}"
if os.getenv("CLICKHOUSE_DATABASE"): if os.getenv("CLICKHOUSE_DATABASE"):
args.client += ' --database=' + os.getenv("CLICKHOUSE_DATABASE") args.client += ' --database=' + os.getenv("CLICKHOUSE_DATABASE")