diff --git a/tests/clickhouse-test b/tests/clickhouse-test index d83b3f08c42..212ccd79f00 100755 --- a/tests/clickhouse-test +++ b/tests/clickhouse-test @@ -11,6 +11,7 @@ import copy import traceback from argparse import ArgumentParser +from typing import Tuple, Union, Optional, TextIO import shlex import subprocess from subprocess import Popen @@ -20,10 +21,12 @@ from subprocess import TimeoutExpired from datetime import datetime from time import time, sleep from errno import ESRCH + try: import termcolor except ImportError: termcolor = None + import random import string import multiprocessing @@ -81,7 +84,7 @@ def stop_tests(): def json_minify(string): """ Removes all js-style comments from json string. Allows to have comments in skip_list.json. - The code taken from https://github.com/getify/JSON.minify/tree/python under the MIT license. + The code was taken from https://github.com/getify/JSON.minify/tree/python under the MIT license. """ tokenizer = re.compile(r'"|(/\*)|(\*/)|(//)|\n|\r') @@ -148,13 +151,17 @@ def remove_control_characters(s): s = re.sub(r"[\x00-\x08\x0b\x0e-\x1f\x7f]", "", s) return s + def get_db_engine(args, database_name): if args.replicated_database: - return " ON CLUSTER test_cluster_database_replicated ENGINE=Replicated('/test/clickhouse/db/{}', '{{shard}}', '{{replica}}')".format(database_name) + return f" ON CLUSTER test_cluster_database_replicated \ + ENGINE=Replicated('/test/clickhouse/db/{database_name}', \ + '{{shard}}', '{{replica}}')" if args.db_engine: return " ENGINE=" + args.db_engine return "" # Will use default engine + def configure_testcase_args(args, case_file, suite_tmp_dir, stderr_file): testcase_args = copy.deepcopy(args) @@ -166,7 +173,6 @@ def configure_testcase_args(args, case_file, suite_tmp_dir, stderr_file): database = testcase_args.database os.environ.setdefault("CLICKHOUSE_DATABASE", database) os.environ.setdefault("CLICKHOUSE_TMP", suite_tmp_dir) - else: # If --database is not specified, we will create temporary database with unique name # And we will recreate and drop it for each test @@ -176,8 +182,14 @@ def configure_testcase_args(args, case_file, suite_tmp_dir, stderr_file): database = 'test_{suffix}'.format(suffix=random_str()) with open(stderr_file, 'w') as stderr: - client_cmd = testcase_args.testcase_client + " " + get_additional_client_options(args) - clickhouse_proc_create = Popen(shlex.split(client_cmd), stdin=PIPE, stdout=PIPE, stderr=stderr, universal_newlines=True) + client_cmd = testcase_args.testcase_client + " " \ + + get_additional_client_options(args) + + clickhouse_proc_create = open_client_process( + universal_newlines=True, + client_args=client_cmd, + stderr_file=stderr) + try: clickhouse_proc_create.communicate(("CREATE DATABASE " + database + get_db_engine(testcase_args, database)), timeout=testcase_args.timeout) except TimeoutExpired: @@ -237,8 +249,10 @@ def run_single_test(args, ext, server_logs_level, client_options, case_file, std if need_drop_database: with open(stderr_file, 'a') as stderr: - clickhouse_proc_create = Popen(shlex.split(client), stdin=PIPE, stdout=PIPE, stderr=stderr, universal_newlines=True) + clickhouse_proc_create = open_client_process(client, universal_newlines=True, stderr_file=stderr) + seconds_left = max(args.timeout - (datetime.now() - start_time).total_seconds(), 20) + try: drop_database_query = "DROP DATABASE " + database if args.replicated_database: @@ -254,7 +268,7 @@ def run_single_test(args, ext, server_logs_level, client_options, case_file, std raise total_time = (datetime.now() - start_time).total_seconds() - return clickhouse_proc_create, "", "Timeout dropping database {} after test".format(database), total_time + return clickhouse_proc_create, "", f"Timeout dropping database {database} after test", total_time shutil.rmtree(args.test_tmp_dir) @@ -286,12 +300,16 @@ def need_retry(stdout, stderr): def get_processlist(args): try: query = b"SHOW PROCESSLIST FORMAT Vertical" + if args.replicated_database: query = b"SELECT materialize((hostName(), tcpPort())) as host, * " \ b"FROM clusterAllReplicas('test_cluster_database_replicated', system.processes) " \ b"WHERE query NOT LIKE '%system.processes%' FORMAT Vertical" - clickhouse_proc = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE) + + clickhouse_proc = open_client_process(args.client) + (stdout, _) = clickhouse_proc.communicate((query), timeout=20) + return False, stdout.decode('utf-8') except Exception as ex: print("Exception", ex) @@ -301,47 +319,90 @@ def get_processlist(args): # collect server stacktraces using gdb def get_stacktraces_from_gdb(server_pid): try: - cmd = "gdb -batch -ex 'thread apply all backtrace' -p {}".format(server_pid) + cmd = f"gdb -batch -ex 'thread apply all backtrace' -p {server_pid}" return subprocess.check_output(cmd, shell=True).decode('utf-8') - except Exception as ex: - print("Error occured while receiving stack traces from gdb: {}".format(str(ex))) + except Exception as e: + print(f"Error occurred while receiving stack traces from gdb: {e}") return None # collect server stacktraces from system.stack_trace table # it does not work in Sandbox def get_stacktraces_from_clickhouse(client, replicated_database=False): - try: - if replicated_database: - return subprocess.check_output("{} --allow_introspection_functions=1 --skip_unavailable_shards=1 --query " - "\"SELECT materialize((hostName(), tcpPort())) as host, thread_id, " - "arrayStringConcat(arrayMap(x, y -> concat(x, ': ', y), arrayMap(x -> addressToLine(x), trace), " - "arrayMap(x -> demangle(addressToSymbol(x)), trace)), '\n') as trace " - "FROM clusterAllReplicas('test_cluster_database_replicated', 'system.stack_trace') " - "ORDER BY host, thread_id format Vertical\"".format(client), shell=True, stderr=subprocess.STDOUT).decode('utf-8') + replicated_msg = \ + "{} --allow_introspection_functions=1 --skip_unavailable_shards=1 --query \ + \"SELECT materialize((hostName(), tcpPort())) as host, thread_id, \ + arrayStringConcat(arrayMap(x, y -> concat(x, ': ', y), \ + arrayMap(x -> addressToLine(x), trace), \ + arrayMap(x -> demangle(addressToSymbol(x)), trace)), '\n') as trace \ + FROM clusterAllReplicas('test_cluster_database_replicated', 'system.stack_trace') \ + ORDER BY host, thread_id FORMAT Vertical\"".format(client) - return subprocess.check_output("{} --allow_introspection_functions=1 --query " - "\"SELECT arrayStringConcat(arrayMap(x, y -> concat(x, ': ', y), arrayMap(x -> addressToLine(x), trace), " - "arrayMap(x -> demangle(addressToSymbol(x)), trace)), '\n') as trace " - "FROM system.stack_trace format Vertical\"".format(client), shell=True, stderr=subprocess.STDOUT).decode('utf-8') - except Exception as ex: - print("Error occured while receiving stack traces from client: {}".format(str(ex))) + msg = \ + "{} --allow_introspection_functions=1 --query \ + \"SELECT arrayStringConcat(arrayMap(x, y -> concat(x, ': ', y), \ + arrayMap(x -> addressToLine(x), trace), \ + arrayMap(x -> demangle(addressToSymbol(x)), trace)), '\n') as trace \ + FROM system.stack_trace FORMAT Vertical\"".format(client) + + try: + return subprocess.check_output( + replicated_msg if replicated_database else msg, + shell=True, stderr=subprocess.STDOUT).decode('utf-8') + except Exception as e: + print(f"Error occurred while receiving stack traces from client: {e}") return None -def get_server_pid(server_tcp_port): + +def print_stacktraces() -> None: + server_pid = get_server_pid() + + bt = None + + if server_pid and not args.replicated_database: + print("") + print(f"Located ClickHouse server process {server_pid} listening at TCP port {args.tcp_port}") + print("Collecting stacktraces from all running threads with gdb:") + + bt = get_stacktraces_from_gdb(server_pid) + + if len(bt) < 1000: + print("Got suspiciously small stacktraces: ", bt) + bt = None + + if bt is None: + print("\nCollecting stacktraces from system.stacktraces table:") + + bt = get_stacktraces_from_clickhouse( + args.client, args.replicated_database) + + if bt is not None: + print(bt) + return + + print(colored( + f"\nUnable to locate ClickHouse server process listening at TCP port {args.tcp_port}. " + "It must have crashed or exited prematurely!", + args, "red", attrs=["bold"])) + + +def get_server_pid(): # lsof does not work in stress tests for some reason - cmd_lsof = "lsof -i tcp:{port} -s tcp:LISTEN -Fp | awk '/^p[0-9]+$/{{print substr($0, 2)}}'".format(port=server_tcp_port) + cmd_lsof = f"lsof -i tcp:{args.tcp_port} -s tcp:LISTEN -Fp | awk '/^p[0-9]+$/{{print substr($0, 2)}}'" cmd_pidof = "pidof -s clickhouse-server" + commands = [cmd_lsof, cmd_pidof] output = None + for cmd in commands: try: output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT, universal_newlines=True) if output: return int(output) except Exception as e: - print("Cannot get server pid with {}, got {}: {}".format(cmd, output, e)) - return None # most likely server dead + print(f"Cannot get server pid with {cmd}, got {output}: {e}") + + return None # most likely server is dead def colored(text, args, color=None, on_color=None, attrs=None): @@ -357,6 +418,14 @@ server_died = multiprocessing.Event() stop_tests_triggered_lock = multiprocessing.Lock() stop_tests_triggered = multiprocessing.Event() queue = multiprocessing.Queue(maxsize=1) + + +def print_test_time(test_time) -> str: + if args.print_time: + return " {0:.2f} sec.".format(test_time) + else: + return '' + restarted_tests = [] # (test, stderr) # def run_tests_array(all_tests, suite, suite_dir, suite_tmp_dir, run_total): @@ -385,15 +454,10 @@ def run_tests_array(all_tests_with_params): client_options = get_additional_client_options(args) - def print_test_time(test_time): - if args.print_time: - return " {0:.2f} sec.".format(test_time) - else: - return '' - if num_tests > 0: about = 'about ' if is_concurrent else '' - print(f"\nRunning {about}{num_tests} {suite} tests ({multiprocessing.current_process().name}).\n") + proc_name = multiprocessing.current_process().name + print(f"\nRunning {about}{num_tests} {suite} tests ({proc_name}).\n") while True: if is_concurrent: @@ -459,7 +523,6 @@ def run_tests_array(all_tests_with_params): message = open(disabled_file, 'r').read() status += MSG_SKIPPED + " - " + message + "\n" else: - if args.testname: clickhouse_proc = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True) failed_to_check = False @@ -599,7 +662,12 @@ def run_tests_array(all_tests_with_params): except: exc_type, exc_value, tb = sys.exc_info() failures += 1 - print("{0} - Test internal error: {1}\n{2}\n{3}".format(MSG_FAIL, exc_type.__name__, exc_value, "\n".join(traceback.format_tb(tb, 10)))) + + exc_name = exc_type.__name__ + traceback_str = "\n".join(traceback.format_tb(tb, 10)) + + print(f"{MSG_FAIL} - Test internal error: {exc_name}") + print(f"{exc_value}\n{traceback_str}") if failures_chain >= 20: stop_tests() @@ -627,9 +695,11 @@ server_logs_level = "warning" def check_server_started(client, retry_count): print("Connecting to ClickHouse server...", end='') + sys.stdout.flush() + while retry_count > 0: - clickhouse_proc = Popen(shlex.split(client), stdin=PIPE, stdout=PIPE, stderr=PIPE) + clickhouse_proc = open_client_process(client) (stdout, stderr) = clickhouse_proc.communicate(b"SELECT 1") if clickhouse_proc.returncode == 0 and stdout.startswith(b"1"): @@ -679,7 +749,7 @@ class BuildFlags(): def collect_build_flags(client): - clickhouse_proc = Popen(shlex.split(client), stdin=PIPE, stdout=PIPE, stderr=PIPE) + clickhouse_proc = open_client_process(client) (stdout, stderr) = clickhouse_proc.communicate(b"SELECT value FROM system.build_options WHERE name = 'CXX_FLAGS'") result = [] @@ -695,7 +765,7 @@ def collect_build_flags(client): else: raise Exception("Cannot get information about build from server errorcode {}, stderr {}".format(clickhouse_proc.returncode, stderr)) - clickhouse_proc = Popen(shlex.split(client), stdin=PIPE, stdout=PIPE, stderr=PIPE) + clickhouse_proc = open_client_process(client) (stdout, stderr) = clickhouse_proc.communicate(b"SELECT value FROM system.build_options WHERE name = 'BUILD_TYPE'") if clickhouse_proc.returncode == 0: @@ -706,7 +776,7 @@ def collect_build_flags(client): else: raise Exception("Cannot get information about build from server errorcode {}, stderr {}".format(clickhouse_proc.returncode, stderr)) - clickhouse_proc = Popen(shlex.split(client), stdin=PIPE, stdout=PIPE, stderr=PIPE) + clickhouse_proc = open_client_process(client) (stdout, stderr) = clickhouse_proc.communicate(b"SELECT value FROM system.build_options WHERE name = 'UNBUNDLED'") if clickhouse_proc.returncode == 0: @@ -715,7 +785,7 @@ def collect_build_flags(client): else: raise Exception("Cannot get information about build from server errorcode {}, stderr {}".format(clickhouse_proc.returncode, stderr)) - clickhouse_proc = Popen(shlex.split(client), stdin=PIPE, stdout=PIPE, stderr=PIPE) + clickhouse_proc = open_client_process(client) (stdout, stderr) = clickhouse_proc.communicate(b"SELECT value FROM system.settings WHERE name = 'default_database_engine'") if clickhouse_proc.returncode == 0: @@ -724,7 +794,7 @@ def collect_build_flags(client): else: raise Exception("Cannot get information about build from server errorcode {}, stderr {}".format(clickhouse_proc.returncode, stderr)) - clickhouse_proc = Popen(shlex.split(client), stdin=PIPE, stdout=PIPE, stderr=PIPE) + clickhouse_proc = open_client_process(client) (stdout, stderr) = clickhouse_proc.communicate(b"SELECT value FROM system.merge_tree_settings WHERE name = 'min_bytes_for_wide_part'") if clickhouse_proc.returncode == 0: @@ -736,6 +806,56 @@ def collect_build_flags(client): return result +def suite_key_func(item: str) -> Union[int, Tuple[int, str]]: + if args.order == 'random': + return random.random() + + if -1 == item.find('_'): + return 99998, '' + + prefix, suffix = item.split('_', 1) + + try: + return int(prefix), suffix + except ValueError: + return 99997, '' + + +def tests_in_suite_key_func(item: str) -> int: + if args.order == 'random': + return random.random() + + reverse = 1 if args.order == 'asc' else -1 + + if -1 == item.find('_'): + return 99998 + + prefix, _ = item.split('_', 1) + + try: + return reverse * int(prefix) + except ValueError: + return 99997 + + +def extract_key(key: str) -> str: + return subprocess.getstatusoutput( + args.extract_from_config + + " --try --config " + + args.configserver + key)[1] + + +def open_client_process( + client_args: str, + universal_newlines: bool = False, + stderr_file: Optional[TextIO] = None): + return Popen( + shlex.split(client_args), stdin=PIPE, stdout=PIPE, + stderr=stderr_file if stderr_file is not None else PIPE, + universal_newlines=True if universal_newlines else None) + + + def do_run_tests(jobs, suite, suite_dir, suite_tmp_dir, all_tests, parallel_tests, sequential_tests, parallel): if jobs > 1 and len(parallel_tests) > 0: print("Found", len(parallel_tests), "parallel tests and", len(sequential_tests), "sequential tests") @@ -790,7 +910,7 @@ def removesuffix(text, *suffixes): Added in python 3.9 https://www.python.org/dev/peps/pep-0616/ - This version can work with severtal possible suffixes + This version can work with several possible suffixes """ for suffix in suffixes: if suffix and text.endswith(suffix): @@ -875,7 +995,7 @@ def main(args): global server_logs_level def is_data_present(): - clickhouse_proc = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE) + clickhouse_proc = open_client_process(args.client) (stdout, stderr) = clickhouse_proc.communicate(b"EXISTS TABLE test.hits") if clickhouse_proc.returncode != 0: raise CalledProcessError(clickhouse_proc.returncode, args.client, stderr) @@ -885,9 +1005,10 @@ def main(args): if not check_server_started(args.client, args.server_check_retries): raise Exception( "Server is not responding. Cannot execute 'SELECT 1' query. \ - Note: if you are using split build, you may have to specify -c option.") + If you are using split build, you have to specify -c option.") build_flags = collect_build_flags(args.client) + if args.replicated_database: build_flags.append(BuildFlags.DATABASE_REPLICATED) @@ -911,6 +1032,7 @@ def main(args): os.environ.setdefault("CLICKHOUSE_BINARY", args.binary) #os.environ.setdefault("CLICKHOUSE_CLIENT", args.client) os.environ.setdefault("CLICKHOUSE_CONFIG", args.configserver) + if args.configclient: os.environ.setdefault("CLICKHOUSE_CONFIG_CLIENT", args.configclient) @@ -923,52 +1045,35 @@ def main(args): stop_time = time() + args.global_time_limit if args.zookeeper is None: - _, out = subprocess.getstatusoutput(args.extract_from_config + " --try --config " + args.configserver + ' --key zookeeper | grep . | wc -l') try: - if int(out) > 0: - args.zookeeper = True - else: - args.zookeeper = False + args.zookeeper = int(extract_key(" --key zookeeper | grep . | wc -l")) > 0 except ValueError: args.zookeeper = False if args.shard is None: - _, out = subprocess.getstatusoutput(args.extract_from_config + " --try --config " + args.configserver + ' --key listen_host | grep -E "127.0.0.2|::"') - if out: - args.shard = True - else: - args.shard = False + args.shard = bool(extract_key(' --key listen_host | grep -E "127.0.0.2|::"')) def create_common_database(args, db_name): create_database_retries = 0 while create_database_retries < MAX_RETRIES: client_cmd = args.client + " " + get_additional_client_options(args) - clickhouse_proc_create = Popen(shlex.split(client_cmd), stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True) + + clickhouse_proc_create = open_client_process(client_cmd, universal_newlines=True) + (stdout, stderr) = clickhouse_proc_create.communicate(("CREATE DATABASE IF NOT EXISTS " + db_name + get_db_engine(args, db_name))) + if not need_retry(stdout, stderr): break create_database_retries += 1 if args.database and args.database != "test": create_common_database(args, args.database) + create_common_database(args, "test") - def sute_key_func(item): - if args.order == 'random': - return random.random() - - if -1 == item.find('_'): - return 99998, '' - - prefix, suffix = item.split('_', 1) - - try: - return int(prefix), suffix - except ValueError: - return 99997, '' - total_tests_run = 0 - for suite in sorted(os.listdir(base_dir), key=sute_key_func): + + for suite in sorted(os.listdir(base_dir), key=suite_key_func): if server_died.is_set(): break @@ -982,8 +1087,8 @@ def main(args): os.makedirs(suite_tmp_dir) suite = suite_re_obj.group(1) - if os.path.isdir(suite_dir): + if os.path.isdir(suite_dir): if 'stateful' in suite and not args.no_stateful and not is_data_present(): print("Won't run stateful tests because test data wasn't loaded.") continue @@ -994,29 +1099,14 @@ def main(args): print("Won't run stateful tests because they were manually disabled.") continue - # Reverse sort order: we want run newest test first. - # And not reverse subtests - def key_func(item): - if args.order == 'random': - return random.random() - - reverse = 1 if args.order == 'asc' else -1 - - if -1 == item.find('_'): - return 99998 - - prefix, _ = item.split('_', 1) - - try: - return reverse * int(prefix) - except ValueError: - return 99997 - - all_tests = get_tests_list(suite_dir, args.test, args.test_runs, key_func) + all_tests = get_tests_list( + suite_dir, args.test, args.test_runs, tests_in_suite_key_func) jobs = args.jobs + parallel_tests = [] sequential_tests = [] + for test in all_tests: if any(s in test for s in args.sequential): sequential_tests.append(test) @@ -1042,44 +1132,21 @@ def main(args): else: print(colored("Seems like server hung and cannot respond to queries", args, "red", attrs=["bold"])) - clickhouse_tcp_port = os.getenv("CLICKHOUSE_PORT_TCP", '9000') - server_pid = get_server_pid(clickhouse_tcp_port) - bt = None - if server_pid and not args.replicated_database: - print("\nLocated ClickHouse server process {} listening at TCP port {}".format(server_pid, clickhouse_tcp_port)) - print("\nCollecting stacktraces from all running threads with gdb:") - bt = get_stacktraces_from_gdb(server_pid) - if len(bt) < 1000: - print("Got suspiciously small stacktraces: ", bt) - bt = None - if bt is None: - print("\nCollecting stacktraces from system.stacktraces table:") - bt = get_stacktraces_from_clickhouse(args.client, args.replicated_database) - if bt is None: - print( - colored( - "\nUnable to locate ClickHouse server process listening at TCP port {}. " - "It must have crashed or exited prematurely!".format(clickhouse_tcp_port), - args, "red", attrs=["bold"])) - else: - print(bt) + print_stacktraces() exit_code.value = 1 else: print(colored("\nNo queries hung.", args, "green", attrs=["bold"])) if len(restarted_tests) > 0: print("\nSome tests were restarted:\n") + for (test_case, stderr) in restarted_tests: - print(test_case) - print(stderr) - print("\n") + print(test_case + "\n" + stderr + "\n") if total_tests_run == 0: print("No tests were run.") sys.exit(1) - else: - print("All tests have finished.") sys.exit(exit_code.value) @@ -1196,9 +1263,11 @@ if __name__ == '__main__': parser.add_argument('--no-long', action='store_false', dest='no_long', help='Do not run long tests') parser.add_argument('--client-option', nargs='+', help='Specify additional client argument') parser.add_argument('--print-time', action='store_true', dest='print_time', help='Print test time') + group=parser.add_mutually_exclusive_group(required=False) group.add_argument('--zookeeper', action='store_true', default=None, dest='zookeeper', help='Run zookeeper related tests') group.add_argument('--no-zookeeper', action='store_false', default=None, dest='zookeeper', help='Do not run zookeeper related tests') + group=parser.add_mutually_exclusive_group(required=False) group.add_argument('--shard', action='store_true', default=None, dest='shard', help='Run sharding related tests (required to clickhouse-server listen 127.0.0.2 127.0.0.3)') group.add_argument('--no-shard', action='store_false', default=None, dest='shard', help='Do not run shard related tests') @@ -1206,7 +1275,7 @@ if __name__ == '__main__': args = parser.parse_args() if args.queries and not os.path.isdir(args.queries): - print("Cannot access the specified directory with queries (" + args.queries + ")", file=sys.stderr) + print(f"Cannot access the specified directory with queries ({args.queries})", file=sys.stderr) sys.exit(1) # Autodetect the directory with queries if not specified @@ -1257,10 +1326,13 @@ if __name__ == '__main__': if args.configclient: args.client += ' --config-file=' + args.configclient + if os.getenv("CLICKHOUSE_HOST"): args.client += ' --host=' + os.getenv("CLICKHOUSE_HOST") - if os.getenv("CLICKHOUSE_PORT_TCP"): - args.client += ' --port=' + os.getenv("CLICKHOUSE_PORT_TCP") + + args.tcp_port = int(os.getenv("CLICKHOUSE_PORT_TCP", 9000)) + args.client += f" --port={args.tcp_port}" + if os.getenv("CLICKHOUSE_DATABASE"): args.client += ' --database=' + os.getenv("CLICKHOUSE_DATABASE")