mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
66bc619c3e
Add function zookeeperSessionUptime(), fix some flaky tests
1397 lines
53 KiB
Python
Executable File
1397 lines
53 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import shutil
|
|
import sys
|
|
import os
|
|
import os.path
|
|
import signal
|
|
import re
|
|
import copy
|
|
import traceback
|
|
import math
|
|
|
|
from argparse import ArgumentParser
|
|
from typing import Tuple, Union, Optional, TextIO, Dict, Set, List
|
|
import shlex
|
|
import subprocess
|
|
from subprocess import Popen
|
|
from subprocess import PIPE
|
|
from subprocess import CalledProcessError
|
|
from subprocess import TimeoutExpired
|
|
from datetime import datetime
|
|
from time import time, sleep
|
|
from errno import ESRCH
|
|
|
|
try:
|
|
import termcolor
|
|
except ImportError:
|
|
termcolor = None
|
|
|
|
import random
|
|
import string
|
|
import multiprocessing
|
|
import socket
|
|
from contextlib import closing
|
|
|
|
USE_JINJA = True
|
|
try:
|
|
import jinja2
|
|
except ImportError:
|
|
USE_JINJA = False
|
|
print('WARNING: jinja2 not installed! Template tests will be skipped.')
|
|
|
|
DISTRIBUTED_DDL_TIMEOUT_MSG = "is executing longer than distributed_ddl_task_timeout"
|
|
|
|
MESSAGES_TO_RETRY = [
|
|
"ConnectionPoolWithFailover: Connection failed at try",
|
|
"DB::Exception: New table appeared in database being dropped or detached. Try again",
|
|
"is already started to be removing by another replica right now",
|
|
"DB::Exception: Cannot enqueue query",
|
|
DISTRIBUTED_DDL_TIMEOUT_MSG # FIXME
|
|
]
|
|
|
|
MAX_RETRIES = 3
|
|
|
|
TEST_FILE_EXTENSIONS = ['.sql', '.sql.j2', '.sh', '.py', '.expect']
|
|
|
|
class Terminated(KeyboardInterrupt):
|
|
pass
|
|
|
|
def signal_handler(sig, frame):
|
|
raise Terminated(f'Terminated with {sig} signal')
|
|
|
|
def stop_tests():
|
|
global stop_tests_triggered_lock
|
|
global stop_tests_triggered
|
|
|
|
with stop_tests_triggered_lock:
|
|
if not stop_tests_triggered.is_set():
|
|
stop_tests_triggered.set()
|
|
|
|
# send signal to all processes in group to avoid hung check triggering
|
|
# (to avoid terminating clickhouse-test itself, the signal should be ignored)
|
|
signal.signal(signal.SIGTERM, signal.SIG_IGN)
|
|
os.killpg(os.getpgid(os.getpid()), signal.SIGTERM)
|
|
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
|
|
|
|
|
def get_db_engine(args, database_name):
|
|
if args.replicated_database:
|
|
return f" ON CLUSTER test_cluster_database_replicated \
|
|
ENGINE=Replicated('/test/clickhouse/db/{database_name}', \
|
|
'{{shard}}', '{{replica}}')"
|
|
if args.db_engine:
|
|
return " ENGINE=" + args.db_engine
|
|
return "" # Will use default engine
|
|
|
|
|
|
def configure_testcase_args(args, case_file, suite_tmp_dir, stderr_file):
|
|
testcase_args = copy.deepcopy(args)
|
|
|
|
testcase_args.testcase_start_time = datetime.now()
|
|
testcase_basename = os.path.basename(case_file)
|
|
testcase_args.testcase_client = f"{testcase_args.client} --log_comment='{testcase_basename}'"
|
|
|
|
if testcase_args.database:
|
|
database = testcase_args.database
|
|
os.environ.setdefault("CLICKHOUSE_DATABASE", database)
|
|
os.environ.setdefault("CLICKHOUSE_TMP", suite_tmp_dir)
|
|
else:
|
|
# If --database is not specified, we will create temporary database with unique name
|
|
# And we will recreate and drop it for each test
|
|
def random_str(length=6):
|
|
alphabet = string.ascii_lowercase + string.digits
|
|
return ''.join(random.choice(alphabet) for _ in range(length))
|
|
database = 'test_{suffix}'.format(suffix=random_str())
|
|
|
|
with open(stderr_file, 'w') as stderr:
|
|
client_cmd = testcase_args.testcase_client + " " \
|
|
+ get_additional_client_options(args)
|
|
|
|
clickhouse_proc_create = open_client_process(
|
|
universal_newlines=True,
|
|
client_args=client_cmd,
|
|
stderr_file=stderr)
|
|
|
|
try:
|
|
clickhouse_proc_create.communicate(("CREATE DATABASE " + database + get_db_engine(testcase_args, database)), timeout=testcase_args.timeout)
|
|
except TimeoutExpired:
|
|
total_time = (datetime.now() - testcase_args.testcase_start_time).total_seconds()
|
|
return clickhouse_proc_create, "", "Timeout creating database {} before test".format(database), total_time
|
|
|
|
os.environ["CLICKHOUSE_DATABASE"] = database
|
|
# Set temporary directory to match the randomly generated database,
|
|
# because .sh tests also use it for temporary files and we want to avoid
|
|
# collisions.
|
|
testcase_args.test_tmp_dir = os.path.join(suite_tmp_dir, database)
|
|
os.mkdir(testcase_args.test_tmp_dir)
|
|
os.environ.setdefault("CLICKHOUSE_TMP", testcase_args.test_tmp_dir)
|
|
|
|
testcase_args.testcase_database = database
|
|
|
|
return testcase_args
|
|
|
|
def run_single_test(args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file):
|
|
client = args.testcase_client
|
|
start_time = args.testcase_start_time
|
|
database = args.testcase_database
|
|
|
|
# This is for .sh tests
|
|
os.environ["CLICKHOUSE_LOG_COMMENT"] = case_file
|
|
|
|
params = {
|
|
'client': client + ' --database=' + database,
|
|
'logs_level': server_logs_level,
|
|
'options': client_options,
|
|
'test': case_file,
|
|
'stdout': stdout_file,
|
|
'stderr': stderr_file,
|
|
}
|
|
|
|
# >> append to stderr (but not stdout since it is not used there),
|
|
# because there are also output of per test database creation
|
|
if not args.database:
|
|
pattern = '{test} > {stdout} 2>> {stderr}'
|
|
else:
|
|
pattern = '{test} > {stdout} 2> {stderr}'
|
|
|
|
if ext == '.sql':
|
|
pattern = "{client} --send_logs_level={logs_level} --testmode --multiquery {options} < " + pattern
|
|
|
|
command = pattern.format(**params)
|
|
|
|
proc = Popen(command, shell=True, env=os.environ)
|
|
|
|
while (datetime.now() - start_time).total_seconds() < args.timeout and proc.poll() is None:
|
|
sleep(0.01)
|
|
|
|
need_drop_database = not args.database
|
|
if need_drop_database and args.no_drop_if_fail:
|
|
maybe_passed = (proc.returncode == 0) and (proc.stderr is None) and (proc.stdout is None or 'Exception' not in proc.stdout)
|
|
need_drop_database = not maybe_passed
|
|
|
|
if need_drop_database:
|
|
with open(stderr_file, 'a') as stderr:
|
|
clickhouse_proc_create = open_client_process(client, universal_newlines=True, stderr_file=stderr)
|
|
|
|
seconds_left = max(args.timeout - (datetime.now() - start_time).total_seconds(), 20)
|
|
|
|
try:
|
|
drop_database_query = "DROP DATABASE " + database
|
|
if args.replicated_database:
|
|
drop_database_query += " ON CLUSTER test_cluster_database_replicated"
|
|
clickhouse_proc_create.communicate((drop_database_query), timeout=seconds_left)
|
|
except TimeoutExpired:
|
|
# kill test process because it can also hung
|
|
if proc.returncode is None:
|
|
try:
|
|
proc.kill()
|
|
except OSError as e:
|
|
if e.errno != ESRCH:
|
|
raise
|
|
|
|
total_time = (datetime.now() - start_time).total_seconds()
|
|
return clickhouse_proc_create, "", f"Timeout dropping database {database} after test", total_time
|
|
|
|
shutil.rmtree(args.test_tmp_dir)
|
|
|
|
total_time = (datetime.now() - start_time).total_seconds()
|
|
|
|
# Normalize randomized database names in stdout, stderr files.
|
|
os.system("LC_ALL=C sed -i -e 's/{test_db}/default/g' {file}".format(test_db=database, file=stdout_file))
|
|
if args.hide_db_name:
|
|
os.system("LC_ALL=C sed -i -e 's/{test_db}/default/g' {file}".format(test_db=database, file=stderr_file))
|
|
if args.replicated_database:
|
|
os.system("LC_ALL=C sed -i -e 's|/auto_{{shard}}||g' {file}".format(file=stdout_file))
|
|
os.system("LC_ALL=C sed -i -e 's|auto_{{replica}}||g' {file}".format(file=stdout_file))
|
|
|
|
# Normalize hostname in stdout file.
|
|
os.system("LC_ALL=C sed -i -e 's/{hostname}/localhost/g' {file}".format(hostname=socket.gethostname(), file=stdout_file))
|
|
|
|
stdout = open(stdout_file, 'rb').read() if os.path.exists(stdout_file) else b''
|
|
stdout = str(stdout, errors='replace', encoding='utf-8')
|
|
stderr = open(stderr_file, 'rb').read() if os.path.exists(stderr_file) else b''
|
|
stderr = str(stderr, errors='replace', encoding='utf-8')
|
|
|
|
return proc, stdout, stderr, total_time
|
|
|
|
|
|
def get_zookeeper_session_uptime(args):
|
|
try:
|
|
query = b"SELECT zookeeperSessionUptime()"
|
|
|
|
if args.replicated_database:
|
|
query = b"SELECT min(materialize(zookeeperSessionUptime())) " \
|
|
b"FROM clusterAllReplicas('test_cluster_database_replicated', system.one) "
|
|
|
|
clickhouse_proc = open_client_process(args.client)
|
|
|
|
(stdout, _) = clickhouse_proc.communicate((query), timeout=20)
|
|
|
|
return int(stdout.decode('utf-8').strip())
|
|
except Exception as ex:
|
|
print("Exception", ex)
|
|
return None
|
|
|
|
|
|
def need_retry(args, stdout, stderr, total_time):
|
|
# Sometimes we may get unexpected exception like "Replica is readonly" or "Shutdown is called for table"
|
|
# instead of "Session expired" or "Connection loss"
|
|
# Retry if session was expired during test execution
|
|
session_uptime = get_zookeeper_session_uptime(args)
|
|
if session_uptime is not None and session_uptime < math.ceil(total_time):
|
|
return True
|
|
|
|
return any(msg in stdout for msg in MESSAGES_TO_RETRY) or any(msg in stderr for msg in MESSAGES_TO_RETRY)
|
|
|
|
|
|
def get_processlist(args):
|
|
try:
|
|
query = b"SHOW PROCESSLIST FORMAT Vertical"
|
|
|
|
if args.replicated_database:
|
|
query = b"SELECT materialize((hostName(), tcpPort())) as host, * " \
|
|
b"FROM clusterAllReplicas('test_cluster_database_replicated', system.processes) " \
|
|
b"WHERE query NOT LIKE '%system.processes%' FORMAT Vertical"
|
|
|
|
clickhouse_proc = open_client_process(args.client)
|
|
|
|
(stdout, _) = clickhouse_proc.communicate((query), timeout=20)
|
|
|
|
return False, stdout.decode('utf-8')
|
|
except Exception as ex:
|
|
print("Exception", ex)
|
|
return True, ""
|
|
|
|
|
|
# collect server stacktraces using gdb
|
|
def get_stacktraces_from_gdb(server_pid):
|
|
try:
|
|
cmd = f"gdb -batch -ex 'thread apply all backtrace' -p {server_pid}"
|
|
return subprocess.check_output(cmd, shell=True).decode('utf-8')
|
|
except Exception as e:
|
|
print(f"Error occurred while receiving stack traces from gdb: {e}")
|
|
return None
|
|
|
|
|
|
# collect server stacktraces from system.stack_trace table
|
|
# it does not work in Sandbox
|
|
def get_stacktraces_from_clickhouse(client, replicated_database=False):
|
|
replicated_msg = \
|
|
"{} --allow_introspection_functions=1 --skip_unavailable_shards=1 --query \
|
|
\"SELECT materialize((hostName(), tcpPort())) as host, thread_id, \
|
|
arrayStringConcat(arrayMap(x, y -> concat(x, ': ', y), \
|
|
arrayMap(x -> addressToLine(x), trace), \
|
|
arrayMap(x -> demangle(addressToSymbol(x)), trace)), '\n') as trace \
|
|
FROM clusterAllReplicas('test_cluster_database_replicated', 'system.stack_trace') \
|
|
ORDER BY host, thread_id FORMAT Vertical\"".format(client)
|
|
|
|
msg = \
|
|
"{} --allow_introspection_functions=1 --query \
|
|
\"SELECT arrayStringConcat(arrayMap(x, y -> concat(x, ': ', y), \
|
|
arrayMap(x -> addressToLine(x), trace), \
|
|
arrayMap(x -> demangle(addressToSymbol(x)), trace)), '\n') as trace \
|
|
FROM system.stack_trace FORMAT Vertical\"".format(client)
|
|
|
|
try:
|
|
return subprocess.check_output(
|
|
replicated_msg if replicated_database else msg,
|
|
shell=True, stderr=subprocess.STDOUT).decode('utf-8')
|
|
except Exception as e:
|
|
print(f"Error occurred while receiving stack traces from client: {e}")
|
|
return None
|
|
|
|
|
|
def print_stacktraces() -> None:
|
|
server_pid = get_server_pid()
|
|
|
|
bt = None
|
|
|
|
if server_pid and not args.replicated_database:
|
|
print("")
|
|
print(f"Located ClickHouse server process {server_pid} listening at TCP port {args.tcp_port}")
|
|
print("Collecting stacktraces from all running threads with gdb:")
|
|
|
|
bt = get_stacktraces_from_gdb(server_pid)
|
|
|
|
if len(bt) < 1000:
|
|
print("Got suspiciously small stacktraces: ", bt)
|
|
bt = None
|
|
|
|
if bt is None:
|
|
print("\nCollecting stacktraces from system.stacktraces table:")
|
|
|
|
bt = get_stacktraces_from_clickhouse(
|
|
args.client, args.replicated_database)
|
|
|
|
if bt is not None:
|
|
print(bt)
|
|
return
|
|
|
|
print(colored(
|
|
f"\nUnable to locate ClickHouse server process listening at TCP port {args.tcp_port}. "
|
|
"It must have crashed or exited prematurely!",
|
|
args, "red", attrs=["bold"]))
|
|
|
|
|
|
def get_server_pid():
|
|
# lsof does not work in stress tests for some reason
|
|
cmd_lsof = f"lsof -i tcp:{args.tcp_port} -s tcp:LISTEN -Fp | awk '/^p[0-9]+$/{{print substr($0, 2)}}'"
|
|
cmd_pidof = "pidof -s clickhouse-server"
|
|
|
|
commands = [cmd_lsof, cmd_pidof]
|
|
output = None
|
|
|
|
for cmd in commands:
|
|
try:
|
|
output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT, universal_newlines=True)
|
|
if output:
|
|
return int(output)
|
|
except Exception as e:
|
|
print(f"Cannot get server pid with {cmd}, got {output}: {e}")
|
|
|
|
return None # most likely server is dead
|
|
|
|
|
|
def colored(text, args, color=None, on_color=None, attrs=None):
|
|
if termcolor and (sys.stdout.isatty() or args.force_color):
|
|
return termcolor.colored(text, color, on_color, attrs)
|
|
else:
|
|
return text
|
|
|
|
|
|
stop_time = None
|
|
exit_code = multiprocessing.Value("i", 0)
|
|
server_died = multiprocessing.Event()
|
|
stop_tests_triggered_lock = multiprocessing.Lock()
|
|
stop_tests_triggered = multiprocessing.Event()
|
|
queue = multiprocessing.Queue(maxsize=1)
|
|
|
|
|
|
def print_test_time(test_time) -> str:
|
|
if args.print_time:
|
|
return " {0:.2f} sec.".format(test_time)
|
|
else:
|
|
return ''
|
|
|
|
|
|
# should skip test, should increment skipped_total, skip reason
|
|
def should_skip_test(name: str, test_ext: str, suite_dir: str, all_tags: Dict[str, Set[str]]) -> Tuple[bool, bool, str]:
|
|
tags = all_tags.get(name + test_ext)
|
|
|
|
should_skip = False
|
|
increment_skip_count = False
|
|
skip_reason = ''
|
|
|
|
if tags and ('disabled' in tags) and not args.disabled:
|
|
should_skip = True
|
|
increment_skip_count = False
|
|
skip_reason = 'disabled'
|
|
|
|
elif os.path.exists(os.path.join(suite_dir, name) + '.disabled') and not args.disabled:
|
|
should_skip = True
|
|
increment_skip_count = False
|
|
skip_reason = 'disabled'
|
|
|
|
elif args.skip and any(s in name for s in args.skip):
|
|
should_skip = True
|
|
increment_skip_count = True
|
|
skip_reason = 'skip'
|
|
|
|
elif not USE_JINJA and test_ext.endswith("j2"):
|
|
should_skip = True
|
|
increment_skip_count = True
|
|
skip_reason = 'no jinja'
|
|
|
|
elif tags and (('zookeeper' in tags) or ('replica' in tags)) and not args.zookeeper:
|
|
should_skip = True
|
|
increment_skip_count = True
|
|
skip_reason = 'no zookeeper'
|
|
|
|
elif tags and (('shard' in tags) or ('distributed' in tags) or ('global' in tags)) and not args.shard:
|
|
should_skip = True
|
|
increment_skip_count = True
|
|
skip_reason = 'no shard'
|
|
|
|
elif tags and ('no-fasttest' in tags) and args.fast_tests_only:
|
|
should_skip = True
|
|
increment_skip_count = True
|
|
skip_reason = 'running fast tests only'
|
|
|
|
elif tags and (('long' in tags) or ('deadlock' in tags) or ('race' in tags)) and args.no_long:
|
|
# Tests for races and deadlocks usually are run in a loop for a significant amount of time
|
|
should_skip = True
|
|
increment_skip_count = True
|
|
skip_reason = 'not running long tests'
|
|
|
|
elif tags and ('no-replicated-database' in tags) and args.replicated_database:
|
|
should_skip = True
|
|
increment_skip_count = True
|
|
skip_reason = 'replicated-database'
|
|
|
|
elif tags:
|
|
for build_flag in args.build_flags:
|
|
if 'no-' + build_flag in tags:
|
|
should_skip = True
|
|
increment_skip_count = True
|
|
skip_reason = build_flag
|
|
break
|
|
|
|
return should_skip, increment_skip_count, skip_reason
|
|
|
|
|
|
def send_test_name_failed(suite: str, case: str) -> bool:
|
|
clickhouse_proc = open_client_process(args.client, universal_newlines=True)
|
|
|
|
failed_to_check = False
|
|
|
|
pid = os.getpid()
|
|
query = f"SELECT 'Running test {suite}/{case} from pid={pid}';"
|
|
|
|
try:
|
|
clickhouse_proc.communicate((query), timeout=20)
|
|
except:
|
|
failed_to_check = True
|
|
|
|
return failed_to_check or clickhouse_proc.returncode != 0
|
|
|
|
|
|
restarted_tests = [] # (test, stderr)
|
|
|
|
# def run_tests_array(all_tests, num_tests, suite, suite_dir, suite_tmp_dir, all_tags):
|
|
def run_tests_array(all_tests_with_params):
|
|
all_tests, num_tests, suite, suite_dir, suite_tmp_dir, all_tags = all_tests_with_params
|
|
global stop_time
|
|
global exit_code
|
|
global server_died
|
|
|
|
OP_SQUARE_BRACKET = colored("[", args, attrs=['bold'])
|
|
CL_SQUARE_BRACKET = colored("]", args, attrs=['bold'])
|
|
|
|
MSG_FAIL = OP_SQUARE_BRACKET + colored(" FAIL ", args, "red", attrs=['bold']) + CL_SQUARE_BRACKET
|
|
MSG_UNKNOWN = OP_SQUARE_BRACKET + colored(" UNKNOWN ", args, "yellow", attrs=['bold']) + CL_SQUARE_BRACKET
|
|
MSG_OK = OP_SQUARE_BRACKET + colored(" OK ", args, "green", attrs=['bold']) + CL_SQUARE_BRACKET
|
|
MSG_SKIPPED = OP_SQUARE_BRACKET + colored(" SKIPPED ", args, "cyan", attrs=['bold']) + CL_SQUARE_BRACKET
|
|
|
|
passed_total = 0
|
|
skipped_total = 0
|
|
failures_total = 0
|
|
failures = 0
|
|
failures_chain = 0
|
|
start_time = datetime.now()
|
|
|
|
is_concurrent = multiprocessing.current_process().name != "MainProcess"
|
|
|
|
client_options = get_additional_client_options(args)
|
|
|
|
if num_tests > 0:
|
|
about = 'about ' if is_concurrent else ''
|
|
proc_name = multiprocessing.current_process().name
|
|
print(f"\nRunning {about}{num_tests} {suite} tests ({proc_name}).\n")
|
|
|
|
while True:
|
|
if is_concurrent:
|
|
case = queue.get()
|
|
if not case:
|
|
break
|
|
else:
|
|
if all_tests:
|
|
case = all_tests.pop(0)
|
|
else:
|
|
break
|
|
|
|
if server_died.is_set():
|
|
stop_tests()
|
|
break
|
|
|
|
if stop_time and time() > stop_time:
|
|
print("\nStop tests run because global time limit is exceeded.\n")
|
|
stop_tests()
|
|
break
|
|
|
|
case_file = os.path.join(suite_dir, case)
|
|
(name, ext) = os.path.splitext(case)
|
|
|
|
try:
|
|
status = ''
|
|
if not is_concurrent:
|
|
sys.stdout.flush()
|
|
sys.stdout.write("{0:72}".format(removesuffix(name, ".gen", ".sql") + ": "))
|
|
# This flush is needed so you can see the test name of the long
|
|
# running test before it will finish. But don't do it in parallel
|
|
# mode, so that the lines don't mix.
|
|
sys.stdout.flush()
|
|
else:
|
|
status = "{0:72}".format(removesuffix(name, ".gen", ".sql") + ": ")
|
|
|
|
skip_test, increment_skip_count, skip_reason = \
|
|
should_skip_test(name, ext, suite_dir, all_tags)
|
|
|
|
if skip_test:
|
|
status += MSG_SKIPPED + f" - {skip_reason}\n"
|
|
|
|
if increment_skip_count:
|
|
skipped_total += 1
|
|
else:
|
|
if args.testname and send_test_name_failed(suite, case):
|
|
failures += 1
|
|
print("Server does not respond to health check")
|
|
server_died.set()
|
|
stop_tests()
|
|
break
|
|
|
|
file_suffix = ('.' + str(os.getpid())) if is_concurrent and args.test_runs > 1 else ''
|
|
reference_file = get_reference_file(suite_dir, name)
|
|
stdout_file = os.path.join(suite_tmp_dir, name) + file_suffix + '.stdout'
|
|
stderr_file = os.path.join(suite_tmp_dir, name) + file_suffix + '.stderr'
|
|
|
|
testcase_args = configure_testcase_args(args, case_file, suite_tmp_dir, stderr_file)
|
|
proc, stdout, stderr, total_time = run_single_test(testcase_args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file)
|
|
|
|
if proc.returncode is None:
|
|
try:
|
|
proc.kill()
|
|
except OSError as e:
|
|
if e.errno != ESRCH:
|
|
raise
|
|
|
|
failures += 1
|
|
status += MSG_FAIL
|
|
status += print_test_time(total_time)
|
|
status += " - Timeout!\n"
|
|
if stderr:
|
|
status += stderr
|
|
status += 'Database: ' + testcase_args.testcase_database
|
|
else:
|
|
counter = 1
|
|
while need_retry(args, stdout, stderr, total_time):
|
|
restarted_tests.append((case_file, stderr))
|
|
testcase_args = configure_testcase_args(args, case_file, suite_tmp_dir, stderr_file)
|
|
proc, stdout, stderr, total_time = run_single_test(testcase_args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file)
|
|
sleep(2**counter)
|
|
counter += 1
|
|
if MAX_RETRIES < counter:
|
|
if args.replicated_database:
|
|
if DISTRIBUTED_DDL_TIMEOUT_MSG in stderr:
|
|
server_died.set()
|
|
break
|
|
|
|
if proc.returncode != 0:
|
|
failures += 1
|
|
failures_chain += 1
|
|
status += MSG_FAIL
|
|
status += print_test_time(total_time)
|
|
status += ' - return code {}\n'.format(proc.returncode)
|
|
|
|
if stderr:
|
|
status += stderr
|
|
|
|
# Stop on fatal errors like segmentation fault. They are sent to client via logs.
|
|
if ' <Fatal> ' in stderr:
|
|
server_died.set()
|
|
|
|
if testcase_args.stop \
|
|
and ('Connection refused' in stderr or 'Attempt to read after eof' in stderr) \
|
|
and 'Received exception from server' not in stderr:
|
|
server_died.set()
|
|
|
|
if os.path.isfile(stdout_file):
|
|
status += ", result:\n\n"
|
|
status += '\n'.join(
|
|
open(stdout_file).read().split('\n')[:100])
|
|
status += '\n'
|
|
|
|
status += "\nstdout:\n{}\n".format(stdout)
|
|
status += 'Database: ' + testcase_args.testcase_database
|
|
|
|
elif stderr:
|
|
failures += 1
|
|
failures_chain += 1
|
|
status += MSG_FAIL
|
|
status += print_test_time(total_time)
|
|
status += " - having stderror:\n{}\n".format(
|
|
'\n'.join(stderr.split('\n')[:100]))
|
|
status += "\nstdout:\n{}\n".format(stdout)
|
|
status += 'Database: ' + testcase_args.testcase_database
|
|
elif 'Exception' in stdout:
|
|
failures += 1
|
|
failures_chain += 1
|
|
status += MSG_FAIL
|
|
status += print_test_time(total_time)
|
|
status += " - having exception in stdout:\n{}\n".format(
|
|
'\n'.join(stdout.split('\n')[:100]))
|
|
status += 'Database: ' + testcase_args.testcase_database
|
|
elif '@@SKIP@@' in stdout:
|
|
skipped_total += 1
|
|
skip_reason = stdout.replace('@@SKIP@@', '').rstrip("\n")
|
|
status += MSG_SKIPPED + f" - {skip_reason}\n"
|
|
elif reference_file is None:
|
|
status += MSG_UNKNOWN
|
|
status += print_test_time(total_time)
|
|
status += " - no reference file\n"
|
|
status += 'Database: ' + testcase_args.testcase_database
|
|
else:
|
|
result_is_different = subprocess.call(['diff', '-q', reference_file, stdout_file], stdout=PIPE)
|
|
|
|
if result_is_different:
|
|
diff = Popen(['diff', '-U', str(testcase_args.unified), reference_file, stdout_file], stdout=PIPE, universal_newlines=True).communicate()[0]
|
|
failures += 1
|
|
status += MSG_FAIL
|
|
status += print_test_time(total_time)
|
|
status += " - result differs with reference:\n{}\n".format(diff)
|
|
status += 'Database: ' + testcase_args.testcase_database
|
|
else:
|
|
if testcase_args.test_runs > 1 and total_time > 60 and 'long' not in name:
|
|
# We're in Flaky Check mode, check the run time as well while we're at it.
|
|
failures += 1
|
|
failures_chain += 1
|
|
status += MSG_FAIL
|
|
status += print_test_time(total_time)
|
|
status += " - Test runs too long (> 60s). Make it faster.\n"
|
|
status += 'Database: ' + testcase_args.testcase_database
|
|
else:
|
|
passed_total += 1
|
|
failures_chain = 0
|
|
status += MSG_OK
|
|
status += print_test_time(total_time)
|
|
status += "\n"
|
|
if os.path.exists(stdout_file):
|
|
os.remove(stdout_file)
|
|
if os.path.exists(stderr_file):
|
|
os.remove(stderr_file)
|
|
|
|
if status and not status.endswith('\n'):
|
|
status += '\n'
|
|
|
|
sys.stdout.write(status)
|
|
sys.stdout.flush()
|
|
except KeyboardInterrupt as e:
|
|
print(colored("Break tests execution", args, "red"))
|
|
stop_tests()
|
|
raise e
|
|
except:
|
|
exc_type, exc_value, tb = sys.exc_info()
|
|
failures += 1
|
|
|
|
exc_name = exc_type.__name__
|
|
traceback_str = "\n".join(traceback.format_tb(tb, 10))
|
|
|
|
print(f"{MSG_FAIL} - Test internal error: {exc_name}")
|
|
print(f"{exc_value}\n{traceback_str}")
|
|
|
|
if failures_chain >= 20:
|
|
stop_tests()
|
|
break
|
|
|
|
failures_total = failures_total + failures
|
|
|
|
if failures_total > 0:
|
|
print(colored(f"\nHaving {failures_total} errors! {passed_total} tests passed."
|
|
f" {skipped_total} tests skipped. {(datetime.now() - start_time).total_seconds():.2f} s elapsed"
|
|
f' ({multiprocessing.current_process().name}).',
|
|
args, "red", attrs=["bold"]))
|
|
exit_code.value = 1
|
|
else:
|
|
print(colored(f"\n{passed_total} tests passed. {skipped_total} tests skipped."
|
|
f" {(datetime.now() - start_time).total_seconds():.2f} s elapsed"
|
|
f' ({multiprocessing.current_process().name}).',
|
|
args, "green", attrs=["bold"]))
|
|
|
|
sys.stdout.flush()
|
|
|
|
|
|
server_logs_level = "warning"
|
|
|
|
|
|
def check_server_started(client, retry_count):
|
|
print("Connecting to ClickHouse server...", end='')
|
|
|
|
sys.stdout.flush()
|
|
|
|
while retry_count > 0:
|
|
clickhouse_proc = open_client_process(client)
|
|
(stdout, stderr) = clickhouse_proc.communicate(b"SELECT 1")
|
|
|
|
if clickhouse_proc.returncode == 0 and stdout.startswith(b"1"):
|
|
print(" OK")
|
|
sys.stdout.flush()
|
|
return True
|
|
|
|
if clickhouse_proc.returncode == 210:
|
|
# Connection refused, retry
|
|
print('.', end='')
|
|
sys.stdout.flush()
|
|
retry_count -= 1
|
|
sleep(0.5)
|
|
continue
|
|
|
|
# FIXME Some old comment, maybe now CH supports Python3 ?
|
|
# We can't print this, because for some reason this is python 2,
|
|
# and args appeared in 3.3. To hell with it.
|
|
# print(''.join(clickhouse_proc.args))
|
|
|
|
# Other kind of error, fail.
|
|
|
|
code: int = clickhouse_proc.returncode
|
|
|
|
print(f"\nClient invocation failed with code {code}:\n\
|
|
stdout: {stdout}\n\
|
|
stderr: {stderr}")
|
|
|
|
sys.stdout.flush()
|
|
|
|
return False
|
|
|
|
print('\nAll connection tries failed')
|
|
sys.stdout.flush()
|
|
|
|
return False
|
|
|
|
|
|
class BuildFlags():
|
|
THREAD = 'tsan'
|
|
ADDRESS = 'asan'
|
|
UNDEFINED = 'ubsan'
|
|
MEMORY = 'msan'
|
|
DEBUG = 'debug'
|
|
UNBUNDLED = 'unbundled'
|
|
RELEASE = 'release'
|
|
ORDINARY_DATABASE = 'ordinary-database'
|
|
POLYMORPHIC_PARTS = 'polymorphic-parts'
|
|
|
|
|
|
def collect_build_flags(client):
|
|
clickhouse_proc = open_client_process(client)
|
|
(stdout, stderr) = clickhouse_proc.communicate(b"SELECT value FROM system.build_options WHERE name = 'CXX_FLAGS'")
|
|
result = []
|
|
|
|
if clickhouse_proc.returncode == 0:
|
|
if b'-fsanitize=thread' in stdout:
|
|
result.append(BuildFlags.THREAD)
|
|
elif b'-fsanitize=address' in stdout:
|
|
result.append(BuildFlags.ADDRESS)
|
|
elif b'-fsanitize=undefined' in stdout:
|
|
result.append(BuildFlags.UNDEFINED)
|
|
elif b'-fsanitize=memory' in stdout:
|
|
result.append(BuildFlags.MEMORY)
|
|
else:
|
|
raise Exception("Cannot get information about build from server errorcode {}, stderr {}".format(clickhouse_proc.returncode, stderr))
|
|
|
|
clickhouse_proc = open_client_process(client)
|
|
(stdout, stderr) = clickhouse_proc.communicate(b"SELECT value FROM system.build_options WHERE name = 'BUILD_TYPE'")
|
|
|
|
if clickhouse_proc.returncode == 0:
|
|
if b'Debug' in stdout:
|
|
result.append(BuildFlags.DEBUG)
|
|
elif b'RelWithDebInfo' in stdout or b'Release' in stdout:
|
|
result.append(BuildFlags.RELEASE)
|
|
else:
|
|
raise Exception("Cannot get information about build from server errorcode {}, stderr {}".format(clickhouse_proc.returncode, stderr))
|
|
|
|
clickhouse_proc = open_client_process(client)
|
|
(stdout, stderr) = clickhouse_proc.communicate(b"SELECT value FROM system.build_options WHERE name = 'UNBUNDLED'")
|
|
|
|
if clickhouse_proc.returncode == 0:
|
|
if b'ON' in stdout or b'1' in stdout:
|
|
result.append(BuildFlags.UNBUNDLED)
|
|
else:
|
|
raise Exception("Cannot get information about build from server errorcode {}, stderr {}".format(clickhouse_proc.returncode, stderr))
|
|
|
|
clickhouse_proc = open_client_process(client)
|
|
(stdout, stderr) = clickhouse_proc.communicate(b"SELECT value FROM system.settings WHERE name = 'default_database_engine'")
|
|
|
|
if clickhouse_proc.returncode == 0:
|
|
if b'Ordinary' in stdout:
|
|
result.append(BuildFlags.ORDINARY_DATABASE)
|
|
else:
|
|
raise Exception("Cannot get information about build from server errorcode {}, stderr {}".format(clickhouse_proc.returncode, stderr))
|
|
|
|
clickhouse_proc = open_client_process(client)
|
|
(stdout, stderr) = clickhouse_proc.communicate(b"SELECT value FROM system.merge_tree_settings WHERE name = 'min_bytes_for_wide_part'")
|
|
|
|
if clickhouse_proc.returncode == 0:
|
|
if stdout == b'0\n':
|
|
result.append(BuildFlags.POLYMORPHIC_PARTS)
|
|
else:
|
|
raise Exception("Cannot get inforamtion about build from server errorcode {}, stderr {}".format(clickhouse_proc.returncode, stderr))
|
|
|
|
return result
|
|
|
|
|
|
def suite_key_func(item: str) -> Union[int, Tuple[int, str]]:
|
|
if args.order == 'random':
|
|
return random.random()
|
|
|
|
if -1 == item.find('_'):
|
|
return 99998, ''
|
|
|
|
prefix, suffix = item.split('_', 1)
|
|
|
|
try:
|
|
return int(prefix), suffix
|
|
except ValueError:
|
|
return 99997, ''
|
|
|
|
|
|
def tests_in_suite_key_func(item: str) -> int:
|
|
if args.order == 'random':
|
|
return random.random()
|
|
|
|
reverse = 1 if args.order == 'asc' else -1
|
|
|
|
if -1 == item.find('_'):
|
|
return 99998
|
|
|
|
prefix, _ = item.split('_', 1)
|
|
|
|
try:
|
|
return reverse * int(prefix)
|
|
except ValueError:
|
|
return 99997
|
|
|
|
|
|
def extract_key(key: str) -> str:
|
|
return subprocess.getstatusoutput(
|
|
args.extract_from_config +
|
|
" --try --config " +
|
|
args.configserver + key)[1]
|
|
|
|
|
|
def open_client_process(
|
|
client_args: str,
|
|
universal_newlines: bool = False,
|
|
stderr_file: Optional[TextIO] = None):
|
|
return Popen(
|
|
shlex.split(client_args), stdin=PIPE, stdout=PIPE,
|
|
stderr=stderr_file if stderr_file is not None else PIPE,
|
|
universal_newlines=True if universal_newlines else None)
|
|
|
|
|
|
|
|
def do_run_tests(jobs, suite, suite_dir, suite_tmp_dir, all_tests, all_tags, parallel_tests, sequential_tests, parallel):
|
|
if jobs > 1 and len(parallel_tests) > 0:
|
|
print("Found", len(parallel_tests), "parallel tests and", len(sequential_tests), "sequential tests")
|
|
run_n, run_total = parallel.split('/')
|
|
run_n = float(run_n)
|
|
run_total = float(run_total)
|
|
tests_n = len(parallel_tests)
|
|
if run_total > tests_n:
|
|
run_total = tests_n
|
|
|
|
if jobs > tests_n:
|
|
jobs = tests_n
|
|
if jobs > run_total:
|
|
run_total = jobs
|
|
|
|
batch_size = max(1, len(parallel_tests) // jobs)
|
|
parallel_tests_array = []
|
|
for _ in range(jobs):
|
|
parallel_tests_array.append((None, batch_size, suite, suite_dir, suite_tmp_dir, all_tags))
|
|
|
|
with closing(multiprocessing.Pool(processes=jobs)) as pool:
|
|
pool.map_async(run_tests_array, parallel_tests_array)
|
|
|
|
for suit in parallel_tests:
|
|
queue.put(suit)
|
|
|
|
for _ in range(jobs):
|
|
queue.put(None)
|
|
|
|
queue.close()
|
|
|
|
pool.join()
|
|
|
|
run_tests_array((sequential_tests, len(sequential_tests), suite, suite_dir, suite_tmp_dir, all_tags))
|
|
return len(sequential_tests) + len(parallel_tests)
|
|
else:
|
|
num_tests = len(all_tests)
|
|
run_tests_array((all_tests, num_tests, suite, suite_dir, suite_tmp_dir, all_tags))
|
|
return num_tests
|
|
|
|
|
|
def is_test_from_dir(suite_dir, case):
|
|
case_file = os.path.join(suite_dir, case)
|
|
# We could also test for executable files (os.access(case_file, os.X_OK),
|
|
# but it interferes with 01610_client_spawn_editor.editor, which is invoked
|
|
# as a query editor in the test, and must be marked as executable.
|
|
return os.path.isfile(case_file) and any(case_file.endswith(suppotred_ext) for suppotred_ext in TEST_FILE_EXTENSIONS)
|
|
|
|
|
|
def removesuffix(text, *suffixes):
|
|
"""
|
|
Added in python 3.9
|
|
https://www.python.org/dev/peps/pep-0616/
|
|
|
|
This version can work with several possible suffixes
|
|
"""
|
|
for suffix in suffixes:
|
|
if suffix and text.endswith(suffix):
|
|
return text[:-len(suffix)]
|
|
return text
|
|
|
|
|
|
def render_test_template(j2env, suite_dir, test_name):
|
|
"""
|
|
Render template for test and reference file if needed
|
|
"""
|
|
|
|
if j2env is None:
|
|
return test_name
|
|
|
|
test_base_name = removesuffix(test_name, ".sql.j2", ".sql")
|
|
|
|
reference_file_name = test_base_name + ".reference.j2"
|
|
reference_file_path = os.path.join(suite_dir, reference_file_name)
|
|
if os.path.isfile(reference_file_path):
|
|
tpl = j2env.get_template(reference_file_name)
|
|
tpl.stream().dump(os.path.join(suite_dir, test_base_name) + ".gen.reference")
|
|
|
|
if test_name.endswith(".sql.j2"):
|
|
tpl = j2env.get_template(test_name)
|
|
generated_test_name = test_base_name + ".gen.sql"
|
|
tpl.stream().dump(os.path.join(suite_dir, generated_test_name))
|
|
return generated_test_name
|
|
|
|
return test_name
|
|
|
|
|
|
def get_selected_tests(suite_dir, patterns):
|
|
"""
|
|
Find all files with tests, filter, render templates
|
|
"""
|
|
|
|
j2env = jinja2.Environment(
|
|
loader=jinja2.FileSystemLoader(suite_dir),
|
|
keep_trailing_newline=True,
|
|
) if USE_JINJA else None
|
|
|
|
for test_name in os.listdir(suite_dir):
|
|
if not is_test_from_dir(suite_dir, test_name):
|
|
continue
|
|
if patterns and not any(re.search(pattern, test_name) for pattern in patterns):
|
|
continue
|
|
if USE_JINJA and test_name.endswith(".gen.sql"):
|
|
continue
|
|
test_name = render_test_template(j2env, suite_dir, test_name)
|
|
yield test_name
|
|
|
|
|
|
def get_tests_list(suite_dir, patterns, test_runs, sort_key):
|
|
"""
|
|
Return list of tests file names to run
|
|
"""
|
|
|
|
all_tests = list(get_selected_tests(suite_dir, patterns))
|
|
all_tests = all_tests * test_runs
|
|
all_tests.sort(key=sort_key)
|
|
return all_tests
|
|
|
|
|
|
def get_reference_file(suite_dir, name):
|
|
"""
|
|
Returns reference file name for specified test
|
|
"""
|
|
|
|
name = removesuffix(name, ".gen")
|
|
for ext in ['.reference', '.gen.reference']:
|
|
reference_file = os.path.join(suite_dir, name) + ext
|
|
if os.path.isfile(reference_file):
|
|
return reference_file
|
|
return None
|
|
|
|
|
|
def main(args):
|
|
global server_died
|
|
global stop_time
|
|
global exit_code
|
|
global server_logs_level
|
|
|
|
def is_data_present():
|
|
clickhouse_proc = open_client_process(args.client)
|
|
(stdout, stderr) = clickhouse_proc.communicate(b"EXISTS TABLE test.hits")
|
|
if clickhouse_proc.returncode != 0:
|
|
raise CalledProcessError(clickhouse_proc.returncode, args.client, stderr)
|
|
|
|
return stdout.startswith(b'1')
|
|
|
|
if not check_server_started(args.client, args.server_check_retries):
|
|
msg = "Server is not responding. Cannot execute 'SELECT 1' query. \
|
|
If you are using split build, you have to specify -c option."
|
|
if args.hung_check:
|
|
print(msg)
|
|
pid = get_server_pid()
|
|
print("Got server pid", pid)
|
|
print_stacktraces()
|
|
raise Exception(msg)
|
|
|
|
args.build_flags = collect_build_flags(args.client)
|
|
|
|
if args.skip:
|
|
args.skip = set(args.skip)
|
|
|
|
base_dir = os.path.abspath(args.queries)
|
|
tmp_dir = os.path.abspath(args.tmp)
|
|
|
|
# Keep same default values as in queries/shell_config.sh
|
|
os.environ.setdefault("CLICKHOUSE_BINARY", args.binary)
|
|
# os.environ.setdefault("CLICKHOUSE_CLIENT", args.client)
|
|
os.environ.setdefault("CLICKHOUSE_CONFIG", args.configserver)
|
|
|
|
if args.configclient:
|
|
os.environ.setdefault("CLICKHOUSE_CONFIG_CLIENT", args.configclient)
|
|
|
|
# Force to print server warnings in stderr
|
|
# Shell scripts could change logging level
|
|
os.environ.setdefault("CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL", server_logs_level)
|
|
|
|
# This code is bad as the time is not monotonic
|
|
if args.global_time_limit:
|
|
stop_time = time() + args.global_time_limit
|
|
|
|
if args.zookeeper is None:
|
|
try:
|
|
args.zookeeper = int(extract_key(" --key zookeeper | grep . | wc -l")) > 0
|
|
except ValueError:
|
|
args.zookeeper = False
|
|
|
|
if args.shard is None:
|
|
args.shard = bool(extract_key(' --key listen_host | grep -E "127.0.0.2|::"'))
|
|
|
|
def create_common_database(args, db_name):
|
|
create_database_retries = 0
|
|
while create_database_retries < MAX_RETRIES:
|
|
start_time = datetime.now()
|
|
|
|
client_cmd = args.client + " " + get_additional_client_options(args)
|
|
|
|
clickhouse_proc_create = open_client_process(client_cmd, universal_newlines=True)
|
|
|
|
(stdout, stderr) = clickhouse_proc_create.communicate(("CREATE DATABASE IF NOT EXISTS " + db_name + get_db_engine(args, db_name)))
|
|
|
|
total_time = (datetime.now() - start_time).total_seconds()
|
|
|
|
if not need_retry(args, stdout, stderr, total_time):
|
|
break
|
|
create_database_retries += 1
|
|
|
|
if args.database and args.database != "test":
|
|
create_common_database(args, args.database)
|
|
|
|
create_common_database(args, "test")
|
|
|
|
total_tests_run = 0
|
|
|
|
for suite in sorted(os.listdir(base_dir), key=suite_key_func):
|
|
if server_died.is_set():
|
|
break
|
|
|
|
suite_dir = os.path.join(base_dir, suite)
|
|
suite_re_obj = re.search('^[0-9]+_(.*)$', suite)
|
|
if not suite_re_obj: # skip .gitignore and so on
|
|
continue
|
|
|
|
suite_tmp_dir = os.path.join(tmp_dir, suite)
|
|
if not os.path.exists(suite_tmp_dir):
|
|
os.makedirs(suite_tmp_dir)
|
|
|
|
suite = suite_re_obj.group(1)
|
|
|
|
if os.path.isdir(suite_dir):
|
|
if 'stateful' in suite and not args.no_stateful and not is_data_present():
|
|
print("Won't run stateful tests because test data wasn't loaded.")
|
|
continue
|
|
if 'stateless' in suite and args.no_stateless:
|
|
print("Won't run stateless tests because they were manually disabled.")
|
|
continue
|
|
if 'stateful' in suite and args.no_stateful:
|
|
print("Won't run stateful tests because they were manually disabled.")
|
|
continue
|
|
|
|
all_tests = get_tests_list(
|
|
suite_dir, args.test, args.test_runs, tests_in_suite_key_func)
|
|
|
|
all_tags = read_test_tags(suite_dir, all_tests)
|
|
|
|
sequential_tests = []
|
|
if args.sequential:
|
|
for test in all_tests:
|
|
if any(s in test for s in args.sequential):
|
|
sequential_tests.append(test)
|
|
else:
|
|
sequential_tests = collect_sequential_list(all_tags)
|
|
|
|
sequential_tests_set = set(sequential_tests)
|
|
parallel_tests = [test for test in all_tests if test not in sequential_tests_set]
|
|
|
|
total_tests_run += do_run_tests(
|
|
args.jobs, suite, suite_dir, suite_tmp_dir, all_tests, all_tags, parallel_tests, sequential_tests, args.parallel)
|
|
|
|
if server_died.is_set():
|
|
exit_code.value = 1
|
|
|
|
if args.hung_check:
|
|
|
|
# Some queries may execute in background for some time after test was finished. This is normal.
|
|
for _ in range(1, 60):
|
|
timeout, processlist = get_processlist(args)
|
|
if timeout or not processlist:
|
|
break
|
|
sleep(1)
|
|
|
|
if timeout or processlist:
|
|
if processlist:
|
|
print(colored("\nFound hung queries in processlist:", args, "red", attrs=["bold"]))
|
|
print(processlist)
|
|
else:
|
|
print(colored("Seems like server hung and cannot respond to queries", args, "red", attrs=["bold"]))
|
|
|
|
|
|
print_stacktraces()
|
|
exit_code.value = 1
|
|
else:
|
|
print(colored("\nNo queries hung.", args, "green", attrs=["bold"]))
|
|
|
|
if len(restarted_tests) > 0:
|
|
print("\nSome tests were restarted:\n")
|
|
|
|
for (test_case, stderr) in restarted_tests:
|
|
print(test_case + "\n" + stderr + "\n")
|
|
|
|
if total_tests_run == 0:
|
|
print("No tests were run.")
|
|
sys.exit(1)
|
|
else:
|
|
print("All tests have finished.")
|
|
|
|
sys.exit(exit_code.value)
|
|
|
|
|
|
def find_binary(name):
|
|
if os.path.exists(name) and os.access(name, os.X_OK):
|
|
return True
|
|
paths = os.environ.get("PATH").split(':')
|
|
for path in paths:
|
|
if os.access(os.path.join(path, name), os.X_OK):
|
|
return True
|
|
|
|
# maybe it wasn't in PATH
|
|
if os.access(os.path.join('/usr/local/bin', name), os.X_OK):
|
|
return True
|
|
if os.access(os.path.join('/usr/bin', name), os.X_OK):
|
|
return True
|
|
return False
|
|
|
|
|
|
def get_additional_client_options(args):
|
|
if args.client_option:
|
|
return ' '.join('--' + option for option in args.client_option)
|
|
|
|
return ''
|
|
|
|
|
|
def get_additional_client_options_url(args):
|
|
if args.client_option:
|
|
return '&'.join(args.client_option)
|
|
|
|
return ''
|
|
|
|
|
|
def read_test_tags(suite_dir: str, all_tests: List[str]) -> Dict[str, Set[str]]:
|
|
def get_comment_sign(filename):
|
|
if filename.endswith('.sql') or filename.endswith('.sql.j2'):
|
|
return '--'
|
|
elif filename.endswith('.sh') or filename.endswith('.py') or filename.endswith('.expect'):
|
|
return '#'
|
|
else:
|
|
raise Exception(f'Unknown file_extension: {filename}')
|
|
|
|
def parse_tags_from_line(line, comment_sign):
|
|
if not line.startswith(comment_sign):
|
|
return None
|
|
tags_str = line[len(comment_sign):].lstrip()
|
|
tags_prefix = "Tags:"
|
|
if not tags_str.startswith(tags_prefix):
|
|
return None
|
|
tags_str = tags_str[len(tags_prefix):]
|
|
tags = tags_str.split(',')
|
|
tags = {tag.strip() for tag in tags}
|
|
return tags
|
|
|
|
def is_shebang(line):
|
|
return line.startswith('#!')
|
|
|
|
def load_tags_from_file(filepath):
|
|
with open(filepath, 'r') as file:
|
|
try:
|
|
line = file.readline()
|
|
if is_shebang(line):
|
|
line = file.readline()
|
|
except UnicodeDecodeError:
|
|
return []
|
|
return parse_tags_from_line(line, get_comment_sign(filepath))
|
|
|
|
all_tags = {}
|
|
start_time = datetime.now()
|
|
for test_name in all_tests:
|
|
tags = load_tags_from_file(os.path.join(suite_dir, test_name))
|
|
if tags:
|
|
all_tags[test_name] = tags
|
|
elapsed = (datetime.now() - start_time).total_seconds()
|
|
if elapsed > 1:
|
|
print(f"Tags for suite {suite_dir} read in {elapsed:.2f} seconds")
|
|
return all_tags
|
|
|
|
|
|
def collect_sequential_list(all_tags: Dict[str, Set[str]]) -> List[str]:
|
|
res = []
|
|
for test_name, tags in all_tags.items():
|
|
if ('no-parallel' in tags) or ('sequential' in tags):
|
|
res.append(test_name)
|
|
return res
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# Move to a new process group and kill it at exit so that we don't have any
|
|
# infinite tests processes left
|
|
# (new process group is required to avoid killing some parent processes)
|
|
os.setpgid(0, 0)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGHUP, signal_handler)
|
|
|
|
parser = ArgumentParser(description='ClickHouse functional tests')
|
|
parser.add_argument('-q', '--queries', help='Path to queries dir')
|
|
parser.add_argument('--tmp', help='Path to tmp dir')
|
|
|
|
parser.add_argument('-b', '--binary', default='clickhouse',
|
|
help='Path to clickhouse (if monolithic build, clickhouse-server otherwise) binary or name of binary in PATH')
|
|
|
|
parser.add_argument('-c', '--client',
|
|
help='Path to clickhouse-client (if split build, useless otherwise) binary of name of binary in PATH')
|
|
|
|
parser.add_argument('--extract_from_config', help='extract-from-config program')
|
|
parser.add_argument('--configclient', help='Client config (if you use not default ports)')
|
|
parser.add_argument('--configserver', default='/etc/clickhouse-server/config.xml', help='Preprocessed server config')
|
|
parser.add_argument('-o', '--output', help='Output xUnit compliant test report directory')
|
|
parser.add_argument('-t', '--timeout', type=int, default=600, help='Timeout for each test case in seconds')
|
|
parser.add_argument('--global_time_limit', type=int, help='Stop if executing more than specified time (after current test finished)')
|
|
parser.add_argument('test', nargs='*', help='Optional test case name regex')
|
|
parser.add_argument('-d', '--disabled', action='store_true', default=False, help='Also run disabled tests')
|
|
parser.add_argument('--stop', action='store_true', default=None, dest='stop', help='Stop on network errors')
|
|
parser.add_argument('--order', default='desc', choices=['asc', 'desc', 'random'], help='Run order')
|
|
parser.add_argument('--testname', action='store_true', default=None, dest='testname', help='Make query with test name before test run')
|
|
parser.add_argument('--hung-check', action='store_true', default=False)
|
|
parser.add_argument('--force-color', action='store_true', default=False)
|
|
parser.add_argument('--database', help='Database for tests (random name test_XXXXXX by default)')
|
|
parser.add_argument('--no-drop-if-fail', action='store_true', help='Do not drop database for test if test has failed')
|
|
parser.add_argument('--hide-db-name', action='store_true', help='Replace random database name with "default" in stderr')
|
|
parser.add_argument('--parallel', default='1/1', help='One parallel test run number/total')
|
|
parser.add_argument('-j', '--jobs', default=1, nargs='?', type=int, help='Run all tests in parallel')
|
|
parser.add_argument('--test-runs', default=1, nargs='?', type=int, help='Run each test many times (useful for e.g. flaky check)')
|
|
parser.add_argument('-U', '--unified', default=3, type=int, help='output NUM lines of unified context')
|
|
parser.add_argument('-r', '--server-check-retries', default=30, type=int, help='Num of tries to execute SELECT 1 before tests started')
|
|
parser.add_argument('--db-engine', help='Database engine name')
|
|
parser.add_argument('--replicated-database', action='store_true', default=False, help='Run tests with Replicated database engine')
|
|
parser.add_argument('--fast-tests-only', action='store_true', default=False, help='Run only fast tests (the tests without the "no-fasttest" tag)')
|
|
parser.add_argument('--no-stateless', action='store_true', help='Disable all stateless tests')
|
|
parser.add_argument('--no-stateful', action='store_true', help='Disable all stateful tests')
|
|
parser.add_argument('--skip', nargs='+', help="Skip these tests")
|
|
parser.add_argument('--sequential', nargs='+', help="Run these tests sequentially even if --parallel specified")
|
|
parser.add_argument('--no-long', action='store_true', dest='no_long', help='Do not run long tests')
|
|
parser.add_argument('--client-option', nargs='+', help='Specify additional client argument')
|
|
parser.add_argument('--print-time', action='store_true', dest='print_time', help='Print test time')
|
|
|
|
group = parser.add_mutually_exclusive_group(required=False)
|
|
group.add_argument('--zookeeper', action='store_true', default=None, dest='zookeeper', help='Run zookeeper related tests')
|
|
group.add_argument('--no-zookeeper', action='store_false', default=None, dest='zookeeper', help='Do not run zookeeper related tests')
|
|
|
|
group = parser.add_mutually_exclusive_group(required=False)
|
|
group.add_argument('--shard', action='store_true', default=None, dest='shard', help='Run sharding related tests (required to clickhouse-server listen 127.0.0.2 127.0.0.3)')
|
|
group.add_argument('--no-shard', action='store_false', default=None, dest='shard', help='Do not run shard related tests')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.queries and not os.path.isdir(args.queries):
|
|
print(f"Cannot access the specified directory with queries ({args.queries})", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Autodetect the directory with queries if not specified
|
|
if args.queries is None:
|
|
args.queries = 'queries'
|
|
|
|
if not os.path.isdir(args.queries):
|
|
# If we're running from the repo
|
|
args.queries = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'queries')
|
|
|
|
if not os.path.isdir(args.queries):
|
|
# Next we're going to try some system directories, don't write 'stdout' files into them.
|
|
if args.tmp is None:
|
|
args.tmp = '/tmp/clickhouse-test'
|
|
|
|
args.queries = '/usr/local/share/clickhouse-test/queries'
|
|
|
|
if not os.path.isdir(args.queries):
|
|
args.queries = '/usr/share/clickhouse-test/queries'
|
|
|
|
if not os.path.isdir(args.queries):
|
|
print("Failed to detect path to the queries directory. Please specify it with '--queries' option.", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
print("Using queries from '" + args.queries + "' directory")
|
|
|
|
if args.tmp is None:
|
|
args.tmp = args.queries
|
|
if args.client is None:
|
|
if find_binary(args.binary + '-client'):
|
|
args.client = args.binary + '-client'
|
|
|
|
print("Using " + args.client + " as client program (expecting split build)")
|
|
elif find_binary(args.binary):
|
|
args.client = args.binary + ' client'
|
|
|
|
print("Using " + args.client + " as client program (expecting monolithic build)")
|
|
else:
|
|
print("No 'clickhouse' or 'clickhouse-client' client binary found", file=sys.stderr)
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
if args.configclient:
|
|
args.client += ' --config-file=' + args.configclient
|
|
|
|
if os.getenv("CLICKHOUSE_HOST"):
|
|
args.client += ' --host=' + os.getenv("CLICKHOUSE_HOST")
|
|
|
|
args.tcp_port = int(os.getenv("CLICKHOUSE_PORT_TCP", "9000"))
|
|
args.client += f" --port={args.tcp_port}"
|
|
|
|
if os.getenv("CLICKHOUSE_DATABASE"):
|
|
args.client += ' --database=' + os.getenv("CLICKHOUSE_DATABASE")
|
|
|
|
if args.client_option:
|
|
# Set options for client
|
|
if 'CLICKHOUSE_CLIENT_OPT' in os.environ:
|
|
os.environ['CLICKHOUSE_CLIENT_OPT'] += ' '
|
|
else:
|
|
os.environ['CLICKHOUSE_CLIENT_OPT'] = ''
|
|
|
|
os.environ['CLICKHOUSE_CLIENT_OPT'] += get_additional_client_options(args)
|
|
|
|
# Set options for curl
|
|
if 'CLICKHOUSE_URL_PARAMS' in os.environ:
|
|
os.environ['CLICKHOUSE_URL_PARAMS'] += '&'
|
|
else:
|
|
os.environ['CLICKHOUSE_URL_PARAMS'] = ''
|
|
|
|
os.environ['CLICKHOUSE_URL_PARAMS'] += get_additional_client_options_url(args)
|
|
|
|
if args.extract_from_config is None:
|
|
if os.access(args.binary + '-extract-from-config', os.X_OK):
|
|
args.extract_from_config = args.binary + '-extract-from-config'
|
|
else:
|
|
args.extract_from_config = args.binary + ' extract-from-config'
|
|
|
|
if args.jobs is None:
|
|
args.jobs = multiprocessing.cpu_count()
|
|
|
|
main(args)
|