ClickHouse/tests/clickhouse-test

1554 lines
60 KiB
Plaintext
Raw Normal View History

2020-10-02 16:54:07 +00:00
#!/usr/bin/env python3
# pylint: disable=too-many-return-statements
# pylint: disable=global-variable-not-assigned
import enum
import shutil
import sys
import os
import os.path
import signal
import re
import copy
import traceback
import math
# Not requests, to avoid requiring extra dependency.
import http.client
import urllib.parse
import json
# for crc32
import zlib
from argparse import ArgumentParser
from typing import Tuple, Union, Optional, Dict, Set, List
import subprocess
from subprocess import Popen
from subprocess import PIPE
2016-09-02 16:26:09 +00:00
from datetime import datetime
2020-08-26 17:44:03 +00:00
from time import time, sleep
2016-09-02 16:26:09 +00:00
from errno import ESRCH
try:
import termcolor
except ImportError:
termcolor = None
import random
import string
2019-06-03 17:36:27 +00:00
import multiprocessing
2021-08-02 13:51:33 +00:00
import socket
from contextlib import closing
USE_JINJA = True
try:
import jinja2
except ImportError:
USE_JINJA = False
print('WARNING: jinja2 not installed! Template tests will be skipped.')
2019-03-13 16:47:02 +00:00
MESSAGES_TO_RETRY = [
2020-05-29 00:46:42 +00:00
"ConnectionPoolWithFailover: Connection failed at try",
2021-06-15 20:52:29 +00:00
"DB::Exception: New table appeared in database being dropped or detached. Try again",
2021-07-06 13:36:18 +00:00
"is already started to be removing by another replica right now",
"DB::Exception: Cannot enqueue query",
"is executing longer than distributed_ddl_task_timeout" # FIXME
]
2019-03-13 16:47:02 +00:00
2021-06-21 17:29:32 +00:00
MAX_RETRIES = 3
2021-06-15 20:52:29 +00:00
TEST_FILE_EXTENSIONS = ['.sql', '.sql.j2', '.sh', '.py', '.expect']
def stringhash(s):
# default hash() function consistent
# only during process invocation https://stackoverflow.com/a/42089311
return zlib.crc32(s.encode('utf-8'))
class HTTPError(Exception):
def __init__(self, message=None, code=None):
self.message = message
self.code = code
super().__init__(message)
def __str__(self):
return 'Code: {}. {}'.format(self.code, self.message)
# Helpers to execute queries via HTTP interface.
def clickhouse_execute_http(base_args, query, timeout=30, settings=None, default_format=None):
client = http.client.HTTPConnection(
host=base_args.tcp_host,
port=base_args.http_port,
timeout=timeout)
timeout = int(timeout)
params = {
'query': query,
# hung check in stress tests may remove the database,
# hence we should use 'system'.
'database': 'system',
'connect_timeout': timeout,
'receive_timeout': timeout,
'send_timeout': timeout,
'http_connection_timeout': timeout,
'http_receive_timeout': timeout,
'http_send_timeout': timeout,
}
if settings is not None:
params.update(settings)
if default_format is not None:
params['default_format'] = default_format
client.request('POST', '/?' + base_args.client_options_query_str + urllib.parse.urlencode(params))
res = client.getresponse()
data = res.read()
if res.status != 200:
raise HTTPError(data.decode(), res.status)
return data
def clickhouse_execute(base_args, query, timeout=30, settings=None):
return clickhouse_execute_http(base_args, query, timeout, settings).strip()
def clickhouse_execute_json(base_args, query, timeout=30, settings=None):
data = clickhouse_execute_http(base_args, query, timeout, settings, 'JSONEachRow')
if not data:
return None
rows = []
for row in data.strip().splitlines():
rows.append(json.loads(row))
return rows
class Terminated(KeyboardInterrupt):
pass
def signal_handler(sig, frame):
raise Terminated(f'Terminated with {sig} signal')
def stop_tests():
global stop_tests_triggered_lock
global stop_tests_triggered
clickhouse-test: fix shared list object (by fixing manager lifetime) Right now it is possible to get the following error: Having 20 errors! 0 tests passed. 0 tests skipped. 57.37 s elapsed (MainProcess). Won't run stateful tests because test data wasn't loaded. Traceback (most recent call last): File "/usr/lib/python3.9/multiprocessing/managers.py", line 802, in _callmethod conn = self._tls.connection AttributeError: 'ForkAwareLocal' object has no attribute 'connection' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/src/ch/clickhouse/.cmake/../tests/clickhouse-test", line 1462, in <module> main(args) File "/src/ch/clickhouse/.cmake/../tests/clickhouse-test", line 1261, in main if len(restarted_tests) > 0: File "<string>", line 2, in __len__ File "/usr/lib/python3.9/multiprocessing/managers.py", line 806, in _callmethod self._connect() File "/usr/lib/python3.9/multiprocessing/managers.py", line 793, in _connect conn = self._Client(self._token.address, authkey=self._authkey) File "/usr/lib/python3.9/multiprocessing/connection.py", line 507, in Client c = SocketClient(address) File "/usr/lib/python3.9/multiprocessing/connection.py", line 635, in SocketClient s.connect(address) ConnectionRefusedError: [Errno 111] Connection refused The reason behind this is that manager's thread got terminated: ipdb> p restarted_tests._manager._process <ForkProcess name='SyncManager-1' pid=25125 parent=24939 stopped exitcode=-SIGTERM> Refs: #29259 (cc: @vdimir) Follow-up for: #29197 (cc: @tavplubix)
2021-09-27 18:10:59 +00:00
global restarted_tests
with stop_tests_triggered_lock:
2021-10-11 11:46:01 +00:00
print("Stopping tests")
if not stop_tests_triggered.is_set():
stop_tests_triggered.set()
clickhouse-test: fix shared list object (by fixing manager lifetime) Right now it is possible to get the following error: Having 20 errors! 0 tests passed. 0 tests skipped. 57.37 s elapsed (MainProcess). Won't run stateful tests because test data wasn't loaded. Traceback (most recent call last): File "/usr/lib/python3.9/multiprocessing/managers.py", line 802, in _callmethod conn = self._tls.connection AttributeError: 'ForkAwareLocal' object has no attribute 'connection' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/src/ch/clickhouse/.cmake/../tests/clickhouse-test", line 1462, in <module> main(args) File "/src/ch/clickhouse/.cmake/../tests/clickhouse-test", line 1261, in main if len(restarted_tests) > 0: File "<string>", line 2, in __len__ File "/usr/lib/python3.9/multiprocessing/managers.py", line 806, in _callmethod self._connect() File "/usr/lib/python3.9/multiprocessing/managers.py", line 793, in _connect conn = self._Client(self._token.address, authkey=self._authkey) File "/usr/lib/python3.9/multiprocessing/connection.py", line 507, in Client c = SocketClient(address) File "/usr/lib/python3.9/multiprocessing/connection.py", line 635, in SocketClient s.connect(address) ConnectionRefusedError: [Errno 111] Connection refused The reason behind this is that manager's thread got terminated: ipdb> p restarted_tests._manager._process <ForkProcess name='SyncManager-1' pid=25125 parent=24939 stopped exitcode=-SIGTERM> Refs: #29259 (cc: @vdimir) Follow-up for: #29197 (cc: @tavplubix)
2021-09-27 18:10:59 +00:00
# materialize multiprocessing.Manager().list() object before
# sending SIGTERM since this object is a proxy, that requires
# communicating with manager thread, but after SIGTERM will be
# send, this thread will die, and you will get
# ConnectionRefusedError error for any access to "restarted_tests"
# variable.
restarted_tests = [*restarted_tests]
# send signal to all processes in group to avoid hung check triggering
# (to avoid terminating clickhouse-test itself, the signal should be ignored)
signal.signal(signal.SIGTERM, signal.SIG_IGN)
os.killpg(os.getpgid(os.getpid()), signal.SIGTERM)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
2021-02-15 10:26:34 +00:00
def get_db_engine(args, database_name):
if args.replicated_database:
return f" ON CLUSTER test_cluster_database_replicated \
ENGINE=Replicated('/test/clickhouse/db/{database_name}', \
'{{shard}}', '{{replica}}')"
2020-09-21 10:24:10 +00:00
if args.db_engine:
return " ENGINE=" + args.db_engine
return "" # Will use default engine
def get_zookeeper_session_uptime(args):
try:
if args.replicated_database:
return int(clickhouse_execute(args, """
SELECT min(materialize(zookeeperSessionUptime()))
FROM clusterAllReplicas('test_cluster_database_replicated', system.one)
"""))
else:
return int(clickhouse_execute(args, 'SELECT zookeeperSessionUptime()'))
except:
return None
def need_retry(args, stdout, stderr, total_time):
if args.check_zookeeper_session:
# Sometimes we may get unexpected exception like "Replica is readonly" or "Shutdown is called for table"
# instead of "Session expired" or "Connection loss"
# Retry if session was expired during test execution.
# If ZooKeeper is configured, then it's more reliable than checking stderr,
# but the following condition is always true if ZooKeeper is not configured.
session_uptime = get_zookeeper_session_uptime(args)
if session_uptime is not None and session_uptime < math.ceil(total_time):
return True
2021-07-26 16:19:47 +00:00
return any(msg in stdout for msg in MESSAGES_TO_RETRY) or any(msg in stderr for msg in MESSAGES_TO_RETRY)
2019-03-13 16:47:02 +00:00
def get_processlist(args):
if args.replicated_database:
return clickhouse_execute_json(args, """
SELECT materialize((hostName(), tcpPort())) as host, *
FROM clusterAllReplicas('test_cluster_database_replicated', system.processes)
WHERE query NOT LIKE '%system.processes%'
""")
else:
return clickhouse_execute_json(args, 'SHOW PROCESSLIST')
2020-03-23 18:17:07 +00:00
# collect server stacktraces using gdb
def get_stacktraces_from_gdb(server_pid):
try:
cmd = f"gdb -batch -ex 'thread apply all backtrace' -p {server_pid}"
2020-10-27 08:44:58 +00:00
return subprocess.check_output(cmd, shell=True).decode('utf-8')
except Exception as e:
print(f"Error occurred while receiving stack traces from gdb: {e}")
2021-02-19 14:38:20 +00:00
return None
2020-03-23 18:17:07 +00:00
# collect server stacktraces from system.stack_trace table
# it does not work in Sandbox
def get_stacktraces_from_clickhouse(args):
settings_str = ' '.join([
get_additional_client_options(args),
'--allow_introspection_functions=1',
'--skip_unavailable_shards=1',
])
replicated_msg = \
"{} {} --query \
\"SELECT materialize((hostName(), tcpPort())) as host, thread_id, \
arrayStringConcat(arrayMap(x, y -> concat(x, ': ', y), \
arrayMap(x -> addressToLine(x), trace), \
arrayMap(x -> demangle(addressToSymbol(x)), trace)), '\n') as trace \
FROM clusterAllReplicas('test_cluster_database_replicated', 'system.stack_trace') \
ORDER BY host, thread_id FORMAT Vertical\"".format(args.client, settings_str)
msg = \
"{} {} --query \
\"SELECT arrayStringConcat(arrayMap(x, y -> concat(x, ': ', y), \
arrayMap(x -> addressToLine(x), trace), \
arrayMap(x -> demangle(addressToSymbol(x)), trace)), '\n') as trace \
FROM system.stack_trace FORMAT Vertical\"".format(args.client, settings_str)
try:
return subprocess.check_output(
replicated_msg if args.replicated_database else msg,
shell=True, stderr=subprocess.STDOUT).decode('utf-8')
except Exception as e:
print(f"Error occurred while receiving stack traces from client: {e}")
2021-02-19 14:38:20 +00:00
return None
def print_stacktraces() -> None:
server_pid = get_server_pid()
bt = None
if server_pid and not args.replicated_database:
print("")
print(f"Located ClickHouse server process {server_pid} listening at TCP port {args.tcp_port}")
print("Collecting stacktraces from all running threads with gdb:")
bt = get_stacktraces_from_gdb(server_pid)
if len(bt) < 1000:
print("Got suspiciously small stacktraces: ", bt)
bt = None
if bt is None:
print("\nCollecting stacktraces from system.stacktraces table:")
bt = get_stacktraces_from_clickhouse(args)
if bt is not None:
print(bt)
return
print(colored(
f"\nUnable to locate ClickHouse server process listening at TCP port {args.tcp_port}. "
"It must have crashed or exited prematurely!",
args, "red", attrs=["bold"]))
def get_server_pid():
2021-02-19 14:38:20 +00:00
# lsof does not work in stress tests for some reason
cmd_lsof = f"lsof -i tcp:{args.tcp_port} -s tcp:LISTEN -Fp | awk '/^p[0-9]+$/{{print substr($0, 2)}}'"
2021-02-19 14:38:20 +00:00
cmd_pidof = "pidof -s clickhouse-server"
2021-02-19 14:38:20 +00:00
commands = [cmd_lsof, cmd_pidof]
2021-02-19 09:57:09 +00:00
output = None
2021-02-19 14:38:20 +00:00
for cmd in commands:
try:
output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT, universal_newlines=True)
2021-02-19 14:38:20 +00:00
if output:
return int(output)
except Exception as e:
print(f"Cannot get server pid with {cmd}, got {output}: {e}")
return None # most likely server is dead
def colored(text, args, color=None, on_color=None, attrs=None):
if termcolor and (sys.stdout.isatty() or args.force_color):
return termcolor.colored(text, color, on_color, attrs)
else:
return text
class TestStatus(enum.Enum):
FAIL = "FAIL"
UNKNOWN = "UNKNOWN"
OK = "OK"
SKIPPED = "SKIPPED"
class FailureReason(enum.Enum):
# FAIL reasons
TIMEOUT = "Timeout!"
SERVER_DIED = "server died"
EXIT_CODE = "return code: "
STDERR = "having stderror: "
EXCEPTION = "having having exception in stdout: "
RESULT_DIFF = "result differs with reference: "
TOO_LONG = "Test runs too long (> 60s). Make it faster."
INTERNAL_QUERY_FAIL = "Internal query (CREATE/DROP DATABASE) failed:"
# SKIPPED reasons
DISABLED = "disabled"
SKIP = "skip"
NO_JINJA = "no jinja"
NO_ZOOKEEPER = "no zookeeper"
NO_SHARD = "no shard"
FAST_ONLY = "running fast tests only"
NO_LONG = "not running long tests"
REPLICATED_DB = "replicated-database"
S3_STORAGE = "s3-storage"
BUILD = "not running for current build"
# UNKNOWN reasons
NO_REFERENCE = "no reference file"
INTERNAL_ERROR = "Test internal error: "
class SettingsRandomizer:
settings = {
2022-02-18 12:22:06 +00:00
"max_insert_threads": lambda: 0 if random.random() < 0.5 else random.randint(1, 16),
"group_by_two_level_threshold": lambda: 1 if random.random() < 0.1 else 2 ** 60 if random.random() < 0.11 else 100000,
"group_by_two_level_threshold_bytes": lambda: 1 if random.random() < 0.1 else 2 ** 60 if random.random() < 0.11 else 50000000,
"distributed_aggregation_memory_efficient": lambda: random.randint(0, 1),
2022-02-18 12:22:06 +00:00
"fsync_metadata": lambda: random.randint(0, 1),
"priority": lambda: int(abs(random.gauss(0, 2))),
"output_format_parallel_formatting": lambda: random.randint(0, 1),
"input_format_parallel_parsing": lambda: random.randint(0, 1),
}
@staticmethod
def get_random_settings():
random_settings = []
for setting, generator in SettingsRandomizer.settings.items():
random_settings.append(setting + "=" + str(generator()) + "")
return random_settings
class TestResult:
def __init__(self, case_name: str, status: TestStatus, reason: Optional[FailureReason], total_time: float, description: str):
self.case_name: str = case_name
self.status: TestStatus = status
self.reason: Optional[FailureReason] = reason
self.total_time: float = total_time
self.description: str = description
self.need_retry: bool = False
def check_if_need_retry(self, args, stdout, stderr, runs_count):
if self.status != TestStatus.FAIL:
return
if not need_retry(args, stdout, stderr, self.total_time):
return
if MAX_RETRIES < runs_count:
return
self.need_retry = True
class TestCase:
@staticmethod
def get_description_from_exception_info(exc_info):
exc_type, exc_value, tb = exc_info
exc_name = exc_type.__name__
traceback_str = "\n".join(traceback.format_tb(tb, 10))
description = f"\n{exc_name}\n{exc_value}\n{traceback_str}"
return description
@staticmethod
def get_reference_file(suite_dir, name):
"""
Returns reference file name for specified test
"""
name = removesuffix(name, ".gen")
for ext in ['.reference', '.gen.reference']:
reference_file = os.path.join(suite_dir, name) + ext
if os.path.isfile(reference_file):
return reference_file
return None
@staticmethod
def configure_testcase_args(args, case_file, suite_tmp_dir):
testcase_args = copy.deepcopy(args)
testcase_args.testcase_start_time = datetime.now()
testcase_basename = os.path.basename(case_file)
testcase_args.testcase_client = f"{testcase_args.client} --log_comment='{testcase_basename}'"
testcase_args.testcase_basename = testcase_basename
2021-08-06 14:38:28 +00:00
if testcase_args.database:
database = testcase_args.database
os.environ.setdefault("CLICKHOUSE_DATABASE", database)
os.environ.setdefault("CLICKHOUSE_TMP", suite_tmp_dir)
else:
# If --database is not specified, we will create temporary database with unique name
# And we will recreate and drop it for each test
def random_str(length=6):
alphabet = string.ascii_lowercase + string.digits
# NOTE: it is important not to use default random generator, since it shares state.
return ''.join(random.SystemRandom().choice(alphabet) for _ in range(length))
2021-08-06 14:38:28 +00:00
database = 'test_{suffix}'.format(suffix=random_str())
2021-08-06 14:38:28 +00:00
clickhouse_execute(args, "CREATE DATABASE " + database + get_db_engine(testcase_args, database), settings={
'log_comment': testcase_args.testcase_basename,
})
2021-08-06 14:38:28 +00:00
os.environ["CLICKHOUSE_DATABASE"] = database
# Set temporary directory to match the randomly generated database,
# because .sh tests also use it for temporary files and we want to avoid
# collisions.
testcase_args.test_tmp_dir = os.path.join(suite_tmp_dir, database)
os.mkdir(testcase_args.test_tmp_dir)
os.environ.setdefault("CLICKHOUSE_TMP", testcase_args.test_tmp_dir)
testcase_args.testcase_database = database
return testcase_args
def add_random_settings(self, client_options):
2022-02-11 14:15:56 +00:00
if self.tags and 'no-random-settings' in self.tags:
return client_options
if len(self.base_url_params) == 0:
os.environ['CLICKHOUSE_URL_PARAMS'] = '&'.join(self.random_settings)
else:
os.environ['CLICKHOUSE_URL_PARAMS'] = self.base_url_params + '&' + '&'.join(self.random_settings)
2022-03-05 16:46:14 +00:00
new_options = " --allow_repeated_settings --" + " --".join(self.random_settings)
2022-02-28 12:17:44 +00:00
os.environ['CLICKHOUSE_CLIENT_OPT'] = self.base_client_options + new_options + ' '
return client_options + new_options
def remove_random_settings_from_env(self):
os.environ['CLICKHOUSE_URL_PARAMS'] = self.base_url_params
os.environ['CLICKHOUSE_CLIENT_OPT'] = self.base_client_options
def add_info_about_settings(self, description):
2022-02-11 14:15:56 +00:00
if self.tags and 'no-random-settings' in self.tags:
return description
2022-02-08 15:38:41 +00:00
return description + "\n" + "Settings used in the test: " + "--" + " --".join(self.random_settings) + "\n"
def __init__(self, suite, case: str, args, is_concurrent: bool):
self.case: str = case # case file name
self.tags: Set[str] = suite.all_tags[case] if case in suite.all_tags else set()
self.case_file: str = os.path.join(suite.suite_path, case)
(self.name, self.ext) = os.path.splitext(case)
file_suffix = ('.' + str(os.getpid())) if is_concurrent and args.test_runs > 1 else ''
self.reference_file = self.get_reference_file(suite.suite_path, self.name)
self.stdout_file = os.path.join(suite.suite_tmp_path, self.name) + file_suffix + '.stdout'
self.stderr_file = os.path.join(suite.suite_tmp_path, self.name) + file_suffix + '.stderr'
self.testcase_args = None
self.runs_count = 0
self.random_settings = SettingsRandomizer.get_random_settings()
self.base_url_params = os.environ['CLICKHOUSE_URL_PARAMS'] if 'CLICKHOUSE_URL_PARAMS' in os.environ else ''
self.base_client_options = os.environ['CLICKHOUSE_CLIENT_OPT'] if 'CLICKHOUSE_CLIENT_OPT' in os.environ else ''
# should skip test, should increment skipped_total, skip reason
def should_skip_test(self, suite) -> Optional[FailureReason]:
tags = self.tags
if tags and ('disabled' in tags) and not args.disabled:
return FailureReason.DISABLED
elif os.path.exists(os.path.join(suite.suite_path, self.name) + '.disabled') and not args.disabled:
return FailureReason.DISABLED
elif args.skip and any(s in self.name for s in args.skip):
return FailureReason.SKIP
elif not USE_JINJA and self.ext.endswith("j2"):
return FailureReason.NO_JINJA
elif tags and (('zookeeper' in tags) or ('replica' in tags)) and not args.zookeeper:
return FailureReason.NO_ZOOKEEPER
elif tags and (('shard' in tags) or ('distributed' in tags) or ('global' in tags)) and not args.shard:
return FailureReason.NO_SHARD
elif tags and ('no-fasttest' in tags) and args.fast_tests_only:
return FailureReason.FAST_ONLY
elif tags and (('long' in tags) or ('deadlock' in tags) or ('race' in tags)) and args.no_long:
# Tests for races and deadlocks usually are run in a loop for a significant amount of time
return FailureReason.NO_LONG
elif tags and ('no-replicated-database' in tags) and args.replicated_database:
return FailureReason.REPLICATED_DB
elif tags and ('no-s3-storage' in tags) and args.s3_storage:
return FailureReason.S3_STORAGE
elif tags:
for build_flag in args.build_flags:
if 'no-' + build_flag in tags:
return FailureReason.BUILD
for tag in tags:
tag = tag.replace('-', '_')
if tag.startswith('use_') and tag not in args.build_flags:
return FailureReason.BUILD
return None
def process_result_impl(self, proc, stdout: str, stderr: str, total_time: float):
description = ""
if proc:
if proc.returncode is None:
try:
proc.kill()
except OSError as e:
if e.errno != ESRCH:
raise
if stderr:
description += stderr
return TestResult(self.name, TestStatus.FAIL, FailureReason.TIMEOUT, total_time, description)
if proc.returncode != 0:
reason = FailureReason.EXIT_CODE
description += str(proc.returncode)
if stderr:
description += "\n"
description += stderr
# Stop on fatal errors like segmentation fault. They are sent to client via logs.
if ' <Fatal> ' in stderr:
reason = FailureReason.SERVER_DIED
if self.testcase_args.stop \
and ('Connection refused' in stderr or 'Attempt to read after eof' in stderr) \
and 'Received exception from server' not in stderr:
reason = FailureReason.SERVER_DIED
if os.path.isfile(self.stdout_file):
description += ", result:\n\n"
description += '\n'.join(open(self.stdout_file).read().splitlines()[:100])
description += '\n'
description += "\nstdout:\n{}\n".format(stdout)
return TestResult(self.name, TestStatus.FAIL, reason, total_time, description)
if stderr:
description += "\n{}\n".format('\n'.join(stderr.splitlines()[:100]))
description += "\nstdout:\n{}\n".format(stdout)
return TestResult(self.name, TestStatus.FAIL, FailureReason.STDERR, total_time, description)
if 'Exception' in stdout:
description += "\n{}\n".format('\n'.join(stdout.splitlines()[:100]))
return TestResult(self.name, TestStatus.FAIL, FailureReason.EXCEPTION, total_time, description)
if '@@SKIP@@' in stdout:
skip_reason = stdout.replace('@@SKIP@@', '').rstrip("\n")
description += " - "
description += skip_reason
return TestResult(self.name, TestStatus.SKIPPED, FailureReason.SKIP, total_time, description)
if self.reference_file is None:
return TestResult(self.name, TestStatus.UNKNOWN, FailureReason.NO_REFERENCE, total_time, description)
result_is_different = subprocess.call(['diff', '-q', self.reference_file, self.stdout_file], stdout=PIPE)
if result_is_different:
diff = Popen(['diff', '-U', str(self.testcase_args.unified), self.reference_file, self.stdout_file], stdout=PIPE,
universal_newlines=True).communicate()[0]
description += "\n{}\n".format(diff)
return TestResult(self.name, TestStatus.FAIL, FailureReason.RESULT_DIFF, total_time, description)
if self.testcase_args.test_runs > 1 and total_time > 60 and 'long' not in self.tags:
# We're in Flaky Check mode, check the run time as well while we're at it.
return TestResult(self.name, TestStatus.FAIL, FailureReason.TOO_LONG, total_time, description)
if os.path.exists(self.stdout_file):
os.remove(self.stdout_file)
if os.path.exists(self.stderr_file):
os.remove(self.stderr_file)
return TestResult(self.name, TestStatus.OK, None, total_time, description)
@staticmethod
def print_test_time(test_time) -> str:
if args.print_time:
return " {0:.2f} sec.".format(test_time)
else:
return ''
def process_result(self, result: TestResult, messages):
description_full = messages[result.status]
description_full += self.print_test_time(result.total_time)
if result.reason is not None:
description_full += " - "
description_full += result.reason.value
description_full += result.description
description_full += "\n"
if result.status == TestStatus.FAIL:
description_full += 'Database: ' + self.testcase_args.testcase_database
result.description = description_full
return result
@staticmethod
def send_test_name_failed(suite: str, case: str) -> bool:
pid = os.getpid()
clickhouse_execute(args, f"SELECT 'Running test {suite}/{case} from pid={pid}'")
def run_single_test(self, server_logs_level, client_options):
args = self.testcase_args
client = args.testcase_client
start_time = args.testcase_start_time
database = args.testcase_database
# This is for .sh tests
os.environ["CLICKHOUSE_LOG_COMMENT"] = args.testcase_basename
params = {
'client': client + ' --database=' + database,
'logs_level': server_logs_level,
'options': client_options,
'test': self.case_file,
'stdout': self.stdout_file,
'stderr': self.stderr_file,
}
# >> append to stderr (but not stdout since it is not used there),
# because there are also output of per test database creation
if not args.database:
pattern = '{test} > {stdout} 2> {stderr}'
else:
pattern = '{test} > {stdout} 2> {stderr}'
if self.ext == '.sql':
pattern = "{client} --send_logs_level={logs_level} --testmode --multiquery {options} < " + pattern
command = pattern.format(**params)
proc = Popen(command, shell=True, env=os.environ)
while (datetime.now() - start_time).total_seconds() < args.timeout and proc.poll() is None:
sleep(0.01)
need_drop_database = not args.database
if need_drop_database and args.no_drop_if_fail:
maybe_passed = (proc.returncode == 0) and (proc.stderr is None) and (
proc.stdout is None or 'Exception' not in proc.stdout)
need_drop_database = not maybe_passed
if need_drop_database:
seconds_left = max(args.timeout - (datetime.now() - start_time).total_seconds(), 20)
try:
clickhouse_execute(args, "DROP DATABASE " + database, timeout=seconds_left, settings={
'log_comment': args.testcase_basename,
})
except socket.timeout:
total_time = (datetime.now() - start_time).total_seconds()
return None, "", f"Timeout dropping database {database} after test", total_time
shutil.rmtree(args.test_tmp_dir)
total_time = (datetime.now() - start_time).total_seconds()
# Normalize randomized database names in stdout, stderr files.
os.system("LC_ALL=C sed -i -e 's/{test_db}/default/g' {file}".format(test_db=database, file=self.stdout_file))
if args.hide_db_name:
os.system(
"LC_ALL=C sed -i -e 's/{test_db}/default/g' {file}".format(test_db=database, file=self.stderr_file))
if args.replicated_database:
os.system("LC_ALL=C sed -i -e 's|/auto_{{shard}}||g' {file}".format(file=self.stdout_file))
os.system("LC_ALL=C sed -i -e 's|auto_{{replica}}||g' {file}".format(file=self.stdout_file))
# Normalize hostname in stdout file.
os.system("LC_ALL=C sed -i -e 's/{hostname}/localhost/g' {file}".format(hostname=socket.gethostname(),
file=self.stdout_file))
stdout = open(self.stdout_file, 'rb').read() if os.path.exists(self.stdout_file) else b''
stdout = str(stdout, errors='replace', encoding='utf-8')
stderr = open(self.stderr_file, 'rb').read() if os.path.exists(self.stderr_file) else b''
stderr = str(stderr, errors='replace', encoding='utf-8')
return proc, stdout, stderr, total_time
def run(self, args, suite, client_options, server_logs_level):
try:
skip_reason = self.should_skip_test(suite)
if skip_reason is not None:
return TestResult(self.name, TestStatus.SKIPPED, skip_reason, 0., "")
if args.testname:
try:
self.send_test_name_failed(suite.suite, self.case)
except:
return TestResult(self.name, TestStatus.FAIL, FailureReason.SERVER_DIED, 0.,
"\nServer does not respond to health check\n")
self.runs_count += 1
self.testcase_args = self.configure_testcase_args(args, self.case_file, suite.suite_tmp_path)
client_options = self.add_random_settings(client_options)
proc, stdout, stderr, total_time = self.run_single_test(server_logs_level, client_options)
result = self.process_result_impl(proc, stdout, stderr, total_time)
result.check_if_need_retry(args, stdout, stderr, self.runs_count)
if result.status == TestStatus.FAIL:
result.description = self.add_info_about_settings(result.description)
return result
except KeyboardInterrupt as e:
raise e
except HTTPError:
return TestResult(self.name, TestStatus.FAIL,
FailureReason.INTERNAL_QUERY_FAIL,
0.,
2022-02-17 11:10:38 +00:00
self.add_info_about_settings(self.get_description_from_exception_info(sys.exc_info())))
except (ConnectionRefusedError, ConnectionResetError):
return TestResult(self.name, TestStatus.FAIL,
FailureReason.SERVER_DIED,
0.,
2022-02-17 11:10:38 +00:00
self.add_info_about_settings(self.get_description_from_exception_info(sys.exc_info())))
except:
return TestResult(self.name, TestStatus.UNKNOWN,
FailureReason.INTERNAL_ERROR,
0.,
self.get_description_from_exception_info(sys.exc_info()))
finally:
self.remove_random_settings_from_env()
class TestSuite:
@staticmethod
def tests_in_suite_key_func(item: str) -> int:
if args.order == 'random':
return random.random()
reverse = 1 if args.order == 'asc' else -1
if -1 == item.find('_'):
return 99998
prefix, _ = item.split('_', 1)
try:
return reverse * int(prefix)
except ValueError:
return 99997
@staticmethod
def render_test_template(j2env, suite_dir, test_name):
"""
Render template for test and reference file if needed
"""
if j2env is None:
return test_name
test_base_name = removesuffix(test_name, ".sql.j2", ".sql")
reference_file_name = test_base_name + ".reference.j2"
reference_file_path = os.path.join(suite_dir, reference_file_name)
if os.path.isfile(reference_file_path):
tpl = j2env.get_template(reference_file_name)
tpl.stream().dump(os.path.join(suite_dir, test_base_name) + ".gen.reference")
if test_name.endswith(".sql.j2"):
tpl = j2env.get_template(test_name)
generated_test_name = test_base_name + ".gen.sql"
tpl.stream().dump(os.path.join(suite_dir, generated_test_name))
return generated_test_name
return test_name
@staticmethod
def read_test_tags(suite_dir: str, all_tests: List[str]) -> Dict[str, Set[str]]:
def get_comment_sign(filename):
if filename.endswith('.sql') or filename.endswith('.sql.j2'):
return '--'
elif filename.endswith('.sh') or filename.endswith('.py') or filename.endswith('.expect'):
return '#'
else:
raise Exception(f'Unknown file_extension: {filename}')
def parse_tags_from_line(line, comment_sign):
if not line.startswith(comment_sign):
return None
tags_str = line[len(comment_sign):].lstrip()
tags_prefix = "Tags:"
if not tags_str.startswith(tags_prefix):
return None
tags_str = tags_str[len(tags_prefix):]
tags = tags_str.split(',')
tags = {tag.strip() for tag in tags}
return tags
def is_shebang(line):
return line.startswith('#!')
def load_tags_from_file(filepath):
with open(filepath, 'r') as file:
try:
line = file.readline()
if is_shebang(line):
line = file.readline()
except UnicodeDecodeError:
return []
return parse_tags_from_line(line, get_comment_sign(filepath))
all_tags = {}
start_time = datetime.now()
for test_name in all_tests:
tags = load_tags_from_file(os.path.join(suite_dir, test_name))
if tags:
all_tags[test_name] = tags
elapsed = (datetime.now() - start_time).total_seconds()
if elapsed > 1:
print(f"Tags for suite {suite_dir} read in {elapsed:.2f} seconds")
return all_tags
def __init__(self, args, suite_path: str, suite_tmp_path: str, suite: str):
self.args = args
self.suite_path: str = suite_path
self.suite_tmp_path: str = suite_tmp_path
self.suite: str = suite
filter_func = lambda x: True
if args.run_by_hash_num is not None and args.run_by_hash_total is not None:
if args.run_by_hash_num > args.run_by_hash_total:
raise Exception(f"Incorrect run by hash, value {args.run_by_hash_num} bigger than total {args.run_by_hash_total}")
filter_func = lambda x: stringhash(x) % args.run_by_hash_total == args.run_by_hash_num
self.all_tests: List[str] = self.get_tests_list(self.tests_in_suite_key_func, filter_func)
self.all_tags: Dict[str, Set[str]] = self.read_test_tags(self.suite_path, self.all_tests)
self.sequential_tests = []
self.parallel_tests = []
for test_name in self.all_tests:
if self.is_sequential_test(test_name):
self.sequential_tests.append(test_name)
else:
self.parallel_tests.append(test_name)
def is_sequential_test(self, test_name):
if args.sequential:
if any(s in test_name for s in args.sequential):
return True
if test_name not in self.all_tags:
return False
return ('no-parallel' in self.all_tags[test_name]) or ('sequential' in self.all_tags[test_name])
def get_tests_list(self, sort_key, filter_func):
"""
Return list of tests file names to run
"""
all_tests = list(self.get_selected_tests(filter_func))
all_tests = all_tests * self.args.test_runs
all_tests.sort(key=sort_key)
return all_tests
def get_selected_tests(self, filter_func):
"""
Find all files with tests, filter, render templates
"""
j2env = jinja2.Environment(
loader=jinja2.FileSystemLoader(self.suite_path),
keep_trailing_newline=True,
) if USE_JINJA else None
for test_name in os.listdir(self.suite_path):
if not is_test_from_dir(self.suite_path, test_name):
continue
if self.args.test and not any(re.search(pattern, test_name) for pattern in self.args.test):
continue
if USE_JINJA and test_name.endswith(".gen.sql"):
continue
if not filter_func(test_name):
continue
test_name = self.render_test_template(j2env, self.suite_path, test_name)
yield test_name
@staticmethod
def read_test_suite(args, suite_dir_name: str):
def is_data_present():
return int(clickhouse_execute(args, 'EXISTS TABLE test.hits'))
base_dir = os.path.abspath(args.queries)
tmp_dir = os.path.abspath(args.tmp)
suite_path = os.path.join(base_dir, suite_dir_name)
suite_re_obj = re.search('^[0-9]+_(.*)$', suite_dir_name)
if not suite_re_obj: # skip .gitignore and so on
return None
suite_tmp_path = os.path.join(tmp_dir, suite_dir_name)
if not os.path.exists(suite_tmp_path):
os.makedirs(suite_tmp_path)
suite = suite_re_obj.group(1)
if not os.path.isdir(suite_path):
return None
if 'stateful' in suite and not args.no_stateful and not is_data_present():
print("Won't run stateful tests because test data wasn't loaded.")
return None
if 'stateless' in suite and args.no_stateless:
print("Won't run stateless tests because they were manually disabled.")
return None
if 'stateful' in suite and args.no_stateful:
print("Won't run stateful tests because they were manually disabled.")
return None
2021-08-06 14:38:28 +00:00
return TestSuite(args, suite_path, suite_tmp_path, suite)
2021-08-06 14:38:28 +00:00
stop_time = None
exit_code = None
server_died = None
stop_tests_triggered_lock = None
stop_tests_triggered = None
queue = None
multiprocessing_manager = None
restarted_tests = None
# def run_tests_array(all_tests: List[str], num_tests: int, test_suite: TestSuite):
def run_tests_array(all_tests_with_params):
all_tests, num_tests, test_suite = all_tests_with_params
2020-08-26 17:44:03 +00:00
global stop_time
global exit_code
global server_died
global restarted_tests
OP_SQUARE_BRACKET = colored("[", args, attrs=['bold'])
CL_SQUARE_BRACKET = colored("]", args, attrs=['bold'])
MSG_FAIL = OP_SQUARE_BRACKET + colored(" FAIL ", args, "red", attrs=['bold']) + CL_SQUARE_BRACKET
MSG_UNKNOWN = OP_SQUARE_BRACKET + colored(" UNKNOWN ", args, "yellow", attrs=['bold']) + CL_SQUARE_BRACKET
MSG_OK = OP_SQUARE_BRACKET + colored(" OK ", args, "green", attrs=['bold']) + CL_SQUARE_BRACKET
MSG_SKIPPED = OP_SQUARE_BRACKET + colored(" SKIPPED ", args, "cyan", attrs=['bold']) + CL_SQUARE_BRACKET
MESSAGES = {TestStatus.FAIL: MSG_FAIL, TestStatus.UNKNOWN: MSG_UNKNOWN, TestStatus.OK: MSG_OK, TestStatus.SKIPPED: MSG_SKIPPED}
passed_total = 0
skipped_total = 0
failures_total = 0
failures_chain = 0
2021-03-29 18:14:06 +00:00
start_time = datetime.now()
2021-03-29 18:19:13 +00:00
is_concurrent = multiprocessing.current_process().name != "MainProcess"
client_options = get_additional_client_options(args)
2021-05-20 16:02:46 +00:00
if num_tests > 0:
about = 'about ' if is_concurrent else ''
proc_name = multiprocessing.current_process().name
print(f"\nRunning {about}{num_tests} {test_suite.suite} tests ({proc_name}).\n")
2021-05-20 16:02:46 +00:00
while True:
2021-05-20 16:44:35 +00:00
if is_concurrent:
2021-10-11 13:40:12 +00:00
case = queue.get(timeout=args.timeout * 1.1)
2021-05-20 16:44:35 +00:00
if not case:
2021-05-20 16:02:46 +00:00
break
else:
2021-05-20 16:44:35 +00:00
if all_tests:
case = all_tests.pop(0)
2021-05-20 16:02:46 +00:00
else:
break
2019-04-09 13:17:36 +00:00
if server_died.is_set():
stop_tests()
break
2019-04-09 13:17:36 +00:00
2020-08-26 17:44:03 +00:00
if stop_time and time() > stop_time:
print("\nStop tests run because global time limit is exceeded.\n")
stop_tests()
2020-08-26 17:44:03 +00:00
break
test_case = TestCase(test_suite, case, args, is_concurrent)
try:
description = ''
if not is_concurrent:
sys.stdout.flush()
sys.stdout.write("{0:72}".format(removesuffix(test_case.name, ".gen", ".sql") + ": "))
# This flush is needed so you can see the test name of the long
# running test before it will finish. But don't do it in parallel
# mode, so that the lines don't mix.
sys.stdout.flush()
else:
description = "{0:72}".format(removesuffix(test_case.name, ".gen", ".sql") + ": ")
2021-08-06 14:38:28 +00:00
while True:
test_result = test_case.run(args, test_suite, client_options, server_logs_level)
test_result = test_case.process_result(test_result, MESSAGES)
if not test_result.need_retry:
break
restarted_tests.append(test_result)
if test_result.status == TestStatus.OK:
passed_total += 1
failures_chain = 0
elif test_result.status == TestStatus.FAIL:
failures_total += 1
failures_chain += 1
if test_result.reason == FailureReason.SERVER_DIED:
2021-08-06 14:38:28 +00:00
server_died.set()
stop_tests()
elif test_result.status == TestStatus.SKIPPED:
skipped_total += 1
2021-08-06 14:38:28 +00:00
description += test_result.description
2021-08-06 14:38:28 +00:00
if description and not description.endswith('\n'):
description += '\n'
2021-08-06 14:38:28 +00:00
sys.stdout.write(description)
sys.stdout.flush()
except KeyboardInterrupt as e:
print(colored("Break tests execution", args, "red"))
stop_tests()
raise e
if failures_chain >= 20:
stop_tests()
break
if failures_total > 0:
2021-03-29 18:14:06 +00:00
print(colored(f"\nHaving {failures_total} errors! {passed_total} tests passed."
2021-03-29 18:19:13 +00:00
f" {skipped_total} tests skipped. {(datetime.now() - start_time).total_seconds():.2f} s elapsed"
f' ({multiprocessing.current_process().name}).',
2021-03-29 18:14:06 +00:00
args, "red", attrs=["bold"]))
exit_code.value = 1
else:
2021-03-29 18:14:06 +00:00
print(colored(f"\n{passed_total} tests passed. {skipped_total} tests skipped."
2021-03-29 18:19:13 +00:00
f" {(datetime.now() - start_time).total_seconds():.2f} s elapsed"
f' ({multiprocessing.current_process().name}).',
2021-03-29 18:14:06 +00:00
args, "green", attrs=["bold"]))
2019-04-09 13:17:36 +00:00
2021-03-29 22:41:07 +00:00
sys.stdout.flush()
server_logs_level = "warning"
def check_server_started(args):
print("Connecting to ClickHouse server...", end='')
sys.stdout.flush()
retry_count = args.server_check_retries
while retry_count > 0:
try:
clickhouse_execute(args, 'SELECT 1')
print(" OK")
sys.stdout.flush()
return True
except (ConnectionRefusedError, ConnectionResetError):
2021-08-06 14:38:28 +00:00
print('.', end='')
sys.stdout.flush()
retry_count -= 1
sleep(0.5)
continue
2021-08-06 14:38:28 +00:00
print('\nAll connection tries failed')
sys.stdout.flush()
return False
class BuildFlags():
THREAD = 'tsan'
ADDRESS = 'asan'
UNDEFINED = 'ubsan'
MEMORY = 'msan'
DEBUG = 'debug'
RELEASE = 'release'
ORDINARY_DATABASE = 'ordinary-database'
2020-07-03 10:57:16 +00:00
POLYMORPHIC_PARTS = 'polymorphic-parts'
def collect_build_flags(args):
result = []
2020-10-07 18:53:34 +00:00
value = clickhouse_execute(args, "SELECT value FROM system.build_options WHERE name = 'CXX_FLAGS'")
if b'-fsanitize=thread' in value:
result.append(BuildFlags.THREAD)
elif b'-fsanitize=address' in value:
result.append(BuildFlags.ADDRESS)
elif b'-fsanitize=undefined' in value:
result.append(BuildFlags.UNDEFINED)
elif b'-fsanitize=memory' in value:
result.append(BuildFlags.MEMORY)
value = clickhouse_execute(args, "SELECT value FROM system.build_options WHERE name = 'BUILD_TYPE'")
if b'Debug' in value:
result.append(BuildFlags.DEBUG)
elif b'RelWithDebInfo' in value or b'Release' in value:
result.append(BuildFlags.RELEASE)
value = clickhouse_execute(args, "SELECT value FROM system.settings WHERE name = 'default_database_engine'")
if value == b'Ordinary':
result.append(BuildFlags.ORDINARY_DATABASE)
value = int(clickhouse_execute(args, "SELECT value FROM system.merge_tree_settings WHERE name = 'min_bytes_for_wide_part'"))
if value == 0:
result.append(BuildFlags.POLYMORPHIC_PARTS)
2020-10-07 18:53:34 +00:00
use_flags = clickhouse_execute(args, "SELECT name FROM system.build_options WHERE name like 'USE_%' AND value in ('ON', '1')")
for use_flag in use_flags.strip().splitlines():
use_flag = use_flag.decode().lower()
result.append(use_flag)
system_processor = clickhouse_execute(args, "SELECT value FROM system.build_options WHERE name = 'SYSTEM_PROCESSOR' LIMIT 1").strip()
if system_processor:
result.append(f'cpu-{system_processor.decode().lower()}')
2020-07-03 10:57:16 +00:00
return result
def suite_key_func(item: str) -> Union[int, Tuple[int, str]]:
if args.order == 'random':
return random.random()
if -1 == item.find('_'):
return 99998, ''
prefix, suffix = item.split('_', 1)
try:
return int(prefix), suffix
except ValueError:
return 99997, ''
def extract_key(key: str) -> str:
return subprocess.getstatusoutput(
args.extract_from_config +
" --try --config " +
args.configserver + key)[1]
def do_run_tests(jobs, test_suite: TestSuite, parallel):
if jobs > 1 and len(test_suite.parallel_tests) > 0:
print("Found", len(test_suite.parallel_tests), "parallel tests and", len(test_suite.sequential_tests), "sequential tests")
2021-05-20 18:11:12 +00:00
run_n, run_total = parallel.split('/')
run_n = float(run_n)
run_total = float(run_total)
tests_n = len(test_suite.parallel_tests)
2021-05-20 18:11:12 +00:00
if run_total > tests_n:
run_total = tests_n
if jobs > tests_n:
jobs = tests_n
if jobs > run_total:
run_total = jobs
batch_size = max(1, len(test_suite.parallel_tests) // jobs)
2021-05-20 18:11:12 +00:00
parallel_tests_array = []
for _ in range(jobs):
parallel_tests_array.append((None, batch_size, test_suite))
2021-05-20 18:11:12 +00:00
with closing(multiprocessing.Pool(processes=jobs)) as pool:
pool.map_async(run_tests_array, parallel_tests_array)
for suit in test_suite.parallel_tests:
2021-10-11 13:40:12 +00:00
queue.put(suit, timeout=args.timeout * 1.1)
2021-05-20 18:11:12 +00:00
for _ in range(jobs):
2021-10-11 13:40:12 +00:00
queue.put(None, timeout=args.timeout * 1.1)
2021-05-20 18:11:12 +00:00
queue.close()
pool.join()
run_tests_array((test_suite.sequential_tests, len(test_suite.sequential_tests), test_suite))
return len(test_suite.sequential_tests) + len(test_suite.parallel_tests)
2021-05-20 18:11:12 +00:00
else:
num_tests = len(test_suite.all_tests)
run_tests_array((test_suite.all_tests, num_tests, test_suite))
2021-05-20 19:57:06 +00:00
return num_tests
2021-05-20 18:11:12 +00:00
def is_test_from_dir(suite_dir, case):
case_file = os.path.join(suite_dir, case)
# We could also test for executable files (os.access(case_file, os.X_OK),
# but it interferes with 01610_client_spawn_editor.editor, which is invoked
# as a query editor in the test, and must be marked as executable.
return os.path.isfile(case_file) and any(case_file.endswith(suppotred_ext) for suppotred_ext in TEST_FILE_EXTENSIONS)
def removesuffix(text, *suffixes):
"""
Added in python 3.9
https://www.python.org/dev/peps/pep-0616/
This version can work with several possible suffixes
"""
for suffix in suffixes:
if suffix and text.endswith(suffix):
return text[:-len(suffix)]
return text
def main(args):
global server_died
2020-08-26 17:44:03 +00:00
global stop_time
global exit_code
global server_logs_level
global restarted_tests
if not check_server_started(args):
2021-09-18 22:05:17 +00:00
msg = "Server is not responding. Cannot execute 'SELECT 1' query. \
If you are using split build, you have to specify -c option."
if args.hung_check:
print(msg)
pid = get_server_pid()
print("Got server pid", pid)
print_stacktraces()
raise Exception(msg)
2020-10-12 11:17:35 +00:00
args.build_flags = collect_build_flags(args)
2020-07-03 11:15:30 +00:00
2020-07-03 10:57:16 +00:00
if args.skip:
args.skip = set(args.skip)
base_dir = os.path.abspath(args.queries)
# Keep same default values as in queries/shell_config.sh
os.environ.setdefault("CLICKHOUSE_BINARY", args.binary)
2021-08-06 14:38:28 +00:00
# os.environ.setdefault("CLICKHOUSE_CLIENT", args.client)
os.environ.setdefault("CLICKHOUSE_CONFIG", args.configserver)
2018-06-18 21:13:11 +00:00
if args.configclient:
os.environ.setdefault("CLICKHOUSE_CONFIG_CLIENT", args.configclient)
# Force to print server warnings in stderr
# Shell scripts could change logging level
os.environ.setdefault("CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL", server_logs_level)
2020-08-26 17:44:03 +00:00
# This code is bad as the time is not monotonic
if args.global_time_limit:
stop_time = time() + args.global_time_limit
if args.zookeeper is None:
try:
args.zookeeper = int(extract_key(" --key zookeeper | grep . | wc -l")) > 0
except ValueError:
args.zookeeper = False
if args.shard is None:
args.shard = bool(extract_key(' --key listen_host | grep -E "127.0.0.2|::"'))
2021-06-22 11:50:09 +00:00
def create_common_database(args, db_name):
create_database_retries = 0
while create_database_retries < MAX_RETRIES:
start_time = datetime.now()
try:
clickhouse_execute(args, "CREATE DATABASE IF NOT EXISTS " + db_name + get_db_engine(args, db_name))
except HTTPError as e:
total_time = (datetime.now() - start_time).total_seconds()
if not need_retry(args, e.message, e.message, total_time):
break
2021-06-22 11:50:09 +00:00
create_database_retries += 1
2021-06-22 11:50:09 +00:00
if args.database and args.database != "test":
create_common_database(args, args.database)
create_common_database(args, "test")
2019-10-09 10:51:05 +00:00
total_tests_run = 0
for suite in sorted(os.listdir(base_dir), key=suite_key_func):
if server_died.is_set():
break
test_suite = TestSuite.read_test_suite(args, suite)
if test_suite is None:
continue
total_tests_run += do_run_tests(args.jobs, test_suite, args.parallel)
2019-10-09 10:51:05 +00:00
if server_died.is_set():
exit_code.value = 1
if args.hung_check:
# Some queries may execute in background for some time after test was finished. This is normal.
for _ in range(1, 60):
processlist = get_processlist(args)
if not processlist:
break
sleep(1)
if processlist:
print(colored("\nFound hung queries in processlist:", args, "red", attrs=["bold"]))
print(json.dumps(processlist, indent=4))
2021-02-19 14:38:20 +00:00
print_stacktraces()
exit_code.value = 1
else:
print(colored("\nNo queries hung.", args, "green", attrs=["bold"]))
2021-06-16 10:26:04 +00:00
if len(restarted_tests) > 0:
2021-06-15 20:52:29 +00:00
print("\nSome tests were restarted:\n")
for test_result in restarted_tests:
print("\n{0:72}: ".format(test_result.case_name))
# replace it with lowercase to avoid parsing retried tests as failed
for status in TestStatus:
2021-09-22 13:17:51 +00:00
test_result.description = test_result.description.replace(status.value, status.value.lower())
print(test_result.description)
2021-06-15 20:52:29 +00:00
2019-10-09 10:51:05 +00:00
if total_tests_run == 0:
print("No tests were run.")
sys.exit(1)
2021-08-06 14:38:28 +00:00
else:
print("All tests have finished.")
2019-10-09 10:51:05 +00:00
sys.exit(exit_code.value)
2019-01-24 11:02:55 +00:00
def find_binary(name):
if os.path.exists(name) and os.access(name, os.X_OK):
return True
2019-01-24 11:02:55 +00:00
paths = os.environ.get("PATH").split(':')
for path in paths:
if os.access(os.path.join(path, name), os.X_OK):
return True
# maybe it wasn't in PATH
2019-06-17 16:50:31 +00:00
if os.access(os.path.join('/usr/local/bin', name), os.X_OK):
return True
if os.access(os.path.join('/usr/bin', name), os.X_OK):
return True
return False
def get_additional_client_options(args):
if args.client_option:
return ' '.join('--' + option for option in args.client_option)
return ''
def get_additional_client_options_url(args):
if args.client_option:
return '&'.join(args.client_option)
return ''
if __name__ == '__main__':
stop_time = None
exit_code = multiprocessing.Value("i", 0)
server_died = multiprocessing.Event()
stop_tests_triggered_lock = multiprocessing.Lock()
stop_tests_triggered = multiprocessing.Event()
queue = multiprocessing.Queue(maxsize=1)
multiprocessing_manager = multiprocessing.Manager()
restarted_tests = multiprocessing_manager.list()
# Move to a new process group and kill it at exit so that we don't have any
# infinite tests processes left
# (new process group is required to avoid killing some parent processes)
os.setpgid(0, 0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGHUP, signal_handler)
2021-08-06 14:38:28 +00:00
parser = ArgumentParser(description='ClickHouse functional tests')
parser.add_argument('-q', '--queries', help='Path to queries dir')
parser.add_argument('--tmp', help='Path to tmp dir')
2020-10-12 11:17:35 +00:00
parser.add_argument('-b', '--binary', default='clickhouse',
2021-01-15 21:33:53 +00:00
help='Path to clickhouse (if monolithic build, clickhouse-server otherwise) binary or name of binary in PATH')
2020-10-12 11:17:35 +00:00
parser.add_argument('-c', '--client',
2021-01-15 21:33:53 +00:00
help='Path to clickhouse-client (if split build, useless otherwise) binary of name of binary in PATH')
2020-10-12 11:17:35 +00:00
parser.add_argument('--extract_from_config', help='extract-from-config program')
parser.add_argument('--configclient', help='Client config (if you use not default ports)')
2021-08-06 14:38:28 +00:00
parser.add_argument('--configserver', default='/etc/clickhouse-server/config.xml', help='Preprocessed server config')
parser.add_argument('-o', '--output', help='Output xUnit compliant test report directory')
parser.add_argument('-t', '--timeout', type=int, default=600, help='Timeout for each test case in seconds')
2020-08-26 17:44:03 +00:00
parser.add_argument('--global_time_limit', type=int, help='Stop if executing more than specified time (after current test finished)')
parser.add_argument('test', nargs='*', help='Optional test case name regex')
parser.add_argument('-d', '--disabled', action='store_true', default=False, help='Also run disabled tests')
parser.add_argument('--stop', action='store_true', default=None, dest='stop', help='Stop on network errors')
2019-12-13 14:27:57 +00:00
parser.add_argument('--order', default='desc', choices=['asc', 'desc', 'random'], help='Run order')
parser.add_argument('--testname', action='store_true', default=None, dest='testname', help='Make query with test name before test run')
parser.add_argument('--hung-check', action='store_true', default=False)
2019-04-09 13:17:36 +00:00
parser.add_argument('--force-color', action='store_true', default=False)
parser.add_argument('--database', help='Database for tests (random name test_XXXXXX by default)')
2021-01-26 17:51:25 +00:00
parser.add_argument('--no-drop-if-fail', action='store_true', help='Do not drop database for test if test has failed')
parser.add_argument('--hide-db-name', action='store_true', help='Replace random database name with "default" in stderr')
parser.add_argument('--parallel', default='1/1', help='One parallel test run number/total')
2019-06-20 09:12:49 +00:00
parser.add_argument('-j', '--jobs', default=1, nargs='?', type=int, help='Run all tests in parallel')
2021-01-27 15:24:39 +00:00
parser.add_argument('--test-runs', default=1, nargs='?', type=int, help='Run each test many times (useful for e.g. flaky check)')
parser.add_argument('-U', '--unified', default=3, type=int, help='output NUM lines of unified context')
clickhouse-test: increase delay for initial SELECT 1 check CI report [1]: <details> 2021-10-24 04:43:13 Server is not responding. Cannot execute 'SELECT 1' query. If you are using split build, you have to specify -c option. 2021-10-24 04:43:13 Cannot get server pid with pidof -s clickhouse-server, got : Command 'pidof -s clickhouse-server' returned non-zero exit status 1. 2021-10-24 04:43:13 Thread 1 (Thread 0x7f401edd7340 (LWP 370)): 2021-10-24 04:43:13 0 0x0000000009f9a807 in __sanitizer::StackDepotBase<>::Put(__sanitizer::StackTrace, bool*) () 2021-10-24 04:43:13 1 0x0000000009f9a6f7 in __sanitizer::StackDepotPut(__sanitizer::StackTrace) () 2021-10-24 04:43:13 2 0x0000000009f2b0e7 in __msan::MsanDeallocate(__sanitizer::StackTrace*, void*) () 2021-10-24 04:43:13 3 0x0000000009fa77df in operator delete(void*, unsigned long) () 2021-10-24 04:43:13 4 0x000000004185ffc2 in std::__1::__libcpp_operator_delete<void*, unsigned long> () at ../contrib/libcxx/include/new:245 2021-10-24 04:43:13 5 std::__1::__do_deallocate_handle_size<>(void*, unsigned long) () at ../contrib/libcxx/include/new:271 2021-10-24 04:43:13 6 std::__1::__libcpp_deallocate () at ../contrib/libcxx/include/new:285 2021-10-24 04:43:13 7 std::__1::allocator<char>::deallocate () at ../contrib/libcxx/include/memory:849 2021-10-24 04:43:13 8 std::__1::allocator_traits<std::__1::allocator<char> >::deallocate () at ../contrib/libcxx/include/__memory/allocator_traits.h:476 2021-10-24 04:43:13 9 std::__1::basic_string<>::~basic_string (this=0x70400000efe8) at ../contrib/libcxx/include/string:2219 2021-10-24 04:43:13 10 Poco::XML::ContextLocator::~ContextLocator (this=0x70400000efc0) at ../contrib/poco/XML/src/ParserEngine.cpp:53 2021-10-24 04:43:13 11 0x000000004186010d in Poco::XML::ContextLocator::~ContextLocator (this=0x70400000efc0) at ../contrib/poco/XML/src/ParserEngine.cpp:52 2021-10-24 04:43:13 12 0x000000004184af75 in Poco::XML::ParserEngine::popContext (this=<optimized out>) at ../contrib/poco/XML/src/ParserEngine.cpp:599 2021-10-24 04:43:13 13 Poco::XML::ParserEngine::parse (this=<optimized out>, pInputSource=0x706000003720) at ../contrib/poco/XML/src/ParserEngine.cpp:234 2021-10-24 04:43:13 14 0x000000004184880e in Poco::XML::SAXParser::parse (this=<optimized out>, systemId=...) at ../contrib/poco/XML/src/SAXParser.cpp:201 2021-10-24 04:43:13 15 0x00000000417f3eeb in Poco::XML::DOMBuilder::parse (this=<optimized out>, uri=...) at ../contrib/poco/XML/src/DOMBuilder.cpp:69 2021-10-24 04:43:13 16 0x00000000417f2946 in Poco::XML::DOMParser::parse (this=<optimized out>, uri=...) at ../contrib/poco/XML/src/DOMParser.cpp:102 2021-10-24 04:43:13 17 0x000000003741dd6c in DB::ConfigProcessor::processConfig (this=this@entry=0x7fff89847130, has_zk_includes=has_zk_includes@entry=0x7fff898470f7, zk_node_cache=zk_node_cache@entry=0x0, zk_changed_event=...) at ../src/Common/Config/ConfigProcessor.cpp:570 2021-10-24 04:43:13 18 0x0000000037427bb7 in DB::ConfigProcessor::loadConfig (this=<optimized out>, allow_zk_includes=true) at ../src/Common/Config/ConfigProcessor.cpp:657 2021-10-24 04:43:13 19 0x000000003743fb76 in DB::ConfigReloader::reloadIfNewer (this=this@entry=0x712000002700, force=true, throw_on_error=true, fallback_to_preprocessed=true, initial_loading=true) at ../src/Common/Config/ConfigReloader.cpp:96 2021-10-24 04:43:13 20 0x000000003743d85e in DB::ConfigReloader::ConfigReloader() () at ../src/Common/Config/ConfigReloader.cpp:33 2021-10-24 04:43:13 21 0x0000000009fd3efb in std::__1::make_unique<>() (__args=<optimized out>, __args=<optimized out>, __args=<optimized out>, __args=<optimized out>, __args=<optimized out>, __args=<optimized out>, __args=<optimized out>) at ../contrib/libcxx/include/memory:2068 2021-10-24 04:43:13 22 DB::Server::main (this=<optimized out>) at ../programs/server/Server.cpp:803 2021-10-24 04:43:13 23 0x000000004175c5f9 in Poco::Util::Application::run (this=<optimized out>) at ../contrib/poco/Util/src/Application.cpp:334 2021-10-24 04:43:13 24 0x0000000009fb5d7f in DB::Server::run (this=<optimized out>) at ../programs/server/Server.cpp:405 </details> [1]: https://clickhouse-test-reports.s3.yandex.net/30611/5ff6c5536558821824d5fdf25a75729e5b82060d/functional_stateless_tests_(memory).html#fail1
2021-10-24 16:02:07 +00:00
parser.add_argument('-r', '--server-check-retries', default=180, type=int, help='Num of tries to execute SELECT 1 before tests started')
2020-09-21 10:24:10 +00:00
parser.add_argument('--db-engine', help='Database engine name')
2021-02-15 10:26:34 +00:00
parser.add_argument('--replicated-database', action='store_true', default=False, help='Run tests with Replicated database engine')
parser.add_argument('--fast-tests-only', action='store_true', default=False, help='Run only fast tests (the tests without the "no-fasttest" tag)')
parser.add_argument('--no-stateless', action='store_true', help='Disable all stateless tests')
parser.add_argument('--no-stateful', action='store_true', help='Disable all stateful tests')
parser.add_argument('--skip', nargs='+', help="Skip these tests")
parser.add_argument('--sequential', nargs='+', help="Run these tests sequentially even if --parallel specified")
2021-08-06 14:38:28 +00:00
parser.add_argument('--no-long', action='store_true', dest='no_long', help='Do not run long tests')
parser.add_argument('--client-option', nargs='+', help='Specify additional client argument')
2019-12-03 09:59:41 +00:00
parser.add_argument('--print-time', action='store_true', dest='print_time', help='Print test time')
parser.add_argument('--check-zookeeper-session', action='store_true', help='Check ZooKeeper session uptime to determine if failed test should be retried')
parser.add_argument('--s3-storage', action='store_true', default=False, help='Run tests over s3 storage')
parser.add_argument('--run-by-hash-num', type=int, help='Run tests matching crc32(test_name) % run_by_hash_total == run_by_hash_num')
parser.add_argument('--run-by-hash-total', type=int, help='Total test groups for crc32(test_name) % run_by_hash_total == run_by_hash_num')
2021-08-06 14:38:28 +00:00
group = parser.add_mutually_exclusive_group(required=False)
group.add_argument('--zookeeper', action='store_true', default=None, dest='zookeeper', help='Run zookeeper related tests')
group.add_argument('--no-zookeeper', action='store_false', default=None, dest='zookeeper', help='Do not run zookeeper related tests')
2021-08-06 14:38:28 +00:00
group = parser.add_mutually_exclusive_group(required=False)
group.add_argument('--shard', action='store_true', default=None, dest='shard', help='Run sharding related tests (required to clickhouse-server listen 127.0.0.2 127.0.0.3)')
group.add_argument('--no-shard', action='store_false', default=None, dest='shard', help='Do not run shard related tests')
args = parser.parse_args()
2020-08-13 18:45:55 +00:00
if args.queries and not os.path.isdir(args.queries):
print(f"Cannot access the specified directory with queries ({args.queries})", file=sys.stderr)
sys.exit(1)
2020-08-13 18:45:55 +00:00
# Autodetect the directory with queries if not specified
if args.queries is None:
args.queries = 'queries'
2020-08-13 18:45:55 +00:00
if not os.path.isdir(args.queries):
# If we're running from the repo
2021-08-06 14:38:28 +00:00
args.queries = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'queries')
2020-08-13 18:45:55 +00:00
if not os.path.isdir(args.queries):
# Next we're going to try some system directories, don't write 'stdout' files into them.
if args.tmp is None:
args.tmp = '/tmp/clickhouse-test'
2020-08-13 18:45:55 +00:00
args.queries = '/usr/local/share/clickhouse-test/queries'
if not os.path.isdir(args.queries):
args.queries = '/usr/share/clickhouse-test/queries'
if not os.path.isdir(args.queries):
print("Failed to detect path to the queries directory. Please specify it with '--queries' option.", file=sys.stderr)
sys.exit(1)
2020-07-03 10:57:16 +00:00
2020-08-13 18:45:55 +00:00
print("Using queries from '" + args.queries + "' directory")
2018-01-16 20:17:31 +00:00
if args.tmp is None:
args.tmp = args.queries
if args.client is None:
2019-01-24 11:02:55 +00:00
if find_binary(args.binary + '-client'):
2018-11-07 11:00:46 +00:00
args.client = args.binary + '-client'
2020-10-12 11:17:35 +00:00
2021-01-15 21:33:53 +00:00
print("Using " + args.client + " as client program (expecting split build)")
2019-01-24 11:02:55 +00:00
elif find_binary(args.binary):
2018-11-07 11:00:46 +00:00
args.client = args.binary + ' client'
2020-10-12 11:17:35 +00:00
2021-01-15 21:33:53 +00:00
print("Using " + args.client + " as client program (expecting monolithic build)")
2019-01-23 14:05:11 +00:00
else:
2020-10-12 11:17:35 +00:00
print("No 'clickhouse' or 'clickhouse-client' client binary found", file=sys.stderr)
2019-01-24 11:02:55 +00:00
parser.print_help()
sys.exit(1)
2019-01-24 11:02:55 +00:00
if args.configclient:
args.client += ' --config-file=' + args.configclient
tcp_host = os.getenv("CLICKHOUSE_HOST")
if tcp_host is not None:
args.tcp_host = tcp_host
args.client += f' --host={tcp_host}'
else:
args.tcp_host = 'localhost'
tcp_port = os.getenv("CLICKHOUSE_PORT_TCP")
if tcp_port is not None:
args.tcp_port = int(tcp_port)
args.client += f" --port={tcp_port}"
else:
args.tcp_port = 9000
http_port = os.getenv("CLICKHOUSE_PORT_HTTP")
if http_port is not None:
args.http_port = int(http_port)
else:
args.http_port = 8123
client_database = os.getenv("CLICKHOUSE_DATABASE")
if client_database is not None:
args.client += f' --database={client_database}'
args.client_database = client_database
else:
args.client_database = 'default'
if args.client_option:
# Set options for client
if 'CLICKHOUSE_CLIENT_OPT' in os.environ:
os.environ['CLICKHOUSE_CLIENT_OPT'] += ' '
else:
os.environ['CLICKHOUSE_CLIENT_OPT'] = ''
os.environ['CLICKHOUSE_CLIENT_OPT'] += get_additional_client_options(args)
# Set options for curl
if 'CLICKHOUSE_URL_PARAMS' in os.environ:
os.environ['CLICKHOUSE_URL_PARAMS'] += '&'
else:
os.environ['CLICKHOUSE_URL_PARAMS'] = ''
client_options_query_str = get_additional_client_options_url(args)
args.client_options_query_str = client_options_query_str + '&'
os.environ['CLICKHOUSE_URL_PARAMS'] += client_options_query_str
else:
args.client_options_query_str = ''
2018-11-07 11:00:46 +00:00
if args.extract_from_config is None:
if os.access(args.binary + '-extract-from-config', os.X_OK):
args.extract_from_config = args.binary + '-extract-from-config'
else:
args.extract_from_config = args.binary + ' extract-from-config'
2019-06-20 09:12:49 +00:00
if args.jobs is None:
args.jobs = multiprocessing.cpu_count()
2019-06-20 09:12:49 +00:00
main(args)