mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-29 19:12:03 +00:00
926 lines
37 KiB
Python
Executable File
926 lines
37 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import sys
|
|
import os
|
|
import os.path
|
|
import re
|
|
import json
|
|
|
|
from argparse import ArgumentParser
|
|
from argparse import FileType
|
|
from pprint import pprint
|
|
import shlex
|
|
import subprocess
|
|
from subprocess import check_call
|
|
from subprocess import Popen
|
|
from subprocess import PIPE
|
|
from subprocess import CalledProcessError
|
|
from datetime import datetime
|
|
from time import time, sleep
|
|
from errno import ESRCH
|
|
try:
|
|
import termcolor
|
|
except ImportError:
|
|
termcolor = None
|
|
from random import random
|
|
import subprocess
|
|
import multiprocessing
|
|
from contextlib import closing
|
|
|
|
|
|
MESSAGES_TO_RETRY = [
|
|
"DB::Exception: ZooKeeper session has been expired",
|
|
"Coordination::Exception: Connection loss",
|
|
"Operation timed out",
|
|
"ConnectionPoolWithFailover: Connection failed at try",
|
|
]
|
|
|
|
|
|
def json_minify(string):
|
|
"""
|
|
Removes all js-style comments from json string. Allows to have comments in skip_list.json.
|
|
The code taken from https://github.com/getify/JSON.minify/tree/python under the MIT license.
|
|
"""
|
|
|
|
tokenizer = re.compile('"|(/\*)|(\*/)|(//)|\n|\r')
|
|
end_slashes_re = re.compile(r'(\\)*$')
|
|
|
|
in_string = False
|
|
in_multi = False
|
|
in_single = False
|
|
|
|
new_str = []
|
|
index = 0
|
|
|
|
for match in re.finditer(tokenizer, string):
|
|
if not (in_multi or in_single):
|
|
tmp = string[index:match.start()]
|
|
new_str.append(tmp)
|
|
else:
|
|
# Replace comments with white space so that the JSON parser reports
|
|
# the correct column numbers on parsing errors.
|
|
new_str.append(' ' * (match.start() - index))
|
|
|
|
index = match.end()
|
|
val = match.group()
|
|
|
|
if val == '"' and not (in_multi or in_single):
|
|
escaped = end_slashes_re.search(string, 0, match.start())
|
|
|
|
# start of string or unescaped quote character to end string
|
|
if not in_string or (escaped is None or len(escaped.group()) % 2 == 0): # noqa
|
|
in_string = not in_string
|
|
index -= 1 # include " character in next catch
|
|
elif not (in_string or in_multi or in_single):
|
|
if val == '/*':
|
|
in_multi = True
|
|
elif val == '//':
|
|
in_single = True
|
|
elif val == '*/' and in_multi and not (in_string or in_single):
|
|
in_multi = False
|
|
new_str.append(' ' * len(val))
|
|
elif val in '\r\n' and not (in_multi or in_string) and in_single:
|
|
in_single = False
|
|
elif not in_multi or in_single: # noqa
|
|
new_str.append(val)
|
|
|
|
if val in '\r\n':
|
|
new_str.append(val)
|
|
elif in_multi or in_single:
|
|
new_str.append(' ' * len(val))
|
|
|
|
new_str.append(string[index:])
|
|
return ''.join(new_str)
|
|
|
|
|
|
def remove_control_characters(s):
|
|
"""
|
|
https://github.com/html5lib/html5lib-python/issues/96#issuecomment-43438438
|
|
"""
|
|
def str_to_int(s, default, base=10):
|
|
if int(s, base) < 0x10000:
|
|
return chr(int(s, base))
|
|
return default
|
|
s = re.sub(r"&#(\d+);?", lambda c: str_to_int(c.group(1), c.group(0)), s)
|
|
s = re.sub(r"&#[xX]([0-9a-fA-F]+);?", lambda c: str_to_int(c.group(1), c.group(0), base=16), s)
|
|
s = re.sub(r"[\x00-\x08\x0b\x0e-\x1f\x7f]", "", s)
|
|
return s
|
|
|
|
def get_db_engine(args):
|
|
if args.db_engine:
|
|
return " ENGINE=" + args.db_engine
|
|
return "" # Will use default engine
|
|
|
|
def run_single_test(args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file):
|
|
# print(client_options)
|
|
|
|
if args.database:
|
|
database = args.database
|
|
os.environ.setdefault("CLICKHOUSE_DATABASE", database)
|
|
|
|
else:
|
|
# If --database is not specified, we will create temporary database with unique name
|
|
# And we will recreate and drop it for each test
|
|
def random_str(length=6):
|
|
import random
|
|
import string
|
|
alphabet = string.ascii_lowercase + string.digits
|
|
return ''.join(random.choice(alphabet) for _ in range(length))
|
|
database = 'test_{suffix}'.format(suffix=random_str())
|
|
|
|
clickhouse_proc_create = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True)
|
|
clickhouse_proc_create.communicate(("CREATE DATABASE " + database + get_db_engine(args)))
|
|
|
|
os.environ["CLICKHOUSE_DATABASE"] = database
|
|
|
|
params = {
|
|
'client': args.client + ' --database=' + database,
|
|
'logs_level': server_logs_level,
|
|
'options': client_options,
|
|
'test': case_file,
|
|
'stdout': stdout_file,
|
|
'stderr': stderr_file,
|
|
}
|
|
|
|
pattern = '{test} > {stdout} 2> {stderr}'
|
|
|
|
if ext == '.sql':
|
|
pattern = "{client} --send_logs_level={logs_level} --testmode --multiquery {options} < " + pattern
|
|
|
|
command = pattern.format(**params)
|
|
|
|
# print(command)
|
|
|
|
proc = Popen(command, shell=True, env=os.environ)
|
|
start_time = datetime.now()
|
|
|
|
while (datetime.now() - start_time).total_seconds() < args.timeout and proc.poll() is None:
|
|
sleep(0.01)
|
|
|
|
if not args.database:
|
|
clickhouse_proc_create = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True)
|
|
clickhouse_proc_create.communicate(("DROP DATABASE " + database))
|
|
|
|
total_time = (datetime.now() - start_time).total_seconds()
|
|
|
|
# Normalize randomized database names in stdout, stderr files.
|
|
os.system("LC_ALL=C sed -i -e 's/{test_db}/default/g' {file}".format(test_db=database, file=stdout_file))
|
|
os.system("LC_ALL=C sed -i -e 's/{test_db}/default/g' {file}".format(test_db=database, file=stderr_file))
|
|
|
|
stdout = open(stdout_file, 'rb').read() if os.path.exists(stdout_file) else b''
|
|
stdout = str(stdout, errors='replace', encoding='utf-8')
|
|
stderr = open(stderr_file, 'rb').read() if os.path.exists(stderr_file) else b''
|
|
stderr = str(stderr, errors='replace', encoding='utf-8')
|
|
|
|
return proc, stdout, stderr, total_time
|
|
|
|
|
|
def need_retry(stderr):
|
|
return any(msg in stderr for msg in MESSAGES_TO_RETRY)
|
|
|
|
|
|
def get_processlist(client_cmd):
|
|
try:
|
|
return subprocess.check_output("{} --query 'SHOW PROCESSLIST FORMAT Vertical'".format(client_cmd), shell=True).decode('utf-8')
|
|
except:
|
|
return "" # server seems dead
|
|
|
|
|
|
# collect server stacktraces using gdb
|
|
def get_stacktraces_from_gdb(server_pid):
|
|
cmd = "gdb -batch -ex 'thread apply all backtrace' -p {}".format(server_pid)
|
|
try:
|
|
return subprocess.check_output(cmd, shell=True).decode('utf-8')
|
|
except Exception as ex:
|
|
return "Error occured while receiving stack traces from gdb: {}".format(str(ex))
|
|
|
|
|
|
# collect server stacktraces from system.stack_trace table
|
|
# it does not work in Sandbox
|
|
def get_stacktraces_from_clickhouse(client):
|
|
try:
|
|
return subprocess.check_output("{} --allow_introspection_functions=1 --query "
|
|
"\"SELECT arrayStringConcat(arrayMap(x, y -> concat(x, ': ', y), arrayMap(x -> addressToLine(x), trace), "
|
|
"arrayMap(x -> demangle(addressToSymbol(x)), trace)), '\n') as trace "
|
|
"FROM system.stack_trace format Vertical\"".format(client), shell=True).decode('utf-8')
|
|
except Exception as ex:
|
|
return "Error occured while receiving stack traces from client: {}".format(str(ex))
|
|
|
|
|
|
def get_server_pid(server_tcp_port):
|
|
cmd = "lsof -i tcp:{port} -s tcp:LISTEN -Fp | awk '/^p[0-9]+$/{{print substr($0, 2)}}'".format(port=server_tcp_port)
|
|
try:
|
|
output = subprocess.check_output(cmd, shell=True)
|
|
if output:
|
|
return int(output)
|
|
else:
|
|
return None # server dead
|
|
except Exception as ex:
|
|
return None
|
|
|
|
|
|
def colored(text, args, color=None, on_color=None, attrs=None):
|
|
if termcolor and (sys.stdout.isatty() or args.force_color):
|
|
return termcolor.colored(text, color, on_color, attrs)
|
|
else:
|
|
return text
|
|
|
|
|
|
SERVER_DIED = False
|
|
exit_code = 0
|
|
stop_time = None
|
|
|
|
|
|
# def run_tests_array(all_tests, suite, suite_dir, suite_tmp_dir, run_total):
|
|
def run_tests_array(all_tests_with_params):
|
|
all_tests, suite, suite_dir, suite_tmp_dir, run_total = all_tests_with_params
|
|
global exit_code
|
|
global SERVER_DIED
|
|
global stop_time
|
|
|
|
OP_SQUARE_BRACKET = colored("[", args, attrs=['bold'])
|
|
CL_SQUARE_BRACKET = colored("]", args, attrs=['bold'])
|
|
|
|
MSG_FAIL = OP_SQUARE_BRACKET + colored(" FAIL ", args, "red", attrs=['bold']) + CL_SQUARE_BRACKET
|
|
MSG_UNKNOWN = OP_SQUARE_BRACKET + colored(" UNKNOWN ", args, "yellow", attrs=['bold']) + CL_SQUARE_BRACKET
|
|
MSG_OK = OP_SQUARE_BRACKET + colored(" OK ", args, "green", attrs=['bold']) + CL_SQUARE_BRACKET
|
|
MSG_SKIPPED = OP_SQUARE_BRACKET + colored(" SKIPPED ", args, "cyan", attrs=['bold']) + CL_SQUARE_BRACKET
|
|
|
|
passed_total = 0
|
|
skipped_total = 0
|
|
failures_total = 0
|
|
failures = 0
|
|
failures_chain = 0
|
|
|
|
client_options = get_additional_client_options(args)
|
|
|
|
def print_test_time(test_time):
|
|
if args.print_time:
|
|
print(" {0:.2f} sec.".format(test_time), end='')
|
|
|
|
if len(all_tests):
|
|
print("\nRunning {} {} tests.".format(len(all_tests), suite) + "\n")
|
|
|
|
for case in all_tests:
|
|
if SERVER_DIED:
|
|
break
|
|
|
|
if stop_time and time() > stop_time:
|
|
print("\nStop tests run because global time limit is exceeded.\n")
|
|
break
|
|
|
|
case_file = os.path.join(suite_dir, case)
|
|
(name, ext) = os.path.splitext(case)
|
|
|
|
try:
|
|
sys.stdout.flush()
|
|
sys.stdout.write("{0:72}".format(name + ": "))
|
|
|
|
if args.skip and any(s in name for s in args.skip):
|
|
print(MSG_SKIPPED + " - skip")
|
|
skipped_total += 1
|
|
elif not args.zookeeper and ('zookeeper' in name
|
|
or 'replica' in name):
|
|
print(MSG_SKIPPED + " - no zookeeper")
|
|
skipped_total += 1
|
|
elif not args.shard and ('shard' in name
|
|
or 'distributed' in name
|
|
or 'global' in name):
|
|
print(MSG_SKIPPED + " - no shard")
|
|
skipped_total += 1
|
|
elif not args.no_long and ('long' in name
|
|
# Tests for races and deadlocks usually are runned in loop
|
|
# for significant amount of time
|
|
or 'deadlock' in name
|
|
or 'race' in name):
|
|
print(MSG_SKIPPED + " - no long")
|
|
skipped_total += 1
|
|
else:
|
|
disabled_file = os.path.join(suite_dir, name) + '.disabled'
|
|
|
|
if os.path.exists(disabled_file) and not args.disabled:
|
|
message = open(disabled_file, 'r').read()
|
|
print(MSG_SKIPPED + " - " + message)
|
|
else:
|
|
|
|
if args.testname:
|
|
clickhouse_proc = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True)
|
|
clickhouse_proc.communicate(("SELECT 'Running test {suite}/{case} from pid={pid}';".format(pid = os.getpid(), case = case, suite = suite)))
|
|
|
|
if clickhouse_proc.returncode != 0:
|
|
failures += 1
|
|
print("Server does not respond to health check")
|
|
SERVER_DIED = True
|
|
break
|
|
|
|
reference_file = os.path.join(suite_dir, name) + '.reference'
|
|
stdout_file = os.path.join(suite_tmp_dir, name) + '.stdout'
|
|
stderr_file = os.path.join(suite_tmp_dir, name) + '.stderr'
|
|
|
|
proc, stdout, stderr, total_time = run_single_test(args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file)
|
|
|
|
if proc.returncode is None:
|
|
try:
|
|
proc.kill()
|
|
except OSError as e:
|
|
if e.errno != ESRCH:
|
|
raise
|
|
|
|
failures += 1
|
|
print(MSG_FAIL, end='')
|
|
print_test_time(total_time)
|
|
print(" - Timeout!")
|
|
else:
|
|
counter = 1
|
|
while proc.returncode != 0 and need_retry(stderr):
|
|
proc, stdout, stderr, total_time = run_single_test(args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file)
|
|
sleep(2**counter)
|
|
counter += 1
|
|
if counter > 6:
|
|
break
|
|
|
|
if proc.returncode != 0:
|
|
failures += 1
|
|
failures_chain += 1
|
|
print(MSG_FAIL, end='')
|
|
print_test_time(total_time)
|
|
print(" - return code {}".format(proc.returncode))
|
|
|
|
if stderr:
|
|
print(stderr)
|
|
|
|
# Stop on fatal errors like segmentation fault. They are sent to client via logs.
|
|
if ' <Fatal> ' in stderr:
|
|
SERVER_DIED = True
|
|
|
|
if args.stop and ('Connection refused' in stderr or 'Attempt to read after eof' in stderr) and not 'Received exception from server' in stderr:
|
|
SERVER_DIED = True
|
|
|
|
if os.path.isfile(stdout_file):
|
|
print(", result:\n")
|
|
print('\n'.join(open(stdout_file).read().split('\n')[:100]))
|
|
|
|
elif stderr:
|
|
failures += 1
|
|
failures_chain += 1
|
|
print(MSG_FAIL, end='')
|
|
print_test_time(total_time)
|
|
print(" - having stderror:\n{}".format(
|
|
'\n'.join(stderr.split('\n')[:100])))
|
|
elif 'Exception' in stdout:
|
|
failures += 1
|
|
failures_chain += 1
|
|
print(MSG_FAIL, end='')
|
|
print_test_time(total_time)
|
|
print(" - having exception:\n{}".format(
|
|
'\n'.join(stdout.split('\n')[:100])))
|
|
elif not os.path.isfile(reference_file):
|
|
print(MSG_UNKNOWN, end='')
|
|
print_test_time(total_time)
|
|
print(" - no reference file")
|
|
else:
|
|
result_is_different = subprocess.call(['diff', '-q', reference_file, stdout_file], stdout=PIPE)
|
|
|
|
if result_is_different:
|
|
diff = Popen(['diff', '-U', str(args.unified), reference_file, stdout_file], stdout=PIPE, universal_newlines=True).communicate()[0]
|
|
failures += 1
|
|
print(MSG_FAIL, end='')
|
|
print_test_time(total_time)
|
|
print(" - result differs with reference:\n{}".format(diff))
|
|
else:
|
|
passed_total += 1
|
|
failures_chain = 0
|
|
print(MSG_OK, end='')
|
|
print_test_time(total_time)
|
|
print()
|
|
if os.path.exists(stdout_file):
|
|
os.remove(stdout_file)
|
|
if os.path.exists(stderr_file):
|
|
os.remove(stderr_file)
|
|
except KeyboardInterrupt as e:
|
|
print(colored("Break tests execution", args, "red"))
|
|
raise e
|
|
except:
|
|
import traceback
|
|
exc_type, exc_value, tb = sys.exc_info()
|
|
failures += 1
|
|
print("{0} - Test internal error: {1}\n{2}\n{3}".format(MSG_FAIL, exc_type.__name__, exc_value, "\n".join(traceback.format_tb(tb, 10))))
|
|
|
|
if failures_chain >= 20:
|
|
break
|
|
|
|
failures_total = failures_total + failures
|
|
|
|
if failures_total > 0:
|
|
print(colored("\nHaving {failures_total} errors! {passed_total} tests passed. {skipped_total} tests skipped.".format(passed_total = passed_total, skipped_total = skipped_total, failures_total = failures_total), args, "red", attrs=["bold"]))
|
|
exit_code = 1
|
|
else:
|
|
print(colored("\n{passed_total} tests passed. {skipped_total} tests skipped.".format(passed_total = passed_total, skipped_total = skipped_total), args, "green", attrs=["bold"]))
|
|
|
|
|
|
server_logs_level = "warning"
|
|
|
|
|
|
def check_server_started(client, retry_count):
|
|
print("Connecting to ClickHouse server...", end='')
|
|
sys.stdout.flush()
|
|
while retry_count > 0:
|
|
clickhouse_proc = Popen(shlex.split(client), stdin=PIPE, stdout=PIPE, stderr=PIPE)
|
|
(stdout, stderr) = clickhouse_proc.communicate(b"SELECT 1")
|
|
|
|
if clickhouse_proc.returncode == 0 and stdout.startswith(b"1"):
|
|
print(" OK")
|
|
sys.stdout.flush()
|
|
return True
|
|
|
|
if clickhouse_proc.returncode == 210:
|
|
# Connection refused, retry
|
|
print('.', end = '')
|
|
sys.stdout.flush()
|
|
retry_count -= 1
|
|
sleep(0.5)
|
|
continue
|
|
|
|
# Other kind of error, fail.
|
|
print('')
|
|
print("Client invocation failed with code ", clickhouse_proc.returncode, ": ")
|
|
# We can't print this, because for some reason this is python 2,
|
|
# and args appeared in 3.3. To hell with it.
|
|
# print(''.join(clickhouse_proc.args))
|
|
print("stdout: ")
|
|
print(stdout)
|
|
print("stderr: ")
|
|
print(stderr)
|
|
sys.stdout.flush()
|
|
return False
|
|
|
|
print('')
|
|
print('All connection tries failed')
|
|
sys.stdout.flush()
|
|
|
|
return False
|
|
|
|
|
|
class BuildFlags(object):
|
|
THREAD = 'thread-sanitizer'
|
|
ADDRESS = 'address-sanitizer'
|
|
UNDEFINED = 'ub-sanitizer'
|
|
MEMORY = 'memory-sanitizer'
|
|
DEBUG = 'debug-build'
|
|
UNBUNDLED = 'unbundled-build'
|
|
RELEASE = 'release-build'
|
|
DATABASE_ORDINARY = 'database-ordinary'
|
|
POLYMORPHIC_PARTS = 'polymorphic-parts'
|
|
|
|
|
|
def collect_build_flags(client):
|
|
clickhouse_proc = Popen(shlex.split(client), stdin=PIPE, stdout=PIPE, stderr=PIPE)
|
|
(stdout, stderr) = clickhouse_proc.communicate(b"SELECT value FROM system.build_options WHERE name = 'CXX_FLAGS'")
|
|
result = []
|
|
|
|
if clickhouse_proc.returncode == 0:
|
|
if b'-fsanitize=thread' in stdout:
|
|
result.append(BuildFlags.THREAD)
|
|
elif b'-fsanitize=address' in stdout:
|
|
result.append(BuildFlags.ADDRESS)
|
|
elif b'-fsanitize=undefined' in stdout:
|
|
result.append(BuildFlags.UNDEFINED)
|
|
elif b'-fsanitize=memory' in stdout:
|
|
result.append(BuildFlags.MEMORY)
|
|
else:
|
|
raise Exception("Cannot get information about build from server errorcode {}, stderr {}".format(clickhouse_proc.returncode, stderr))
|
|
|
|
clickhouse_proc = Popen(shlex.split(client), stdin=PIPE, stdout=PIPE, stderr=PIPE)
|
|
(stdout, stderr) = clickhouse_proc.communicate(b"SELECT value FROM system.build_options WHERE name = 'BUILD_TYPE'")
|
|
|
|
if clickhouse_proc.returncode == 0:
|
|
if b'Debug' in stdout:
|
|
result.append(BuildFlags.DEBUG)
|
|
elif b'RelWithDebInfo' in stdout or b'Release' in stdout:
|
|
result.append(BuildFlags.RELEASE)
|
|
else:
|
|
raise Exception("Cannot get information about build from server errorcode {}, stderr {}".format(clickhouse_proc.returncode, stderr))
|
|
|
|
clickhouse_proc = Popen(shlex.split(client), stdin=PIPE, stdout=PIPE, stderr=PIPE)
|
|
(stdout, stderr) = clickhouse_proc.communicate(b"SELECT value FROM system.build_options WHERE name = 'UNBUNDLED'")
|
|
|
|
if clickhouse_proc.returncode == 0:
|
|
if b'ON' in stdout or b'1' in stdout:
|
|
result.append(BuildFlags.UNBUNDLED)
|
|
else:
|
|
raise Exception("Cannot get information about build from server errorcode {}, stderr {}".format(clickhouse_proc.returncode, stderr))
|
|
|
|
clickhouse_proc = Popen(shlex.split(client), stdin=PIPE, stdout=PIPE, stderr=PIPE)
|
|
(stdout, stderr) = clickhouse_proc.communicate(b"SELECT value FROM system.settings WHERE name = 'default_database_engine'")
|
|
|
|
if clickhouse_proc.returncode == 0:
|
|
if b'Ordinary' in stdout:
|
|
result.append(BuildFlags.DATABASE_ORDINARY)
|
|
else:
|
|
raise Exception("Cannot get information about build from server errorcode {}, stderr {}".format(clickhouse_proc.returncode, stderr))
|
|
|
|
clickhouse_proc = Popen(shlex.split(client), stdin=PIPE, stdout=PIPE, stderr=PIPE)
|
|
(stdout, stderr) = clickhouse_proc.communicate(b"SELECT value FROM system.merge_tree_settings WHERE name = 'min_bytes_for_wide_part'")
|
|
|
|
if clickhouse_proc.returncode == 0:
|
|
if stdout == b'0\n':
|
|
result.append(BuildFlags.POLYMORPHIC_PARTS)
|
|
else:
|
|
raise Exception("Cannot get inforamtion about build from server errorcode {}, stderr {}".format(clickhouse_proc.returncode, stderr))
|
|
|
|
return result
|
|
|
|
|
|
def main(args):
|
|
global SERVER_DIED
|
|
global stop_time
|
|
global exit_code
|
|
global server_logs_level
|
|
|
|
def is_data_present():
|
|
clickhouse_proc = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE)
|
|
(stdout, stderr) = clickhouse_proc.communicate(b"EXISTS TABLE test.hits")
|
|
if clickhouse_proc.returncode != 0:
|
|
raise CalledProcessError(clickhouse_proc.returncode, args.client, stderr)
|
|
|
|
return stdout.startswith(b'1')
|
|
|
|
if not check_server_started(args.client, args.server_check_retries):
|
|
raise Exception(
|
|
"Server is not responding. Cannot execute 'SELECT 1' query. \
|
|
Note: if you are using unbundled mode, you also have to specify -c option.")
|
|
|
|
build_flags = collect_build_flags(args.client)
|
|
|
|
if args.use_skip_list:
|
|
tests_to_skip_from_list = collect_tests_to_skip(args.skip_list_path, build_flags)
|
|
else:
|
|
tests_to_skip_from_list = set([])
|
|
|
|
if args.skip:
|
|
args.skip = set(args.skip) | tests_to_skip_from_list
|
|
else:
|
|
args.skip = tests_to_skip_from_list
|
|
|
|
base_dir = os.path.abspath(args.queries)
|
|
tmp_dir = os.path.abspath(args.tmp)
|
|
|
|
# Keep same default values as in queries/shell_config.sh
|
|
os.environ.setdefault("CLICKHOUSE_BINARY", args.binary)
|
|
#os.environ.setdefault("CLICKHOUSE_CLIENT", args.client)
|
|
os.environ.setdefault("CLICKHOUSE_CONFIG", args.configserver)
|
|
if args.configclient:
|
|
os.environ.setdefault("CLICKHOUSE_CONFIG_CLIENT", args.configclient)
|
|
os.environ.setdefault("CLICKHOUSE_TMP", tmp_dir)
|
|
|
|
# Force to print server warnings in stderr
|
|
# Shell scripts could change logging level
|
|
os.environ.setdefault("CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL", server_logs_level)
|
|
|
|
# This code is bad as the time is not monotonic
|
|
if args.global_time_limit:
|
|
stop_time = time() + args.global_time_limit
|
|
|
|
if args.zookeeper is None:
|
|
code, out = subprocess.getstatusoutput(args.extract_from_config + " --try --config " + args.configserver + ' --key zookeeper | grep . | wc -l')
|
|
try:
|
|
if int(out) > 0:
|
|
args.zookeeper = True
|
|
else:
|
|
args.zookeeper = False
|
|
except ValueError:
|
|
args.zookeeper = False
|
|
|
|
if args.shard is None:
|
|
code, out = subprocess.getstatusoutput(args.extract_from_config + " --try --config " + args.configserver + ' --key listen_host | grep -E "127.0.0.2|::"')
|
|
if out:
|
|
args.shard = True
|
|
else:
|
|
args.shard = False
|
|
|
|
if args.database and args.database != "test":
|
|
clickhouse_proc_create = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True)
|
|
clickhouse_proc_create.communicate(("CREATE DATABASE IF NOT EXISTS " + args.database + get_db_engine(args)))
|
|
|
|
clickhouse_proc_create = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True)
|
|
clickhouse_proc_create.communicate(("CREATE DATABASE IF NOT EXISTS test" + get_db_engine(args)))
|
|
|
|
def is_test_from_dir(suite_dir, case):
|
|
case_file = os.path.join(suite_dir, case)
|
|
(name, ext) = os.path.splitext(case)
|
|
return os.path.isfile(case_file) and (ext == '.sql' or ext == '.sh' or ext == '.py')
|
|
|
|
def sute_key_func(item):
|
|
if args.order == 'random':
|
|
return random()
|
|
|
|
if -1 == item.find('_'):
|
|
return 99998, ''
|
|
|
|
prefix, suffix = item.split('_', 1)
|
|
|
|
try:
|
|
return int(prefix), suffix
|
|
except ValueError:
|
|
return 99997, ''
|
|
|
|
total_tests_run = 0
|
|
for suite in sorted(os.listdir(base_dir), key=sute_key_func):
|
|
if SERVER_DIED:
|
|
break
|
|
|
|
suite_dir = os.path.join(base_dir, suite)
|
|
suite_re_obj = re.search('^[0-9]+_(.*)$', suite)
|
|
if not suite_re_obj: #skip .gitignore and so on
|
|
continue
|
|
|
|
suite_tmp_dir = os.path.join(tmp_dir, suite)
|
|
if not os.path.exists(suite_tmp_dir):
|
|
os.makedirs(suite_tmp_dir)
|
|
|
|
suite = suite_re_obj.group(1)
|
|
if os.path.isdir(suite_dir):
|
|
|
|
if 'stateful' in suite and not args.no_stateful and not is_data_present():
|
|
print("Won't run stateful tests because test data wasn't loaded.")
|
|
continue
|
|
if 'stateless' in suite and args.no_stateless:
|
|
print("Won't run stateless tests because they were manually disabled.")
|
|
continue
|
|
if 'stateful' in suite and args.no_stateful:
|
|
print("Won't run stateful tests because they were manually disabled.")
|
|
continue
|
|
|
|
# Reverse sort order: we want run newest test first.
|
|
# And not reverse subtests
|
|
def key_func(item):
|
|
if args.order == 'random':
|
|
return random()
|
|
|
|
reverse = 1 if args.order == 'asc' else -1
|
|
|
|
if -1 == item.find('_'):
|
|
return 99998
|
|
|
|
prefix, suffix = item.split('_', 1)
|
|
|
|
try:
|
|
return reverse * int(prefix), suffix
|
|
except ValueError:
|
|
return 99997
|
|
|
|
all_tests = os.listdir(suite_dir)
|
|
all_tests = [case for case in all_tests if is_test_from_dir(suite_dir, case)]
|
|
if args.test:
|
|
all_tests = [t for t in all_tests if any([re.search(r, t) for r in args.test])]
|
|
all_tests.sort(key=key_func)
|
|
|
|
run_n, run_total = args.parallel.split('/')
|
|
run_n = float(run_n)
|
|
run_total = float(run_total)
|
|
tests_n = len(all_tests)
|
|
if run_total > tests_n:
|
|
run_total = tests_n
|
|
if run_n > run_total:
|
|
continue
|
|
|
|
jobs = args.jobs
|
|
if jobs > tests_n:
|
|
jobs = tests_n
|
|
if jobs > run_total:
|
|
run_total = jobs
|
|
|
|
batch_size = len(all_tests) // jobs
|
|
all_tests_array = []
|
|
for i in range(0, len(all_tests), batch_size):
|
|
all_tests_array.append((all_tests[i:i+batch_size], suite, suite_dir, suite_tmp_dir, run_total))
|
|
|
|
if jobs > 1:
|
|
with closing(multiprocessing.Pool(processes=jobs)) as pool:
|
|
pool.map(run_tests_array, all_tests_array)
|
|
else:
|
|
run_tests_array(all_tests_array[int(run_n)-1])
|
|
|
|
total_tests_run += tests_n
|
|
|
|
if args.hung_check:
|
|
|
|
# Some queries may execute in background for some time after test was finished. This is normal.
|
|
for n in range(1, 60):
|
|
processlist = get_processlist(args.client)
|
|
if not processlist:
|
|
break
|
|
sleep(1)
|
|
|
|
if processlist:
|
|
print(colored("\nFound hung queries in processlist:", args, "red", attrs=["bold"]))
|
|
print(processlist)
|
|
|
|
clickhouse_tcp_port = os.getenv("CLICKHOUSE_PORT_TCP", '9000')
|
|
server_pid = get_server_pid(clickhouse_tcp_port)
|
|
if server_pid:
|
|
print("\nLocated ClickHouse server process {} listening at TCP port {}".format(server_pid, clickhouse_tcp_port))
|
|
|
|
# It does not work in Sandbox
|
|
#print("\nCollecting stacktraces from system.stacktraces table:")
|
|
#print(get_stacktraces_from_clickhouse(args.client))
|
|
|
|
print("\nCollecting stacktraces from all running threads with gdb:")
|
|
print(get_stacktraces_from_gdb(server_pid))
|
|
else:
|
|
print(
|
|
colored(
|
|
"\nUnable to locate ClickHouse server process listening at TCP port {}. "
|
|
"It must have crashed or exited prematurely!".format(clickhouse_tcp_port),
|
|
args, "red", attrs=["bold"]))
|
|
|
|
exit_code = 1
|
|
else:
|
|
print(colored("\nNo queries hung.", args, "green", attrs=["bold"]))
|
|
|
|
if total_tests_run == 0:
|
|
print("No tests were run.")
|
|
sys.exit(1)
|
|
|
|
sys.exit(exit_code)
|
|
|
|
|
|
def find_binary(name):
|
|
if os.path.exists(name) and os.access(name, os.X_OK):
|
|
return True
|
|
paths = os.environ.get("PATH").split(':')
|
|
for path in paths:
|
|
if os.access(os.path.join(path, name), os.X_OK):
|
|
return True
|
|
|
|
# maybe it wasn't in PATH
|
|
if os.access(os.path.join('/usr/local/bin', name), os.X_OK):
|
|
return True
|
|
if os.access(os.path.join('/usr/bin', name), os.X_OK):
|
|
return True
|
|
return False
|
|
|
|
|
|
def get_additional_client_options(args):
|
|
if args.client_option:
|
|
return ' '.join('--' + option for option in args.client_option)
|
|
|
|
return ''
|
|
|
|
|
|
def get_additional_client_options_url(args):
|
|
if args.client_option:
|
|
return '&'.join(args.client_option)
|
|
|
|
return ''
|
|
|
|
|
|
def collect_tests_to_skip(skip_list_path, build_flags):
|
|
result = set([])
|
|
if not os.path.exists(skip_list_path):
|
|
return result
|
|
|
|
with open(skip_list_path, 'r') as skip_list_file:
|
|
content = skip_list_file.read()
|
|
# allows to have comments in skip_list.json
|
|
skip_dict = json.loads(json_minify(content))
|
|
for build_flag in build_flags:
|
|
result |= set(skip_dict[build_flag])
|
|
|
|
if len(result) > 0:
|
|
print("Found file with skip-list {}, {} test will be skipped".format(skip_list_path, len(result)))
|
|
|
|
return result
|
|
|
|
if __name__ == '__main__':
|
|
parser=ArgumentParser(description='ClickHouse functional tests')
|
|
parser.add_argument('-q', '--queries', help='Path to queries dir')
|
|
parser.add_argument('--tmp', help='Path to tmp dir')
|
|
|
|
parser.add_argument('-b', '--binary', default='clickhouse',
|
|
help='Path to clickhouse (if bundled, clickhouse-server otherwise) binary or name of binary in PATH')
|
|
|
|
parser.add_argument('-c', '--client',
|
|
help='Path to clickhouse-client (if unbundled, useless otherwise) binary of name of binary in PATH')
|
|
|
|
parser.add_argument('--extract_from_config', help='extract-from-config program')
|
|
parser.add_argument('--configclient', help='Client config (if you use not default ports)')
|
|
parser.add_argument('--configserver', default= '/etc/clickhouse-server/config.xml', help='Preprocessed server config')
|
|
parser.add_argument('-o', '--output', help='Output xUnit compliant test report directory')
|
|
parser.add_argument('-t', '--timeout', type=int, default=600, help='Timeout for each test case in seconds')
|
|
parser.add_argument('--global_time_limit', type=int, help='Stop if executing more than specified time (after current test finished)')
|
|
parser.add_argument('test', nargs='*', help='Optional test case name regex')
|
|
parser.add_argument('-d', '--disabled', action='store_true', default=False, help='Also run disabled tests')
|
|
parser.add_argument('--stop', action='store_true', default=None, dest='stop', help='Stop on network errors')
|
|
parser.add_argument('--order', default='desc', choices=['asc', 'desc', 'random'], help='Run order')
|
|
parser.add_argument('--testname', action='store_true', default=None, dest='testname', help='Make query with test name before test run')
|
|
parser.add_argument('--hung-check', action='store_true', default=False)
|
|
parser.add_argument('--force-color', action='store_true', default=False)
|
|
parser.add_argument('--database', help='Database for tests (random name test_XXXXXX by default)')
|
|
parser.add_argument('--parallel', default='1/1', help='One parallel test run number/total')
|
|
parser.add_argument('-j', '--jobs', default=1, nargs='?', type=int, help='Run all tests in parallel')
|
|
parser.add_argument('-U', '--unified', default=3, type=int, help='output NUM lines of unified context')
|
|
parser.add_argument('-r', '--server-check-retries', default=30, type=int, help='Num of tries to execute SELECT 1 before tests started')
|
|
parser.add_argument('--skip-list-path', help="Path to skip-list file")
|
|
parser.add_argument('--use-skip-list', action='store_true', default=False, help="Use skip list to skip tests if found")
|
|
parser.add_argument('--db-engine', help='Database engine name')
|
|
|
|
parser.add_argument('--no-stateless', action='store_true', help='Disable all stateless tests')
|
|
parser.add_argument('--no-stateful', action='store_true', help='Disable all stateful tests')
|
|
parser.add_argument('--skip', nargs='+', help="Skip these tests")
|
|
parser.add_argument('--no-long', action='store_false', dest='no_long', help='Do not run long tests')
|
|
parser.add_argument('--client-option', nargs='+', help='Specify additional client argument')
|
|
parser.add_argument('--print-time', action='store_true', dest='print_time', help='Print test time')
|
|
group=parser.add_mutually_exclusive_group(required=False)
|
|
group.add_argument('--zookeeper', action='store_true', default=None, dest='zookeeper', help='Run zookeeper related tests')
|
|
group.add_argument('--no-zookeeper', action='store_false', default=None, dest='zookeeper', help='Do not run zookeeper related tests')
|
|
group=parser.add_mutually_exclusive_group(required=False)
|
|
group.add_argument('--shard', action='store_true', default=None, dest='shard', help='Run sharding related tests (required to clickhouse-server listen 127.0.0.2 127.0.0.3)')
|
|
group.add_argument('--no-shard', action='store_false', default=None, dest='shard', help='Do not run shard related tests')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.queries and not os.path.isdir(args.queries):
|
|
print("Cannot access the specified directory with queries (" + args.queries + ")", file=sys.stderr)
|
|
exit(1)
|
|
|
|
# Autodetect the directory with queries if not specified
|
|
if args.queries is None:
|
|
args.queries = 'queries'
|
|
|
|
if not os.path.isdir(args.queries):
|
|
# If we're running from the repo
|
|
args.queries = os.path.join(os.path.dirname(os.path.abspath( __file__ )), 'queries')
|
|
|
|
if not os.path.isdir(args.queries):
|
|
# Next we're going to try some system directories, don't write 'stdout' files into them.
|
|
if args.tmp is None:
|
|
args.tmp = '/tmp/clickhouse-test'
|
|
|
|
args.queries = '/usr/local/share/clickhouse-test/queries'
|
|
|
|
if not os.path.isdir(args.queries):
|
|
args.queries = '/usr/share/clickhouse-test/queries'
|
|
|
|
if not os.path.isdir(args.queries):
|
|
print("Failed to detect path to the queries directory. Please specify it with '--queries' option.", file=sys.stderr)
|
|
exit(1)
|
|
|
|
print("Using queries from '" + args.queries + "' directory")
|
|
|
|
if args.skip_list_path is None:
|
|
args.skip_list_path = os.path.join(args.queries, 'skip_list.json')
|
|
|
|
if args.tmp is None:
|
|
args.tmp = args.queries
|
|
if args.client is None:
|
|
if find_binary(args.binary + '-client'):
|
|
args.client = args.binary + '-client'
|
|
|
|
print("Using " + args.client + " as client program (expecting unbundled mode)")
|
|
elif find_binary(args.binary):
|
|
args.client = args.binary + ' client'
|
|
|
|
print("Using " + args.client + " as client program (expecting bundled mode)")
|
|
else:
|
|
print("No 'clickhouse' or 'clickhouse-client' client binary found", file=sys.stderr)
|
|
parser.print_help()
|
|
exit(1)
|
|
|
|
if args.configclient:
|
|
args.client += ' --config-file=' + args.configclient
|
|
if os.getenv("CLICKHOUSE_HOST"):
|
|
args.client += ' --host=' + os.getenv("CLICKHOUSE_HOST")
|
|
if os.getenv("CLICKHOUSE_PORT_TCP"):
|
|
args.client += ' --port=' + os.getenv("CLICKHOUSE_PORT_TCP")
|
|
if os.getenv("CLICKHOUSE_DATABASE"):
|
|
args.client += ' --database=' + os.getenv("CLICKHOUSE_DATABASE")
|
|
|
|
if args.client_option:
|
|
# Set options for client
|
|
if 'CLICKHOUSE_CLIENT_OPT' in os.environ:
|
|
os.environ['CLICKHOUSE_CLIENT_OPT'] += ' '
|
|
else:
|
|
os.environ['CLICKHOUSE_CLIENT_OPT'] = ''
|
|
|
|
os.environ['CLICKHOUSE_CLIENT_OPT'] += get_additional_client_options(args)
|
|
|
|
# Set options for curl
|
|
if 'CLICKHOUSE_URL_PARAMS' in os.environ:
|
|
os.environ['CLICKHOUSE_URL_PARAMS'] += '&'
|
|
else:
|
|
os.environ['CLICKHOUSE_URL_PARAMS'] = ''
|
|
|
|
os.environ['CLICKHOUSE_URL_PARAMS'] += get_additional_client_options_url(args)
|
|
|
|
if args.extract_from_config is None:
|
|
if os.access(args.binary + '-extract-from-config', os.X_OK):
|
|
args.extract_from_config = args.binary + '-extract-from-config'
|
|
else:
|
|
args.extract_from_config = args.binary + ' extract-from-config'
|
|
|
|
if args.jobs is None:
|
|
args.jobs = multiprocessing.cpu_count()
|
|
|
|
main(args)
|