mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
5a55913f45
Try to run stateful tests in parallel
1634 lines
64 KiB
Python
Executable File
1634 lines
64 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
# pylint: disable=too-many-return-statements
|
|
# pylint: disable=global-variable-not-assigned
|
|
|
|
import enum
|
|
import shutil
|
|
import sys
|
|
import os
|
|
import os.path
|
|
import signal
|
|
import re
|
|
import copy
|
|
import traceback
|
|
import math
|
|
# Not requests, to avoid requiring extra dependency.
|
|
import http.client
|
|
import urllib.parse
|
|
import json
|
|
# for crc32
|
|
import zlib
|
|
|
|
from argparse import ArgumentParser
|
|
from typing import Tuple, Union, Optional, Dict, Set, List
|
|
import subprocess
|
|
from subprocess import Popen
|
|
from subprocess import PIPE
|
|
from datetime import datetime
|
|
from time import time, sleep
|
|
from errno import ESRCH
|
|
|
|
try:
|
|
import termcolor
|
|
except ImportError:
|
|
termcolor = None
|
|
|
|
import random
|
|
import string
|
|
import multiprocessing
|
|
import socket
|
|
from contextlib import closing
|
|
|
|
USE_JINJA = True
|
|
try:
|
|
import jinja2
|
|
except ImportError:
|
|
USE_JINJA = False
|
|
print('WARNING: jinja2 not installed! Template tests will be skipped.')
|
|
|
|
MESSAGES_TO_RETRY = [
|
|
"ConnectionPoolWithFailover: Connection failed at try",
|
|
"DB::Exception: New table appeared in database being dropped or detached. Try again",
|
|
"is already started to be removing by another replica right now",
|
|
"DB::Exception: Cannot enqueue query",
|
|
"environment: line 1: wait_for: No record of process", # Something weird from bash internals, let's just retry
|
|
"is executing longer than distributed_ddl_task_timeout" # FIXME
|
|
]
|
|
|
|
MAX_RETRIES = 3
|
|
|
|
TEST_FILE_EXTENSIONS = ['.sql', '.sql.j2', '.sh', '.py', '.expect']
|
|
|
|
VERSION_PATTERN = r"^((\d+\.)?(\d+\.)?(\d+\.)?\d+)$"
|
|
|
|
|
|
def stringhash(s):
|
|
# default hash() function consistent
|
|
# only during process invocation https://stackoverflow.com/a/42089311
|
|
return zlib.crc32(s.encode('utf-8'))
|
|
|
|
|
|
class HTTPError(Exception):
|
|
def __init__(self, message=None, code=None):
|
|
self.message = message
|
|
self.code = code
|
|
super().__init__(message)
|
|
|
|
def __str__(self):
|
|
return 'Code: {}. {}'.format(self.code, self.message)
|
|
|
|
# Helpers to execute queries via HTTP interface.
|
|
def clickhouse_execute_http(base_args, query, timeout=30, settings=None, default_format=None):
|
|
client = http.client.HTTPConnection(
|
|
host=base_args.tcp_host,
|
|
port=base_args.http_port,
|
|
timeout=timeout)
|
|
|
|
timeout = int(timeout)
|
|
params = {
|
|
'query': query,
|
|
|
|
# hung check in stress tests may remove the database,
|
|
# hence we should use 'system'.
|
|
'database': 'system',
|
|
|
|
'connect_timeout': timeout,
|
|
'receive_timeout': timeout,
|
|
'send_timeout': timeout,
|
|
|
|
'http_connection_timeout': timeout,
|
|
'http_receive_timeout': timeout,
|
|
'http_send_timeout': timeout,
|
|
}
|
|
if settings is not None:
|
|
params.update(settings)
|
|
if default_format is not None:
|
|
params['default_format'] = default_format
|
|
|
|
client.request('POST', '/?' + base_args.client_options_query_str + urllib.parse.urlencode(params))
|
|
res = client.getresponse()
|
|
data = res.read()
|
|
if res.status != 200:
|
|
raise HTTPError(data.decode(), res.status)
|
|
|
|
return data
|
|
|
|
def clickhouse_execute(base_args, query, timeout=30, settings=None):
|
|
return clickhouse_execute_http(base_args, query, timeout, settings).strip()
|
|
|
|
def clickhouse_execute_json(base_args, query, timeout=60, settings=None):
|
|
data = clickhouse_execute_http(base_args, query, timeout, settings, 'JSONEachRow')
|
|
if not data:
|
|
return None
|
|
rows = []
|
|
for row in data.strip().splitlines():
|
|
rows.append(json.loads(row))
|
|
return rows
|
|
|
|
|
|
class Terminated(KeyboardInterrupt):
|
|
pass
|
|
|
|
def signal_handler(sig, frame):
|
|
raise Terminated(f'Terminated with {sig} signal')
|
|
|
|
def stop_tests():
|
|
global stop_tests_triggered_lock
|
|
global stop_tests_triggered
|
|
global restarted_tests
|
|
|
|
with stop_tests_triggered_lock:
|
|
print("Stopping tests")
|
|
if not stop_tests_triggered.is_set():
|
|
stop_tests_triggered.set()
|
|
|
|
# materialize multiprocessing.Manager().list() object before
|
|
# sending SIGTERM since this object is a proxy, that requires
|
|
# communicating with manager thread, but after SIGTERM will be
|
|
# send, this thread will die, and you will get
|
|
# ConnectionRefusedError error for any access to "restarted_tests"
|
|
# variable.
|
|
restarted_tests = [*restarted_tests]
|
|
|
|
# send signal to all processes in group to avoid hung check triggering
|
|
# (to avoid terminating clickhouse-test itself, the signal should be ignored)
|
|
signal.signal(signal.SIGTERM, signal.SIG_IGN)
|
|
os.killpg(os.getpgid(os.getpid()), signal.SIGTERM)
|
|
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
|
|
|
|
|
def get_db_engine(args, database_name):
|
|
if args.replicated_database:
|
|
return f" ON CLUSTER test_cluster_database_replicated \
|
|
ENGINE=Replicated('/test/clickhouse/db/{database_name}', \
|
|
'{{shard}}', '{{replica}}')"
|
|
if args.db_engine:
|
|
return " ENGINE=" + args.db_engine
|
|
return "" # Will use default engine
|
|
|
|
|
|
def get_zookeeper_session_uptime(args):
|
|
try:
|
|
if args.replicated_database:
|
|
return int(clickhouse_execute(args, """
|
|
SELECT min(materialize(zookeeperSessionUptime()))
|
|
FROM clusterAllReplicas('test_cluster_database_replicated', system.one)
|
|
"""))
|
|
else:
|
|
return int(clickhouse_execute(args, 'SELECT zookeeperSessionUptime()'))
|
|
except:
|
|
return None
|
|
|
|
|
|
def need_retry(args, stdout, stderr, total_time):
|
|
if args.check_zookeeper_session:
|
|
# Sometimes we may get unexpected exception like "Replica is readonly" or "Shutdown is called for table"
|
|
# instead of "Session expired" or "Connection loss"
|
|
# Retry if session was expired during test execution.
|
|
# If ZooKeeper is configured, then it's more reliable than checking stderr,
|
|
# but the following condition is always true if ZooKeeper is not configured.
|
|
session_uptime = get_zookeeper_session_uptime(args)
|
|
if session_uptime is not None and session_uptime < math.ceil(total_time):
|
|
return True
|
|
return any(msg in stdout for msg in MESSAGES_TO_RETRY) or any(msg in stderr for msg in MESSAGES_TO_RETRY)
|
|
|
|
|
|
def get_processlist(args):
|
|
if args.replicated_database:
|
|
return clickhouse_execute_json(args, """
|
|
SELECT materialize((hostName(), tcpPort())) as host, *
|
|
FROM clusterAllReplicas('test_cluster_database_replicated', system.processes)
|
|
WHERE query NOT LIKE '%system.processes%'
|
|
""")
|
|
else:
|
|
return clickhouse_execute_json(args, 'SHOW PROCESSLIST')
|
|
|
|
|
|
def get_processlist_after_test(args):
|
|
log_comment = args.testcase_basename
|
|
database = args.testcase_database
|
|
if args.replicated_database:
|
|
return clickhouse_execute_json(args, f"""
|
|
SELECT materialize((hostName(), tcpPort())) as host, *
|
|
FROM clusterAllReplicas('test_cluster_database_replicated', system.processes)
|
|
WHERE
|
|
query NOT LIKE '%system.processes%' AND
|
|
Settings['log_comment'] = '{log_comment}' AND
|
|
current_database = '{database}'
|
|
""")
|
|
else:
|
|
return clickhouse_execute_json(args, f"""
|
|
SELECT *
|
|
FROM system.processes
|
|
WHERE
|
|
query NOT LIKE '%system.processes%' AND
|
|
Settings['log_comment'] = '{log_comment}' AND
|
|
current_database = '{database}'
|
|
""")
|
|
|
|
|
|
# collect server stacktraces using gdb
|
|
def get_stacktraces_from_gdb(server_pid):
|
|
try:
|
|
cmd = f"gdb -batch -ex 'thread apply all backtrace' -p {server_pid}"
|
|
return subprocess.check_output(cmd, shell=True).decode('utf-8')
|
|
except Exception as e:
|
|
print(f"Error occurred while receiving stack traces from gdb: {e}")
|
|
return None
|
|
|
|
|
|
# collect server stacktraces from system.stack_trace table
|
|
# it does not work in Sandbox
|
|
def get_stacktraces_from_clickhouse(args):
|
|
settings_str = ' '.join([
|
|
get_additional_client_options(args),
|
|
'--allow_introspection_functions=1',
|
|
'--skip_unavailable_shards=1',
|
|
])
|
|
replicated_msg = \
|
|
"{} {} --query \
|
|
\"SELECT materialize((hostName(), tcpPort())) as host, thread_id, \
|
|
arrayStringConcat(arrayMap(x, y -> concat(x, ': ', y), \
|
|
arrayMap(x -> addressToLine(x), trace), \
|
|
arrayMap(x -> demangle(addressToSymbol(x)), trace)), '\n') as trace \
|
|
FROM clusterAllReplicas('test_cluster_database_replicated', 'system.stack_trace') \
|
|
ORDER BY host, thread_id FORMAT Vertical\"".format(args.client, settings_str)
|
|
|
|
msg = \
|
|
"{} {} --query \
|
|
\"SELECT arrayStringConcat(arrayMap(x, y -> concat(x, ': ', y), \
|
|
arrayMap(x -> addressToLine(x), trace), \
|
|
arrayMap(x -> demangle(addressToSymbol(x)), trace)), '\n') as trace \
|
|
FROM system.stack_trace FORMAT Vertical\"".format(args.client, settings_str)
|
|
|
|
try:
|
|
return subprocess.check_output(
|
|
replicated_msg if args.replicated_database else msg,
|
|
shell=True, stderr=subprocess.STDOUT).decode('utf-8')
|
|
except Exception as e:
|
|
print(f"Error occurred while receiving stack traces from client: {e}")
|
|
return None
|
|
|
|
|
|
def print_stacktraces() -> None:
|
|
server_pid = get_server_pid()
|
|
|
|
bt = None
|
|
|
|
if server_pid and not args.replicated_database:
|
|
print("")
|
|
print(f"Located ClickHouse server process {server_pid} listening at TCP port {args.tcp_port}")
|
|
print("Collecting stacktraces from all running threads with gdb:")
|
|
|
|
bt = get_stacktraces_from_gdb(server_pid)
|
|
|
|
if len(bt) < 1000:
|
|
print("Got suspiciously small stacktraces: ", bt)
|
|
bt = None
|
|
|
|
if bt is None:
|
|
print("\nCollecting stacktraces from system.stacktraces table:")
|
|
|
|
bt = get_stacktraces_from_clickhouse(args)
|
|
|
|
if bt is not None:
|
|
print(bt)
|
|
return
|
|
|
|
print(colored(
|
|
f"\nUnable to locate ClickHouse server process listening at TCP port {args.tcp_port}. "
|
|
"It must have crashed or exited prematurely!",
|
|
args, "red", attrs=["bold"]))
|
|
|
|
|
|
def get_server_pid():
|
|
# lsof does not work in stress tests for some reason
|
|
cmd_lsof = f"lsof -i tcp:{args.tcp_port} -s tcp:LISTEN -Fp | awk '/^p[0-9]+$/{{print substr($0, 2)}}'"
|
|
cmd_pidof = "pidof -s clickhouse-server"
|
|
|
|
commands = [cmd_lsof, cmd_pidof]
|
|
output = None
|
|
|
|
for cmd in commands:
|
|
try:
|
|
output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT, universal_newlines=True)
|
|
if output:
|
|
return int(output)
|
|
except Exception as e:
|
|
print(f"Cannot get server pid with {cmd}, got {output}: {e}")
|
|
|
|
return None # most likely server is dead
|
|
|
|
|
|
def colored(text, args, color=None, on_color=None, attrs=None):
|
|
if termcolor and (sys.stdout.isatty() or args.force_color):
|
|
return termcolor.colored(text, color, on_color, attrs)
|
|
else:
|
|
return text
|
|
|
|
|
|
class TestStatus(enum.Enum):
|
|
FAIL = "FAIL"
|
|
UNKNOWN = "UNKNOWN"
|
|
OK = "OK"
|
|
SKIPPED = "SKIPPED"
|
|
|
|
|
|
class FailureReason(enum.Enum):
|
|
# FAIL reasons
|
|
TIMEOUT = "Timeout!"
|
|
SERVER_DIED = "server died"
|
|
EXIT_CODE = "return code: "
|
|
STDERR = "having stderror: "
|
|
EXCEPTION = "having having exception in stdout: "
|
|
RESULT_DIFF = "result differs with reference: "
|
|
TOO_LONG = "Test runs too long (> 60s). Make it faster."
|
|
INTERNAL_QUERY_FAIL = "Internal query (CREATE/DROP DATABASE) failed:"
|
|
|
|
# SKIPPED reasons
|
|
DISABLED = "disabled"
|
|
SKIP = "skip"
|
|
NO_JINJA = "no jinja"
|
|
NO_ZOOKEEPER = "no zookeeper"
|
|
NO_SHARD = "no shard"
|
|
FAST_ONLY = "running fast tests only"
|
|
NO_LONG = "not running long tests"
|
|
REPLICATED_DB = "replicated-database"
|
|
S3_STORAGE = "s3-storage"
|
|
BUILD = "not running for current build"
|
|
BACKWARD_INCOMPATIBLE = "test is backward incompatible"
|
|
|
|
# UNKNOWN reasons
|
|
NO_REFERENCE = "no reference file"
|
|
INTERNAL_ERROR = "Test internal error: "
|
|
|
|
|
|
class SettingsRandomizer:
|
|
settings = {
|
|
"max_insert_threads": lambda: 0 if random.random() < 0.5 else random.randint(1, 16),
|
|
"group_by_two_level_threshold": lambda: 1 if random.random() < 0.1 else 2 ** 60 if random.random() < 0.11 else 100000,
|
|
"group_by_two_level_threshold_bytes": lambda: 1 if random.random() < 0.1 else 2 ** 60 if random.random() < 0.11 else 50000000,
|
|
"distributed_aggregation_memory_efficient": lambda: random.randint(0, 1),
|
|
"fsync_metadata": lambda: random.randint(0, 1),
|
|
"priority": lambda: int(abs(random.gauss(0, 2))),
|
|
"output_format_parallel_formatting": lambda: random.randint(0, 1),
|
|
"input_format_parallel_parsing": lambda: random.randint(0, 1),
|
|
"min_chunk_bytes_for_parallel_parsing": lambda: max(1024, int(random.gauss(10 * 1024 * 1024, 5 * 1000 * 1000))),
|
|
"max_read_buffer_size": lambda: random.randint(500000, 1048576),
|
|
"prefer_localhost_replica": lambda: random.randint(0, 1),
|
|
"max_block_size": lambda: random.randint(8000, 100000),
|
|
"max_threads": lambda: random.randint(1, 64),
|
|
}
|
|
|
|
@staticmethod
|
|
def get_random_settings():
|
|
random_settings = []
|
|
for setting, generator in SettingsRandomizer.settings.items():
|
|
random_settings.append(setting + "=" + str(generator()) + "")
|
|
return random_settings
|
|
|
|
|
|
class TestResult:
|
|
def __init__(self, case_name: str, status: TestStatus, reason: Optional[FailureReason], total_time: float, description: str):
|
|
self.case_name: str = case_name
|
|
self.status: TestStatus = status
|
|
self.reason: Optional[FailureReason] = reason
|
|
self.total_time: float = total_time
|
|
self.description: str = description
|
|
self.need_retry: bool = False
|
|
|
|
def check_if_need_retry(self, args, stdout, stderr, runs_count):
|
|
if self.status != TestStatus.FAIL:
|
|
return
|
|
if not need_retry(args, stdout, stderr, self.total_time):
|
|
return
|
|
if MAX_RETRIES < runs_count:
|
|
return
|
|
self.need_retry = True
|
|
|
|
|
|
class TestCase:
|
|
@staticmethod
|
|
def get_description_from_exception_info(exc_info):
|
|
exc_type, exc_value, tb = exc_info
|
|
exc_name = exc_type.__name__
|
|
traceback_str = "\n".join(traceback.format_tb(tb, 10))
|
|
description = f"\n{exc_name}\n{exc_value}\n{traceback_str}"
|
|
return description
|
|
|
|
@staticmethod
|
|
def get_reference_file(suite_dir, name):
|
|
"""
|
|
Returns reference file name for specified test
|
|
"""
|
|
|
|
name = removesuffix(name, ".gen")
|
|
for ext in ['.reference', '.gen.reference']:
|
|
reference_file = os.path.join(suite_dir, name) + ext
|
|
if os.path.isfile(reference_file):
|
|
return reference_file
|
|
return None
|
|
|
|
@staticmethod
|
|
def configure_testcase_args(args, case_file, suite_tmp_dir):
|
|
testcase_args = copy.deepcopy(args)
|
|
|
|
testcase_args.testcase_start_time = datetime.now()
|
|
testcase_basename = os.path.basename(case_file)
|
|
testcase_args.testcase_client = f"{testcase_args.client} --log_comment '{testcase_basename}'"
|
|
testcase_args.testcase_basename = testcase_basename
|
|
|
|
if testcase_args.database:
|
|
database = testcase_args.database
|
|
os.environ.setdefault("CLICKHOUSE_DATABASE", database)
|
|
os.environ.setdefault("CLICKHOUSE_TMP", suite_tmp_dir)
|
|
else:
|
|
# If --database is not specified, we will create temporary database with unique name
|
|
# And we will recreate and drop it for each test
|
|
def random_str(length=6):
|
|
alphabet = string.ascii_lowercase + string.digits
|
|
# NOTE: it is important not to use default random generator, since it shares state.
|
|
return ''.join(random.SystemRandom().choice(alphabet) for _ in range(length))
|
|
|
|
database = 'test_{suffix}'.format(suffix=random_str())
|
|
|
|
clickhouse_execute(args, "CREATE DATABASE " + database + get_db_engine(testcase_args, database), settings={
|
|
'log_comment': testcase_args.testcase_basename,
|
|
})
|
|
|
|
os.environ["CLICKHOUSE_DATABASE"] = database
|
|
# Set temporary directory to match the randomly generated database,
|
|
# because .sh tests also use it for temporary files and we want to avoid
|
|
# collisions.
|
|
testcase_args.test_tmp_dir = os.path.join(suite_tmp_dir, database)
|
|
os.mkdir(testcase_args.test_tmp_dir)
|
|
os.environ.setdefault("CLICKHOUSE_TMP", testcase_args.test_tmp_dir)
|
|
|
|
testcase_args.testcase_database = database
|
|
|
|
return testcase_args
|
|
|
|
def add_random_settings(self, args, client_options):
|
|
if self.tags and 'no-random-settings' in self.tags:
|
|
return client_options
|
|
if args.no_random_settings:
|
|
return client_options
|
|
|
|
if len(self.base_url_params) == 0:
|
|
os.environ['CLICKHOUSE_URL_PARAMS'] = '&'.join(self.random_settings)
|
|
else:
|
|
os.environ['CLICKHOUSE_URL_PARAMS'] = self.base_url_params + '&' + '&'.join(self.random_settings)
|
|
|
|
new_options = " --allow_repeated_settings --" + " --".join(self.random_settings)
|
|
os.environ['CLICKHOUSE_CLIENT_OPT'] = self.base_client_options + new_options + ' '
|
|
return client_options + new_options
|
|
|
|
def remove_random_settings_from_env(self):
|
|
os.environ['CLICKHOUSE_URL_PARAMS'] = self.base_url_params
|
|
os.environ['CLICKHOUSE_CLIENT_OPT'] = self.base_client_options
|
|
|
|
def add_info_about_settings(self, args, description):
|
|
if self.tags and 'no-random-settings' in self.tags:
|
|
return description
|
|
if args.no_random_settings:
|
|
return description
|
|
|
|
return description + "\n" + "Settings used in the test: " + "--" + " --".join(self.random_settings) + "\n"
|
|
|
|
def __init__(self, suite, case: str, args, is_concurrent: bool):
|
|
self.case: str = case # case file name
|
|
self.tags: Set[str] = suite.all_tags[case] if case in suite.all_tags else set()
|
|
|
|
for tag in os.getenv("GLOBAL_TAGS", "").split(","):
|
|
self.tags.add(tag.strip())
|
|
|
|
self.case_file: str = os.path.join(suite.suite_path, case)
|
|
(self.name, self.ext) = os.path.splitext(case)
|
|
|
|
file_suffix = ('.' + str(os.getpid())) if is_concurrent and args.test_runs > 1 else ''
|
|
self.reference_file = self.get_reference_file(suite.suite_path, self.name)
|
|
self.stdout_file = os.path.join(suite.suite_tmp_path, self.name) + file_suffix + '.stdout'
|
|
self.stderr_file = os.path.join(suite.suite_tmp_path, self.name) + file_suffix + '.stderr'
|
|
|
|
self.testcase_args = None
|
|
self.runs_count = 0
|
|
|
|
self.random_settings = SettingsRandomizer.get_random_settings()
|
|
self.base_url_params = os.environ['CLICKHOUSE_URL_PARAMS'] if 'CLICKHOUSE_URL_PARAMS' in os.environ else ''
|
|
self.base_client_options = os.environ['CLICKHOUSE_CLIENT_OPT'] if 'CLICKHOUSE_CLIENT_OPT' in os.environ else ''
|
|
|
|
# Check if test contains tag "no-backward-compatibility-check" and we should skip it
|
|
def check_backward_incompatible_tag(self) -> bool:
|
|
for tag in self.tags:
|
|
if tag.startswith("no-backward-compatibility-check"):
|
|
split = tag.split(':')
|
|
|
|
# If version is not specified in tag, always skip this test.
|
|
if len(split) == 1:
|
|
return True
|
|
version_from_tag = split[1]
|
|
|
|
# Check if extracted string from tag is a real ClickHouse version, if not - always skip test.
|
|
if re.match(VERSION_PATTERN, version_from_tag) is None:
|
|
return True
|
|
|
|
server_version = str(clickhouse_execute(args, "SELECT version()").decode())
|
|
# If server version is less or equal from the version specified in tag, we should skip this test.
|
|
if list(map(int, server_version.split('.'))) <= list(map(int, version_from_tag.split('.'))):
|
|
return True
|
|
|
|
return False
|
|
|
|
# should skip test, should increment skipped_total, skip reason
|
|
def should_skip_test(self, suite) -> Optional[FailureReason]:
|
|
tags = self.tags
|
|
|
|
if tags and ('disabled' in tags) and not args.disabled:
|
|
return FailureReason.DISABLED
|
|
|
|
elif os.path.exists(os.path.join(suite.suite_path, self.name) + '.disabled') and not args.disabled:
|
|
return FailureReason.DISABLED
|
|
|
|
elif args.skip and any(s in self.name for s in args.skip):
|
|
return FailureReason.SKIP
|
|
|
|
elif not USE_JINJA and self.ext.endswith("j2"):
|
|
return FailureReason.NO_JINJA
|
|
|
|
elif tags and (('zookeeper' in tags) or ('replica' in tags)) and not args.zookeeper:
|
|
return FailureReason.NO_ZOOKEEPER
|
|
|
|
elif tags and (('shard' in tags) or ('distributed' in tags) or ('global' in tags)) and not args.shard:
|
|
return FailureReason.NO_SHARD
|
|
|
|
elif tags and ('no-fasttest' in tags) and args.fast_tests_only:
|
|
return FailureReason.FAST_ONLY
|
|
|
|
elif tags and (('long' in tags) or ('deadlock' in tags) or ('race' in tags)) and args.no_long:
|
|
# Tests for races and deadlocks usually are run in a loop for a significant amount of time
|
|
return FailureReason.NO_LONG
|
|
|
|
elif tags and ('no-replicated-database' in tags) and args.replicated_database:
|
|
return FailureReason.REPLICATED_DB
|
|
|
|
elif args.backward_compatibility_check and self.check_backward_incompatible_tag():
|
|
return FailureReason.BACKWARD_INCOMPATIBLE
|
|
|
|
elif tags and ('no-s3-storage' in tags) and args.s3_storage:
|
|
return FailureReason.S3_STORAGE
|
|
|
|
elif tags:
|
|
for build_flag in args.build_flags:
|
|
if 'no-' + build_flag in tags:
|
|
return FailureReason.BUILD
|
|
for tag in tags:
|
|
tag = tag.replace('-', '_')
|
|
if tag.startswith('use_') and tag not in args.build_flags:
|
|
return FailureReason.BUILD
|
|
|
|
return None
|
|
|
|
def process_result_impl(self, proc, stdout: str, stderr: str, total_time: float):
|
|
description = ""
|
|
|
|
if proc:
|
|
if proc.returncode is None:
|
|
try:
|
|
proc.kill()
|
|
except OSError as e:
|
|
if e.errno != ESRCH:
|
|
raise
|
|
|
|
if stderr:
|
|
description += stderr
|
|
return TestResult(self.name, TestStatus.FAIL, FailureReason.TIMEOUT, total_time, description)
|
|
|
|
if proc.returncode != 0:
|
|
reason = FailureReason.EXIT_CODE
|
|
description += str(proc.returncode)
|
|
|
|
if stderr:
|
|
description += "\n"
|
|
description += stderr
|
|
|
|
# Stop on fatal errors like segmentation fault. They are sent to client via logs.
|
|
if ' <Fatal> ' in stderr:
|
|
reason = FailureReason.SERVER_DIED
|
|
|
|
if self.testcase_args.stop \
|
|
and ('Connection refused' in stderr or 'Attempt to read after eof' in stderr) \
|
|
and 'Received exception from server' not in stderr:
|
|
reason = FailureReason.SERVER_DIED
|
|
|
|
if os.path.isfile(self.stdout_file):
|
|
description += ", result:\n\n"
|
|
description += '\n'.join(open(self.stdout_file).read().splitlines()[:100])
|
|
description += '\n'
|
|
|
|
description += "\nstdout:\n{}\n".format(stdout)
|
|
return TestResult(self.name, TestStatus.FAIL, reason, total_time, description)
|
|
|
|
if stderr:
|
|
description += "\n{}\n".format('\n'.join(stderr.splitlines()[:100]))
|
|
description += "\nstdout:\n{}\n".format(stdout)
|
|
return TestResult(self.name, TestStatus.FAIL, FailureReason.STDERR, total_time, description)
|
|
|
|
if 'Exception' in stdout:
|
|
description += "\n{}\n".format('\n'.join(stdout.splitlines()[:100]))
|
|
return TestResult(self.name, TestStatus.FAIL, FailureReason.EXCEPTION, total_time, description)
|
|
|
|
if '@@SKIP@@' in stdout:
|
|
skip_reason = stdout.replace('@@SKIP@@', '').rstrip("\n")
|
|
description += " - "
|
|
description += skip_reason
|
|
return TestResult(self.name, TestStatus.SKIPPED, FailureReason.SKIP, total_time, description)
|
|
|
|
if self.reference_file is None:
|
|
return TestResult(self.name, TestStatus.UNKNOWN, FailureReason.NO_REFERENCE, total_time, description)
|
|
|
|
result_is_different = subprocess.call(['diff', '-q', self.reference_file, self.stdout_file], stdout=PIPE)
|
|
|
|
if result_is_different:
|
|
diff = Popen(['diff', '-U', str(self.testcase_args.unified), self.reference_file, self.stdout_file], stdout=PIPE,
|
|
universal_newlines=True).communicate()[0]
|
|
description += "\n{}\n".format(diff)
|
|
return TestResult(self.name, TestStatus.FAIL, FailureReason.RESULT_DIFF, total_time, description)
|
|
|
|
if self.testcase_args.test_runs > 1 and total_time > 60 and 'long' not in self.tags:
|
|
# We're in Flaky Check mode, check the run time as well while we're at it.
|
|
return TestResult(self.name, TestStatus.FAIL, FailureReason.TOO_LONG, total_time, description)
|
|
|
|
if os.path.exists(self.stdout_file):
|
|
os.remove(self.stdout_file)
|
|
if os.path.exists(self.stderr_file):
|
|
os.remove(self.stderr_file)
|
|
|
|
return TestResult(self.name, TestStatus.OK, None, total_time, description)
|
|
|
|
@staticmethod
|
|
def print_test_time(test_time) -> str:
|
|
if args.print_time:
|
|
return " {0:.2f} sec.".format(test_time)
|
|
else:
|
|
return ''
|
|
|
|
def process_result(self, result: TestResult, messages):
|
|
description_full = messages[result.status]
|
|
description_full += self.print_test_time(result.total_time)
|
|
if result.reason is not None:
|
|
description_full += " - "
|
|
description_full += result.reason.value
|
|
|
|
description_full += result.description
|
|
description_full += "\n"
|
|
|
|
if result.status == TestStatus.FAIL:
|
|
description_full += 'Database: ' + self.testcase_args.testcase_database
|
|
|
|
result.description = description_full
|
|
return result
|
|
|
|
@staticmethod
|
|
def send_test_name_failed(suite: str, case: str) -> bool:
|
|
pid = os.getpid()
|
|
clickhouse_execute(args, f"SELECT 'Running test {suite}/{case} from pid={pid}'")
|
|
|
|
def run_single_test(self, server_logs_level, client_options):
|
|
args = self.testcase_args
|
|
client = args.testcase_client
|
|
start_time = args.testcase_start_time
|
|
database = args.testcase_database
|
|
|
|
# This is for .sh tests
|
|
os.environ["CLICKHOUSE_LOG_COMMENT"] = args.testcase_basename
|
|
|
|
params = {
|
|
'client': client + ' --database=' + database,
|
|
'logs_level': server_logs_level,
|
|
'options': client_options,
|
|
'test': self.case_file,
|
|
'stdout': self.stdout_file,
|
|
'stderr': self.stderr_file,
|
|
}
|
|
|
|
# >> append to stderr (but not stdout since it is not used there),
|
|
# because there are also output of per test database creation
|
|
if not args.database:
|
|
pattern = '{test} > {stdout} 2> {stderr}'
|
|
else:
|
|
pattern = '{test} > {stdout} 2> {stderr}'
|
|
|
|
if self.ext == '.sql':
|
|
pattern = "{client} --send_logs_level={logs_level} --testmode --multiquery {options} < " + pattern
|
|
|
|
command = pattern.format(**params)
|
|
|
|
proc = Popen(command, shell=True, env=os.environ)
|
|
|
|
while (datetime.now() - start_time).total_seconds() < args.timeout and proc.poll() is None:
|
|
sleep(0.01)
|
|
|
|
need_drop_database = not args.database
|
|
if need_drop_database and args.no_drop_if_fail:
|
|
maybe_passed = (proc.returncode == 0) and (proc.stderr is None) and (
|
|
proc.stdout is None or 'Exception' not in proc.stdout)
|
|
need_drop_database = not maybe_passed
|
|
|
|
left_queries_check = args.no_left_queries_check is False
|
|
if self.tags and 'no-left-queries-check' in self.tags:
|
|
left_queries_check = False
|
|
if left_queries_check:
|
|
processlist = get_processlist_after_test(args)
|
|
if processlist:
|
|
print(colored(f"\nFound queries left in processlist after running {args.testcase_basename} (database={database}):", args, "red", attrs=["bold"]))
|
|
print(json.dumps(processlist, indent=4))
|
|
exit_code.value = 1
|
|
|
|
if need_drop_database:
|
|
seconds_left = max(args.timeout - (datetime.now() - start_time).total_seconds(), 20)
|
|
try:
|
|
clickhouse_execute(args, "DROP DATABASE " + database, timeout=seconds_left, settings={
|
|
'log_comment': args.testcase_basename,
|
|
})
|
|
except socket.timeout:
|
|
total_time = (datetime.now() - start_time).total_seconds()
|
|
return None, "", f"Timeout dropping database {database} after test", total_time
|
|
shutil.rmtree(args.test_tmp_dir)
|
|
|
|
total_time = (datetime.now() - start_time).total_seconds()
|
|
|
|
# Normalize randomized database names in stdout, stderr files.
|
|
os.system("LC_ALL=C sed -i -e 's/{test_db}/default/g' {file}".format(test_db=database, file=self.stdout_file))
|
|
if args.hide_db_name:
|
|
os.system(
|
|
"LC_ALL=C sed -i -e 's/{test_db}/default/g' {file}".format(test_db=database, file=self.stderr_file))
|
|
if args.replicated_database:
|
|
os.system("LC_ALL=C sed -i -e 's|/auto_{{shard}}||g' {file}".format(file=self.stdout_file))
|
|
os.system("LC_ALL=C sed -i -e 's|auto_{{replica}}||g' {file}".format(file=self.stdout_file))
|
|
|
|
# Normalize hostname in stdout file.
|
|
os.system("LC_ALL=C sed -i -e 's/{hostname}/localhost/g' {file}".format(hostname=socket.gethostname(),
|
|
file=self.stdout_file))
|
|
|
|
stdout = open(self.stdout_file, 'rb').read() if os.path.exists(self.stdout_file) else b''
|
|
stdout = str(stdout, errors='replace', encoding='utf-8')
|
|
stderr = open(self.stderr_file, 'rb').read() if os.path.exists(self.stderr_file) else b''
|
|
stderr = str(stderr, errors='replace', encoding='utf-8')
|
|
|
|
return proc, stdout, stderr, total_time
|
|
|
|
def run(self, args, suite, client_options, server_logs_level):
|
|
try:
|
|
skip_reason = self.should_skip_test(suite)
|
|
if skip_reason is not None:
|
|
return TestResult(self.name, TestStatus.SKIPPED, skip_reason, 0., "")
|
|
|
|
if args.testname:
|
|
try:
|
|
self.send_test_name_failed(suite.suite, self.case)
|
|
except:
|
|
return TestResult(self.name, TestStatus.FAIL, FailureReason.SERVER_DIED, 0.,
|
|
"\nServer does not respond to health check\n")
|
|
|
|
self.runs_count += 1
|
|
self.testcase_args = self.configure_testcase_args(args, self.case_file, suite.suite_tmp_path)
|
|
client_options = self.add_random_settings(args, client_options)
|
|
proc, stdout, stderr, total_time = self.run_single_test(server_logs_level, client_options)
|
|
|
|
result = self.process_result_impl(proc, stdout, stderr, total_time)
|
|
result.check_if_need_retry(args, stdout, stderr, self.runs_count)
|
|
if result.status == TestStatus.FAIL:
|
|
result.description = self.add_info_about_settings(args, result.description)
|
|
return result
|
|
except KeyboardInterrupt as e:
|
|
raise e
|
|
except HTTPError:
|
|
return TestResult(self.name, TestStatus.FAIL,
|
|
FailureReason.INTERNAL_QUERY_FAIL,
|
|
0.,
|
|
self.add_info_about_settings(args, self.get_description_from_exception_info(sys.exc_info())))
|
|
except (ConnectionRefusedError, ConnectionResetError):
|
|
return TestResult(self.name, TestStatus.FAIL,
|
|
FailureReason.SERVER_DIED,
|
|
0.,
|
|
self.add_info_about_settings(args, self.get_description_from_exception_info(sys.exc_info())))
|
|
except:
|
|
return TestResult(self.name, TestStatus.UNKNOWN,
|
|
FailureReason.INTERNAL_ERROR,
|
|
0.,
|
|
self.get_description_from_exception_info(sys.exc_info()))
|
|
finally:
|
|
self.remove_random_settings_from_env()
|
|
|
|
|
|
class TestSuite:
|
|
@staticmethod
|
|
def tests_in_suite_key_func(item: str) -> int:
|
|
if args.order == 'random':
|
|
return random.random()
|
|
|
|
reverse = 1 if args.order == 'asc' else -1
|
|
|
|
if -1 == item.find('_'):
|
|
return 99998
|
|
|
|
prefix, _ = item.split('_', 1)
|
|
|
|
try:
|
|
return reverse * int(prefix)
|
|
except ValueError:
|
|
return 99997
|
|
|
|
@staticmethod
|
|
def render_test_template(j2env, suite_dir, test_name):
|
|
"""
|
|
Render template for test and reference file if needed
|
|
"""
|
|
|
|
if j2env is None:
|
|
return test_name
|
|
|
|
test_base_name = removesuffix(test_name, ".sql.j2", ".sql")
|
|
|
|
reference_file_name = test_base_name + ".reference.j2"
|
|
reference_file_path = os.path.join(suite_dir, reference_file_name)
|
|
if os.path.isfile(reference_file_path):
|
|
tpl = j2env.get_template(reference_file_name)
|
|
tpl.stream().dump(os.path.join(suite_dir, test_base_name) + ".gen.reference")
|
|
|
|
if test_name.endswith(".sql.j2"):
|
|
tpl = j2env.get_template(test_name)
|
|
generated_test_name = test_base_name + ".gen.sql"
|
|
tpl.stream().dump(os.path.join(suite_dir, generated_test_name))
|
|
return generated_test_name
|
|
|
|
return test_name
|
|
|
|
@staticmethod
|
|
def read_test_tags(suite_dir: str, all_tests: List[str]) -> Dict[str, Set[str]]:
|
|
def get_comment_sign(filename):
|
|
if filename.endswith('.sql') or filename.endswith('.sql.j2'):
|
|
return '--'
|
|
elif filename.endswith('.sh') or filename.endswith('.py') or filename.endswith('.expect'):
|
|
return '#'
|
|
else:
|
|
raise Exception(f'Unknown file_extension: {filename}')
|
|
|
|
def parse_tags_from_line(line, comment_sign):
|
|
if not line.startswith(comment_sign):
|
|
return None
|
|
tags_str = line[len(comment_sign):].lstrip()
|
|
tags_prefix = "Tags:"
|
|
if not tags_str.startswith(tags_prefix):
|
|
return None
|
|
tags_str = tags_str[len(tags_prefix):]
|
|
tags = tags_str.split(',')
|
|
tags = {tag.strip() for tag in tags}
|
|
return tags
|
|
|
|
def is_shebang(line):
|
|
return line.startswith('#!')
|
|
|
|
def load_tags_from_file(filepath):
|
|
with open(filepath, 'r') as file:
|
|
try:
|
|
line = file.readline()
|
|
if is_shebang(line):
|
|
line = file.readline()
|
|
except UnicodeDecodeError:
|
|
return []
|
|
return parse_tags_from_line(line, get_comment_sign(filepath))
|
|
|
|
all_tags = {}
|
|
start_time = datetime.now()
|
|
for test_name in all_tests:
|
|
tags = load_tags_from_file(os.path.join(suite_dir, test_name))
|
|
if tags:
|
|
all_tags[test_name] = tags
|
|
elapsed = (datetime.now() - start_time).total_seconds()
|
|
if elapsed > 1:
|
|
print(f"Tags for suite {suite_dir} read in {elapsed:.2f} seconds")
|
|
return all_tags
|
|
|
|
def __init__(self, args, suite_path: str, suite_tmp_path: str, suite: str):
|
|
self.args = args
|
|
self.suite_path: str = suite_path
|
|
self.suite_tmp_path: str = suite_tmp_path
|
|
self.suite: str = suite
|
|
|
|
filter_func = lambda x: True
|
|
|
|
if args.run_by_hash_num is not None and args.run_by_hash_total is not None:
|
|
if args.run_by_hash_num > args.run_by_hash_total:
|
|
raise Exception(f"Incorrect run by hash, value {args.run_by_hash_num} bigger than total {args.run_by_hash_total}")
|
|
|
|
filter_func = lambda x: stringhash(x) % args.run_by_hash_total == args.run_by_hash_num
|
|
|
|
self.all_tests: List[str] = self.get_tests_list(self.tests_in_suite_key_func, filter_func)
|
|
self.all_tags: Dict[str, Set[str]] = self.read_test_tags(self.suite_path, self.all_tests)
|
|
|
|
self.sequential_tests = []
|
|
self.parallel_tests = []
|
|
for test_name in self.all_tests:
|
|
if self.is_sequential_test(test_name):
|
|
self.sequential_tests.append(test_name)
|
|
else:
|
|
self.parallel_tests.append(test_name)
|
|
|
|
def is_sequential_test(self, test_name):
|
|
if args.sequential:
|
|
if any(s in test_name for s in args.sequential):
|
|
return True
|
|
|
|
if test_name not in self.all_tags:
|
|
return False
|
|
|
|
return ('no-parallel' in self.all_tags[test_name]) or ('sequential' in self.all_tags[test_name])
|
|
|
|
def get_tests_list(self, sort_key, filter_func):
|
|
"""
|
|
Return list of tests file names to run
|
|
"""
|
|
|
|
all_tests = list(self.get_selected_tests(filter_func))
|
|
all_tests = all_tests * self.args.test_runs
|
|
all_tests.sort(key=sort_key)
|
|
return all_tests
|
|
|
|
def get_selected_tests(self, filter_func):
|
|
"""
|
|
Find all files with tests, filter, render templates
|
|
"""
|
|
|
|
j2env = jinja2.Environment(
|
|
loader=jinja2.FileSystemLoader(self.suite_path),
|
|
keep_trailing_newline=True,
|
|
) if USE_JINJA else None
|
|
|
|
for test_name in os.listdir(self.suite_path):
|
|
if not is_test_from_dir(self.suite_path, test_name):
|
|
continue
|
|
if self.args.test and not any(re.search(pattern, test_name) for pattern in self.args.test):
|
|
continue
|
|
if USE_JINJA and test_name.endswith(".gen.sql"):
|
|
continue
|
|
if not filter_func(test_name):
|
|
continue
|
|
test_name = self.render_test_template(j2env, self.suite_path, test_name)
|
|
yield test_name
|
|
|
|
@staticmethod
|
|
def read_test_suite(args, suite_dir_name: str):
|
|
def is_data_present():
|
|
return int(clickhouse_execute(args, 'EXISTS TABLE test.hits'))
|
|
|
|
base_dir = os.path.abspath(args.queries)
|
|
tmp_dir = os.path.abspath(args.tmp)
|
|
suite_path = os.path.join(base_dir, suite_dir_name)
|
|
|
|
suite_re_obj = re.search('^[0-9]+_(.*)$', suite_dir_name)
|
|
if not suite_re_obj: # skip .gitignore and so on
|
|
return None
|
|
|
|
suite_tmp_path = os.path.join(tmp_dir, suite_dir_name)
|
|
if not os.path.exists(suite_tmp_path):
|
|
os.makedirs(suite_tmp_path)
|
|
|
|
suite = suite_re_obj.group(1)
|
|
|
|
if not os.path.isdir(suite_path):
|
|
return None
|
|
|
|
if 'stateful' in suite and not args.no_stateful and not is_data_present():
|
|
print("Won't run stateful tests because test data wasn't loaded.")
|
|
return None
|
|
if 'stateless' in suite and args.no_stateless:
|
|
print("Won't run stateless tests because they were manually disabled.")
|
|
return None
|
|
if 'stateful' in suite and args.no_stateful:
|
|
print("Won't run stateful tests because they were manually disabled.")
|
|
return None
|
|
|
|
return TestSuite(args, suite_path, suite_tmp_path, suite)
|
|
|
|
|
|
stop_time = None
|
|
exit_code = None
|
|
server_died = None
|
|
stop_tests_triggered_lock = None
|
|
stop_tests_triggered = None
|
|
queue = None
|
|
multiprocessing_manager = None
|
|
restarted_tests = None
|
|
|
|
# def run_tests_array(all_tests: List[str], num_tests: int, test_suite: TestSuite):
|
|
def run_tests_array(all_tests_with_params):
|
|
all_tests, num_tests, test_suite = all_tests_with_params
|
|
global stop_time
|
|
global exit_code
|
|
global server_died
|
|
global restarted_tests
|
|
|
|
OP_SQUARE_BRACKET = colored("[", args, attrs=['bold'])
|
|
CL_SQUARE_BRACKET = colored("]", args, attrs=['bold'])
|
|
|
|
MSG_FAIL = OP_SQUARE_BRACKET + colored(" FAIL ", args, "red", attrs=['bold']) + CL_SQUARE_BRACKET
|
|
MSG_UNKNOWN = OP_SQUARE_BRACKET + colored(" UNKNOWN ", args, "yellow", attrs=['bold']) + CL_SQUARE_BRACKET
|
|
MSG_OK = OP_SQUARE_BRACKET + colored(" OK ", args, "green", attrs=['bold']) + CL_SQUARE_BRACKET
|
|
MSG_SKIPPED = OP_SQUARE_BRACKET + colored(" SKIPPED ", args, "cyan", attrs=['bold']) + CL_SQUARE_BRACKET
|
|
|
|
MESSAGES = {TestStatus.FAIL: MSG_FAIL, TestStatus.UNKNOWN: MSG_UNKNOWN, TestStatus.OK: MSG_OK, TestStatus.SKIPPED: MSG_SKIPPED}
|
|
|
|
passed_total = 0
|
|
skipped_total = 0
|
|
failures_total = 0
|
|
failures_chain = 0
|
|
start_time = datetime.now()
|
|
|
|
is_concurrent = multiprocessing.current_process().name != "MainProcess"
|
|
|
|
client_options = get_additional_client_options(args)
|
|
|
|
if num_tests > 0:
|
|
about = 'about ' if is_concurrent else ''
|
|
proc_name = multiprocessing.current_process().name
|
|
print(f"\nRunning {about}{num_tests} {test_suite.suite} tests ({proc_name}).\n")
|
|
|
|
while True:
|
|
if is_concurrent:
|
|
case = queue.get(timeout=args.timeout * 1.1)
|
|
if not case:
|
|
break
|
|
else:
|
|
if all_tests:
|
|
case = all_tests.pop(0)
|
|
else:
|
|
break
|
|
|
|
if server_died.is_set():
|
|
stop_tests()
|
|
break
|
|
|
|
if stop_time and time() > stop_time:
|
|
print("\nStop tests run because global time limit is exceeded.\n")
|
|
stop_tests()
|
|
break
|
|
|
|
test_case = TestCase(test_suite, case, args, is_concurrent)
|
|
|
|
try:
|
|
description = ''
|
|
if not is_concurrent:
|
|
sys.stdout.flush()
|
|
sys.stdout.write("{0:72}".format(removesuffix(test_case.name, ".gen", ".sql") + ": "))
|
|
# This flush is needed so you can see the test name of the long
|
|
# running test before it will finish. But don't do it in parallel
|
|
# mode, so that the lines don't mix.
|
|
sys.stdout.flush()
|
|
else:
|
|
description = "{0:72}".format(removesuffix(test_case.name, ".gen", ".sql") + ": ")
|
|
|
|
while True:
|
|
test_result = test_case.run(args, test_suite, client_options, server_logs_level)
|
|
test_result = test_case.process_result(test_result, MESSAGES)
|
|
if not test_result.need_retry:
|
|
break
|
|
restarted_tests.append(test_result)
|
|
|
|
if test_result.status == TestStatus.OK:
|
|
passed_total += 1
|
|
failures_chain = 0
|
|
elif test_result.status == TestStatus.FAIL:
|
|
failures_total += 1
|
|
failures_chain += 1
|
|
if test_result.reason == FailureReason.SERVER_DIED:
|
|
server_died.set()
|
|
stop_tests()
|
|
|
|
elif test_result.status == TestStatus.SKIPPED:
|
|
skipped_total += 1
|
|
|
|
description += test_result.description
|
|
|
|
if description and not description.endswith('\n'):
|
|
description += '\n'
|
|
|
|
sys.stdout.write(description)
|
|
sys.stdout.flush()
|
|
except KeyboardInterrupt as e:
|
|
print(colored("Break tests execution", args, "red"))
|
|
stop_tests()
|
|
raise e
|
|
|
|
if failures_chain >= 20:
|
|
stop_tests()
|
|
break
|
|
|
|
if failures_total > 0:
|
|
print(colored(f"\nHaving {failures_total} errors! {passed_total} tests passed."
|
|
f" {skipped_total} tests skipped. {(datetime.now() - start_time).total_seconds():.2f} s elapsed"
|
|
f' ({multiprocessing.current_process().name}).',
|
|
args, "red", attrs=["bold"]))
|
|
exit_code.value = 1
|
|
else:
|
|
print(colored(f"\n{passed_total} tests passed. {skipped_total} tests skipped."
|
|
f" {(datetime.now() - start_time).total_seconds():.2f} s elapsed"
|
|
f' ({multiprocessing.current_process().name}).',
|
|
args, "green", attrs=["bold"]))
|
|
|
|
sys.stdout.flush()
|
|
|
|
|
|
server_logs_level = "warning"
|
|
|
|
|
|
def check_server_started(args):
|
|
print("Connecting to ClickHouse server...", end='')
|
|
|
|
sys.stdout.flush()
|
|
retry_count = args.server_check_retries
|
|
while retry_count > 0:
|
|
try:
|
|
clickhouse_execute(args, 'SELECT 1')
|
|
print(" OK")
|
|
sys.stdout.flush()
|
|
return True
|
|
except (ConnectionRefusedError, ConnectionResetError):
|
|
print('.', end='')
|
|
sys.stdout.flush()
|
|
retry_count -= 1
|
|
sleep(0.5)
|
|
continue
|
|
|
|
print('\nAll connection tries failed')
|
|
sys.stdout.flush()
|
|
return False
|
|
|
|
|
|
class BuildFlags():
|
|
THREAD = 'tsan'
|
|
ADDRESS = 'asan'
|
|
UNDEFINED = 'ubsan'
|
|
MEMORY = 'msan'
|
|
DEBUG = 'debug'
|
|
RELEASE = 'release'
|
|
ORDINARY_DATABASE = 'ordinary-database'
|
|
POLYMORPHIC_PARTS = 'polymorphic-parts'
|
|
|
|
|
|
def collect_build_flags(args):
|
|
result = []
|
|
|
|
value = clickhouse_execute(args, "SELECT value FROM system.build_options WHERE name = 'CXX_FLAGS'")
|
|
if b'-fsanitize=thread' in value:
|
|
result.append(BuildFlags.THREAD)
|
|
elif b'-fsanitize=address' in value:
|
|
result.append(BuildFlags.ADDRESS)
|
|
elif b'-fsanitize=undefined' in value:
|
|
result.append(BuildFlags.UNDEFINED)
|
|
elif b'-fsanitize=memory' in value:
|
|
result.append(BuildFlags.MEMORY)
|
|
|
|
value = clickhouse_execute(args, "SELECT value FROM system.build_options WHERE name = 'BUILD_TYPE'")
|
|
if b'Debug' in value:
|
|
result.append(BuildFlags.DEBUG)
|
|
elif b'RelWithDebInfo' in value or b'Release' in value:
|
|
result.append(BuildFlags.RELEASE)
|
|
|
|
value = clickhouse_execute(args, "SELECT value FROM system.settings WHERE name = 'default_database_engine'")
|
|
if value == b'Ordinary':
|
|
result.append(BuildFlags.ORDINARY_DATABASE)
|
|
|
|
value = int(clickhouse_execute(args, "SELECT value FROM system.merge_tree_settings WHERE name = 'min_bytes_for_wide_part'"))
|
|
if value == 0:
|
|
result.append(BuildFlags.POLYMORPHIC_PARTS)
|
|
|
|
use_flags = clickhouse_execute(args, "SELECT name FROM system.build_options WHERE name like 'USE_%' AND value in ('ON', '1')")
|
|
for use_flag in use_flags.strip().splitlines():
|
|
use_flag = use_flag.decode().lower()
|
|
result.append(use_flag)
|
|
|
|
system_processor = clickhouse_execute(args, "SELECT value FROM system.build_options WHERE name = 'SYSTEM_PROCESSOR' LIMIT 1").strip()
|
|
if system_processor:
|
|
result.append(f'cpu-{system_processor.decode().lower()}')
|
|
|
|
return result
|
|
|
|
|
|
def suite_key_func(item: str) -> Union[int, Tuple[int, str]]:
|
|
if args.order == 'random':
|
|
return random.random()
|
|
|
|
if -1 == item.find('_'):
|
|
return 99998, ''
|
|
|
|
prefix, suffix = item.split('_', 1)
|
|
|
|
try:
|
|
return int(prefix), suffix
|
|
except ValueError:
|
|
return 99997, ''
|
|
|
|
|
|
def extract_key(key: str) -> str:
|
|
return subprocess.getstatusoutput(
|
|
args.extract_from_config +
|
|
" --try --config " +
|
|
args.configserver + key)[1]
|
|
|
|
|
|
def do_run_tests(jobs, test_suite: TestSuite, parallel):
|
|
if jobs > 1 and len(test_suite.parallel_tests) > 0:
|
|
print("Found", len(test_suite.parallel_tests), "parallel tests and", len(test_suite.sequential_tests), "sequential tests")
|
|
run_n, run_total = parallel.split('/')
|
|
run_n = float(run_n)
|
|
run_total = float(run_total)
|
|
tests_n = len(test_suite.parallel_tests)
|
|
if run_total > tests_n:
|
|
run_total = tests_n
|
|
|
|
if jobs > tests_n:
|
|
jobs = tests_n
|
|
if jobs > run_total:
|
|
run_total = jobs
|
|
|
|
batch_size = max(1, len(test_suite.parallel_tests) // jobs)
|
|
parallel_tests_array = []
|
|
for _ in range(jobs):
|
|
parallel_tests_array.append((None, batch_size, test_suite))
|
|
|
|
with closing(multiprocessing.Pool(processes=jobs)) as pool:
|
|
pool.map_async(run_tests_array, parallel_tests_array)
|
|
|
|
for suit in test_suite.parallel_tests:
|
|
queue.put(suit, timeout=args.timeout * 1.1)
|
|
|
|
for _ in range(jobs):
|
|
queue.put(None, timeout=args.timeout * 1.1)
|
|
|
|
queue.close()
|
|
|
|
pool.join()
|
|
|
|
run_tests_array((test_suite.sequential_tests, len(test_suite.sequential_tests), test_suite))
|
|
return len(test_suite.sequential_tests) + len(test_suite.parallel_tests)
|
|
else:
|
|
num_tests = len(test_suite.all_tests)
|
|
run_tests_array((test_suite.all_tests, num_tests, test_suite))
|
|
return num_tests
|
|
|
|
|
|
def is_test_from_dir(suite_dir, case):
|
|
case_file = os.path.join(suite_dir, case)
|
|
# We could also test for executable files (os.access(case_file, os.X_OK),
|
|
# but it interferes with 01610_client_spawn_editor.editor, which is invoked
|
|
# as a query editor in the test, and must be marked as executable.
|
|
return os.path.isfile(case_file) and any(case_file.endswith(suppotred_ext) for suppotred_ext in TEST_FILE_EXTENSIONS)
|
|
|
|
|
|
def removesuffix(text, *suffixes):
|
|
"""
|
|
Added in python 3.9
|
|
https://www.python.org/dev/peps/pep-0616/
|
|
|
|
This version can work with several possible suffixes
|
|
"""
|
|
for suffix in suffixes:
|
|
if suffix and text.endswith(suffix):
|
|
return text[:-len(suffix)]
|
|
return text
|
|
|
|
|
|
def main(args):
|
|
global server_died
|
|
global stop_time
|
|
global exit_code
|
|
global server_logs_level
|
|
global restarted_tests
|
|
|
|
if not check_server_started(args):
|
|
msg = "Server is not responding. Cannot execute 'SELECT 1' query. \
|
|
If you are using split build, you have to specify -c option."
|
|
if args.hung_check:
|
|
print(msg)
|
|
pid = get_server_pid()
|
|
print("Got server pid", pid)
|
|
print_stacktraces()
|
|
raise Exception(msg)
|
|
|
|
args.build_flags = collect_build_flags(args)
|
|
|
|
if args.skip:
|
|
args.skip = set(args.skip)
|
|
|
|
base_dir = os.path.abspath(args.queries)
|
|
|
|
# Keep same default values as in queries/shell_config.sh
|
|
os.environ.setdefault("CLICKHOUSE_BINARY", args.binary)
|
|
# os.environ.setdefault("CLICKHOUSE_CLIENT", args.client)
|
|
os.environ.setdefault("CLICKHOUSE_CONFIG", args.configserver)
|
|
|
|
if args.configclient:
|
|
os.environ.setdefault("CLICKHOUSE_CONFIG_CLIENT", args.configclient)
|
|
|
|
# Force to print server warnings in stderr
|
|
# Shell scripts could change logging level
|
|
os.environ.setdefault("CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL", server_logs_level)
|
|
|
|
# This code is bad as the time is not monotonic
|
|
if args.global_time_limit:
|
|
stop_time = time() + args.global_time_limit
|
|
|
|
if args.zookeeper is None:
|
|
try:
|
|
args.zookeeper = int(extract_key(" --key zookeeper | grep . | wc -l")) > 0
|
|
except ValueError:
|
|
args.zookeeper = False
|
|
|
|
if args.shard is None:
|
|
args.shard = bool(extract_key(' --key listen_host | grep -E "127.0.0.2|::"'))
|
|
|
|
def create_common_database(args, db_name):
|
|
create_database_retries = 0
|
|
while create_database_retries < MAX_RETRIES:
|
|
start_time = datetime.now()
|
|
try:
|
|
clickhouse_execute(args, "CREATE DATABASE IF NOT EXISTS " + db_name + get_db_engine(args, db_name))
|
|
except HTTPError as e:
|
|
total_time = (datetime.now() - start_time).total_seconds()
|
|
if not need_retry(args, e.message, e.message, total_time):
|
|
break
|
|
create_database_retries += 1
|
|
|
|
if args.database and args.database != "test":
|
|
create_common_database(args, args.database)
|
|
|
|
create_common_database(args, "test")
|
|
|
|
total_tests_run = 0
|
|
|
|
for suite in sorted(os.listdir(base_dir), key=suite_key_func):
|
|
if server_died.is_set():
|
|
break
|
|
|
|
test_suite = TestSuite.read_test_suite(args, suite)
|
|
if test_suite is None:
|
|
continue
|
|
|
|
total_tests_run += do_run_tests(args.jobs, test_suite, args.parallel)
|
|
|
|
if server_died.is_set():
|
|
exit_code.value = 1
|
|
|
|
if args.hung_check:
|
|
|
|
# Some queries may execute in background for some time after test was finished. This is normal.
|
|
for _ in range(1, 60):
|
|
processlist = get_processlist(args)
|
|
if not processlist:
|
|
break
|
|
sleep(1)
|
|
|
|
if processlist:
|
|
print(colored("\nFound hung queries in processlist:", args, "red", attrs=["bold"]))
|
|
print(json.dumps(processlist, indent=4))
|
|
|
|
print_stacktraces()
|
|
exit_code.value = 1
|
|
else:
|
|
print(colored("\nNo queries hung.", args, "green", attrs=["bold"]))
|
|
|
|
if len(restarted_tests) > 0:
|
|
print("\nSome tests were restarted:\n")
|
|
|
|
for test_result in restarted_tests:
|
|
print("\n{0:72}: ".format(test_result.case_name))
|
|
# replace it with lowercase to avoid parsing retried tests as failed
|
|
for status in TestStatus:
|
|
test_result.description = test_result.description.replace(status.value, status.value.lower())
|
|
print(test_result.description)
|
|
|
|
if total_tests_run == 0:
|
|
print("No tests were run.")
|
|
sys.exit(1)
|
|
else:
|
|
print("All tests have finished.")
|
|
|
|
sys.exit(exit_code.value)
|
|
|
|
|
|
def find_binary(name):
|
|
if os.path.exists(name) and os.access(name, os.X_OK):
|
|
return True
|
|
paths = os.environ.get("PATH").split(':')
|
|
for path in paths:
|
|
if os.access(os.path.join(path, name), os.X_OK):
|
|
return True
|
|
|
|
# maybe it wasn't in PATH
|
|
if os.access(os.path.join('/usr/local/bin', name), os.X_OK):
|
|
return True
|
|
if os.access(os.path.join('/usr/bin', name), os.X_OK):
|
|
return True
|
|
return False
|
|
|
|
|
|
def get_additional_client_options(args):
|
|
if args.client_option:
|
|
return ' '.join('--' + option for option in args.client_option)
|
|
return ''
|
|
|
|
def get_additional_client_options_url(args):
|
|
if args.client_option:
|
|
return '&'.join(args.client_option)
|
|
return ''
|
|
|
|
if __name__ == '__main__':
|
|
stop_time = None
|
|
exit_code = multiprocessing.Value("i", 0)
|
|
server_died = multiprocessing.Event()
|
|
stop_tests_triggered_lock = multiprocessing.Lock()
|
|
stop_tests_triggered = multiprocessing.Event()
|
|
queue = multiprocessing.Queue(maxsize=1)
|
|
multiprocessing_manager = multiprocessing.Manager()
|
|
restarted_tests = multiprocessing_manager.list()
|
|
|
|
# Move to a new process group and kill it at exit so that we don't have any
|
|
# infinite tests processes left
|
|
# (new process group is required to avoid killing some parent processes)
|
|
os.setpgid(0, 0)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGHUP, signal_handler)
|
|
|
|
parser = ArgumentParser(description='ClickHouse functional tests')
|
|
parser.add_argument('-q', '--queries', help='Path to queries dir')
|
|
parser.add_argument('--tmp', help='Path to tmp dir')
|
|
|
|
parser.add_argument('-b', '--binary', default='clickhouse',
|
|
help='Path to clickhouse (if monolithic build, clickhouse-server otherwise) binary or name of binary in PATH')
|
|
|
|
parser.add_argument('-c', '--client',
|
|
help='Path to clickhouse-client (if split build, useless otherwise) binary of name of binary in PATH')
|
|
|
|
parser.add_argument('--extract_from_config', help='extract-from-config program')
|
|
parser.add_argument('--configclient', help='Client config (if you use not default ports)')
|
|
parser.add_argument('--configserver', default='/etc/clickhouse-server/config.xml', help='Preprocessed server config')
|
|
parser.add_argument('-o', '--output', help='Output xUnit compliant test report directory')
|
|
parser.add_argument('-t', '--timeout', type=int, default=600, help='Timeout for each test case in seconds')
|
|
parser.add_argument('--global_time_limit', type=int, help='Stop if executing more than specified time (after current test finished)')
|
|
parser.add_argument('test', nargs='*', help='Optional test case name regex')
|
|
parser.add_argument('-d', '--disabled', action='store_true', default=False, help='Also run disabled tests')
|
|
parser.add_argument('--stop', action='store_true', default=None, dest='stop', help='Stop on network errors')
|
|
parser.add_argument('--order', default='desc', choices=['asc', 'desc', 'random'], help='Run order')
|
|
parser.add_argument('--testname', action='store_true', default=None, dest='testname', help='Make query with test name before test run')
|
|
parser.add_argument('--hung-check', action='store_true', default=False)
|
|
parser.add_argument('--no-left-queries-check', action='store_true', default=False)
|
|
parser.add_argument('--force-color', action='store_true', default=False)
|
|
parser.add_argument('--database', help='Database for tests (random name test_XXXXXX by default)')
|
|
parser.add_argument('--no-drop-if-fail', action='store_true', help='Do not drop database for test if test has failed')
|
|
parser.add_argument('--hide-db-name', action='store_true', help='Replace random database name with "default" in stderr')
|
|
parser.add_argument('--parallel', default='1/1', help='One parallel test run number/total')
|
|
parser.add_argument('-j', '--jobs', default=1, nargs='?', type=int, help='Run all tests in parallel')
|
|
parser.add_argument('--test-runs', default=1, nargs='?', type=int, help='Run each test many times (useful for e.g. flaky check)')
|
|
parser.add_argument('-U', '--unified', default=3, type=int, help='output NUM lines of unified context')
|
|
parser.add_argument('-r', '--server-check-retries', default=180, type=int, help='Num of tries to execute SELECT 1 before tests started')
|
|
parser.add_argument('--db-engine', help='Database engine name')
|
|
parser.add_argument('--replicated-database', action='store_true', default=False, help='Run tests with Replicated database engine')
|
|
parser.add_argument('--fast-tests-only', action='store_true', default=False, help='Run only fast tests (the tests without the "no-fasttest" tag)')
|
|
parser.add_argument('--no-stateless', action='store_true', help='Disable all stateless tests')
|
|
parser.add_argument('--no-stateful', action='store_true', help='Disable all stateful tests')
|
|
parser.add_argument('--skip', nargs='+', help="Skip these tests")
|
|
parser.add_argument('--sequential', nargs='+', help="Run these tests sequentially even if --parallel specified")
|
|
parser.add_argument('--no-long', action='store_true', dest='no_long', help='Do not run long tests')
|
|
parser.add_argument('--client-option', nargs='+', help='Specify additional client argument')
|
|
parser.add_argument('--print-time', action='store_true', dest='print_time', help='Print test time')
|
|
parser.add_argument('--check-zookeeper-session', action='store_true', help='Check ZooKeeper session uptime to determine if failed test should be retried')
|
|
parser.add_argument('--s3-storage', action='store_true', default=False, help='Run tests over s3 storage')
|
|
parser.add_argument('--no-random-settings', action='store_true', default=False, help='Disable settings randomization')
|
|
|
|
parser.add_argument('--run-by-hash-num', type=int, help='Run tests matching crc32(test_name) % run_by_hash_total == run_by_hash_num')
|
|
parser.add_argument('--run-by-hash-total', type=int, help='Total test groups for crc32(test_name) % run_by_hash_total == run_by_hash_num')
|
|
|
|
group = parser.add_mutually_exclusive_group(required=False)
|
|
group.add_argument('--zookeeper', action='store_true', default=None, dest='zookeeper', help='Run zookeeper related tests')
|
|
group.add_argument('--no-zookeeper', action='store_false', default=None, dest='zookeeper', help='Do not run zookeeper related tests')
|
|
|
|
group = parser.add_mutually_exclusive_group(required=False)
|
|
group.add_argument('--shard', action='store_true', default=None, dest='shard', help='Run sharding related tests (required to clickhouse-server listen 127.0.0.2 127.0.0.3)')
|
|
group.add_argument('--no-shard', action='store_false', default=None, dest='shard', help='Do not run shard related tests')
|
|
|
|
group.add_argument('--backward-compatibility-check', action='store_true', help='Run tests for further backwoard compatibility testing by ignoring all'
|
|
'drop queries in tests for collecting data from new version of server')
|
|
args = parser.parse_args()
|
|
|
|
if args.queries and not os.path.isdir(args.queries):
|
|
print(f"Cannot access the specified directory with queries ({args.queries})", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Autodetect the directory with queries if not specified
|
|
if args.queries is None:
|
|
args.queries = 'queries'
|
|
|
|
if not os.path.isdir(args.queries):
|
|
# If we're running from the repo
|
|
args.queries = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'queries')
|
|
|
|
if not os.path.isdir(args.queries):
|
|
# Next we're going to try some system directories, don't write 'stdout' files into them.
|
|
if args.tmp is None:
|
|
args.tmp = '/tmp/clickhouse-test'
|
|
|
|
args.queries = '/usr/local/share/clickhouse-test/queries'
|
|
|
|
if not os.path.isdir(args.queries):
|
|
args.queries = '/usr/share/clickhouse-test/queries'
|
|
|
|
if not os.path.isdir(args.queries):
|
|
print("Failed to detect path to the queries directory. Please specify it with '--queries' option.", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
print("Using queries from '" + args.queries + "' directory")
|
|
|
|
if args.tmp is None:
|
|
args.tmp = args.queries
|
|
if args.client is None:
|
|
if find_binary(args.binary + '-client'):
|
|
args.client = args.binary + '-client'
|
|
|
|
print("Using " + args.client + " as client program (expecting split build)")
|
|
elif find_binary(args.binary):
|
|
args.client = args.binary + ' client'
|
|
|
|
print("Using " + args.client + " as client program (expecting monolithic build)")
|
|
else:
|
|
print("No 'clickhouse' or 'clickhouse-client' client binary found", file=sys.stderr)
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
if args.configclient:
|
|
args.client += ' --config-file=' + args.configclient
|
|
|
|
tcp_host = os.getenv("CLICKHOUSE_HOST")
|
|
if tcp_host is not None:
|
|
args.tcp_host = tcp_host
|
|
args.client += f' --host={tcp_host}'
|
|
else:
|
|
args.tcp_host = 'localhost'
|
|
|
|
tcp_port = os.getenv("CLICKHOUSE_PORT_TCP")
|
|
if tcp_port is not None:
|
|
args.tcp_port = int(tcp_port)
|
|
args.client += f" --port={tcp_port}"
|
|
else:
|
|
args.tcp_port = 9000
|
|
|
|
http_port = os.getenv("CLICKHOUSE_PORT_HTTP")
|
|
if http_port is not None:
|
|
args.http_port = int(http_port)
|
|
else:
|
|
args.http_port = 8123
|
|
|
|
client_database = os.getenv("CLICKHOUSE_DATABASE")
|
|
if client_database is not None:
|
|
args.client += f' --database={client_database}'
|
|
args.client_database = client_database
|
|
else:
|
|
args.client_database = 'default'
|
|
|
|
if args.backward_compatibility_check:
|
|
args.client += ' --fake-drop'
|
|
|
|
if args.client_option:
|
|
# Set options for client
|
|
if 'CLICKHOUSE_CLIENT_OPT' in os.environ:
|
|
os.environ['CLICKHOUSE_CLIENT_OPT'] += ' '
|
|
else:
|
|
os.environ['CLICKHOUSE_CLIENT_OPT'] = ''
|
|
|
|
os.environ['CLICKHOUSE_CLIENT_OPT'] += get_additional_client_options(args)
|
|
|
|
# Set options for curl
|
|
if 'CLICKHOUSE_URL_PARAMS' in os.environ:
|
|
os.environ['CLICKHOUSE_URL_PARAMS'] += '&'
|
|
else:
|
|
os.environ['CLICKHOUSE_URL_PARAMS'] = ''
|
|
|
|
client_options_query_str = get_additional_client_options_url(args)
|
|
args.client_options_query_str = client_options_query_str + '&'
|
|
os.environ['CLICKHOUSE_URL_PARAMS'] += client_options_query_str
|
|
else:
|
|
args.client_options_query_str = ''
|
|
|
|
if args.extract_from_config is None:
|
|
if os.access(args.binary + '-extract-from-config', os.X_OK):
|
|
args.extract_from_config = args.binary + '-extract-from-config'
|
|
else:
|
|
args.extract_from_config = args.binary + ' extract-from-config'
|
|
|
|
if args.jobs is None:
|
|
args.jobs = multiprocessing.cpu_count()
|
|
|
|
main(args)
|